root/branches/api-cleanup/src/libstreaming/StreamProcessorManager.cpp

Revision 821, 39.0 kB (checked in by ppalmers, 16 years ago)

make xrun hanling more robust

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23 #include "config.h"
24
25 #include "StreamProcessorManager.h"
26 #include "generic/StreamProcessor.h"
27 #include "generic/Port.h"
28 #include "libieee1394/cycletimer.h"
29
30 #include "libutil/Time.h"
31
32 #include <errno.h>
33 #include <assert.h>
34 #include <math.h>
35
36 namespace Streaming {
37
38 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
39
40 StreamProcessorManager::StreamProcessorManager()
41     : m_is_slave( false )
42     , m_SyncSource(NULL)
43     , m_xrun_happened( false )
44     , m_nb_buffers( 0 )
45     , m_period( 0 )
46     , m_audio_datatype( eADT_Float )
47     , m_nominal_framerate ( 0 )
48     , m_xruns(0)
49     , m_nbperiods(0)
50 {
51     addOption(Util::OptionContainer::Option("slaveMode",false));
52 }
53
54 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int framerate, unsigned int nb_buffers)
55     : m_is_slave( false )
56     , m_SyncSource(NULL)
57     , m_xrun_happened( false )
58     , m_nb_buffers(nb_buffers)
59     , m_period(period)
60     , m_audio_datatype( eADT_Float )
61     , m_nominal_framerate ( framerate )
62     , m_xruns(0)
63     , m_nbperiods(0)
64 {
65     addOption(Util::OptionContainer::Option("slaveMode",false));
66 }
67
68 StreamProcessorManager::~StreamProcessorManager() {
69 }
70
71 /**
72  * Registers \ref processor with this manager.
73  *
74  * also registers it with the isohandlermanager
75  *
76  * be sure to call isohandlermanager->init() first!
77  * and be sure that the processors are also ->init()'ed
78  *
79  * @param processor
80  * @return true if successfull
81  */
82 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
83 {
84     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
85     assert(processor);
86     if (processor->getType() == StreamProcessor::ePT_Receive) {
87         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
88         m_ReceiveProcessors.push_back(processor);
89         return true;
90     }
91
92     if (processor->getType() == StreamProcessor::ePT_Transmit) {
93         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
94         m_TransmitProcessors.push_back(processor);
95         return true;
96     }
97
98     debugFatal("Unsupported processor type!\n");
99     return false;
100 }
101
102 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
103 {
104     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
105     assert(processor);
106
107     if (processor->getType()==StreamProcessor::ePT_Receive) {
108
109         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
110               it != m_ReceiveProcessors.end();
111               ++it )
112         {
113             if ( *it == processor ) {
114                 if (*it == m_SyncSource) {
115                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
116                     m_SyncSource = NULL;
117                 }
118                 m_ReceiveProcessors.erase(it);
119                 return true;
120             }
121         }
122     }
123
124     if (processor->getType()==StreamProcessor::ePT_Transmit) {
125         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
126               it != m_TransmitProcessors.end();
127               ++it )
128         {
129             if ( *it == processor ) {
130                 if (*it == m_SyncSource) {
131                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
132                     m_SyncSource = NULL;
133                 }
134                 m_TransmitProcessors.erase(it);
135                 return true;
136             }
137         }
138     }
139
140     debugFatal("Processor (%p) not found!\n",processor);
141     return false; //not found
142 }
143
144 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
145     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
146     m_SyncSource=s;
147     return true;
148 }
149
150 bool StreamProcessorManager::prepare() {
151
152     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
153
154     m_is_slave=false;
155     if(!getOption("slaveMode", m_is_slave)) {
156         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
157     }
158
159     // if no sync source is set, select one here
160     if(m_SyncSource == NULL) {
161        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
162     }
163
164     // FIXME: put into separate method
165     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
166           it != m_ReceiveProcessors.end();
167           ++it )
168     {
169         if(m_SyncSource == NULL) {
170             debugWarning(" => Sync Source is %p.\n", *it);
171             m_SyncSource = *it;
172         }
173     }
174     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
175           it != m_TransmitProcessors.end();
176           ++it )
177     {
178         if(m_SyncSource == NULL) {
179             debugWarning(" => Sync Source is %p.\n", *it);
180             m_SyncSource = *it;
181         }
182     }
183
184     // now do the actual preparation of the SP's
185     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
186     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
187         it != m_ReceiveProcessors.end();
188         ++it ) {
189
190         if(!(*it)->setOption("slaveMode", m_is_slave)) {
191             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
192         }
193
194         if(!(*it)->prepare()) {
195             debugFatal(  " could not prepare (%p)...\n",(*it));
196             return false;
197         }
198     }
199     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
200     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
201         it != m_TransmitProcessors.end();
202         ++it ) {
203         if(!(*it)->setOption("slaveMode", m_is_slave)) {
204             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
205         }
206         if(!(*it)->prepare()) {
207             debugFatal( " could not prepare (%p)...\n",(*it));
208             return false;
209         }
210     }
211
212     // if there are no stream processors registered,
213     // fail
214     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
215         debugFatal("No stream processors registered, can't do anything usefull\n");
216         return false;
217     }
218     return true;
219 }
220
221 bool StreamProcessorManager::startDryRunning() {
222     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
223     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
224     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
225             it != m_ReceiveProcessors.end();
226             ++it ) {
227         if (!(*it)->isDryRunning()) {
228             if(!(*it)->scheduleStartDryRunning(-1)) {
229                 debugError("Could not put SP %p into the dry-running state\n", *it);
230                 return false;
231             }
232         } else {
233             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
234         }
235     }
236     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
237             it != m_TransmitProcessors.end();
238             ++it ) {
239         if (!(*it)->isDryRunning()) {
240             if(!(*it)->scheduleStartDryRunning(-1)) {
241                 debugError("Could not put SP %p into the dry-running state\n", *it);
242                 return false;
243             }
244         } else {
245             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
246         }
247     }
248     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
249     // wait for the syncsource to start running.
250     // that will block the waitForPeriod call until everyone has started (theoretically)
251     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_DRYRUN; // by then it should have started
252     bool all_dry_running = false;
253     while (!all_dry_running && cnt) {
254         all_dry_running = true;
255         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
256                 it != m_ReceiveProcessors.end();
257                 ++it ) {
258             all_dry_running &= (*it)->isDryRunning();
259         }
260         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
261                 it != m_TransmitProcessors.end();
262                 ++it ) {
263             all_dry_running &= (*it)->isDryRunning();
264         }
265
266         SleepRelativeUsec(125);
267         cnt--;
268     }
269     if(cnt==0) {
270         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
271         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
272                 it != m_ReceiveProcessors.end();
273                 ++it ) {
274             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
275                 (*it)->getTypeString(), *it, (*it)->getStateString());
276         }
277         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
278                 it != m_TransmitProcessors.end();
279                 ++it ) {
280             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
281                 (*it)->getTypeString(), *it, (*it)->getStateString());
282         }
283         return false;
284     }
285     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
286     return true;
287 }
288
289 bool StreamProcessorManager::syncStartAll() {
290     if(m_SyncSource == NULL) return false;
291     // figure out when to get the SP's running.
292     // the xmit SP's should also know the base timestamp
293     // streams should be aligned here
294
295     // now find out how long we have to delay the wait operation such that
296     // the received frames will all be presented to the SP
297     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
298     int max_of_min_delay = 0;
299     int min_delay = 0;
300     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
301             it != m_ReceiveProcessors.end();
302             ++it ) {
303         min_delay = (*it)->getMaxFrameLatency();
304         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
305     }
306
307     // add some processing margin. This only shifts the time
308     // at which the buffer is transfer()'ed. This makes things somewhat
309     // more robust. It should be noted though that shifting the transfer
310     // time to a later time instant also causes the xmit buffer fill to be
311     // lower on average.
312     max_of_min_delay += STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS;
313     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n",
314         max_of_min_delay,
315         (unsigned int)TICKS_TO_SECS(max_of_min_delay),
316         (unsigned int)TICKS_TO_CYCLES(max_of_min_delay),
317         (unsigned int)TICKS_TO_OFFSET(max_of_min_delay));
318     m_SyncSource->setSyncDelay(max_of_min_delay);
319
320     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
321     //        have aquired lock
322     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
323     //sleep(2); // FIXME: be smarter here
324
325     // make sure that we are dry-running long enough for the
326     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
327     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
328     int nb_sync_runs=20;
329     int64_t time_till_next_period;
330     while(nb_sync_runs--) { // or while not sync-ed?
331         // check if we were woken up too soon
332         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
333         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
334         if(time_till_next_period > 0) {
335             // wait for the period
336             SleepRelativeUsec(time_till_next_period);
337         }
338     }
339
340     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
341     // FIXME: in the SPM it would be nice to have system time instead of
342     //        1394 time
343
344     // we now should have decent sync info on the sync source
345     // determine a point in time where the system should start
346     // figure out where we are now
347     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
348     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
349         time_of_first_sample,
350         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
351         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
352         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
353
354     // start wet-running in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles
355     // this is the time window we have to setup all SP's such that they
356     // can start wet-running correctly.
357     time_of_first_sample = addTicks(time_of_first_sample,
358                                     STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP * TICKS_PER_CYCLE);
359
360     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
361         time_of_first_sample,
362         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
363         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
364         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
365
366     // we should start wet-running the transmit SP's some cycles in advance
367     // such that we know it is wet-running when it should output its first sample
368     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
369                                                  STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE);
370
371     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
372                                                  STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE);
373     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
374         time_to_start_xmit,
375         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
376         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
377         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
378     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
379         time_to_start_recv,
380         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
381         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
382         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
383
384     // at this point the buffer head timestamp of the transmit buffers can be set
385     // this is the presentation time of the first sample in the buffer
386     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
387           it != m_TransmitProcessors.end();
388           ++it ) {
389         (*it)->setBufferHeadTimestamp(time_of_first_sample);
390     }
391
392     // STEP X: switch SP's over to the running state
393     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
394           it != m_ReceiveProcessors.end();
395           ++it ) {
396         if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
397             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
398             return false;
399         }
400     }
401     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
402           it != m_TransmitProcessors.end();
403           ++it ) {
404         if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
405             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
406             return false;
407         }
408     }
409     // wait for the syncsource to start running.
410     // that will block the waitForPeriod call until everyone has started (theoretically)
411     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP * 20; // by then it should have started
412     while (!m_SyncSource->isRunning() && cnt) {
413         SleepRelativeUsec(125);
414         cnt--;
415     }
416     if(cnt==0) {
417         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
418         return false;
419     }
420
421     if(!alignReceivedStreams()) {
422         debugError("Could not align streams...\n");
423         return false;
424     }
425
426     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
427     return true;
428 }
429
430 bool
431 StreamProcessorManager::alignReceivedStreams()
432 {
433     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
434     unsigned int nb_sync_runs;
435     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
436     int64_t diff_between_streams[nb_rcv_sp];
437     int64_t diff;
438
439     unsigned int i;
440
441     unsigned int periods_per_align_try = (STREAMPROCESSORMANAGER_ALIGN_AVERAGE_TIME_MSEC * getNominalRate());
442     periods_per_align_try /= 1000;
443     periods_per_align_try /= getPeriodSize();
444     debugOutput( DEBUG_LEVEL_VERBOSE, " averaging over %u periods...\n", periods_per_align_try);
445
446     bool aligned = false;
447     int cnt = STREAMPROCESSORMANAGER_NB_ALIGN_TRIES;
448     while (!aligned && cnt--) {
449         nb_sync_runs = periods_per_align_try;
450         while(nb_sync_runs) {
451             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
452             if(!waitForPeriod()) {
453                 debugWarning("xrun while aligning streams...\n");
454                 return false;
455             };
456
457             i = 0;
458             for ( i = 0; i < nb_rcv_sp; i++) {
459                 StreamProcessor *s = m_ReceiveProcessors.at(i);
460                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
461                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",
462                     m_SyncSource, s, diff);
463                 if ( nb_sync_runs == periods_per_align_try ) {
464                     diff_between_streams[i] = diff;
465                 } else {
466                     diff_between_streams[i] += diff;
467                 }
468             }
469             if(!transferSilence()) {
470                 debugError("Could not transfer silence\n");
471                 return false;
472             }
473             nb_sync_runs--;
474         }
475         // calculate the average offsets
476         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
477         int diff_between_streams_frames[nb_rcv_sp];
478         aligned = true;
479         for ( i = 0; i < nb_rcv_sp; i++) {
480             StreamProcessor *s = m_ReceiveProcessors.at(i);
481
482             diff_between_streams[i] /= periods_per_align_try;
483             diff_between_streams_frames[i] = (int)roundf(diff_between_streams[i] / s->getTicksPerFrame());
484             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",
485                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
486
487             aligned &= (diff_between_streams_frames[i] == 0);
488
489             // reposition the stream
490             if(!s->shiftStream(diff_between_streams_frames[i])) {
491                 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]);
492                 return false;
493             }
494         }
495         if (!aligned) {
496             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
497         }
498     }
499     if (cnt == 0) {
500         debugError("Align failed\n");
501         return false;
502     }
503     return true;
504 }
505
506 bool StreamProcessorManager::start() {
507     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
508
509     // start all SP's synchonized
510     bool start_result = false;
511     for (int ntries; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
512         // put all SP's into dry-running state
513         if (!startDryRunning()) {
514             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
515             start_result = false;
516             continue;
517         }
518
519         start_result = syncStartAll();
520         if(start_result) {
521             break;
522         } else {
523             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
524         }
525     }
526     if (!start_result) {
527         debugFatal("Could not syncStartAll...\n");
528         return false;
529     }
530
531     return true;
532 }
533
534 bool StreamProcessorManager::stop() {
535     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
536
537     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
538     // switch SP's over to the dry-running state
539     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
540           it != m_ReceiveProcessors.end();
541           ++it ) {
542         if((*it)->isRunning()) {
543             if(!(*it)->scheduleStopRunning(-1)) {
544                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
545                 return false;
546             }
547         }
548     }
549     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
550           it != m_TransmitProcessors.end();
551           ++it ) {
552         if((*it)->isRunning()) {
553             if(!(*it)->scheduleStopRunning(-1)) {
554                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
555                 return false;
556             }
557         }
558     }
559     // wait for the SP's to get into the dry-running state
560     int cnt = 2000;
561     bool ready = false;
562     while (!ready && cnt) {
563         ready = true;
564         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
565             it != m_ReceiveProcessors.end();
566             ++it ) {
567             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
568         }
569         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
570             it != m_TransmitProcessors.end();
571             ++it ) {
572             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
573         }
574         SleepRelativeUsec(125);
575         cnt--;
576     }
577     if(cnt==0) {
578         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
579         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
580             it != m_ReceiveProcessors.end();
581             ++it ) {
582             (*it)->dumpInfo();
583         }
584         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
585             it != m_TransmitProcessors.end();
586             ++it ) {
587             (*it)->dumpInfo();
588         }
589         return false;
590     }
591
592     // switch SP's over to the stopped state
593     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
594           it != m_ReceiveProcessors.end();
595           ++it ) {
596         if(!(*it)->scheduleStopDryRunning(-1)) {
597             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
598             return false;
599         }
600     }
601     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
602           it != m_TransmitProcessors.end();
603           ++it ) {
604         if(!(*it)->scheduleStopDryRunning(-1)) {
605             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
606             return false;
607         }
608     }
609     // wait for the SP's to get into the running state
610     cnt = 200;
611     ready = false;
612     while (!ready && cnt) {
613         ready = true;
614         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
615             it != m_ReceiveProcessors.end();
616             ++it ) {
617             ready &= (*it)->isStopped();
618         }
619         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
620             it != m_TransmitProcessors.end();
621             ++it ) {
622             ready &= (*it)->isStopped();
623         }
624         SleepRelativeUsec(125);
625         cnt--;
626     }
627     if(cnt==0) {
628         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to stop\n");
629         return false;
630     }
631     return true;
632 }
633
634 /**
635  * Called upon Xrun events. This brings all StreamProcessors back
636  * into their starting state, and then carries on streaming. This should
637  * have the same effect as restarting the whole thing.
638  *
639  * @return true if successful, false otherwise
640  */
641 bool StreamProcessorManager::handleXrun() {
642
643     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
644
645     dumpInfo();
646
647     /*
648      * Reset means:
649      * 1) Disabling the SP's, so that they don't process any packets
650      *    note: the isomanager does keep on delivering/requesting them
651      * 2) Bringing all buffers & streamprocessors into a know state
652      *    - Clear all capture buffers
653      *    - Put nb_periods*period_size of null frames into the playback buffers
654      * 3) Re-enable the SP's
655      */
656
657     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
658     // start all SP's synchonized
659     bool start_result = false;
660     for (int ntries; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
661         // put all SP's into dry-running state
662         if (!startDryRunning()) {
663             debugShowBackLog();
664             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
665             start_result = false;
666             continue;
667         }
668
669         start_result = syncStartAll();
670         if(start_result) {
671             break;
672         } else {
673             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
674         }
675     }
676     if (!start_result) {
677         debugFatal("Could not syncStartAll...\n");
678         return false;
679     }
680     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
681
682     return true;
683 }
684
685 /**
686  * @brief Waits until the next period of samples is ready
687  *
688  * This function does not return until a full period of samples is (or should be)
689  * ready to be transferred.
690  *
691  * @return true if the period is ready, false if an xrun occurred
692  */
693 bool StreamProcessorManager::waitForPeriod() {
694     if(m_SyncSource == NULL) return false;
695     bool xrun_occurred = false;
696     bool period_not_ready = true;
697
698     while(period_not_ready) {
699         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for period (%d frames in buffer)...\n", m_SyncSource->getBufferFill());
700         if(!m_SyncSource->waitForSignal()) {
701             debugError("Error waiting for signal\n");
702             return false;
703         }
704
705         unsigned int bufferfill = m_SyncSource->getBufferFill();
706         period_not_ready = bufferfill < m_period;
707
708 #ifdef DEBUG
709         if(period_not_ready) {
710             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "period is not ready (bufferfill: %u)\n", bufferfill);
711         } else {
712             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "period is ready (bufferfill: %u)\n", bufferfill);
713         }
714 #endif
715
716         // check for underruns on the ISO side,
717         // those should make us bail out of the wait loop
718         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
719             it != m_ReceiveProcessors.end();
720             ++it ) {
721             // a xrun has occurred on the Iso side
722             xrun_occurred |= (*it)->xrunOccurred();
723         }
724         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
725             it != m_TransmitProcessors.end();
726             ++it ) {
727             // a xrun has occurred on the Iso side
728             xrun_occurred |= (*it)->xrunOccurred();
729         }
730         if(xrun_occurred) break;
731         // FIXME: make sure we also exit this loop when something else happens (e.g. signal, iso error)
732     }
733
734     // we save the 'ideal' time of the transfer at this point,
735     // because we can have interleaved read - process - write
736     // cycles making that we modify a receiving stream's buffer
737     // before we get to writing.
738     // NOTE: before waitForPeriod() is called again, both the transmit
739     //       and the receive processors should have done their transfer.
740     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
741     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
742         m_time_of_transfer);
743
744     // this is to notify the client of the delay that we introduced by waiting
745     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
746     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", m_delayed_usecs);
747
748 #ifdef DEBUG
749     int rcv_bf=0, xmt_bf=0;
750     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
751         it != m_ReceiveProcessors.end();
752         ++it ) {
753         rcv_bf = (*it)->getBufferFill();
754     }
755     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
756         it != m_TransmitProcessors.end();
757         ++it ) {
758         xmt_bf = (*it)->getBufferFill();
759     }
760     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
761         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
762
763     // check if xruns occurred on the Iso side.
764     // also check if xruns will occur should we transfer() now
765     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
766           it != m_ReceiveProcessors.end();
767           ++it ) {
768
769         if ((*it)->xrunOccurred()) {
770             debugWarning("Xrun on RECV SP %p due to ISO side xrun\n",*it);
771             (*it)->dumpInfo();
772         }
773         if (!((*it)->canClientTransferFrames(m_period))) {
774             debugWarning("Xrun on RECV SP %p due to buffer side xrun\n",*it);
775             (*it)->dumpInfo();
776         }
777     }
778     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
779           it != m_TransmitProcessors.end();
780           ++it ) {
781         if ((*it)->xrunOccurred()) {
782             debugWarning("Xrun on XMIT SP %p due to ISO side xrun\n",*it);
783         }
784         if (!((*it)->canClientTransferFrames(m_period))) {
785             debugWarning("Xrun on XMIT SP %p due to buffer side xrun\n",*it);
786         }
787     }
788 #endif
789
790     m_nbperiods++;
791     // now we can signal the client that we are (should be) ready
792     return !xrun_occurred;
793 }
794
795 /**
796  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
797  *
798  * Transfers one period of frames from the client side to the Iso side and vice versa.
799  *
800  * @return true if successful, false otherwise (indicates xrun).
801  */
802 bool StreamProcessorManager::transfer() {
803     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
804     bool retval=true;
805     retval &= transfer(StreamProcessor::ePT_Receive);
806     retval &= transfer(StreamProcessor::ePT_Transmit);
807     return retval;
808 }
809
810 /**
811  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
812  *
813  * Transfers one period of frames from the client side to the Iso side or vice versa.
814  *
815  * @param t The processor type to tranfer for (receive or transmit)
816  * @return true if successful, false otherwise (indicates xrun).
817  */
818 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
819     if(m_SyncSource == NULL) return false;
820     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
821         t, m_time_of_transfer,
822         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
823         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
824         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
825
826     bool retval = true;
827     // a static cast could make sure that there is no performance
828     // penalty for the virtual functions (to be checked)
829     if (t==StreamProcessor::ePT_Receive) {
830         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
831                 it != m_ReceiveProcessors.end();
832                 ++it ) {
833             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
834                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
835                             m_period, m_time_of_transfer,*it);
836                 retval &= false; // buffer underrun
837             }
838         }
839     } else {
840         // FIXME: in the SPM it would be nice to have system time instead of
841         //        1394 time
842         float rate = m_SyncSource->getTicksPerFrame();
843         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
844
845         // the data we are putting into the buffer is intended to be transmitted
846         // one ringbuffer size after it has been received
847         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
848
849         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
850                 it != m_TransmitProcessors.end();
851                 ++it ) {
852             // FIXME: in the SPM it would be nice to have system time instead of
853             //        1394 time
854             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
855                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
856                         m_period, transmit_timestamp, *it);
857                 retval &= false; // buffer underrun
858             }
859         }
860     }
861     return retval;
862 }
863
864 /**
865  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
866  *
867  * Transfers one period of silence to the Iso side for transmit SP's
868  * or dump one period of frames for receive SP's
869  *
870  * @return true if successful, false otherwise (indicates xrun).
871  */
872 bool StreamProcessorManager::transferSilence() {
873     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
874     bool retval=true;
875     retval &= transferSilence(StreamProcessor::ePT_Receive);
876     retval &= transferSilence(StreamProcessor::ePT_Transmit);
877     return retval;
878 }
879
880 /**
881  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
882  *
883  * Transfers one period of silence to the Iso side for transmit SP's
884  * or dump one period of frames for receive SP's
885  *
886  * @param t The processor type to tranfer for (receive or transmit)
887  * @return true if successful, false otherwise (indicates xrun).
888  */
889 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
890     if(m_SyncSource == NULL) return false;
891     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
892         t, m_time_of_transfer,
893         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
894         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
895         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
896
897     bool retval = true;
898     // a static cast could make sure that there is no performance
899     // penalty for the virtual functions (to be checked)
900     if (t==StreamProcessor::ePT_Receive) {
901         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
902                 it != m_ReceiveProcessors.end();
903                 ++it ) {
904             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
905                     debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n",
906                             m_period, m_time_of_transfer,*it);
907                 retval &= false; // buffer underrun
908             }
909         }
910     } else {
911         // FIXME: in the SPM it would be nice to have system time instead of
912         //        1394 time
913         float rate = m_SyncSource->getTicksPerFrame();
914         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
915
916         // the data we are putting into the buffer is intended to be transmitted
917         // one ringbuffer size after it has been received
918         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
919
920         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
921                 it != m_TransmitProcessors.end();
922                 ++it ) {
923             // FIXME: in the SPM it would be nice to have system time instead of
924             //        1394 time
925             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
926                 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
927                         m_period, transmit_timestamp, *it);
928                 retval &= false; // buffer underrun
929             }
930         }
931     }
932     return retval;
933 }
934
935 void StreamProcessorManager::dumpInfo() {
936     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
937     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
938     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
939     debugOutputShort( DEBUG_LEVEL_NORMAL, "Data type: %s\n", (m_audio_datatype==eADT_Float?"float":"int24"));
940
941     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
942     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
943         it != m_ReceiveProcessors.end();
944         ++it ) {
945         (*it)->dumpInfo();
946     }
947
948     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
949     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
950         it != m_TransmitProcessors.end();
951         ++it ) {
952         (*it)->dumpInfo();
953     }
954
955     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
956
957 }
958
959 void StreamProcessorManager::setVerboseLevel(int l) {
960     setDebugLevel(l);
961
962     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
963     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
964         it != m_ReceiveProcessors.end();
965         ++it ) {
966         (*it)->setVerboseLevel(l);
967     }
968
969     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
970     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
971         it != m_TransmitProcessors.end();
972         ++it ) {
973         (*it)->setVerboseLevel(l);
974     }
975 }
976
977
978 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
979     int count=0;
980
981     if (direction == Port::E_Capture) {
982         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
983             it != m_ReceiveProcessors.end();
984             ++it ) {
985             count += (*it)->getPortCount(type);
986         }
987     } else {
988         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
989             it != m_TransmitProcessors.end();
990             ++it ) {
991             count += (*it)->getPortCount(type);
992         }
993     }
994     return count;
995 }
996
997 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
998     int count=0;
999
1000     if (direction == Port::E_Capture) {
1001         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1002             it != m_ReceiveProcessors.end();
1003             ++it ) {
1004             count += (*it)->getPortCount();
1005         }
1006     } else {
1007         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1008             it != m_TransmitProcessors.end();
1009             ++it ) {
1010             count += (*it)->getPortCount();
1011         }
1012     }
1013     return count;
1014 }
1015
1016 // TODO: implement a port map here, instead of the loop
1017
1018 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1019     int count=0;
1020     int prevcount=0;
1021
1022     if (direction == Port::E_Capture) {
1023         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1024             it != m_ReceiveProcessors.end();
1025             ++it ) {
1026             count += (*it)->getPortCount();
1027             if (count > idx) {
1028                 return (*it)->getPortAtIdx(idx-prevcount);
1029             }
1030             prevcount=count;
1031         }
1032     } else {
1033         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1034             it != m_TransmitProcessors.end();
1035             ++it ) {
1036             count += (*it)->getPortCount();
1037             if (count > idx) {
1038                 return (*it)->getPortAtIdx(idx-prevcount);
1039             }
1040             prevcount=count;
1041         }
1042     }
1043     return NULL;
1044 }
1045
1046 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1047     m_thread_realtime=rt;
1048     m_thread_priority=priority;
1049     return true;
1050 }
1051
1052
1053 } // end of namespace
Note: See TracBrowser for help on using the browser.