root/trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

Revision 860, 40.1 kB (checked in by ppalmers, 13 years ago)

clean up synchronization in streamprocessor

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23 #include "config.h"
24
25 #include "StreamProcessorManager.h"
26 #include "generic/StreamProcessor.h"
27 #include "generic/Port.h"
28 #include "libieee1394/cycletimer.h"
29
30 #include "libutil/Time.h"
31
32 #include <errno.h>
33 #include <assert.h>
34 #include <math.h>
35
36 namespace Streaming {
37
38 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
39
40 StreamProcessorManager::StreamProcessorManager()
41     : m_is_slave( false )
42     , m_SyncSource(NULL)
43     , m_xrun_happened( false )
44     , m_nb_buffers( 0 )
45     , m_period( 0 )
46     , m_audio_datatype( eADT_Float )
47     , m_nominal_framerate ( 0 )
48     , m_xruns(0)
49     , m_nbperiods(0)
50 {
51     addOption(Util::OptionContainer::Option("slaveMode",false));
52 }
53
54 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int framerate, unsigned int nb_buffers)
55     : m_is_slave( false )
56     , m_SyncSource(NULL)
57     , m_xrun_happened( false )
58     , m_nb_buffers(nb_buffers)
59     , m_period(period)
60     , m_audio_datatype( eADT_Float )
61     , m_nominal_framerate ( framerate )
62     , m_xruns(0)
63     , m_nbperiods(0)
64 {
65     addOption(Util::OptionContainer::Option("slaveMode",false));
66 }
67
68 StreamProcessorManager::~StreamProcessorManager() {
69 }
70
71 /**
72  * Registers \ref processor with this manager.
73  *
74  * also registers it with the isohandlermanager
75  *
76  * be sure to call isohandlermanager->init() first!
77  * and be sure that the processors are also ->init()'ed
78  *
79  * @param processor
80  * @return true if successfull
81  */
82 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
83 {
84     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
85     assert(processor);
86     if (processor->getType() == StreamProcessor::ePT_Receive) {
87         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
88         m_ReceiveProcessors.push_back(processor);
89         return true;
90     }
91
92     if (processor->getType() == StreamProcessor::ePT_Transmit) {
93         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
94         m_TransmitProcessors.push_back(processor);
95         return true;
96     }
97
98     debugFatal("Unsupported processor type!\n");
99     return false;
100 }
101
102 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
103 {
104     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
105     assert(processor);
106
107     if (processor->getType()==StreamProcessor::ePT_Receive) {
108
109         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
110               it != m_ReceiveProcessors.end();
111               ++it )
112         {
113             if ( *it == processor ) {
114                 if (*it == m_SyncSource) {
115                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
116                     m_SyncSource = NULL;
117                 }
118                 m_ReceiveProcessors.erase(it);
119                 return true;
120             }
121         }
122     }
123
124     if (processor->getType()==StreamProcessor::ePT_Transmit) {
125         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
126               it != m_TransmitProcessors.end();
127               ++it )
128         {
129             if ( *it == processor ) {
130                 if (*it == m_SyncSource) {
131                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
132                     m_SyncSource = NULL;
133                 }
134                 m_TransmitProcessors.erase(it);
135                 return true;
136             }
137         }
138     }
139
140     debugFatal("Processor (%p) not found!\n",processor);
141     return false; //not found
142 }
143
144 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
145     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
146     m_SyncSource=s;
147     return true;
148 }
149
150 bool StreamProcessorManager::prepare() {
151
152     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
153
154     m_is_slave=false;
155     if(!getOption("slaveMode", m_is_slave)) {
156         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
157     }
158
159     // if no sync source is set, select one here
160     if(m_SyncSource == NULL) {
161        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
162     }
163
164     // FIXME: put into separate method
165     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
166           it != m_ReceiveProcessors.end();
167           ++it )
168     {
169         if(m_SyncSource == NULL) {
170             debugWarning(" => Sync Source is %p.\n", *it);
171             m_SyncSource = *it;
172         }
173     }
174     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
175           it != m_TransmitProcessors.end();
176           ++it )
177     {
178         if(m_SyncSource == NULL) {
179             debugWarning(" => Sync Source is %p.\n", *it);
180             m_SyncSource = *it;
181         }
182     }
183
184     // now do the actual preparation of the SP's
185     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
186     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
187         it != m_ReceiveProcessors.end();
188         ++it ) {
189
190         if(!(*it)->setOption("slaveMode", m_is_slave)) {
191             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
192         }
193
194         if(!(*it)->prepare()) {
195             debugFatal(  " could not prepare (%p)...\n",(*it));
196             return false;
197         }
198     }
199     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
200     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
201         it != m_TransmitProcessors.end();
202         ++it ) {
203         if(!(*it)->setOption("slaveMode", m_is_slave)) {
204             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
205         }
206         if(!(*it)->prepare()) {
207             debugFatal( " could not prepare (%p)...\n",(*it));
208             return false;
209         }
210     }
211
212     // if there are no stream processors registered,
213     // fail
214     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
215         debugFatal("No stream processors registered, can't do anything usefull\n");
216         return false;
217     }
218     return true;
219 }
220
221 bool StreamProcessorManager::startDryRunning() {
222     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
223     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
224     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
225             it != m_TransmitProcessors.end();
226             ++it ) {
227         if (!(*it)->isDryRunning()) {
228             if(!(*it)->scheduleStartDryRunning(-1)) {
229                 debugError("Could not put SP %p into the dry-running state\n", *it);
230                 return false;
231             }
232         } else {
233             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
234         }
235     }
236     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
237             it != m_ReceiveProcessors.end();
238             ++it ) {
239         if (!(*it)->isDryRunning()) {
240             if(!(*it)->scheduleStartDryRunning(-1)) {
241                 debugError("Could not put SP %p into the dry-running state\n", *it);
242                 return false;
243             }
244         } else {
245             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
246         }
247     }
248     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
249     // wait for the syncsource to start running.
250     // that will block the waitForPeriod call until everyone has started (theoretically)
251     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_DRYRUN; // by then it should have started
252     bool all_dry_running = false;
253     while (!all_dry_running && cnt) {
254         all_dry_running = true;
255         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
256                 it != m_ReceiveProcessors.end();
257                 ++it ) {
258             all_dry_running &= (*it)->isDryRunning();
259         }
260         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
261                 it != m_TransmitProcessors.end();
262                 ++it ) {
263             all_dry_running &= (*it)->isDryRunning();
264         }
265
266         SleepRelativeUsec(125);
267         cnt--;
268     }
269     if(cnt==0) {
270         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
271         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
272                 it != m_ReceiveProcessors.end();
273                 ++it ) {
274             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
275                 (*it)->getTypeString(), *it, (*it)->getStateString());
276         }
277         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
278                 it != m_TransmitProcessors.end();
279                 ++it ) {
280             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
281                 (*it)->getTypeString(), *it, (*it)->getStateString());
282         }
283         return false;
284     }
285     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
286     return true;
287 }
288
289 bool StreamProcessorManager::syncStartAll() {
290     if(m_SyncSource == NULL) return false;
291     // figure out when to get the SP's running.
292     // the xmit SP's should also know the base timestamp
293     // streams should be aligned here
294
295     // now find out how long we have to delay the wait operation such that
296     // the received frames will all be presented to the SP
297     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
298     int max_of_min_delay = 0;
299     int min_delay = 0;
300     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
301             it != m_ReceiveProcessors.end();
302             ++it ) {
303         min_delay = (*it)->getMaxFrameLatency();
304         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
305     }
306
307     // add some processing margin. This only shifts the time
308     // at which the buffer is transfer()'ed. This makes things somewhat
309     // more robust. It should be noted though that shifting the transfer
310     // time to a later time instant also causes the xmit buffer fill to be
311     // lower on average.
312     max_of_min_delay += STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS;
313     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n",
314         max_of_min_delay,
315         (unsigned int)TICKS_TO_SECS(max_of_min_delay),
316         (unsigned int)TICKS_TO_CYCLES(max_of_min_delay),
317         (unsigned int)TICKS_TO_OFFSET(max_of_min_delay));
318     m_SyncSource->setSyncDelay(max_of_min_delay);
319
320     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
321     //        have aquired lock
322     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
323     //sleep(2); // FIXME: be smarter here
324
325     // make sure that we are dry-running long enough for the
326     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
327     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
328    
329     unsigned int nb_sync_runs = (STREAMPROCESSORMANAGER_SYNC_WAIT_TIME_MSEC * getNominalRate());
330     nb_sync_runs /= 1000;
331     nb_sync_runs /= getPeriodSize();
332
333     int64_t time_till_next_period;
334     while(nb_sync_runs--) { // or while not sync-ed?
335         // check if we were woken up too soon
336         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
337         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
338         if(time_till_next_period > 0) {
339             // wait for the period
340             SleepRelativeUsec(time_till_next_period);
341         }
342     }
343
344     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
345     // FIXME: in the SPM it would be nice to have system time instead of
346     //        1394 time
347
348     // we now should have decent sync info on the sync source
349     // determine a point in time where the system should start
350     // figure out where we are now
351     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
352     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
353         time_of_first_sample,
354         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
355         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
356         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
357
358     // start wet-running in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles
359     // this is the time window we have to setup all SP's such that they
360     // can start wet-running correctly.
361     time_of_first_sample = addTicks(time_of_first_sample,
362                                     STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP * TICKS_PER_CYCLE);
363
364     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
365         time_of_first_sample,
366         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
367         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
368         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
369
370     // we should start wet-running the transmit SP's some cycles in advance
371     // such that we know it is wet-running when it should output its first sample
372     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
373                                                  STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE);
374
375     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
376                                                  STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE);
377     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
378         time_to_start_xmit,
379         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
380         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
381         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
382     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
383         time_to_start_recv,
384         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
385         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
386         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
387
388     // at this point the buffer head timestamp of the transmit buffers can be set
389     // this is the presentation time of the first sample in the buffer
390     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
391           it != m_TransmitProcessors.end();
392           ++it ) {
393         (*it)->setBufferHeadTimestamp(time_of_first_sample);
394     }
395
396     // STEP X: switch SP's over to the running state
397     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
398           it != m_ReceiveProcessors.end();
399           ++it ) {
400         if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
401             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
402             return false;
403         }
404     }
405     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
406           it != m_TransmitProcessors.end();
407           ++it ) {
408         if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
409             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
410             return false;
411         }
412     }
413     // wait for the syncsource to start running.
414     // that will block the waitForPeriod call until everyone has started (theoretically)
415     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP * 20; // by then it should have started
416     while (!m_SyncSource->isRunning() && cnt) {
417         SleepRelativeUsec(125);
418         cnt--;
419     }
420     if(cnt==0) {
421         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
422         return false;
423     }
424
425     if(!alignReceivedStreams()) {
426         debugError("Could not align streams...\n");
427         return false;
428     }
429
430     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
431     return true;
432 }
433
434 bool
435 StreamProcessorManager::alignReceivedStreams()
436 {
437     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
438     unsigned int nb_sync_runs;
439     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
440     int64_t diff_between_streams[nb_rcv_sp];
441     int64_t diff;
442
443     unsigned int i;
444
445     unsigned int periods_per_align_try = (STREAMPROCESSORMANAGER_ALIGN_AVERAGE_TIME_MSEC * getNominalRate());
446     periods_per_align_try /= 1000;
447     periods_per_align_try /= getPeriodSize();
448     debugOutput( DEBUG_LEVEL_VERBOSE, " averaging over %u periods...\n", periods_per_align_try);
449
450     bool aligned = false;
451     int cnt = STREAMPROCESSORMANAGER_NB_ALIGN_TRIES;
452     while (!aligned && cnt--) {
453         nb_sync_runs = periods_per_align_try;
454         while(nb_sync_runs) {
455             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
456             if(!waitForPeriod()) {
457                 debugWarning("xrun while aligning streams...\n");
458                 return false;
459             };
460
461             i = 0;
462             for ( i = 0; i < nb_rcv_sp; i++) {
463                 StreamProcessor *s = m_ReceiveProcessors.at(i);
464                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
465                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",
466                     m_SyncSource, s, diff);
467                 if ( nb_sync_runs == periods_per_align_try ) {
468                     diff_between_streams[i] = diff;
469                 } else {
470                     diff_between_streams[i] += diff;
471                 }
472             }
473             if(!transferSilence()) {
474                 debugError("Could not transfer silence\n");
475                 return false;
476             }
477             nb_sync_runs--;
478         }
479         // calculate the average offsets
480         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
481         int diff_between_streams_frames[nb_rcv_sp];
482         aligned = true;
483         for ( i = 0; i < nb_rcv_sp; i++) {
484             StreamProcessor *s = m_ReceiveProcessors.at(i);
485
486             diff_between_streams[i] /= periods_per_align_try;
487             diff_between_streams_frames[i] = (int)roundf(diff_between_streams[i] / s->getTicksPerFrame());
488             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",
489                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
490
491             aligned &= (diff_between_streams_frames[i] == 0);
492
493             // reposition the stream
494             if(!s->shiftStream(diff_between_streams_frames[i])) {
495                 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]);
496                 return false;
497             }
498         }
499         if (!aligned) {
500             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
501         }
502     }
503     if (cnt == 0) {
504         debugError("Align failed\n");
505         return false;
506     }
507     return true;
508 }
509
510 bool StreamProcessorManager::start() {
511     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
512
513     // start all SP's synchonized
514     bool start_result = false;
515     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
516         // put all SP's into dry-running state
517         if (!startDryRunning()) {
518             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
519             start_result = false;
520             continue;
521         }
522
523         start_result = syncStartAll();
524         if(start_result) {
525             break;
526         } else {
527             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
528         }
529     }
530     if (!start_result) {
531         debugFatal("Could not syncStartAll...\n");
532         return false;
533     }
534
535     return true;
536 }
537
538 bool StreamProcessorManager::stop() {
539     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
540
541     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
542     // switch SP's over to the dry-running state
543     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
544           it != m_ReceiveProcessors.end();
545           ++it ) {
546         if((*it)->isRunning()) {
547             if(!(*it)->scheduleStopRunning(-1)) {
548                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
549                 return false;
550             }
551         }
552     }
553     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
554           it != m_TransmitProcessors.end();
555           ++it ) {
556         if((*it)->isRunning()) {
557             if(!(*it)->scheduleStopRunning(-1)) {
558                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
559                 return false;
560             }
561         }
562     }
563     // wait for the SP's to get into the dry-running/stopped state
564     int cnt = 8000;
565     bool ready = false;
566     while (!ready && cnt) {
567         ready = true;
568         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
569             it != m_ReceiveProcessors.end();
570             ++it ) {
571             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream());
572         }
573         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
574             it != m_TransmitProcessors.end();
575             ++it ) {
576             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream());
577         }
578         SleepRelativeUsec(125);
579         cnt--;
580     }
581     if(cnt==0) {
582         debugWarning(" Timeout waiting for the SP's to start dry-running\n");
583         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
584             it != m_ReceiveProcessors.end();
585             ++it ) {
586             (*it)->dumpInfo();
587         }
588         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
589             it != m_TransmitProcessors.end();
590             ++it ) {
591             (*it)->dumpInfo();
592         }
593         return false;
594     }
595
596     // switch SP's over to the stopped state
597     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
598           it != m_ReceiveProcessors.end();
599           ++it ) {
600         if(!(*it)->scheduleStopDryRunning(-1)) {
601             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
602             return false;
603         }
604     }
605     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
606           it != m_TransmitProcessors.end();
607           ++it ) {
608         if(!(*it)->scheduleStopDryRunning(-1)) {
609             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
610             return false;
611         }
612     }
613     // wait for the SP's to get into the stopped state
614     cnt = 8000;
615     ready = false;
616     while (!ready && cnt) {
617         ready = true;
618         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
619             it != m_ReceiveProcessors.end();
620             ++it ) {
621             ready &= (*it)->isStopped();
622         }
623         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
624             it != m_TransmitProcessors.end();
625             ++it ) {
626             ready &= (*it)->isStopped();
627         }
628         SleepRelativeUsec(125);
629         cnt--;
630     }
631     if(cnt==0) {
632         debugWarning(" Timeout waiting for the SP's to stop\n");
633         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
634             it != m_ReceiveProcessors.end();
635             ++it ) {
636             (*it)->dumpInfo();
637         }
638         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
639             it != m_TransmitProcessors.end();
640             ++it ) {
641             (*it)->dumpInfo();
642         }
643         return false;
644     }
645     return true;
646 }
647
648 /**
649  * Called upon Xrun events. This brings all StreamProcessors back
650  * into their starting state, and then carries on streaming. This should
651  * have the same effect as restarting the whole thing.
652  *
653  * @return true if successful, false otherwise
654  */
655 bool StreamProcessorManager::handleXrun() {
656
657     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
658
659     dumpInfo();
660
661     /*
662      * Reset means:
663      * 1) Disabling the SP's, so that they don't process any packets
664      *    note: the isomanager does keep on delivering/requesting them
665      * 2) Bringing all buffers & streamprocessors into a know state
666      *    - Clear all capture buffers
667      *    - Put nb_periods*period_size of null frames into the playback buffers
668      * 3) Re-enable the SP's
669      */
670
671     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
672     // start all SP's synchonized
673     bool start_result = false;
674     for (int ntries; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
675         // put all SP's into dry-running state
676         if (!startDryRunning()) {
677             debugShowBackLog();
678             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
679             start_result = false;
680             continue;
681         }
682
683         start_result = syncStartAll();
684         if(start_result) {
685             break;
686         } else {
687             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
688         }
689     }
690     if (!start_result) {
691         debugFatal("Could not syncStartAll...\n");
692         return false;
693     }
694     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
695
696     return true;
697 }
698
699 /**
700  * @brief Waits until the next period of samples is ready
701  *
702  * This function does not return until a full period of samples is (or should be)
703  * ready to be transferred.
704  *
705  * @return true if the period is ready, false if an xrun occurred
706  */
707 bool StreamProcessorManager::waitForPeriod() {
708     if(m_SyncSource == NULL) return false;
709     bool xrun_occurred = false;
710     bool period_not_ready = true;
711
712     while(period_not_ready) {
713         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for period (%d frames in buffer)...\n", m_SyncSource->getBufferFill());
714         bool result;
715         if(m_SyncSource->getType() == StreamProcessor::ePT_Receive) {
716             result = m_SyncSource->waitForConsumePeriod();
717         } else {
718             result = m_SyncSource->waitForProducePeriod();
719         }
720 //         if(!result) {
721 //             debugError("Error waiting for signal\n");
722 //             return false;
723 //         }
724
725         // HACK: this should be solved more elegantly
726         period_not_ready = false;
727         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
728             it != m_ReceiveProcessors.end();
729             ++it ) {
730             bool this_sp_period_ready = (*it)->canConsumePeriod();
731             if (!this_sp_period_ready) {
732                 period_not_ready = true;
733             }
734         }
735         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
736             it != m_TransmitProcessors.end();
737             ++it ) {
738             bool this_sp_period_ready = (*it)->canProducePeriod();
739             if (!this_sp_period_ready) {
740                 period_not_ready = true;
741             }
742         }
743         // check for underruns on the ISO side,
744         // those should make us bail out of the wait loop
745         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
746             it != m_ReceiveProcessors.end();
747             ++it ) {
748             // a xrun has occurred on the Iso side
749             xrun_occurred |= (*it)->xrunOccurred();
750         }
751         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
752             it != m_TransmitProcessors.end();
753             ++it ) {
754             // a xrun has occurred on the Iso side
755             xrun_occurred |= (*it)->xrunOccurred();
756         }
757         if(xrun_occurred) break;
758         // FIXME: make sure we also exit this loop when something else happens (e.g. signal, iso error)
759     }
760
761     // we save the 'ideal' time of the transfer at this point,
762     // because we can have interleaved read - process - write
763     // cycles making that we modify a receiving stream's buffer
764     // before we get to writing.
765     // NOTE: before waitForPeriod() is called again, both the transmit
766     //       and the receive processors should have done their transfer.
767     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
768     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
769         m_time_of_transfer);
770
771     // this is to notify the client of the delay that we introduced by waiting
772     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
773     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", m_delayed_usecs);
774
775 #ifdef DEBUG
776     int rcv_bf=0, xmt_bf=0;
777     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
778         it != m_ReceiveProcessors.end();
779         ++it ) {
780         rcv_bf = (*it)->getBufferFill();
781     }
782     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
783         it != m_TransmitProcessors.end();
784         ++it ) {
785         xmt_bf = (*it)->getBufferFill();
786     }
787     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
788         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
789
790     // check if xruns occurred on the Iso side.
791     // also check if xruns will occur should we transfer() now
792     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
793           it != m_ReceiveProcessors.end();
794           ++it ) {
795
796         if ((*it)->xrunOccurred()) {
797             debugWarning("Xrun on RECV SP %p due to ISO side xrun\n",*it);
798             (*it)->dumpInfo();
799         }
800         if (!((*it)->canClientTransferFrames(m_period))) {
801             debugWarning("Xrun on RECV SP %p due to buffer side xrun\n",*it);
802             (*it)->dumpInfo();
803         }
804     }
805     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
806           it != m_TransmitProcessors.end();
807           ++it ) {
808         if ((*it)->xrunOccurred()) {
809             debugWarning("Xrun on XMIT SP %p due to ISO side xrun\n",*it);
810         }
811         if (!((*it)->canClientTransferFrames(m_period))) {
812             debugWarning("Xrun on XMIT SP %p due to buffer side xrun\n",*it);
813         }
814     }
815 #endif
816
817     m_nbperiods++;
818     // now we can signal the client that we are (should be) ready
819     return !xrun_occurred;
820 }
821
822 /**
823  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
824  *
825  * Transfers one period of frames from the client side to the Iso side and vice versa.
826  *
827  * @return true if successful, false otherwise (indicates xrun).
828  */
829 bool StreamProcessorManager::transfer() {
830     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
831     bool retval=true;
832     retval &= transfer(StreamProcessor::ePT_Receive);
833     retval &= transfer(StreamProcessor::ePT_Transmit);
834     return retval;
835 }
836
837 /**
838  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
839  *
840  * Transfers one period of frames from the client side to the Iso side or vice versa.
841  *
842  * @param t The processor type to tranfer for (receive or transmit)
843  * @return true if successful, false otherwise (indicates xrun).
844  */
845 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
846     if(m_SyncSource == NULL) return false;
847     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
848         t, m_time_of_transfer,
849         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
850         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
851         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
852
853     bool retval = true;
854     // a static cast could make sure that there is no performance
855     // penalty for the virtual functions (to be checked)
856     if (t==StreamProcessor::ePT_Receive) {
857         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
858                 it != m_ReceiveProcessors.end();
859                 ++it ) {
860             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
861                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
862                             m_period, m_time_of_transfer,*it);
863                 retval &= false; // buffer underrun
864             }
865         }
866     } else {
867         // FIXME: in the SPM it would be nice to have system time instead of
868         //        1394 time
869         float rate = m_SyncSource->getTicksPerFrame();
870         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
871
872         // the data we are putting into the buffer is intended to be transmitted
873         // one ringbuffer size after it has been received
874         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
875
876         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
877                 it != m_TransmitProcessors.end();
878                 ++it ) {
879             // FIXME: in the SPM it would be nice to have system time instead of
880             //        1394 time
881             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
882                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
883                         m_period, transmit_timestamp, *it);
884                 retval &= false; // buffer underrun
885             }
886         }
887     }
888     return retval;
889 }
890
891 /**
892  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
893  *
894  * Transfers one period of silence to the Iso side for transmit SP's
895  * or dump one period of frames for receive SP's
896  *
897  * @return true if successful, false otherwise (indicates xrun).
898  */
899 bool StreamProcessorManager::transferSilence() {
900     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
901     bool retval=true;
902     retval &= transferSilence(StreamProcessor::ePT_Receive);
903     retval &= transferSilence(StreamProcessor::ePT_Transmit);
904     return retval;
905 }
906
907 /**
908  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
909  *
910  * Transfers one period of silence to the Iso side for transmit SP's
911  * or dump one period of frames for receive SP's
912  *
913  * @param t The processor type to tranfer for (receive or transmit)
914  * @return true if successful, false otherwise (indicates xrun).
915  */
916 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
917     if(m_SyncSource == NULL) return false;
918     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
919         t, m_time_of_transfer,
920         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
921         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
922         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
923
924     bool retval = true;
925     // a static cast could make sure that there is no performance
926     // penalty for the virtual functions (to be checked)
927     if (t==StreamProcessor::ePT_Receive) {
928         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
929                 it != m_ReceiveProcessors.end();
930                 ++it ) {
931             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
932                     debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n",
933                             m_period, m_time_of_transfer,*it);
934                 retval &= false; // buffer underrun
935             }
936         }
937     } else {
938         // FIXME: in the SPM it would be nice to have system time instead of
939         //        1394 time
940         float rate = m_SyncSource->getTicksPerFrame();
941         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
942
943         // the data we are putting into the buffer is intended to be transmitted
944         // one ringbuffer size after it has been received
945         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
946
947         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
948                 it != m_TransmitProcessors.end();
949                 ++it ) {
950             // FIXME: in the SPM it would be nice to have system time instead of
951             //        1394 time
952             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
953                 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
954                         m_period, transmit_timestamp, *it);
955                 retval &= false; // buffer underrun
956             }
957         }
958     }
959     return retval;
960 }
961
962 void StreamProcessorManager::dumpInfo() {
963     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
964     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
965     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
966     debugOutputShort( DEBUG_LEVEL_NORMAL, "Data type: %s\n", (m_audio_datatype==eADT_Float?"float":"int24"));
967
968     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
969     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
970         it != m_ReceiveProcessors.end();
971         ++it ) {
972         (*it)->dumpInfo();
973     }
974
975     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
976     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
977         it != m_TransmitProcessors.end();
978         ++it ) {
979         (*it)->dumpInfo();
980     }
981
982     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
983
984 }
985
986 void StreamProcessorManager::setVerboseLevel(int l) {
987     setDebugLevel(l);
988
989     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
990     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
991         it != m_ReceiveProcessors.end();
992         ++it ) {
993         (*it)->setVerboseLevel(l);
994     }
995
996     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
997     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
998         it != m_TransmitProcessors.end();
999         ++it ) {
1000         (*it)->setVerboseLevel(l);
1001     }
1002 }
1003
1004
1005 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1006     int count=0;
1007
1008     if (direction == Port::E_Capture) {
1009         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1010             it != m_ReceiveProcessors.end();
1011             ++it ) {
1012             count += (*it)->getPortCount(type);
1013         }
1014     } else {
1015         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1016             it != m_TransmitProcessors.end();
1017             ++it ) {
1018             count += (*it)->getPortCount(type);
1019         }
1020     }
1021     return count;
1022 }
1023
1024 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1025     int count=0;
1026
1027     if (direction == Port::E_Capture) {
1028         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1029             it != m_ReceiveProcessors.end();
1030             ++it ) {
1031             count += (*it)->getPortCount();
1032         }
1033     } else {
1034         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1035             it != m_TransmitProcessors.end();
1036             ++it ) {
1037             count += (*it)->getPortCount();
1038         }
1039     }
1040     return count;
1041 }
1042
1043 // TODO: implement a port map here, instead of the loop
1044
1045 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1046     int count=0;
1047     int prevcount=0;
1048
1049     if (direction == Port::E_Capture) {
1050         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1051             it != m_ReceiveProcessors.end();
1052             ++it ) {
1053             count += (*it)->getPortCount();
1054             if (count > idx) {
1055                 return (*it)->getPortAtIdx(idx-prevcount);
1056             }
1057             prevcount=count;
1058         }
1059     } else {
1060         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1061             it != m_TransmitProcessors.end();
1062             ++it ) {
1063             count += (*it)->getPortCount();
1064             if (count > idx) {
1065                 return (*it)->getPortAtIdx(idx-prevcount);
1066             }
1067             prevcount=count;
1068         }
1069     }
1070     return NULL;
1071 }
1072
1073 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1074     m_thread_realtime=rt;
1075     m_thread_priority=priority;
1076     return true;
1077 }
1078
1079
1080 } // end of namespace
Note: See TracBrowser for help on using the browser.