root/trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

Revision 841, 39.6 kB (checked in by ppalmers, 16 years ago)

fix single ISO thread operation (1394 stack seems to be thread-unsafe)

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23 #include "config.h"
24
25 #include "StreamProcessorManager.h"
26 #include "generic/StreamProcessor.h"
27 #include "generic/Port.h"
28 #include "libieee1394/cycletimer.h"
29
30 #include "libutil/Time.h"
31
32 #include <errno.h>
33 #include <assert.h>
34 #include <math.h>
35
36 namespace Streaming {
37
38 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
39
40 StreamProcessorManager::StreamProcessorManager()
41     : m_is_slave( false )
42     , m_SyncSource(NULL)
43     , m_xrun_happened( false )
44     , m_nb_buffers( 0 )
45     , m_period( 0 )
46     , m_audio_datatype( eADT_Float )
47     , m_nominal_framerate ( 0 )
48     , m_xruns(0)
49     , m_nbperiods(0)
50 {
51     addOption(Util::OptionContainer::Option("slaveMode",false));
52 }
53
54 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int framerate, unsigned int nb_buffers)
55     : m_is_slave( false )
56     , m_SyncSource(NULL)
57     , m_xrun_happened( false )
58     , m_nb_buffers(nb_buffers)
59     , m_period(period)
60     , m_audio_datatype( eADT_Float )
61     , m_nominal_framerate ( framerate )
62     , m_xruns(0)
63     , m_nbperiods(0)
64 {
65     addOption(Util::OptionContainer::Option("slaveMode",false));
66 }
67
68 StreamProcessorManager::~StreamProcessorManager() {
69 }
70
71 /**
72  * Registers \ref processor with this manager.
73  *
74  * also registers it with the isohandlermanager
75  *
76  * be sure to call isohandlermanager->init() first!
77  * and be sure that the processors are also ->init()'ed
78  *
79  * @param processor
80  * @return true if successfull
81  */
82 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
83 {
84     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
85     assert(processor);
86     if (processor->getType() == StreamProcessor::ePT_Receive) {
87         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
88         m_ReceiveProcessors.push_back(processor);
89         return true;
90     }
91
92     if (processor->getType() == StreamProcessor::ePT_Transmit) {
93         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
94         m_TransmitProcessors.push_back(processor);
95         return true;
96     }
97
98     debugFatal("Unsupported processor type!\n");
99     return false;
100 }
101
102 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
103 {
104     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
105     assert(processor);
106
107     if (processor->getType()==StreamProcessor::ePT_Receive) {
108
109         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
110               it != m_ReceiveProcessors.end();
111               ++it )
112         {
113             if ( *it == processor ) {
114                 if (*it == m_SyncSource) {
115                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
116                     m_SyncSource = NULL;
117                 }
118                 m_ReceiveProcessors.erase(it);
119                 return true;
120             }
121         }
122     }
123
124     if (processor->getType()==StreamProcessor::ePT_Transmit) {
125         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
126               it != m_TransmitProcessors.end();
127               ++it )
128         {
129             if ( *it == processor ) {
130                 if (*it == m_SyncSource) {
131                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
132                     m_SyncSource = NULL;
133                 }
134                 m_TransmitProcessors.erase(it);
135                 return true;
136             }
137         }
138     }
139
140     debugFatal("Processor (%p) not found!\n",processor);
141     return false; //not found
142 }
143
144 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
145     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
146     m_SyncSource=s;
147     return true;
148 }
149
150 bool StreamProcessorManager::prepare() {
151
152     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
153
154     m_is_slave=false;
155     if(!getOption("slaveMode", m_is_slave)) {
156         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
157     }
158
159     // if no sync source is set, select one here
160     if(m_SyncSource == NULL) {
161        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
162     }
163
164     // FIXME: put into separate method
165     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
166           it != m_ReceiveProcessors.end();
167           ++it )
168     {
169         if(m_SyncSource == NULL) {
170             debugWarning(" => Sync Source is %p.\n", *it);
171             m_SyncSource = *it;
172         }
173     }
174     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
175           it != m_TransmitProcessors.end();
176           ++it )
177     {
178         if(m_SyncSource == NULL) {
179             debugWarning(" => Sync Source is %p.\n", *it);
180             m_SyncSource = *it;
181         }
182     }
183
184     // now do the actual preparation of the SP's
185     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
186     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
187         it != m_ReceiveProcessors.end();
188         ++it ) {
189
190         if(!(*it)->setOption("slaveMode", m_is_slave)) {
191             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
192         }
193
194         if(!(*it)->prepare()) {
195             debugFatal(  " could not prepare (%p)...\n",(*it));
196             return false;
197         }
198     }
199     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
200     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
201         it != m_TransmitProcessors.end();
202         ++it ) {
203         if(!(*it)->setOption("slaveMode", m_is_slave)) {
204             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
205         }
206         if(!(*it)->prepare()) {
207             debugFatal( " could not prepare (%p)...\n",(*it));
208             return false;
209         }
210     }
211
212     // if there are no stream processors registered,
213     // fail
214     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
215         debugFatal("No stream processors registered, can't do anything usefull\n");
216         return false;
217     }
218     return true;
219 }
220
221 bool StreamProcessorManager::startDryRunning() {
222     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
223     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
224     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
225             it != m_TransmitProcessors.end();
226             ++it ) {
227         if (!(*it)->isDryRunning()) {
228             if(!(*it)->scheduleStartDryRunning(-1)) {
229                 debugError("Could not put SP %p into the dry-running state\n", *it);
230                 return false;
231             }
232         } else {
233             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
234         }
235     }
236     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
237             it != m_ReceiveProcessors.end();
238             ++it ) {
239         if (!(*it)->isDryRunning()) {
240             if(!(*it)->scheduleStartDryRunning(-1)) {
241                 debugError("Could not put SP %p into the dry-running state\n", *it);
242                 return false;
243             }
244         } else {
245             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
246         }
247     }
248     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
249     // wait for the syncsource to start running.
250     // that will block the waitForPeriod call until everyone has started (theoretically)
251     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_DRYRUN; // by then it should have started
252     bool all_dry_running = false;
253     while (!all_dry_running && cnt) {
254         all_dry_running = true;
255         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
256                 it != m_ReceiveProcessors.end();
257                 ++it ) {
258             all_dry_running &= (*it)->isDryRunning();
259         }
260         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
261                 it != m_TransmitProcessors.end();
262                 ++it ) {
263             all_dry_running &= (*it)->isDryRunning();
264         }
265
266         SleepRelativeUsec(125);
267         cnt--;
268     }
269     if(cnt==0) {
270         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
271         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
272                 it != m_ReceiveProcessors.end();
273                 ++it ) {
274             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
275                 (*it)->getTypeString(), *it, (*it)->getStateString());
276         }
277         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
278                 it != m_TransmitProcessors.end();
279                 ++it ) {
280             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
281                 (*it)->getTypeString(), *it, (*it)->getStateString());
282         }
283         return false;
284     }
285     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
286     return true;
287 }
288
289 bool StreamProcessorManager::syncStartAll() {
290     if(m_SyncSource == NULL) return false;
291     // figure out when to get the SP's running.
292     // the xmit SP's should also know the base timestamp
293     // streams should be aligned here
294
295     // now find out how long we have to delay the wait operation such that
296     // the received frames will all be presented to the SP
297     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
298     int max_of_min_delay = 0;
299     int min_delay = 0;
300     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
301             it != m_ReceiveProcessors.end();
302             ++it ) {
303         min_delay = (*it)->getMaxFrameLatency();
304         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
305     }
306
307     // add some processing margin. This only shifts the time
308     // at which the buffer is transfer()'ed. This makes things somewhat
309     // more robust. It should be noted though that shifting the transfer
310     // time to a later time instant also causes the xmit buffer fill to be
311     // lower on average.
312     max_of_min_delay += STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS;
313     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n",
314         max_of_min_delay,
315         (unsigned int)TICKS_TO_SECS(max_of_min_delay),
316         (unsigned int)TICKS_TO_CYCLES(max_of_min_delay),
317         (unsigned int)TICKS_TO_OFFSET(max_of_min_delay));
318     m_SyncSource->setSyncDelay(max_of_min_delay);
319
320     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
321     //        have aquired lock
322     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
323     //sleep(2); // FIXME: be smarter here
324
325     // make sure that we are dry-running long enough for the
326     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
327     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
328    
329     unsigned int nb_sync_runs = (STREAMPROCESSORMANAGER_SYNC_WAIT_TIME_MSEC * getNominalRate());
330     nb_sync_runs /= 1000;
331     nb_sync_runs /= getPeriodSize();
332
333     int64_t time_till_next_period;
334     while(nb_sync_runs--) { // or while not sync-ed?
335         // check if we were woken up too soon
336         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
337         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
338         if(time_till_next_period > 0) {
339             // wait for the period
340             SleepRelativeUsec(time_till_next_period);
341         }
342     }
343
344     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
345     // FIXME: in the SPM it would be nice to have system time instead of
346     //        1394 time
347
348     // we now should have decent sync info on the sync source
349     // determine a point in time where the system should start
350     // figure out where we are now
351     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
352     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
353         time_of_first_sample,
354         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
355         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
356         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
357
358     // start wet-running in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles
359     // this is the time window we have to setup all SP's such that they
360     // can start wet-running correctly.
361     time_of_first_sample = addTicks(time_of_first_sample,
362                                     STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP * TICKS_PER_CYCLE);
363
364     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
365         time_of_first_sample,
366         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
367         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
368         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
369
370     // we should start wet-running the transmit SP's some cycles in advance
371     // such that we know it is wet-running when it should output its first sample
372     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
373                                                  STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE);
374
375     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
376                                                  STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE);
377     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
378         time_to_start_xmit,
379         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
380         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
381         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
382     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
383         time_to_start_recv,
384         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
385         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
386         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
387
388     // at this point the buffer head timestamp of the transmit buffers can be set
389     // this is the presentation time of the first sample in the buffer
390     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
391           it != m_TransmitProcessors.end();
392           ++it ) {
393         (*it)->setBufferHeadTimestamp(time_of_first_sample);
394     }
395
396     // STEP X: switch SP's over to the running state
397     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
398           it != m_ReceiveProcessors.end();
399           ++it ) {
400         if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
401             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
402             return false;
403         }
404     }
405     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
406           it != m_TransmitProcessors.end();
407           ++it ) {
408         if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
409             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
410             return false;
411         }
412     }
413     // wait for the syncsource to start running.
414     // that will block the waitForPeriod call until everyone has started (theoretically)
415     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP * 20; // by then it should have started
416     while (!m_SyncSource->isRunning() && cnt) {
417         SleepRelativeUsec(125);
418         cnt--;
419     }
420     if(cnt==0) {
421         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
422         return false;
423     }
424
425     if(!alignReceivedStreams()) {
426         debugError("Could not align streams...\n");
427         return false;
428     }
429
430     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
431     return true;
432 }
433
434 bool
435 StreamProcessorManager::alignReceivedStreams()
436 {
437     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
438     unsigned int nb_sync_runs;
439     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
440     int64_t diff_between_streams[nb_rcv_sp];
441     int64_t diff;
442
443     unsigned int i;
444
445     unsigned int periods_per_align_try = (STREAMPROCESSORMANAGER_ALIGN_AVERAGE_TIME_MSEC * getNominalRate());
446     periods_per_align_try /= 1000;
447     periods_per_align_try /= getPeriodSize();
448     debugOutput( DEBUG_LEVEL_VERBOSE, " averaging over %u periods...\n", periods_per_align_try);
449
450     bool aligned = false;
451     int cnt = STREAMPROCESSORMANAGER_NB_ALIGN_TRIES;
452     while (!aligned && cnt--) {
453         nb_sync_runs = periods_per_align_try;
454         while(nb_sync_runs) {
455             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
456             if(!waitForPeriod()) {
457                 debugWarning("xrun while aligning streams...\n");
458                 return false;
459             };
460
461             i = 0;
462             for ( i = 0; i < nb_rcv_sp; i++) {
463                 StreamProcessor *s = m_ReceiveProcessors.at(i);
464                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
465                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",
466                     m_SyncSource, s, diff);
467                 if ( nb_sync_runs == periods_per_align_try ) {
468                     diff_between_streams[i] = diff;
469                 } else {
470                     diff_between_streams[i] += diff;
471                 }
472             }
473             if(!transferSilence()) {
474                 debugError("Could not transfer silence\n");
475                 return false;
476             }
477             nb_sync_runs--;
478         }
479         // calculate the average offsets
480         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
481         int diff_between_streams_frames[nb_rcv_sp];
482         aligned = true;
483         for ( i = 0; i < nb_rcv_sp; i++) {
484             StreamProcessor *s = m_ReceiveProcessors.at(i);
485
486             diff_between_streams[i] /= periods_per_align_try;
487             diff_between_streams_frames[i] = (int)roundf(diff_between_streams[i] / s->getTicksPerFrame());
488             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",
489                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
490
491             aligned &= (diff_between_streams_frames[i] == 0);
492
493             // reposition the stream
494             if(!s->shiftStream(diff_between_streams_frames[i])) {
495                 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]);
496                 return false;
497             }
498         }
499         if (!aligned) {
500             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
501         }
502     }
503     if (cnt == 0) {
504         debugError("Align failed\n");
505         return false;
506     }
507     return true;
508 }
509
510 bool StreamProcessorManager::start() {
511     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
512
513     // start all SP's synchonized
514     bool start_result = false;
515     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
516         // put all SP's into dry-running state
517         if (!startDryRunning()) {
518             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
519             start_result = false;
520             continue;
521         }
522
523         start_result = syncStartAll();
524         if(start_result) {
525             break;
526         } else {
527             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
528         }
529     }
530     if (!start_result) {
531         debugFatal("Could not syncStartAll...\n");
532         return false;
533     }
534
535     return true;
536 }
537
538 bool StreamProcessorManager::stop() {
539     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
540
541     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
542     // switch SP's over to the dry-running state
543     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
544           it != m_ReceiveProcessors.end();
545           ++it ) {
546         if((*it)->isRunning()) {
547             if(!(*it)->scheduleStopRunning(-1)) {
548                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
549                 return false;
550             }
551         }
552     }
553     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
554           it != m_TransmitProcessors.end();
555           ++it ) {
556         if((*it)->isRunning()) {
557             if(!(*it)->scheduleStopRunning(-1)) {
558                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
559                 return false;
560             }
561         }
562     }
563     // wait for the SP's to get into the dry-running/stopped state
564     int cnt = 2000;
565     bool ready = false;
566     while (!ready && cnt) {
567         ready = true;
568         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
569             it != m_ReceiveProcessors.end();
570             ++it ) {
571             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream());
572         }
573         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
574             it != m_TransmitProcessors.end();
575             ++it ) {
576             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream());
577         }
578         SleepRelativeUsec(125);
579         cnt--;
580     }
581     if(cnt==0) {
582         debugWarning(" Timeout waiting for the SP's to start dry-running\n");
583         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
584             it != m_ReceiveProcessors.end();
585             ++it ) {
586             (*it)->dumpInfo();
587         }
588         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
589             it != m_TransmitProcessors.end();
590             ++it ) {
591             (*it)->dumpInfo();
592         }
593         return false;
594     }
595
596     // switch SP's over to the stopped state
597     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
598           it != m_ReceiveProcessors.end();
599           ++it ) {
600         if(!(*it)->scheduleStopDryRunning(-1)) {
601             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
602             return false;
603         }
604     }
605     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
606           it != m_TransmitProcessors.end();
607           ++it ) {
608         if(!(*it)->scheduleStopDryRunning(-1)) {
609             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
610             return false;
611         }
612     }
613     // wait for the SP's to get into the stopped state
614     cnt = 2000;
615     ready = false;
616     while (!ready && cnt) {
617         ready = true;
618         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
619             it != m_ReceiveProcessors.end();
620             ++it ) {
621             ready &= (*it)->isStopped();
622         }
623         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
624             it != m_TransmitProcessors.end();
625             ++it ) {
626             ready &= (*it)->isStopped();
627         }
628         SleepRelativeUsec(125);
629         cnt--;
630     }
631     if(cnt==0) {
632         debugWarning(" Timeout waiting for the SP's to stop\n");
633         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
634             it != m_ReceiveProcessors.end();
635             ++it ) {
636             (*it)->dumpInfo();
637         }
638         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
639             it != m_TransmitProcessors.end();
640             ++it ) {
641             (*it)->dumpInfo();
642         }
643         return false;
644     }
645     return true;
646 }
647
648 /**
649  * Called upon Xrun events. This brings all StreamProcessors back
650  * into their starting state, and then carries on streaming. This should
651  * have the same effect as restarting the whole thing.
652  *
653  * @return true if successful, false otherwise
654  */
655 bool StreamProcessorManager::handleXrun() {
656
657     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
658
659     dumpInfo();
660
661     /*
662      * Reset means:
663      * 1) Disabling the SP's, so that they don't process any packets
664      *    note: the isomanager does keep on delivering/requesting them
665      * 2) Bringing all buffers & streamprocessors into a know state
666      *    - Clear all capture buffers
667      *    - Put nb_periods*period_size of null frames into the playback buffers
668      * 3) Re-enable the SP's
669      */
670
671     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
672     // start all SP's synchonized
673     bool start_result = false;
674     for (int ntries; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
675         // put all SP's into dry-running state
676         if (!startDryRunning()) {
677             debugShowBackLog();
678             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
679             start_result = false;
680             continue;
681         }
682
683         start_result = syncStartAll();
684         if(start_result) {
685             break;
686         } else {
687             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
688         }
689     }
690     if (!start_result) {
691         debugFatal("Could not syncStartAll...\n");
692         return false;
693     }
694     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
695
696     return true;
697 }
698
699 /**
700  * @brief Waits until the next period of samples is ready
701  *
702  * This function does not return until a full period of samples is (or should be)
703  * ready to be transferred.
704  *
705  * @return true if the period is ready, false if an xrun occurred
706  */
707 bool StreamProcessorManager::waitForPeriod() {
708     if(m_SyncSource == NULL) return false;
709     bool xrun_occurred = false;
710     bool period_not_ready = true;
711
712     while(period_not_ready) {
713         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for period (%d frames in buffer)...\n", m_SyncSource->getBufferFill());
714         if(!m_SyncSource->waitForSignal()) {
715             debugError("Error waiting for signal\n");
716             return false;
717         }
718
719         // HACK: this should be solved more elegantly
720         period_not_ready = false;
721         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
722             it != m_ReceiveProcessors.end();
723             ++it ) {
724             bool this_sp_period_ready = (*it)->canClientTransferFrames(m_period);
725             if (!this_sp_period_ready) {
726                 period_not_ready = true;
727             }
728         }
729         // check for underruns on the ISO side,
730         // those should make us bail out of the wait loop
731         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
732             it != m_ReceiveProcessors.end();
733             ++it ) {
734             // a xrun has occurred on the Iso side
735             xrun_occurred |= (*it)->xrunOccurred();
736         }
737         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
738             it != m_TransmitProcessors.end();
739             ++it ) {
740             // a xrun has occurred on the Iso side
741             xrun_occurred |= (*it)->xrunOccurred();
742         }
743         if(xrun_occurred) break;
744         // FIXME: make sure we also exit this loop when something else happens (e.g. signal, iso error)
745     }
746
747     // we save the 'ideal' time of the transfer at this point,
748     // because we can have interleaved read - process - write
749     // cycles making that we modify a receiving stream's buffer
750     // before we get to writing.
751     // NOTE: before waitForPeriod() is called again, both the transmit
752     //       and the receive processors should have done their transfer.
753     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
754     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
755         m_time_of_transfer);
756
757     // this is to notify the client of the delay that we introduced by waiting
758     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
759     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", m_delayed_usecs);
760
761 #ifdef DEBUG
762     int rcv_bf=0, xmt_bf=0;
763     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
764         it != m_ReceiveProcessors.end();
765         ++it ) {
766         rcv_bf = (*it)->getBufferFill();
767     }
768     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
769         it != m_TransmitProcessors.end();
770         ++it ) {
771         xmt_bf = (*it)->getBufferFill();
772     }
773     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
774         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
775
776     // check if xruns occurred on the Iso side.
777     // also check if xruns will occur should we transfer() now
778     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
779           it != m_ReceiveProcessors.end();
780           ++it ) {
781
782         if ((*it)->xrunOccurred()) {
783             debugWarning("Xrun on RECV SP %p due to ISO side xrun\n",*it);
784             (*it)->dumpInfo();
785         }
786         if (!((*it)->canClientTransferFrames(m_period))) {
787             debugWarning("Xrun on RECV SP %p due to buffer side xrun\n",*it);
788             (*it)->dumpInfo();
789         }
790     }
791     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
792           it != m_TransmitProcessors.end();
793           ++it ) {
794         if ((*it)->xrunOccurred()) {
795             debugWarning("Xrun on XMIT SP %p due to ISO side xrun\n",*it);
796         }
797         if (!((*it)->canClientTransferFrames(m_period))) {
798             debugWarning("Xrun on XMIT SP %p due to buffer side xrun\n",*it);
799         }
800     }
801 #endif
802
803     m_nbperiods++;
804     // now we can signal the client that we are (should be) ready
805     return !xrun_occurred;
806 }
807
808 /**
809  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
810  *
811  * Transfers one period of frames from the client side to the Iso side and vice versa.
812  *
813  * @return true if successful, false otherwise (indicates xrun).
814  */
815 bool StreamProcessorManager::transfer() {
816     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
817     bool retval=true;
818     retval &= transfer(StreamProcessor::ePT_Receive);
819     retval &= transfer(StreamProcessor::ePT_Transmit);
820     return retval;
821 }
822
823 /**
824  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
825  *
826  * Transfers one period of frames from the client side to the Iso side or vice versa.
827  *
828  * @param t The processor type to tranfer for (receive or transmit)
829  * @return true if successful, false otherwise (indicates xrun).
830  */
831 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
832     if(m_SyncSource == NULL) return false;
833     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
834         t, m_time_of_transfer,
835         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
836         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
837         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
838
839     bool retval = true;
840     // a static cast could make sure that there is no performance
841     // penalty for the virtual functions (to be checked)
842     if (t==StreamProcessor::ePT_Receive) {
843         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
844                 it != m_ReceiveProcessors.end();
845                 ++it ) {
846             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
847                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
848                             m_period, m_time_of_transfer,*it);
849                 retval &= false; // buffer underrun
850             }
851         }
852     } else {
853         // FIXME: in the SPM it would be nice to have system time instead of
854         //        1394 time
855         float rate = m_SyncSource->getTicksPerFrame();
856         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
857
858         // the data we are putting into the buffer is intended to be transmitted
859         // one ringbuffer size after it has been received
860         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
861
862         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
863                 it != m_TransmitProcessors.end();
864                 ++it ) {
865             // FIXME: in the SPM it would be nice to have system time instead of
866             //        1394 time
867             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
868                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
869                         m_period, transmit_timestamp, *it);
870                 retval &= false; // buffer underrun
871             }
872         }
873     }
874     return retval;
875 }
876
877 /**
878  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
879  *
880  * Transfers one period of silence to the Iso side for transmit SP's
881  * or dump one period of frames for receive SP's
882  *
883  * @return true if successful, false otherwise (indicates xrun).
884  */
885 bool StreamProcessorManager::transferSilence() {
886     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
887     bool retval=true;
888     retval &= transferSilence(StreamProcessor::ePT_Receive);
889     retval &= transferSilence(StreamProcessor::ePT_Transmit);
890     return retval;
891 }
892
893 /**
894  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
895  *
896  * Transfers one period of silence to the Iso side for transmit SP's
897  * or dump one period of frames for receive SP's
898  *
899  * @param t The processor type to tranfer for (receive or transmit)
900  * @return true if successful, false otherwise (indicates xrun).
901  */
902 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
903     if(m_SyncSource == NULL) return false;
904     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
905         t, m_time_of_transfer,
906         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
907         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
908         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
909
910     bool retval = true;
911     // a static cast could make sure that there is no performance
912     // penalty for the virtual functions (to be checked)
913     if (t==StreamProcessor::ePT_Receive) {
914         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
915                 it != m_ReceiveProcessors.end();
916                 ++it ) {
917             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
918                     debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n",
919                             m_period, m_time_of_transfer,*it);
920                 retval &= false; // buffer underrun
921             }
922         }
923     } else {
924         // FIXME: in the SPM it would be nice to have system time instead of
925         //        1394 time
926         float rate = m_SyncSource->getTicksPerFrame();
927         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
928
929         // the data we are putting into the buffer is intended to be transmitted
930         // one ringbuffer size after it has been received
931         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
932
933         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
934                 it != m_TransmitProcessors.end();
935                 ++it ) {
936             // FIXME: in the SPM it would be nice to have system time instead of
937             //        1394 time
938             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
939                 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
940                         m_period, transmit_timestamp, *it);
941                 retval &= false; // buffer underrun
942             }
943         }
944     }
945     return retval;
946 }
947
948 void StreamProcessorManager::dumpInfo() {
949     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
950     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
951     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
952     debugOutputShort( DEBUG_LEVEL_NORMAL, "Data type: %s\n", (m_audio_datatype==eADT_Float?"float":"int24"));
953
954     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
955     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
956         it != m_ReceiveProcessors.end();
957         ++it ) {
958         (*it)->dumpInfo();
959     }
960
961     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
962     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
963         it != m_TransmitProcessors.end();
964         ++it ) {
965         (*it)->dumpInfo();
966     }
967
968     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
969
970 }
971
972 void StreamProcessorManager::setVerboseLevel(int l) {
973     setDebugLevel(l);
974
975     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
976     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
977         it != m_ReceiveProcessors.end();
978         ++it ) {
979         (*it)->setVerboseLevel(l);
980     }
981
982     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
983     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
984         it != m_TransmitProcessors.end();
985         ++it ) {
986         (*it)->setVerboseLevel(l);
987     }
988 }
989
990
991 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
992     int count=0;
993
994     if (direction == Port::E_Capture) {
995         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
996             it != m_ReceiveProcessors.end();
997             ++it ) {
998             count += (*it)->getPortCount(type);
999         }
1000     } else {
1001         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1002             it != m_TransmitProcessors.end();
1003             ++it ) {
1004             count += (*it)->getPortCount(type);
1005         }
1006     }
1007     return count;
1008 }
1009
1010 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1011     int count=0;
1012
1013     if (direction == Port::E_Capture) {
1014         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1015             it != m_ReceiveProcessors.end();
1016             ++it ) {
1017             count += (*it)->getPortCount();
1018         }
1019     } else {
1020         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1021             it != m_TransmitProcessors.end();
1022             ++it ) {
1023             count += (*it)->getPortCount();
1024         }
1025     }
1026     return count;
1027 }
1028
1029 // TODO: implement a port map here, instead of the loop
1030
1031 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1032     int count=0;
1033     int prevcount=0;
1034
1035     if (direction == Port::E_Capture) {
1036         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1037             it != m_ReceiveProcessors.end();
1038             ++it ) {
1039             count += (*it)->getPortCount();
1040             if (count > idx) {
1041                 return (*it)->getPortAtIdx(idx-prevcount);
1042             }
1043             prevcount=count;
1044         }
1045     } else {
1046         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1047             it != m_TransmitProcessors.end();
1048             ++it ) {
1049             count += (*it)->getPortCount();
1050             if (count > idx) {
1051                 return (*it)->getPortAtIdx(idx-prevcount);
1052             }
1053             prevcount=count;
1054         }
1055     }
1056     return NULL;
1057 }
1058
1059 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1060     m_thread_realtime=rt;
1061     m_thread_priority=priority;
1062     return true;
1063 }
1064
1065
1066 } // end of namespace
Note: See TracBrowser for help on using the browser.