root/trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

Revision 783, 39.5 kB (checked in by ppalmers, 15 years ago)

cleanup time/wait/sleep code

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "StreamProcessorManager.h"
25 #include "generic/StreamProcessor.h"
26 #include "generic/Port.h"
27 #include "libieee1394/cycletimer.h"
28
29 #include "libutil/Time.h"
30
31 #include <errno.h>
32 #include <assert.h>
33 #include <math.h>
34
35 #define RUNNING_TIMEOUT_MSEC 4000
36 #define PREPARE_TIMEOUT_MSEC 4000
37 #define ENABLE_TIMEOUT_MSEC 4000
38
39 // allows to add some processing margin. This shifts the time
40 // at which the buffer is transfer()'ed, making things somewhat
41 // more robust. It should be noted though that shifting the transfer
42 // time to a later time instant also causes the xmit buffer fill to be
43 // lower on average.
44 #define FFADO_SIGNAL_DELAY_TICKS (3072*1)
45
46 namespace Streaming {
47
48 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
49
50 StreamProcessorManager::StreamProcessorManager()
51     : m_is_slave( false )
52     , m_SyncSource(NULL)
53     , m_nb_buffers( 0 )
54     , m_period( 0 )
55     , m_nominal_framerate ( 0 )
56     , m_xrun_happened( false )
57     , m_xruns(0)
58     , m_nbperiods(0)
59 {
60     addOption(Util::OptionContainer::Option("slaveMode",false));
61 }
62
63 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int framerate, unsigned int nb_buffers)
64     : m_is_slave( false )
65     , m_SyncSource(NULL)
66     , m_nb_buffers(nb_buffers)
67     , m_period(period)
68     , m_nominal_framerate ( framerate )
69     , m_xruns(0)
70     , m_xrun_happened( false )
71     , m_nbperiods(0)
72 {
73     addOption(Util::OptionContainer::Option("slaveMode",false));
74 }
75
76 StreamProcessorManager::~StreamProcessorManager() {
77 }
78
79 /**
80  * Registers \ref processor with this manager.
81  *
82  * also registers it with the isohandlermanager
83  *
84  * be sure to call isohandlermanager->init() first!
85  * and be sure that the processors are also ->init()'ed
86  *
87  * @param processor
88  * @return true if successfull
89  */
90 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
91 {
92     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
93     assert(processor);
94     if (processor->getType() == StreamProcessor::ePT_Receive) {
95         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
96         m_ReceiveProcessors.push_back(processor);
97         return true;
98     }
99
100     if (processor->getType() == StreamProcessor::ePT_Transmit) {
101         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
102         m_TransmitProcessors.push_back(processor);
103         return true;
104     }
105
106     debugFatal("Unsupported processor type!\n");
107     return false;
108 }
109
110 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
111 {
112     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
113     assert(processor);
114
115     if (processor->getType()==StreamProcessor::ePT_Receive) {
116
117         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
118               it != m_ReceiveProcessors.end();
119               ++it )
120         {
121             if ( *it == processor ) {
122                 m_ReceiveProcessors.erase(it);
123                 return true;
124             }
125         }
126     }
127
128     if (processor->getType()==StreamProcessor::ePT_Transmit) {
129         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
130               it != m_TransmitProcessors.end();
131               ++it )
132         {
133             if ( *it == processor ) {
134                 m_TransmitProcessors.erase(it);
135                 return true;
136             }
137         }
138     }
139
140     debugFatal("Processor (%p) not found!\n",processor);
141     return false; //not found
142 }
143
144 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
145     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
146     m_SyncSource=s;
147     return true;
148 }
149
150 bool StreamProcessorManager::prepare() {
151
152     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
153
154     m_is_slave=false;
155     if(!getOption("slaveMode", m_is_slave)) {
156         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
157     }
158
159     // if no sync source is set, select one here
160     if(m_SyncSource == NULL) {
161        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
162     }
163
164     // FIXME: put into separate method
165     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
166           it != m_ReceiveProcessors.end();
167           ++it )
168     {
169         if(m_SyncSource == NULL) {
170             debugWarning(" => Sync Source is %p.\n", *it);
171             m_SyncSource = *it;
172         }
173     }
174     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
175           it != m_TransmitProcessors.end();
176           ++it )
177     {
178         if(m_SyncSource == NULL) {
179             debugWarning(" => Sync Source is %p.\n", *it);
180             m_SyncSource = *it;
181         }
182     }
183
184     // now do the actual preparation of the SP's
185     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
186     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
187         it != m_ReceiveProcessors.end();
188         ++it ) {
189
190         if(!(*it)->setOption("slaveMode", m_is_slave)) {
191             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
192         }
193
194         if(!(*it)->prepare()) {
195             debugFatal(  " could not prepare (%p)...\n",(*it));
196             return false;
197         }
198     }
199     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
200     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
201         it != m_TransmitProcessors.end();
202         ++it ) {
203         if(!(*it)->setOption("slaveMode", m_is_slave)) {
204             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
205         }
206         if(!(*it)->prepare()) {
207             debugFatal( " could not prepare (%p)...\n",(*it));
208             return false;
209         }
210     }
211
212     // if there are no stream processors registered,
213     // fail
214     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
215         debugFatal("No stream processors registered, can't do anything usefull\n");
216         return false;
217     }
218     return true;
219 }
220
221 bool StreamProcessorManager::startDryRunning() {
222     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
223     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
224     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
225             it != m_ReceiveProcessors.end();
226             ++it ) {
227         if (!(*it)->isDryRunning()) {
228             if(!(*it)->scheduleStartDryRunning(-1)) {
229                 debugError("Could not put SP %p into the dry-running state\n", *it);
230                 return false;
231             }
232         } else {
233             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
234         }
235     }
236     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
237             it != m_TransmitProcessors.end();
238             ++it ) {
239         if (!(*it)->isDryRunning()) {
240             if(!(*it)->scheduleStartDryRunning(-1)) {
241                 debugError("Could not put SP %p into the dry-running state\n", *it);
242                 return false;
243             }
244         } else {
245             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
246         }
247     }
248     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
249     // wait for the syncsource to start running.
250     // that will block the waitForPeriod call until everyone has started (theoretically)
251     #define CYCLES_FOR_DRYRUN 40000
252     int cnt = CYCLES_FOR_DRYRUN; // by then it should have started
253     bool all_dry_running = false;
254     while (!all_dry_running && cnt) {
255         all_dry_running = true;
256         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
257                 it != m_ReceiveProcessors.end();
258                 ++it ) {
259             all_dry_running &= (*it)->isDryRunning();
260         }
261         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
262                 it != m_TransmitProcessors.end();
263                 ++it ) {
264             all_dry_running &= (*it)->isDryRunning();
265         }
266
267         SleepRelativeUsec(125);
268         cnt--;
269     }
270     if(cnt==0) {
271         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
272         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
273                 it != m_ReceiveProcessors.end();
274                 ++it ) {
275             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
276                 (*it)->getTypeString(), *it, (*it)->getStateString());
277         }
278         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
279                 it != m_TransmitProcessors.end();
280                 ++it ) {
281             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
282                 (*it)->getTypeString(), *it, (*it)->getStateString());
283         }
284         return false;
285     }
286     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
287     return true;
288 }
289
290 bool StreamProcessorManager::syncStartAll() {
291     // figure out when to get the SP's running.
292     // the xmit SP's should also know the base timestamp
293     // streams should be aligned here
294
295     // now find out how long we have to delay the wait operation such that
296     // the received frames will all be presented to the SP
297     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
298     int max_of_min_delay = 0;
299     int min_delay = 0;
300     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
301             it != m_ReceiveProcessors.end();
302             ++it ) {
303         min_delay = (*it)->getMaxFrameLatency();
304         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
305     }
306
307     // add some processing margin. This only shifts the time
308     // at which the buffer is transfer()'ed. This makes things somewhat
309     // more robust. It should be noted though that shifting the transfer
310     // time to a later time instant also causes the xmit buffer fill to be
311     // lower on average.
312     max_of_min_delay += FFADO_SIGNAL_DELAY_TICKS;
313     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n",
314         max_of_min_delay,
315         (unsigned int)TICKS_TO_SECS(max_of_min_delay),
316         (unsigned int)TICKS_TO_CYCLES(max_of_min_delay),
317         (unsigned int)TICKS_TO_OFFSET(max_of_min_delay));
318     m_SyncSource->setSyncDelay(max_of_min_delay);
319
320     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
321     //        have aquired lock
322     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
323     //sleep(2); // FIXME: be smarter here
324
325     // make sure that we are dry-running long enough for the
326     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
327     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
328     int nb_sync_runs=20;
329     int64_t time_till_next_period;
330     while(nb_sync_runs--) { // or while not sync-ed?
331         // check if we were woken up too soon
332         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
333         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
334         if(time_till_next_period > 0) {
335             // wait for the period
336             SleepRelativeUsec(time_till_next_period);
337         }
338     }
339
340     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
341     // FIXME: in the SPM it would be nice to have system time instead of
342     //        1394 time
343
344     // we now should have decent sync info on the sync source
345     // determine a point in time where the system should start
346     // figure out where we are now
347     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
348     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
349         time_of_first_sample,
350         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
351         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
352         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
353
354     #define CYCLES_FOR_STARTUP 2000
355     // start wet-running in CYCLES_FOR_STARTUP cycles
356     // this is the time window we have to setup all SP's such that they
357     // can start wet-running correctly.
358     time_of_first_sample = addTicks(time_of_first_sample,
359                                     CYCLES_FOR_STARTUP * TICKS_PER_CYCLE);
360
361     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
362         time_of_first_sample,
363         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
364         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
365         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
366
367     // we should start wet-running the transmit SP's some cycles in advance
368     // such that we know it is wet-running when it should output its first sample
369     #define PRESTART_CYCLES_FOR_XMIT 20
370     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
371                                                  PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE);
372
373     #define PRESTART_CYCLES_FOR_RECV 0
374     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
375                                                  PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE);
376     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
377         time_to_start_xmit,
378         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
379         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
380         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
381     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
382         time_to_start_recv,
383         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
384         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
385         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
386
387     // at this point the buffer head timestamp of the transmit buffers can be set
388     // this is the presentation time of the first sample in the buffer
389     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
390           it != m_TransmitProcessors.end();
391           ++it ) {
392         (*it)->setBufferHeadTimestamp(time_of_first_sample);
393     }
394
395     // STEP X: switch SP's over to the running state
396     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
397           it != m_ReceiveProcessors.end();
398           ++it ) {
399         if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
400             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
401             return false;
402         }
403     }
404     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
405           it != m_TransmitProcessors.end();
406           ++it ) {
407         if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
408             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
409             return false;
410         }
411     }
412     // wait for the syncsource to start running.
413     // that will block the waitForPeriod call until everyone has started (theoretically)
414     int cnt = CYCLES_FOR_STARTUP * 20; // by then it should have started
415     while (!m_SyncSource->isRunning() && cnt) {
416         SleepRelativeUsec(125);
417         cnt--;
418     }
419     if(cnt==0) {
420         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
421         return false;
422     }
423
424     // now align the received streams
425     if(!alignReceivedStreams()) {
426         debugError("Could not align streams\n");
427         return false;
428     }
429     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
430     return true;
431 }
432
433 bool
434 StreamProcessorManager::alignReceivedStreams()
435 {
436     #define ALIGN_AVERAGE_TIME_MSEC 200
437     #define NB_ALIGN_TRIES 40
438     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
439     unsigned int nb_sync_runs;
440     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
441     int64_t diff_between_streams[nb_rcv_sp];
442     int64_t diff;
443
444     unsigned int i;
445
446     unsigned int periods_per_align_try = (ALIGN_AVERAGE_TIME_MSEC * getNominalRate());
447     periods_per_align_try /= 1000;
448     periods_per_align_try /= getPeriodSize();
449     debugOutput( DEBUG_LEVEL_VERBOSE, " averaging over %u periods...\n", periods_per_align_try);
450
451     bool aligned = false;
452     int cnt = NB_ALIGN_TRIES;
453     while (!aligned && cnt--) {
454         nb_sync_runs = periods_per_align_try;
455         while(nb_sync_runs) {
456             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
457             waitForPeriod();
458
459             i = 0;
460             for ( i = 0; i < nb_rcv_sp; i++) {
461                 StreamProcessor *s = m_ReceiveProcessors.at(i);
462                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
463                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",
464                     m_SyncSource, s, diff);
465                 if ( nb_sync_runs == periods_per_align_try ) {
466                     diff_between_streams[i] = diff;
467                 } else {
468                     diff_between_streams[i] += diff;
469                 }
470             }
471             if(!transferSilence()) {
472                 debugError("Could not transfer silence\n");
473                 return false;
474             }
475             nb_sync_runs--;
476         }
477         // calculate the average offsets
478         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
479         int diff_between_streams_frames[nb_rcv_sp];
480         aligned = true;
481         for ( i = 0; i < nb_rcv_sp; i++) {
482             StreamProcessor *s = m_ReceiveProcessors.at(i);
483
484             diff_between_streams[i] /= periods_per_align_try;
485             diff_between_streams_frames[i] = (int)roundf(diff_between_streams[i] / s->getTicksPerFrame());
486             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",
487                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
488
489             aligned &= (diff_between_streams_frames[i] == 0);
490
491             // reposition the stream
492             if(!s->shiftStream(diff_between_streams_frames[i])) {
493                 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]);
494                 return false;
495             }
496         }
497         if (!aligned) {
498             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
499         }
500     }
501     if (cnt == 0) {
502         debugError("Align failed\n");
503         return false;
504     }
505     return true;
506 }
507
508 bool StreamProcessorManager::start() {
509     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
510
511     // put all SP's into dry-running state
512     if (!startDryRunning()) {
513         debugFatal("Could not put SP's in dry-running state\n");
514         return false;
515     }
516
517     // start all SP's synchonized
518     if (!syncStartAll()) {
519         debugFatal("Could not syncStartAll...\n");
520         return false;
521     }
522     return true;
523 }
524
525 bool StreamProcessorManager::stop() {
526     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
527
528     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
529     // switch SP's over to the dry-running state
530     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
531           it != m_ReceiveProcessors.end();
532           ++it ) {
533         if(!(*it)->scheduleStopRunning(-1)) {
534             debugError("%p->scheduleStopRunning(-1) failed\n", *it);
535             return false;
536         }
537     }
538     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
539           it != m_TransmitProcessors.end();
540           ++it ) {
541         if(!(*it)->scheduleStopRunning(-1)) {
542             debugError("%p->scheduleStopRunning(-1) failed\n", *it);
543             return false;
544         }
545     }
546     // wait for the SP's to get into the dry-running state
547     int cnt = 200;
548     bool ready = false;
549     while (!ready && cnt) {
550         ready = true;
551         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
552             it != m_ReceiveProcessors.end();
553             ++it ) {
554             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
555         }
556         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
557             it != m_TransmitProcessors.end();
558             ++it ) {
559             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
560         }
561         SleepRelativeUsec(125);
562         cnt--;
563     }
564     if(cnt==0) {
565         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
566         return false;
567     }
568
569     // switch SP's over to the stopped state
570     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
571           it != m_ReceiveProcessors.end();
572           ++it ) {
573         if(!(*it)->scheduleStopDryRunning(-1)) {
574             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
575             return false;
576         }
577     }
578     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
579           it != m_TransmitProcessors.end();
580           ++it ) {
581         if(!(*it)->scheduleStopDryRunning(-1)) {
582             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
583             return false;
584         }
585     }
586     // wait for the SP's to get into the running state
587     cnt = 200;
588     ready = false;
589     while (!ready && cnt) {
590         ready = true;
591         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
592             it != m_ReceiveProcessors.end();
593             ++it ) {
594             ready &= (*it)->isStopped();
595         }
596         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
597             it != m_TransmitProcessors.end();
598             ++it ) {
599             ready &= (*it)->isStopped();
600         }
601         SleepRelativeUsec(125);
602         cnt--;
603     }
604     if(cnt==0) {
605         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to stop\n");
606         return false;
607     }
608     return true;
609 }
610
611 /**
612  * Called upon Xrun events. This brings all StreamProcessors back
613  * into their starting state, and then carries on streaming. This should
614  * have the same effect as restarting the whole thing.
615  *
616  * @return true if successful, false otherwise
617  */
618 bool StreamProcessorManager::handleXrun() {
619
620     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
621
622     dumpInfo();
623
624     /*
625      * Reset means:
626      * 1) Disabling the SP's, so that they don't process any packets
627      *    note: the isomanager does keep on delivering/requesting them
628      * 2) Bringing all buffers & streamprocessors into a know state
629      *    - Clear all capture buffers
630      *    - Put nb_periods*period_size of null frames into the playback buffers
631      * 3) Re-enable the SP's
632      */
633
634     // put all SP's back into dry-running state
635     if (!startDryRunning()) {
636         debugFatal("Could not put SP's in dry-running state\n");
637         return false;
638     }
639
640     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
641     // start all SP's synchonized
642     if (!syncStartAll()) {
643         debugFatal("Could not syncStartAll...\n");
644         return false;
645     }
646
647     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
648
649     return true;
650 }
651
652 /**
653  * @brief Waits until the next period of samples is ready
654  *
655  * This function does not return until a full period of samples is (or should be)
656  * ready to be transferred.
657  *
658  * @return true if the period is ready, false if an xrun occurred
659  */
660 bool StreamProcessorManager::waitForPeriod() {
661     int time_till_next_period;
662     bool xrun_occurred = false;
663
664     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
665
666     assert(m_SyncSource);
667
668     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
669
670     while(time_till_next_period > 0) {
671         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
672
673         // wait for the period
674         SleepRelativeUsec(time_till_next_period);
675
676         // check for underruns on the ISO side,
677         // those should make us bail out of the wait loop
678         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
679             it != m_ReceiveProcessors.end();
680             ++it ) {
681             // a xrun has occurred on the Iso side
682             xrun_occurred |= (*it)->xrunOccurred();
683         }
684         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
685             it != m_TransmitProcessors.end();
686             ++it ) {
687             // a xrun has occurred on the Iso side
688             xrun_occurred |= (*it)->xrunOccurred();
689         }
690         if(xrun_occurred) break;
691
692         // check if we were waked up too soon
693         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
694     }
695
696     // we save the 'ideal' time of the transfer at this point,
697     // because we can have interleaved read - process - write
698     // cycles making that we modify a receiving stream's buffer
699     // before we get to writing.
700     // NOTE: before waitForPeriod() is called again, both the transmit
701     //       and the receive processors should have done their transfer.
702     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
703     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
704         m_time_of_transfer);
705
706     // normally we can transfer frames at this time, but in some cases this is not true
707     // e.g. when there are not enough frames in the receive buffer.
708     // however this doesn't have to be a problem, since we can wait some more until we
709     // have enough frames. There is only a problem once the ISO xmit doesn't have packets
710     // to transmit, or if the receive buffer overflows. These conditions are signaled by
711     // the iso threads
712     // check if xruns occurred on the Iso side.
713     // also check if xruns will occur should we transfer() now
714     #ifdef DEBUG
715     int waited = 0;
716     #endif
717     bool ready_for_transfer = false;
718     bool ready;
719     xrun_occurred = false;
720     while (!ready_for_transfer && !xrun_occurred) {
721         // FIXME: can deadlock when the iso handlers die (e.g. unplug the device)
722         ready_for_transfer = true;
723         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
724             it != m_ReceiveProcessors.end();
725             ++it ) {
726             ready = ((*it)->canClientTransferFrames(m_period));
727             ready_for_transfer &= ready;
728             if (!ready) (*it)->flush();
729             xrun_occurred |= (*it)->xrunOccurred();
730         }
731         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
732             it != m_TransmitProcessors.end();
733             ++it ) {
734             ready = ((*it)->canClientTransferFrames(m_period));
735             ready_for_transfer &= ready;
736             if (!ready) (*it)->flush();
737             xrun_occurred |= (*it)->xrunOccurred();
738         }
739         if (!ready_for_transfer) {
740            
741             SleepRelativeUsec(125); // MAGIC: one cycle sleep...
742
743             // in order to avoid this in the future, we increase the sync delay of the sync source SP
744             int d = m_SyncSource->getSyncDelay() + TICKS_PER_CYCLE;
745             m_SyncSource->setSyncDelay(d);
746
747             #ifdef DEBUG
748             waited++;
749             #endif
750         }
751     } // we are either ready or an xrun occurred
752
753     // in order to avoid a runaway value of the sync delay, we gradually decrease
754     // it. It will be increased by a 'too early' event (cfr some lines higher)
755     // hence we'll be at a good point on average.
756     int d = m_SyncSource->getSyncDelay() - 1;
757     if (d >= 0) m_SyncSource->setSyncDelay(d);
758
759
760     #ifdef DEBUG
761     if(waited > 0) {
762         debugOutput(DEBUG_LEVEL_VERBOSE, "Waited %d x 125us due to SP not ready for transfer\n", waited);
763     }
764     #endif
765
766     // this is to notify the client of the delay that we introduced by waiting
767     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
768     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", m_delayed_usecs);
769
770 #ifdef DEBUG
771     int rcv_bf=0, xmt_bf=0;
772     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
773         it != m_ReceiveProcessors.end();
774         ++it ) {
775         rcv_bf = (*it)->getBufferFill();
776     }
777     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
778         it != m_TransmitProcessors.end();
779         ++it ) {
780         xmt_bf = (*it)->getBufferFill();
781     }
782     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
783         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
784
785     // check if xruns occurred on the Iso side.
786     // also check if xruns will occur should we transfer() now
787     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
788           it != m_ReceiveProcessors.end();
789           ++it ) {
790
791         if ((*it)->xrunOccurred()) {
792             debugWarning("Xrun on RECV SP %p due to ISO side xrun\n",*it);
793             (*it)->dumpInfo();
794         }
795         if (!((*it)->canClientTransferFrames(m_period))) {
796             debugWarning("Xrun on RECV SP %p due to buffer side xrun\n",*it);
797             (*it)->dumpInfo();
798         }
799     }
800     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
801           it != m_TransmitProcessors.end();
802           ++it ) {
803         if ((*it)->xrunOccurred()) {
804             debugWarning("Xrun on XMIT SP %p due to ISO side xrun\n",*it);
805         }
806         if (!((*it)->canClientTransferFrames(m_period))) {
807             debugWarning("Xrun on XMIT SP %p due to buffer side xrun\n",*it);
808         }
809     }
810 #endif
811
812     m_nbperiods++;
813     // now we can signal the client that we are (should be) ready
814     return !xrun_occurred;
815 }
816
817 /**
818  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
819  *
820  * Transfers one period of frames from the client side to the Iso side and vice versa.
821  *
822  * @return true if successful, false otherwise (indicates xrun).
823  */
824 bool StreamProcessorManager::transfer() {
825     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
826     bool retval=true;
827     retval &= transfer(StreamProcessor::ePT_Receive);
828     retval &= transfer(StreamProcessor::ePT_Transmit);
829     return retval;
830 }
831
832 /**
833  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
834  *
835  * Transfers one period of frames from the client side to the Iso side or vice versa.
836  *
837  * @param t The processor type to tranfer for (receive or transmit)
838  * @return true if successful, false otherwise (indicates xrun).
839  */
840 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
841     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
842         t, m_time_of_transfer,
843         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
844         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
845         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
846
847     bool retval = true;
848     // a static cast could make sure that there is no performance
849     // penalty for the virtual functions (to be checked)
850     if (t==StreamProcessor::ePT_Receive) {
851         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
852                 it != m_ReceiveProcessors.end();
853                 ++it ) {
854             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
855                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
856                             m_period, m_time_of_transfer,*it);
857                 retval &= false; // buffer underrun
858             }
859         }
860     } else {
861         // FIXME: in the SPM it would be nice to have system time instead of
862         //        1394 time
863         float rate = m_SyncSource->getTicksPerFrame();
864         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
865
866         // the data we are putting into the buffer is intended to be transmitted
867         // one ringbuffer size after it has been received
868         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
869
870         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
871                 it != m_TransmitProcessors.end();
872                 ++it ) {
873             // FIXME: in the SPM it would be nice to have system time instead of
874             //        1394 time
875             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
876                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
877                         m_period, transmit_timestamp, *it);
878                 retval &= false; // buffer underrun
879             }
880         }
881     }
882     return retval;
883 }
884
885 /**
886  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
887  *
888  * Transfers one period of silence to the Iso side for transmit SP's
889  * or dump one period of frames for receive SP's
890  *
891  * @return true if successful, false otherwise (indicates xrun).
892  */
893 bool StreamProcessorManager::transferSilence() {
894     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
895     bool retval=true;
896     retval &= transferSilence(StreamProcessor::ePT_Receive);
897     retval &= transferSilence(StreamProcessor::ePT_Transmit);
898     return retval;
899 }
900
901 /**
902  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
903  *
904  * Transfers one period of silence to the Iso side for transmit SP's
905  * or dump one period of frames for receive SP's
906  *
907  * @param t The processor type to tranfer for (receive or transmit)
908  * @return true if successful, false otherwise (indicates xrun).
909  */
910 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
911     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
912         t, m_time_of_transfer,
913         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
914         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
915         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
916
917     bool retval = true;
918     // a static cast could make sure that there is no performance
919     // penalty for the virtual functions (to be checked)
920     if (t==StreamProcessor::ePT_Receive) {
921         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
922                 it != m_ReceiveProcessors.end();
923                 ++it ) {
924             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
925                     debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n",
926                             m_period, m_time_of_transfer,*it);
927                 retval &= false; // buffer underrun
928             }
929         }
930     } else {
931         // FIXME: in the SPM it would be nice to have system time instead of
932         //        1394 time
933         float rate = m_SyncSource->getTicksPerFrame();
934         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
935
936         // the data we are putting into the buffer is intended to be transmitted
937         // one ringbuffer size after it has been received
938         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
939
940         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
941                 it != m_TransmitProcessors.end();
942                 ++it ) {
943             // FIXME: in the SPM it would be nice to have system time instead of
944             //        1394 time
945             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
946                 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
947                         m_period, transmit_timestamp, *it);
948                 retval &= false; // buffer underrun
949             }
950         }
951     }
952     return retval;
953 }
954
955 void StreamProcessorManager::dumpInfo() {
956     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
957     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
958     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
959
960     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
961     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
962         it != m_ReceiveProcessors.end();
963         ++it ) {
964         (*it)->dumpInfo();
965     }
966
967     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
968     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
969         it != m_TransmitProcessors.end();
970         ++it ) {
971         (*it)->dumpInfo();
972     }
973
974     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
975
976 }
977
978 void StreamProcessorManager::setVerboseLevel(int l) {
979     setDebugLevel(l);
980
981     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
982     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
983         it != m_ReceiveProcessors.end();
984         ++it ) {
985         (*it)->setVerboseLevel(l);
986     }
987
988     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
989     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
990         it != m_TransmitProcessors.end();
991         ++it ) {
992         (*it)->setVerboseLevel(l);
993     }
994 }
995
996
997 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
998     int count=0;
999
1000     if (direction == Port::E_Capture) {
1001         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1002             it != m_ReceiveProcessors.end();
1003             ++it ) {
1004             count += (*it)->getPortCount(type);
1005         }
1006     } else {
1007         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1008             it != m_TransmitProcessors.end();
1009             ++it ) {
1010             count += (*it)->getPortCount(type);
1011         }
1012     }
1013     return count;
1014 }
1015
1016 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1017     int count=0;
1018
1019     if (direction == Port::E_Capture) {
1020         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1021             it != m_ReceiveProcessors.end();
1022             ++it ) {
1023             count += (*it)->getPortCount();
1024         }
1025     } else {
1026         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1027             it != m_TransmitProcessors.end();
1028             ++it ) {
1029             count += (*it)->getPortCount();
1030         }
1031     }
1032     return count;
1033 }
1034
1035 // TODO: implement a port map here, instead of the loop
1036
1037 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1038     int count=0;
1039     int prevcount=0;
1040
1041     if (direction == Port::E_Capture) {
1042         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1043             it != m_ReceiveProcessors.end();
1044             ++it ) {
1045             count += (*it)->getPortCount();
1046             if (count > idx) {
1047                 return (*it)->getPortAtIdx(idx-prevcount);
1048             }
1049             prevcount=count;
1050         }
1051     } else {
1052         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1053             it != m_TransmitProcessors.end();
1054             ++it ) {
1055             count += (*it)->getPortCount();
1056             if (count > idx) {
1057                 return (*it)->getPortAtIdx(idx-prevcount);
1058             }
1059             prevcount=count;
1060         }
1061     }
1062     return NULL;
1063 }
1064
1065 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1066     m_thread_realtime=rt;
1067     m_thread_priority=priority;
1068     return true;
1069 }
1070
1071
1072 } // end of namespace
Note: See TracBrowser for help on using the browser.