root/trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

Revision 864, 40.2 kB (checked in by ppalmers, 15 years ago)

update license to GPLv2 or GPLv3 instead of GPLv2 or any later version. Update copyrights to reflect the new year

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23 #include "config.h"
24
25 #include "StreamProcessorManager.h"
26 #include "generic/StreamProcessor.h"
27 #include "generic/Port.h"
28 #include "libieee1394/cycletimer.h"
29
30 #include "libutil/Time.h"
31
32 #include <errno.h>
33 #include <assert.h>
34 #include <math.h>
35
36 namespace Streaming {
37
38 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
39
40 StreamProcessorManager::StreamProcessorManager()
41     : m_is_slave( false )
42     , m_SyncSource(NULL)
43     , m_xrun_happened( false )
44     , m_nb_buffers( 0 )
45     , m_period( 0 )
46     , m_audio_datatype( eADT_Float )
47     , m_nominal_framerate ( 0 )
48     , m_xruns(0)
49     , m_nbperiods(0)
50 {
51     addOption(Util::OptionContainer::Option("slaveMode",false));
52 }
53
54 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int framerate, unsigned int nb_buffers)
55     : m_is_slave( false )
56     , m_SyncSource(NULL)
57     , m_xrun_happened( false )
58     , m_nb_buffers(nb_buffers)
59     , m_period(period)
60     , m_audio_datatype( eADT_Float )
61     , m_nominal_framerate ( framerate )
62     , m_xruns(0)
63     , m_nbperiods(0)
64 {
65     addOption(Util::OptionContainer::Option("slaveMode",false));
66 }
67
68 StreamProcessorManager::~StreamProcessorManager() {
69 }
70
71 /**
72  * Registers \ref processor with this manager.
73  *
74  * also registers it with the isohandlermanager
75  *
76  * be sure to call isohandlermanager->init() first!
77  * and be sure that the processors are also ->init()'ed
78  *
79  * @param processor
80  * @return true if successfull
81  */
82 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
83 {
84     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
85     assert(processor);
86     if (processor->getType() == StreamProcessor::ePT_Receive) {
87         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
88         m_ReceiveProcessors.push_back(processor);
89         return true;
90     }
91
92     if (processor->getType() == StreamProcessor::ePT_Transmit) {
93         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
94         m_TransmitProcessors.push_back(processor);
95         return true;
96     }
97
98     debugFatal("Unsupported processor type!\n");
99     return false;
100 }
101
102 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
103 {
104     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
105     assert(processor);
106
107     if (processor->getType()==StreamProcessor::ePT_Receive) {
108
109         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
110               it != m_ReceiveProcessors.end();
111               ++it )
112         {
113             if ( *it == processor ) {
114                 if (*it == m_SyncSource) {
115                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
116                     m_SyncSource = NULL;
117                 }
118                 m_ReceiveProcessors.erase(it);
119                 return true;
120             }
121         }
122     }
123
124     if (processor->getType()==StreamProcessor::ePT_Transmit) {
125         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
126               it != m_TransmitProcessors.end();
127               ++it )
128         {
129             if ( *it == processor ) {
130                 if (*it == m_SyncSource) {
131                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
132                     m_SyncSource = NULL;
133                 }
134                 m_TransmitProcessors.erase(it);
135                 return true;
136             }
137         }
138     }
139
140     debugFatal("Processor (%p) not found!\n",processor);
141     return false; //not found
142 }
143
144 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
145     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
146     m_SyncSource=s;
147     return true;
148 }
149
150 bool StreamProcessorManager::prepare() {
151
152     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
153
154     m_is_slave=false;
155     if(!getOption("slaveMode", m_is_slave)) {
156         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
157     }
158
159     // if no sync source is set, select one here
160     if(m_SyncSource == NULL) {
161        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
162     }
163
164     // FIXME: put into separate method
165     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
166           it != m_ReceiveProcessors.end();
167           ++it )
168     {
169         if(m_SyncSource == NULL) {
170             debugWarning(" => Sync Source is %p.\n", *it);
171             m_SyncSource = *it;
172         }
173     }
174     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
175           it != m_TransmitProcessors.end();
176           ++it )
177     {
178         if(m_SyncSource == NULL) {
179             debugWarning(" => Sync Source is %p.\n", *it);
180             m_SyncSource = *it;
181         }
182     }
183
184     // now do the actual preparation of the SP's
185     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
186     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
187         it != m_ReceiveProcessors.end();
188         ++it ) {
189
190         if(!(*it)->setOption("slaveMode", m_is_slave)) {
191             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
192         }
193
194         if(!(*it)->prepare()) {
195             debugFatal(  " could not prepare (%p)...\n",(*it));
196             return false;
197         }
198     }
199     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
200     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
201         it != m_TransmitProcessors.end();
202         ++it ) {
203         if(!(*it)->setOption("slaveMode", m_is_slave)) {
204             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
205         }
206         if(!(*it)->prepare()) {
207             debugFatal( " could not prepare (%p)...\n",(*it));
208             return false;
209         }
210     }
211
212     // if there are no stream processors registered,
213     // fail
214     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
215         debugFatal("No stream processors registered, can't do anything usefull\n");
216         return false;
217     }
218     return true;
219 }
220
221 bool StreamProcessorManager::startDryRunning() {
222     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
223     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
224     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
225             it != m_TransmitProcessors.end();
226             ++it ) {
227         if (!(*it)->isDryRunning()) {
228             if(!(*it)->scheduleStartDryRunning(-1)) {
229                 debugError("Could not put SP %p into the dry-running state\n", *it);
230                 return false;
231             }
232         } else {
233             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
234         }
235     }
236     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
237             it != m_ReceiveProcessors.end();
238             ++it ) {
239         if (!(*it)->isDryRunning()) {
240             if(!(*it)->scheduleStartDryRunning(-1)) {
241                 debugError("Could not put SP %p into the dry-running state\n", *it);
242                 return false;
243             }
244         } else {
245             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
246         }
247     }
248     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
249     // wait for the syncsource to start running.
250     // that will block the waitForPeriod call until everyone has started (theoretically)
251     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_DRYRUN; // by then it should have started
252     bool all_dry_running = false;
253     while (!all_dry_running && cnt) {
254         all_dry_running = true;
255         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
256                 it != m_ReceiveProcessors.end();
257                 ++it ) {
258             all_dry_running &= (*it)->isDryRunning();
259         }
260         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
261                 it != m_TransmitProcessors.end();
262                 ++it ) {
263             all_dry_running &= (*it)->isDryRunning();
264         }
265
266         SleepRelativeUsec(125);
267         cnt--;
268     }
269     if(cnt==0) {
270         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
271         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
272                 it != m_ReceiveProcessors.end();
273                 ++it ) {
274             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
275                 (*it)->getTypeString(), *it, (*it)->getStateString());
276         }
277         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
278                 it != m_TransmitProcessors.end();
279                 ++it ) {
280             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
281                 (*it)->getTypeString(), *it, (*it)->getStateString());
282         }
283         return false;
284     }
285     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
286     return true;
287 }
288
289 bool StreamProcessorManager::syncStartAll() {
290     if(m_SyncSource == NULL) return false;
291     // figure out when to get the SP's running.
292     // the xmit SP's should also know the base timestamp
293     // streams should be aligned here
294
295     // now find out how long we have to delay the wait operation such that
296     // the received frames will all be presented to the SP
297     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
298     int max_of_min_delay = 0;
299     int min_delay = 0;
300     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
301             it != m_ReceiveProcessors.end();
302             ++it ) {
303         min_delay = (*it)->getMaxFrameLatency();
304         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
305     }
306
307     // add some processing margin. This only shifts the time
308     // at which the buffer is transfer()'ed. This makes things somewhat
309     // more robust. It should be noted though that shifting the transfer
310     // time to a later time instant also causes the xmit buffer fill to be
311     // lower on average.
312     max_of_min_delay += STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS;
313     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n",
314         max_of_min_delay,
315         (unsigned int)TICKS_TO_SECS(max_of_min_delay),
316         (unsigned int)TICKS_TO_CYCLES(max_of_min_delay),
317         (unsigned int)TICKS_TO_OFFSET(max_of_min_delay));
318     m_SyncSource->setSyncDelay(max_of_min_delay);
319
320     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
321     //        have aquired lock
322     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
323     //sleep(2); // FIXME: be smarter here
324
325     // make sure that we are dry-running long enough for the
326     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
327     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
328    
329     unsigned int nb_sync_runs = (STREAMPROCESSORMANAGER_SYNC_WAIT_TIME_MSEC * getNominalRate());
330     nb_sync_runs /= 1000;
331     nb_sync_runs /= getPeriodSize();
332
333     int64_t time_till_next_period;
334     while(nb_sync_runs--) { // or while not sync-ed?
335         // check if we were woken up too soon
336         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
337         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
338         if(time_till_next_period > 0) {
339             // wait for the period
340             SleepRelativeUsec(time_till_next_period);
341         }
342     }
343
344     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
345     // FIXME: in the SPM it would be nice to have system time instead of
346     //        1394 time
347
348     // we now should have decent sync info on the sync source
349     // determine a point in time where the system should start
350     // figure out where we are now
351     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
352     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
353         time_of_first_sample,
354         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
355         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
356         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
357
358     // start wet-running in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles
359     // this is the time window we have to setup all SP's such that they
360     // can start wet-running correctly.
361     time_of_first_sample = addTicks(time_of_first_sample,
362                                     STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP * TICKS_PER_CYCLE);
363
364     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
365         time_of_first_sample,
366         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
367         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
368         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
369
370     // we should start wet-running the transmit SP's some cycles in advance
371     // such that we know it is wet-running when it should output its first sample
372     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
373                                                  STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE);
374
375     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
376                                                  STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE);
377     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
378         time_to_start_xmit,
379         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
380         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
381         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
382     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
383         time_to_start_recv,
384         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
385         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
386         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
387
388     // at this point the buffer head timestamp of the transmit buffers can be set
389     // this is the presentation time of the first sample in the buffer
390     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
391           it != m_TransmitProcessors.end();
392           ++it ) {
393         (*it)->setBufferHeadTimestamp(time_of_first_sample);
394     }
395
396     // STEP X: switch SP's over to the running state
397     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
398           it != m_ReceiveProcessors.end();
399           ++it ) {
400         if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
401             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
402             return false;
403         }
404     }
405     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
406           it != m_TransmitProcessors.end();
407           ++it ) {
408         if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
409             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
410             return false;
411         }
412     }
413     // wait for the syncsource to start running.
414     // that will block the waitForPeriod call until everyone has started (theoretically)
415     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP * 20; // by then it should have started
416     while (!m_SyncSource->isRunning() && cnt) {
417         SleepRelativeUsec(125);
418         cnt--;
419     }
420     if(cnt==0) {
421         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
422         return false;
423     }
424
425     if(!alignReceivedStreams()) {
426         debugError("Could not align streams...\n");
427         return false;
428     }
429
430     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
431     return true;
432 }
433
434 bool
435 StreamProcessorManager::alignReceivedStreams()
436 {
437     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
438     unsigned int nb_sync_runs;
439     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
440     int64_t diff_between_streams[nb_rcv_sp];
441     int64_t diff;
442
443     unsigned int i;
444
445     unsigned int periods_per_align_try = (STREAMPROCESSORMANAGER_ALIGN_AVERAGE_TIME_MSEC * getNominalRate());
446     periods_per_align_try /= 1000;
447     periods_per_align_try /= getPeriodSize();
448     debugOutput( DEBUG_LEVEL_VERBOSE, " averaging over %u periods...\n", periods_per_align_try);
449
450     bool aligned = false;
451     int cnt = STREAMPROCESSORMANAGER_NB_ALIGN_TRIES;
452     while (!aligned && cnt--) {
453         nb_sync_runs = periods_per_align_try;
454         while(nb_sync_runs) {
455             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
456             if(!waitForPeriod()) {
457                 debugWarning("xrun while aligning streams...\n");
458                 return false;
459             };
460
461             i = 0;
462             for ( i = 0; i < nb_rcv_sp; i++) {
463                 StreamProcessor *s = m_ReceiveProcessors.at(i);
464                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
465                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",
466                     m_SyncSource, s, diff);
467                 if ( nb_sync_runs == periods_per_align_try ) {
468                     diff_between_streams[i] = diff;
469                 } else {
470                     diff_between_streams[i] += diff;
471                 }
472             }
473             if(!transferSilence()) {
474                 debugError("Could not transfer silence\n");
475                 return false;
476             }
477             nb_sync_runs--;
478         }
479         // calculate the average offsets
480         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
481         int diff_between_streams_frames[nb_rcv_sp];
482         aligned = true;
483         for ( i = 0; i < nb_rcv_sp; i++) {
484             StreamProcessor *s = m_ReceiveProcessors.at(i);
485
486             diff_between_streams[i] /= periods_per_align_try;
487             diff_between_streams_frames[i] = (int)roundf(diff_between_streams[i] / s->getTicksPerFrame());
488             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",
489                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
490
491             aligned &= (diff_between_streams_frames[i] == 0);
492
493             // reposition the stream
494             if(!s->shiftStream(diff_between_streams_frames[i])) {
495                 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]);
496                 return false;
497             }
498         }
499         if (!aligned) {
500             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
501         }
502     }
503     if (cnt == 0) {
504         debugError("Align failed\n");
505         return false;
506     }
507     return true;
508 }
509
510 bool StreamProcessorManager::start() {
511     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
512
513     // start all SP's synchonized
514     bool start_result = false;
515     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
516         // put all SP's into dry-running state
517         if (!startDryRunning()) {
518             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
519             start_result = false;
520             continue;
521         }
522
523         start_result = syncStartAll();
524         if(start_result) {
525             break;
526         } else {
527             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
528         }
529     }
530     if (!start_result) {
531         debugFatal("Could not syncStartAll...\n");
532         return false;
533     }
534
535     return true;
536 }
537
538 bool StreamProcessorManager::stop() {
539     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
540
541     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
542     // switch SP's over to the dry-running state
543     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
544           it != m_ReceiveProcessors.end();
545           ++it ) {
546         if((*it)->isRunning()) {
547             if(!(*it)->scheduleStopRunning(-1)) {
548                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
549                 return false;
550             }
551         }
552     }
553     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
554           it != m_TransmitProcessors.end();
555           ++it ) {
556         if((*it)->isRunning()) {
557             if(!(*it)->scheduleStopRunning(-1)) {
558                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
559                 return false;
560             }
561         }
562     }
563     // wait for the SP's to get into the dry-running/stopped state
564     int cnt = 8000;
565     bool ready = false;
566     while (!ready && cnt) {
567         ready = true;
568         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
569             it != m_ReceiveProcessors.end();
570             ++it ) {
571             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream());
572         }
573         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
574             it != m_TransmitProcessors.end();
575             ++it ) {
576             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream());
577         }
578         SleepRelativeUsec(125);
579         cnt--;
580     }
581     if(cnt==0) {
582         debugWarning(" Timeout waiting for the SP's to start dry-running\n");
583         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
584             it != m_ReceiveProcessors.end();
585             ++it ) {
586             (*it)->dumpInfo();
587         }
588         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
589             it != m_TransmitProcessors.end();
590             ++it ) {
591             (*it)->dumpInfo();
592         }
593         return false;
594     }
595
596     // switch SP's over to the stopped state
597     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
598           it != m_ReceiveProcessors.end();
599           ++it ) {
600         if(!(*it)->scheduleStopDryRunning(-1)) {
601             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
602             return false;
603         }
604     }
605     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
606           it != m_TransmitProcessors.end();
607           ++it ) {
608         if(!(*it)->scheduleStopDryRunning(-1)) {
609             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
610             return false;
611         }
612     }
613     // wait for the SP's to get into the stopped state
614     cnt = 8000;
615     ready = false;
616     while (!ready && cnt) {
617         ready = true;
618         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
619             it != m_ReceiveProcessors.end();
620             ++it ) {
621             ready &= (*it)->isStopped();
622         }
623         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
624             it != m_TransmitProcessors.end();
625             ++it ) {
626             ready &= (*it)->isStopped();
627         }
628         SleepRelativeUsec(125);
629         cnt--;
630     }
631     if(cnt==0) {
632         debugWarning(" Timeout waiting for the SP's to stop\n");
633         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
634             it != m_ReceiveProcessors.end();
635             ++it ) {
636             (*it)->dumpInfo();
637         }
638         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
639             it != m_TransmitProcessors.end();
640             ++it ) {
641             (*it)->dumpInfo();
642         }
643         return false;
644     }
645     return true;
646 }
647
648 /**
649  * Called upon Xrun events. This brings all StreamProcessors back
650  * into their starting state, and then carries on streaming. This should
651  * have the same effect as restarting the whole thing.
652  *
653  * @return true if successful, false otherwise
654  */
655 bool StreamProcessorManager::handleXrun() {
656
657     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
658
659     dumpInfo();
660
661     /*
662      * Reset means:
663      * 1) Disabling the SP's, so that they don't process any packets
664      *    note: the isomanager does keep on delivering/requesting them
665      * 2) Bringing all buffers & streamprocessors into a know state
666      *    - Clear all capture buffers
667      *    - Put nb_periods*period_size of null frames into the playback buffers
668      * 3) Re-enable the SP's
669      */
670
671     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
672     // start all SP's synchonized
673     bool start_result = false;
674     for (int ntries; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
675         // put all SP's into dry-running state
676         if (!startDryRunning()) {
677             debugShowBackLog();
678             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
679             start_result = false;
680             continue;
681         }
682
683         start_result = syncStartAll();
684         if(start_result) {
685             break;
686         } else {
687             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
688         }
689     }
690     if (!start_result) {
691         debugFatal("Could not syncStartAll...\n");
692         return false;
693     }
694     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
695
696     return true;
697 }
698
699 /**
700  * @brief Waits until the next period of samples is ready
701  *
702  * This function does not return until a full period of samples is (or should be)
703  * ready to be transferred.
704  *
705  * @return true if the period is ready, false if an xrun occurred
706  */
707 bool StreamProcessorManager::waitForPeriod() {
708     if(m_SyncSource == NULL) return false;
709     bool xrun_occurred = false;
710     bool period_not_ready = true;
711
712     while(period_not_ready) {
713         debugOutput( DEBUG_LEVEL_VERBOSE, "waiting for period (%d frames in buffer)...\n", m_SyncSource->getBufferFill());
714         bool result;
715         if(m_SyncSource->getType() == StreamProcessor::ePT_Receive) {
716             result = m_SyncSource->waitForConsumePeriod();
717         } else {
718             result = m_SyncSource->waitForProducePeriod();
719         }
720 //         if(!result) {
721 //             debugError("Error waiting for signal\n");
722 //             return false;
723 //         }
724
725         // HACK: this should be solved more elegantly
726         period_not_ready = false;
727         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
728             it != m_ReceiveProcessors.end();
729             ++it ) {
730             bool this_sp_period_ready = (*it)->canConsumePeriod();
731             if (!this_sp_period_ready) {
732                 period_not_ready = true;
733             }
734         }
735         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
736             it != m_TransmitProcessors.end();
737             ++it ) {
738             bool this_sp_period_ready = (*it)->canProducePeriod();
739             if (!this_sp_period_ready) {
740                 period_not_ready = true;
741             }
742         }
743         // check for underruns on the ISO side,
744         // those should make us bail out of the wait loop
745         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
746             it != m_ReceiveProcessors.end();
747             ++it ) {
748             // a xrun has occurred on the Iso side
749             xrun_occurred |= (*it)->xrunOccurred();
750         }
751         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
752             it != m_TransmitProcessors.end();
753             ++it ) {
754             // a xrun has occurred on the Iso side
755             xrun_occurred |= (*it)->xrunOccurred();
756         }
757         if(xrun_occurred) break;
758         // FIXME: make sure we also exit this loop when something else happens (e.g. signal, iso error)
759     }
760
761     if(xrun_occurred) {
762         debugOutput( DEBUG_LEVEL_VERBOSE, "exit due to xrun...\n");
763     }
764
765     // we save the 'ideal' time of the transfer at this point,
766     // because we can have interleaved read - process - write
767     // cycles making that we modify a receiving stream's buffer
768     // before we get to writing.
769     // NOTE: before waitForPeriod() is called again, both the transmit
770     //       and the receive processors should have done their transfer.
771     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
772     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
773         m_time_of_transfer);
774
775     // this is to notify the client of the delay that we introduced by waiting
776     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
777     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", m_delayed_usecs);
778
779 #ifdef DEBUG
780     int rcv_bf=0, xmt_bf=0;
781     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
782         it != m_ReceiveProcessors.end();
783         ++it ) {
784         rcv_bf = (*it)->getBufferFill();
785     }
786     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
787         it != m_TransmitProcessors.end();
788         ++it ) {
789         xmt_bf = (*it)->getBufferFill();
790     }
791     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
792         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
793
794     // check if xruns occurred on the Iso side.
795     // also check if xruns will occur should we transfer() now
796     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
797           it != m_ReceiveProcessors.end();
798           ++it ) {
799
800         if ((*it)->xrunOccurred()) {
801             debugWarning("Xrun on RECV SP %p due to ISO side xrun\n",*it);
802             (*it)->dumpInfo();
803         }
804         if (!((*it)->canClientTransferFrames(m_period))) {
805             debugWarning("Xrun on RECV SP %p due to buffer side xrun\n",*it);
806             (*it)->dumpInfo();
807         }
808     }
809     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
810           it != m_TransmitProcessors.end();
811           ++it ) {
812         if ((*it)->xrunOccurred()) {
813             debugWarning("Xrun on XMIT SP %p due to ISO side xrun\n",*it);
814         }
815         if (!((*it)->canClientTransferFrames(m_period))) {
816             debugWarning("Xrun on XMIT SP %p due to buffer side xrun\n",*it);
817         }
818     }
819 #endif
820
821     m_nbperiods++;
822     // now we can signal the client that we are (should be) ready
823     return !xrun_occurred;
824 }
825
826 /**
827  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
828  *
829  * Transfers one period of frames from the client side to the Iso side and vice versa.
830  *
831  * @return true if successful, false otherwise (indicates xrun).
832  */
833 bool StreamProcessorManager::transfer() {
834     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
835     bool retval=true;
836     retval &= transfer(StreamProcessor::ePT_Receive);
837     retval &= transfer(StreamProcessor::ePT_Transmit);
838     return retval;
839 }
840
841 /**
842  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
843  *
844  * Transfers one period of frames from the client side to the Iso side or vice versa.
845  *
846  * @param t The processor type to tranfer for (receive or transmit)
847  * @return true if successful, false otherwise (indicates xrun).
848  */
849 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
850     if(m_SyncSource == NULL) return false;
851     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
852         t, m_time_of_transfer,
853         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
854         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
855         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
856
857     bool retval = true;
858     // a static cast could make sure that there is no performance
859     // penalty for the virtual functions (to be checked)
860     if (t==StreamProcessor::ePT_Receive) {
861         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
862                 it != m_ReceiveProcessors.end();
863                 ++it ) {
864             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
865                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
866                             m_period, m_time_of_transfer,*it);
867                 retval &= false; // buffer underrun
868             }
869         }
870     } else {
871         // FIXME: in the SPM it would be nice to have system time instead of
872         //        1394 time
873         float rate = m_SyncSource->getTicksPerFrame();
874         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
875
876         // the data we are putting into the buffer is intended to be transmitted
877         // one ringbuffer size after it has been received
878         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
879
880         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
881                 it != m_TransmitProcessors.end();
882                 ++it ) {
883             // FIXME: in the SPM it would be nice to have system time instead of
884             //        1394 time
885             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
886                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
887                         m_period, transmit_timestamp, *it);
888                 retval &= false; // buffer underrun
889             }
890         }
891     }
892     return retval;
893 }
894
895 /**
896  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
897  *
898  * Transfers one period of silence to the Iso side for transmit SP's
899  * or dump one period of frames for receive SP's
900  *
901  * @return true if successful, false otherwise (indicates xrun).
902  */
903 bool StreamProcessorManager::transferSilence() {
904     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
905     bool retval=true;
906     retval &= transferSilence(StreamProcessor::ePT_Receive);
907     retval &= transferSilence(StreamProcessor::ePT_Transmit);
908     return retval;
909 }
910
911 /**
912  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
913  *
914  * Transfers one period of silence to the Iso side for transmit SP's
915  * or dump one period of frames for receive SP's
916  *
917  * @param t The processor type to tranfer for (receive or transmit)
918  * @return true if successful, false otherwise (indicates xrun).
919  */
920 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
921     if(m_SyncSource == NULL) return false;
922     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
923         t, m_time_of_transfer,
924         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
925         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
926         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
927
928     bool retval = true;
929     // a static cast could make sure that there is no performance
930     // penalty for the virtual functions (to be checked)
931     if (t==StreamProcessor::ePT_Receive) {
932         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
933                 it != m_ReceiveProcessors.end();
934                 ++it ) {
935             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
936                     debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n",
937                             m_period, m_time_of_transfer,*it);
938                 retval &= false; // buffer underrun
939             }
940         }
941     } else {
942         // FIXME: in the SPM it would be nice to have system time instead of
943         //        1394 time
944         float rate = m_SyncSource->getTicksPerFrame();
945         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
946
947         // the data we are putting into the buffer is intended to be transmitted
948         // one ringbuffer size after it has been received
949         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
950
951         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
952                 it != m_TransmitProcessors.end();
953                 ++it ) {
954             // FIXME: in the SPM it would be nice to have system time instead of
955             //        1394 time
956             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
957                 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
958                         m_period, transmit_timestamp, *it);
959                 retval &= false; // buffer underrun
960             }
961         }
962     }
963     return retval;
964 }
965
966 void StreamProcessorManager::dumpInfo() {
967     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
968     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
969     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
970     debugOutputShort( DEBUG_LEVEL_NORMAL, "Data type: %s\n", (m_audio_datatype==eADT_Float?"float":"int24"));
971
972     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
973     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
974         it != m_ReceiveProcessors.end();
975         ++it ) {
976         (*it)->dumpInfo();
977     }
978
979     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
980     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
981         it != m_TransmitProcessors.end();
982         ++it ) {
983         (*it)->dumpInfo();
984     }
985
986     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
987
988 }
989
990 void StreamProcessorManager::setVerboseLevel(int l) {
991     setDebugLevel(l);
992
993     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
994     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
995         it != m_ReceiveProcessors.end();
996         ++it ) {
997         (*it)->setVerboseLevel(l);
998     }
999
1000     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
1001     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1002         it != m_TransmitProcessors.end();
1003         ++it ) {
1004         (*it)->setVerboseLevel(l);
1005     }
1006 }
1007
1008
1009 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1010     int count=0;
1011
1012     if (direction == Port::E_Capture) {
1013         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1014             it != m_ReceiveProcessors.end();
1015             ++it ) {
1016             count += (*it)->getPortCount(type);
1017         }
1018     } else {
1019         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1020             it != m_TransmitProcessors.end();
1021             ++it ) {
1022             count += (*it)->getPortCount(type);
1023         }
1024     }
1025     return count;
1026 }
1027
1028 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1029     int count=0;
1030
1031     if (direction == Port::E_Capture) {
1032         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1033             it != m_ReceiveProcessors.end();
1034             ++it ) {
1035             count += (*it)->getPortCount();
1036         }
1037     } else {
1038         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1039             it != m_TransmitProcessors.end();
1040             ++it ) {
1041             count += (*it)->getPortCount();
1042         }
1043     }
1044     return count;
1045 }
1046
1047 // TODO: implement a port map here, instead of the loop
1048
1049 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1050     int count=0;
1051     int prevcount=0;
1052
1053     if (direction == Port::E_Capture) {
1054         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1055             it != m_ReceiveProcessors.end();
1056             ++it ) {
1057             count += (*it)->getPortCount();
1058             if (count > idx) {
1059                 return (*it)->getPortAtIdx(idx-prevcount);
1060             }
1061             prevcount=count;
1062         }
1063     } else {
1064         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1065             it != m_TransmitProcessors.end();
1066             ++it ) {
1067             count += (*it)->getPortCount();
1068             if (count > idx) {
1069                 return (*it)->getPortAtIdx(idx-prevcount);
1070             }
1071             prevcount=count;
1072         }
1073     }
1074     return NULL;
1075 }
1076
1077 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1078     m_thread_realtime=rt;
1079     m_thread_priority=priority;
1080     return true;
1081 }
1082
1083
1084 } // end of namespace
Note: See TracBrowser for help on using the browser.