root/trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

Revision 1005, 47.8 kB (checked in by ppalmers, 13 years ago)

Improve thread synchronisation. Switch back to separate threads for transmit and
receive since it is not possible to statically schedule things properly. One
of the threads (i.e. the client thread) is out of our control, hence it's
execution can't be controlled. Using separate threads and correct priorities
will shift this problem to the OS. Note that the priority of the packet
receive thread should be lower than the client thread (such that the client
thread is woken ASAP), and the priority of the transmit thread should be
higher than the client thread (such that packets are queued ASAP).
Extra benefit: multi-cores are used.

Some other startup improvements.

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23 #include "config.h"
24
25 #include "StreamProcessorManager.h"
26 #include "generic/StreamProcessor.h"
27 #include "generic/Port.h"
28 #include "libieee1394/cycletimer.h"
29
30 #include "libutil/Time.h"
31
32 #include <errno.h>
33 #include <assert.h>
34 #include <math.h>
35
36 namespace Streaming {
37
38 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
39
40 StreamProcessorManager::StreamProcessorManager()
41     : m_is_slave( false )
42     , m_SyncSource(NULL)
43     , m_xrun_happened( false )
44     , m_activity_wait_timeout_usec( 1000*1000 )
45     , m_nb_buffers( 0 )
46     , m_period( 0 )
47     , m_audio_datatype( eADT_Float )
48     , m_nominal_framerate ( 0 )
49     , m_xruns(0)
50     , m_shutdown_needed(false)
51     , m_nbperiods(0)
52 {
53     addOption(Util::OptionContainer::Option("slaveMode",false));
54     sem_init(&m_activity_semaphore, 0, 0);
55 }
56
57 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int framerate, unsigned int nb_buffers)
58     : m_is_slave( false )
59     , m_SyncSource(NULL)
60     , m_xrun_happened( false )
61     , m_activity_wait_timeout_usec( 1000*1000 )
62     , m_nb_buffers(nb_buffers)
63     , m_period(period)
64     , m_audio_datatype( eADT_Float )
65     , m_nominal_framerate ( framerate )
66     , m_xruns(0)
67     , m_shutdown_needed(false)
68     , m_nbperiods(0)
69 {
70     addOption(Util::OptionContainer::Option("slaveMode",false));
71     sem_init(&m_activity_semaphore, 0, 0);
72 }
73
74 StreamProcessorManager::~StreamProcessorManager() {
75     sem_post(&m_activity_semaphore);
76     sem_destroy(&m_activity_semaphore);
77 }
78
79 void
80 StreamProcessorManager::handleBusReset()
81 {
82     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) Handle bus reset...\n", this);
83
84     // FIXME: we request shutdown for now.
85     m_shutdown_needed=true;
86
87     // note that all receive streams are gone once a device is unplugged
88
89     // synchronize with the wait lock
90     m_WaitLock.Lock();
91
92     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) got wait lock...\n", this);
93     // cause all SP's to bail out
94     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
95           it != m_ReceiveProcessors.end();
96           ++it )
97     {
98         (*it)->handleBusReset();
99     }
100     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
101           it != m_TransmitProcessors.end();
102           ++it )
103     {
104         (*it)->handleBusReset();
105     }
106
107     m_WaitLock.Unlock();
108 }
109
110 void
111 StreamProcessorManager::signalActivity()
112 {
113     sem_post(&m_activity_semaphore);
114     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p activity\n", this);
115 }
116
117 enum StreamProcessorManager::eActivityResult
118 StreamProcessorManager::waitForActivity()
119 {
120     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p waiting for activity\n", this);
121     struct timespec ts;
122     int result;
123
124     if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
125         debugError("clock_gettime failed\n");
126         return eAR_Error;
127     }
128     long long int timeout_nsec=0;
129     int timeout_sec = 0;
130     if (m_activity_wait_timeout_usec >= 0) {
131         timeout_nsec = m_activity_wait_timeout_usec * 1000LL;
132         timeout_sec = 0;
133         while(timeout_nsec >= 1000000000LL) {
134             timeout_sec += 1;
135             timeout_nsec -= 1000000000LL;
136         }
137         ts.tv_nsec += timeout_nsec;
138         ts.tv_sec += timeout_sec;
139     }
140
141     if (m_activity_wait_timeout_usec >= 0) {
142         result = sem_timedwait(&m_activity_semaphore, &ts);
143     } else {
144         result = sem_wait(&m_activity_semaphore);
145     }
146
147     if(result != 0) {
148         if (result == ETIMEDOUT) {
149             debugOutput(DEBUG_LEVEL_VERBOSE,
150                         "(%p) pthread_cond_timedwait() timed out (result=%d)\n",
151                         this, result);
152             return eAR_Timeout;
153         } else if (result == EINTR) {
154             debugOutput(DEBUG_LEVEL_VERBOSE,
155                         "(%p) pthread_cond_[timed]wait() interrupted by signal (result=%d)\n",
156                         this, result);
157             return eAR_Interrupted;
158         } else {
159             debugError("(%p) pthread_cond_[timed]wait error (result=%d)\n",
160                         this, result);
161             debugError("(%p) timeout_sec=%d timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n",
162                        this, timeout_sec, timeout_nsec, ts.tv_sec, ts.tv_nsec);
163             return eAR_Error;
164         }
165     }
166
167     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p got activity\n", this);
168     return eAR_Activity;
169 }
170
171 /**
172  * Registers \ref processor with this manager.
173  *
174  * also registers it with the isohandlermanager
175  *
176  * be sure to call isohandlermanager->init() first!
177  * and be sure that the processors are also ->init()'ed
178  *
179  * @param processor
180  * @return true if successfull
181  */
182 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
183 {
184     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
185     assert(processor);
186     if (processor->getType() == StreamProcessor::ePT_Receive) {
187         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
188         m_ReceiveProcessors.push_back(processor);
189         return true;
190     }
191
192     if (processor->getType() == StreamProcessor::ePT_Transmit) {
193         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
194         m_TransmitProcessors.push_back(processor);
195         return true;
196     }
197
198     debugFatal("Unsupported processor type!\n");
199     return false;
200 }
201
202 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
203 {
204     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
205     assert(processor);
206
207     if (processor->getType()==StreamProcessor::ePT_Receive) {
208
209         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
210               it != m_ReceiveProcessors.end();
211               ++it )
212         {
213             if ( *it == processor ) {
214                 if (*it == m_SyncSource) {
215                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
216                     m_SyncSource = NULL;
217                 }
218                 m_ReceiveProcessors.erase(it);
219                 return true;
220             }
221         }
222     }
223
224     if (processor->getType()==StreamProcessor::ePT_Transmit) {
225         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
226               it != m_TransmitProcessors.end();
227               ++it )
228         {
229             if ( *it == processor ) {
230                 if (*it == m_SyncSource) {
231                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
232                     m_SyncSource = NULL;
233                 }
234                 m_TransmitProcessors.erase(it);
235                 return true;
236             }
237         }
238     }
239
240     debugFatal("Processor (%p) not found!\n",processor);
241     return false; //not found
242 }
243
244 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
245     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
246     m_SyncSource=s;
247     return true;
248 }
249
250 bool StreamProcessorManager::prepare() {
251
252     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
253     m_is_slave=false;
254     if(!getOption("slaveMode", m_is_slave)) {
255         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
256     }
257
258     m_shutdown_needed=false;
259
260     // if no sync source is set, select one here
261     if(m_SyncSource == NULL) {
262        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
263     }
264
265     // FIXME: put into separate method
266     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
267           it != m_ReceiveProcessors.end();
268           ++it )
269     {
270         if(m_SyncSource == NULL) {
271             debugWarning(" => Sync Source is %p.\n", *it);
272             m_SyncSource = *it;
273         }
274     }
275     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
276           it != m_TransmitProcessors.end();
277           ++it )
278     {
279         if(m_SyncSource == NULL) {
280             debugWarning(" => Sync Source is %p.\n", *it);
281             m_SyncSource = *it;
282         }
283     }
284
285     // now do the actual preparation of the SP's
286     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
287     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
288         it != m_ReceiveProcessors.end();
289         ++it ) {
290
291         if(!(*it)->setOption("slaveMode", m_is_slave)) {
292             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
293         }
294
295         if(!(*it)->prepare()) {
296             debugFatal(  " could not prepare (%p)...\n",(*it));
297             return false;
298         }
299     }
300     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
301     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
302         it != m_TransmitProcessors.end();
303         ++it ) {
304         if(!(*it)->setOption("slaveMode", m_is_slave)) {
305             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
306         }
307         if(!(*it)->prepare()) {
308             debugFatal( " could not prepare (%p)...\n",(*it));
309             return false;
310         }
311     }
312
313     // if there are no stream processors registered,
314     // fail
315     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
316         debugFatal("No stream processors registered, can't do anything usefull\n");
317         return false;
318     }
319     return true;
320 }
321
322 bool StreamProcessorManager::startDryRunning() {
323     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
324     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
325     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
326             it != m_TransmitProcessors.end();
327             ++it ) {
328         if (!(*it)->isDryRunning()) {
329             if(!(*it)->scheduleStartDryRunning(-1)) {
330                 debugError("Could not put SP %p into the dry-running state\n", *it);
331                 return false;
332             }
333         } else {
334             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
335         }
336     }
337     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
338             it != m_ReceiveProcessors.end();
339             ++it ) {
340         if (!(*it)->isDryRunning()) {
341             if(!(*it)->scheduleStartDryRunning(-1)) {
342                 debugError("Could not put SP %p into the dry-running state\n", *it);
343                 return false;
344             }
345         } else {
346             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
347         }
348     }
349     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
350     // wait for the syncsource to start running.
351     // that will block the waitForPeriod call until everyone has started (theoretically)
352     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_DRYRUN; // by then it should have started
353     bool all_dry_running = false;
354     while (!all_dry_running && cnt) {
355         all_dry_running = true;
356         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
357                 it != m_ReceiveProcessors.end();
358                 ++it ) {
359             all_dry_running &= (*it)->isDryRunning();
360         }
361         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
362                 it != m_TransmitProcessors.end();
363                 ++it ) {
364             all_dry_running &= (*it)->isDryRunning();
365         }
366
367         SleepRelativeUsec(125);
368         cnt--;
369     }
370     if(cnt==0) {
371         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
372         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
373                 it != m_ReceiveProcessors.end();
374                 ++it ) {
375             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
376                 (*it)->getTypeString(), *it, (*it)->getStateString());
377         }
378         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
379                 it != m_TransmitProcessors.end();
380                 ++it ) {
381             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
382                 (*it)->getTypeString(), *it, (*it)->getStateString());
383         }
384         return false;
385     }
386     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
387     return true;
388 }
389
390 bool StreamProcessorManager::syncStartAll() {
391     if(m_SyncSource == NULL) return false;
392     // figure out when to get the SP's running.
393     // the xmit SP's should also know the base timestamp
394     // streams should be aligned here
395
396     // now find out how long we have to delay the wait operation such that
397     // the received frames will all be presented to the SP
398     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
399     int max_of_min_delay = 0;
400     int min_delay = 0;
401     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
402             it != m_ReceiveProcessors.end();
403             ++it ) {
404         min_delay = (*it)->getMaxFrameLatency();
405         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
406     }
407
408     // add some processing margin. This only shifts the time
409     // at which the buffer is transfer()'ed. This makes things somewhat
410     // more robust. It should be noted though that shifting the transfer
411     // time to a later time instant also causes the xmit buffer fill to be
412     // lower on average.
413     max_of_min_delay += STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS;
414     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n",
415         max_of_min_delay,
416         (unsigned int)TICKS_TO_SECS(max_of_min_delay),
417         (unsigned int)TICKS_TO_CYCLES(max_of_min_delay),
418         (unsigned int)TICKS_TO_OFFSET(max_of_min_delay));
419     m_SyncSource->setSyncDelay(max_of_min_delay);
420
421     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
422     //        have aquired lock
423     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
424     //sleep(2); // FIXME: be smarter here
425
426     // make sure that we are dry-running long enough for the
427     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
428     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
429    
430     unsigned int nb_sync_runs = (STREAMPROCESSORMANAGER_SYNC_WAIT_TIME_MSEC * getNominalRate());
431     nb_sync_runs /= 1000;
432     nb_sync_runs /= getPeriodSize();
433
434     int64_t time_till_next_period;
435     while(nb_sync_runs--) { // or while not sync-ed?
436         // check if we were woken up too soon
437         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
438         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
439         if(time_till_next_period > 0) {
440             // wait for the period
441             SleepRelativeUsec(time_till_next_period);
442         }
443     }
444
445     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
446     // FIXME: in the SPM it would be nice to have system time instead of
447     //        1394 time
448
449     // we now should have decent sync info on the sync source
450     // determine a point in time where the system should start
451     // figure out where we are now
452     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
453     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
454         time_of_first_sample,
455         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
456         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
457         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
458
459     // start wet-running in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles
460     // this is the time window we have to setup all SP's such that they
461     // can start wet-running correctly.
462     time_of_first_sample = addTicks(time_of_first_sample,
463                                     STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP * TICKS_PER_CYCLE);
464
465     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
466         time_of_first_sample,
467         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
468         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
469         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
470
471     // we should start wet-running the transmit SP's some cycles in advance
472     // such that we know it is wet-running when it should output its first sample
473     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
474                                                  STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE);
475
476     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
477                                                  STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE);
478     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
479         time_to_start_xmit,
480         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
481         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
482         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
483     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
484         time_to_start_recv,
485         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
486         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
487         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
488
489     // at this point the buffer head timestamp of the transmit buffers can be set
490     // this is the presentation time of the first sample in the buffer
491     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
492           it != m_TransmitProcessors.end();
493           ++it ) {
494         (*it)->setBufferHeadTimestamp(time_of_first_sample);
495     }
496
497     // switch syncsource to running state
498     uint64_t time_to_start_sync;
499     // FIXME: this is most likely not going to work for transmit sync sources
500     // but those are unsupported in this version
501     if(m_SyncSource->getType() == StreamProcessor::ePT_Receive ) {
502         time_to_start_sync = time_to_start_recv;
503     } else {
504         time_to_start_sync = time_to_start_xmit;
505     }
506     if(!m_SyncSource->scheduleStartRunning(time_to_start_sync)) {
507         debugError("m_SyncSource->scheduleStartRunning(%11llu) failed\n", time_to_start_sync);
508         return false;
509     }
510
511     // STEP X: switch all non-syncsource SP's over to the running state
512     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
513           it != m_ReceiveProcessors.end();
514           ++it ) {
515         if(*it != m_SyncSource) {
516             if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
517                 debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
518                 return false;
519             }
520         }
521     }
522     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
523           it != m_TransmitProcessors.end();
524           ++it ) {
525         if(*it != m_SyncSource) {
526             if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
527                 debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
528                 return false;
529             }
530         }
531     }
532     // wait for the syncsource to start running.
533     // that will block the waitForPeriod call until everyone has started (theoretically)
534     // note: the SP's are scheduled to start in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles,
535     // so a 20 times this value should be a good timeout
536     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP * 20; // by then it should have started
537     while (!m_SyncSource->isRunning() && cnt) {
538         SleepRelativeUsec(125);
539         cnt--;
540     }
541     if(cnt==0) {
542         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
543         return false;
544     }
545
546     // the sync source is running, we can now read a decent received timestamp from it
547     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
548
549     // and a (still very rough) approximation of the rate
550     float rate = m_SyncSource->getTicksPerFrame();
551     int64_t delay_in_ticks=(int64_t)(((float)((m_nb_buffers-1) * m_period)) * rate);
552     debugOutput( DEBUG_LEVEL_VERBOSE, "  initial time of transfer %010lld, rate %f...\n",
553                 m_time_of_transfer, rate);
554
555     // then use this information to initialize the xmit handlers
556
557     //  we now set the buffer tail timestamp of the transmit buffer
558     //  to the period transfer time instant plus what's nb_buffers - 1
559     //  in ticks. This due to the fact that we (should) have received one period
560     //  worth of ticks at t=m_time_of_transfer
561     //  hence one period of frames should also have been transmitted, which means
562     //  that there should be (nb_buffers - 1) * periodsize of frames in the xmit buffer
563     //  that allows us to calculate the tail timestamp for the buffer.
564
565     int64_t transmit_tail_timestamp = addTicks(m_time_of_transfer, delay_in_ticks);
566
567     debugOutput( DEBUG_LEVEL_VERBOSE, "  preset transmit tail TS %010lld, rate %f...\n",
568                 transmit_tail_timestamp, rate);
569
570     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
571         it != m_TransmitProcessors.end();
572         ++it ) {
573         (*it)->setBufferTailTimestamp(transmit_tail_timestamp);
574         (*it)->setTicksPerFrame(rate);
575     }
576
577     // align the received streams to be phase aligned
578     if(!alignReceivedStreams()) {
579         debugError("Could not align streams...\n");
580         return false;
581     }
582
583     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
584     return true;
585 }
586
587 bool
588 StreamProcessorManager::alignReceivedStreams()
589 {
590     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
591     unsigned int nb_sync_runs;
592     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
593     int64_t diff_between_streams[nb_rcv_sp];
594     int64_t diff;
595
596     unsigned int i;
597
598     unsigned int periods_per_align_try = (STREAMPROCESSORMANAGER_ALIGN_AVERAGE_TIME_MSEC * getNominalRate());
599     periods_per_align_try /= 1000;
600     periods_per_align_try /= getPeriodSize();
601     debugOutput( DEBUG_LEVEL_VERBOSE, " averaging over %u periods...\n", periods_per_align_try);
602
603     bool aligned = false;
604     int cnt = STREAMPROCESSORMANAGER_NB_ALIGN_TRIES;
605     while (!aligned && cnt--) {
606         nb_sync_runs = periods_per_align_try;
607         while(nb_sync_runs) {
608             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
609             if(!waitForPeriod()) {
610                 debugWarning("xrun while aligning streams...\n");
611                 return false;
612             };
613
614             // before we do anything else, transfer
615             if(!transferSilence()) {
616                 debugError("Could not transfer silence\n");
617                 return false;
618             }
619
620             // now calculate the stream offset
621             i = 0;
622             for ( i = 0; i < nb_rcv_sp; i++) {
623                 StreamProcessor *s = m_ReceiveProcessors.at(i);
624                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
625                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",
626                     m_SyncSource, s, diff);
627                 if ( nb_sync_runs == periods_per_align_try ) {
628                     diff_between_streams[i] = diff;
629                 } else {
630                     diff_between_streams[i] += diff;
631                 }
632             }
633
634             nb_sync_runs--;
635         }
636         // calculate the average offsets
637         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
638         int diff_between_streams_frames[nb_rcv_sp];
639         aligned = true;
640         for ( i = 0; i < nb_rcv_sp; i++) {
641             StreamProcessor *s = m_ReceiveProcessors.at(i);
642
643             diff_between_streams[i] /= periods_per_align_try;
644             diff_between_streams_frames[i] = (int)roundf(diff_between_streams[i] / s->getTicksPerFrame());
645             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",
646                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
647
648             aligned &= (diff_between_streams_frames[i] == 0);
649
650             // reposition the stream
651             if(!s->shiftStream(diff_between_streams_frames[i])) {
652                 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]);
653                 return false;
654             }
655         }
656         if (!aligned) {
657             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
658         }
659     }
660     if (cnt == 0) {
661         debugError("Align failed\n");
662         return false;
663     }
664     return true;
665 }
666
667 bool StreamProcessorManager::start() {
668     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
669
670     // start all SP's synchonized
671     bool start_result = false;
672     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
673         // put all SP's into dry-running state
674         if (!startDryRunning()) {
675             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
676             start_result = false;
677             continue;
678         }
679
680         start_result = syncStartAll();
681         if(start_result) {
682             break;
683         } else {
684             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
685         }
686     }
687     if (!start_result) {
688         debugFatal("Could not syncStartAll...\n");
689         return false;
690     }
691
692     return true;
693 }
694
695 bool StreamProcessorManager::stop() {
696     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
697
698     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
699     // switch SP's over to the dry-running state
700     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
701           it != m_ReceiveProcessors.end();
702           ++it ) {
703         if((*it)->isRunning()) {
704             if(!(*it)->scheduleStopRunning(-1)) {
705                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
706                 return false;
707             }
708         }
709     }
710     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
711           it != m_TransmitProcessors.end();
712           ++it ) {
713         if((*it)->isRunning()) {
714             if(!(*it)->scheduleStopRunning(-1)) {
715                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
716                 return false;
717             }
718         }
719     }
720     // wait for the SP's to get into the dry-running/stopped state
721     int cnt = 8000;
722     bool ready = false;
723     while (!ready && cnt) {
724         ready = true;
725         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
726             it != m_ReceiveProcessors.end();
727             ++it ) {
728             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream());
729         }
730         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
731             it != m_TransmitProcessors.end();
732             ++it ) {
733             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream());
734         }
735         SleepRelativeUsec(125);
736         cnt--;
737     }
738     if(cnt==0) {
739         debugWarning(" Timeout waiting for the SP's to start dry-running\n");
740         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
741             it != m_ReceiveProcessors.end();
742             ++it ) {
743             (*it)->dumpInfo();
744         }
745         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
746             it != m_TransmitProcessors.end();
747             ++it ) {
748             (*it)->dumpInfo();
749         }
750         return false;
751     }
752
753     // switch SP's over to the stopped state
754     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
755           it != m_ReceiveProcessors.end();
756           ++it ) {
757         if(!(*it)->scheduleStopDryRunning(-1)) {
758             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
759             return false;
760         }
761     }
762     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
763           it != m_TransmitProcessors.end();
764           ++it ) {
765         if(!(*it)->scheduleStopDryRunning(-1)) {
766             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
767             return false;
768         }
769     }
770     // wait for the SP's to get into the stopped state
771     cnt = 8000;
772     ready = false;
773     while (!ready && cnt) {
774         ready = true;
775         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
776             it != m_ReceiveProcessors.end();
777             ++it ) {
778             ready &= (*it)->isStopped();
779         }
780         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
781             it != m_TransmitProcessors.end();
782             ++it ) {
783             ready &= (*it)->isStopped();
784         }
785         SleepRelativeUsec(125);
786         cnt--;
787     }
788     if(cnt==0) {
789         debugWarning(" Timeout waiting for the SP's to stop\n");
790         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
791             it != m_ReceiveProcessors.end();
792             ++it ) {
793             (*it)->dumpInfo();
794         }
795         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
796             it != m_TransmitProcessors.end();
797             ++it ) {
798             (*it)->dumpInfo();
799         }
800         return false;
801     }
802     return true;
803 }
804
805 /**
806  * Called upon Xrun events. This brings all StreamProcessors back
807  * into their starting state, and then carries on streaming. This should
808  * have the same effect as restarting the whole thing.
809  *
810  * @return true if successful, false otherwise
811  */
812 bool StreamProcessorManager::handleXrun() {
813
814     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
815
816     dumpInfo();
817
818     /*
819      * Reset means:
820      * 1) Disabling the SP's, so that they don't process any packets
821      *    note: the isomanager does keep on delivering/requesting them
822      * 2) Bringing all buffers & streamprocessors into a know state
823      *    - Clear all capture buffers
824      *    - Put nb_periods*period_size of null frames into the playback buffers
825      * 3) Re-enable the SP's
826      */
827
828     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
829     // start all SP's synchonized
830     bool start_result = false;
831     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
832         if(m_shutdown_needed) {
833             debugOutput(DEBUG_LEVEL_VERBOSE, "Shutdown requested...\n");
834             return true;
835         }
836         // put all SP's into dry-running state
837         if (!startDryRunning()) {
838             debugShowBackLog();
839             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
840             start_result = false;
841             continue;
842         }
843
844         start_result = syncStartAll();
845         if(start_result) {
846             break;
847         } else {
848             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
849         }
850     }
851     if (!start_result) {
852         debugFatal("Could not syncStartAll...\n");
853         return false;
854     }
855     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
856
857     return true;
858 }
859
860 /**
861  * @brief Waits until the next period of samples is ready
862  *
863  * This function does not return until a full period of samples is (or should be)
864  * ready to be transferred.
865  *
866  * @return true if the period is ready, false if not
867  */
868 bool StreamProcessorManager::waitForPeriod() {
869     if(m_SyncSource == NULL) return false;
870     if(m_shutdown_needed) return false;
871     bool xrun_occurred = false;
872     bool period_not_ready = true;
873
874     // grab the wait lock
875     m_WaitLock.Lock();
876
877     while(period_not_ready) {
878         debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
879                            "waiting for period (%d frames in buffer)...\n",
880                            m_SyncSource->getBufferFill());
881
882         // wait for something to happen
883         switch(waitForActivity()) {
884             case eAR_Error:
885                 debugError("Error while waiting for activity\n");
886                 return false;
887             case eAR_Interrupted:
888                 // FIXME: what to do here?
889                 debugWarning("Interrupted while waiting for activity\n");
890                 break;
891             case eAR_Timeout:
892                 // FIXME: what to do here?
893                 debugWarning("Timeout while waiting for activity\n");
894                 break;
895             case eAR_Activity:
896                 // do nothing
897                 break;
898         }
899         debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "got activity...\n");
900
901         // HACK: this should be solved more elegantly
902         period_not_ready = false;
903         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
904             it != m_ReceiveProcessors.end();
905             ++it ) {
906             bool this_sp_period_ready = (*it)->canConsumePeriod();
907             if (!this_sp_period_ready) {
908                 period_not_ready = true;
909             }
910         }
911         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
912             it != m_TransmitProcessors.end();
913             ++it ) {
914             bool this_sp_period_ready = (*it)->canProducePeriod();
915             if (!this_sp_period_ready) {
916                 period_not_ready = true;
917             }
918         }
919         debugOutputExtreme(DEBUG_LEVEL_VERBOSE, " period not ready? %d...\n", period_not_ready);
920
921         // check for underruns on the ISO side,
922         // those should make us bail out of the wait loop
923         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
924             it != m_ReceiveProcessors.end();
925             ++it ) {
926             // a xrun has occurred on the Iso side
927             xrun_occurred |= (*it)->xrunOccurred();
928         }
929         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
930             it != m_TransmitProcessors.end();
931             ++it ) {
932             // a xrun has occurred on the Iso side
933             xrun_occurred |= (*it)->xrunOccurred();
934         }
935         if(xrun_occurred) break;
936         // FIXME: make sure we also exit this loop when something else happens (e.g. signal, iso error)
937
938         // if we have to shutdown due to some async event (busreset), do so
939         if(m_shutdown_needed) break;
940     }
941
942     if(xrun_occurred) {
943         debugOutput( DEBUG_LEVEL_VERBOSE, "exit due to xrun...\n");
944     }
945
946     // we save the 'ideal' time of the transfer at this point,
947     // because we can have interleaved read - process - write
948     // cycles making that we modify a receiving stream's buffer
949     // before we get to writing.
950     // NOTE: before waitForPeriod() is called again, both the transmit
951     //       and the receive processors should have done their transfer.
952     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
953    
954     #ifdef DEBUG
955     static uint64_t m_time_of_transfer2 = m_time_of_transfer;
956    
957     int ticks_per_period = (int)(m_SyncSource->getTicksPerFrame() * m_period);
958     int diff=diffTicks(m_time_of_transfer, m_time_of_transfer2);
959     // display message if the difference between two successive tick
960     // values is more than 50 ticks. 1 sample at 48k is 512 ticks
961     // so 50 ticks = 10%, which is a rather large jitter value.
962     if(diff-ticks_per_period > 50 || diff-ticks_per_period < -50) {
963         debugOutput(DEBUG_LEVEL_VERBOSE, "rather large TSP difference TS=%011llu => TS=%011llu (%d, nom %d)\n",
964                                             m_time_of_transfer2, m_time_of_transfer, diff, ticks_per_period);
965     }
966     m_time_of_transfer2 = m_time_of_transfer;
967     #endif
968
969     debugOutputExtreme( DEBUG_LEVEL_VERBOSE,
970                         "transfer at %llu ticks...\n",
971                         m_time_of_transfer);
972
973     // this is to notify the client of the delay that we introduced by waiting
974     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
975     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
976                         "delayed for %d usecs...\n",
977                         m_delayed_usecs);
978
979 #ifdef DEBUG
980     int rcv_bf=0, xmt_bf=0;
981     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
982         it != m_ReceiveProcessors.end();
983         ++it ) {
984         rcv_bf = (*it)->getBufferFill();
985     }
986     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
987         it != m_TransmitProcessors.end();
988         ++it ) {
989         xmt_bf = (*it)->getBufferFill();
990     }
991     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
992                         "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
993                         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
994
995     // check if xruns occurred on the Iso side.
996     // also check if xruns will occur should we transfer() now
997     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
998           it != m_ReceiveProcessors.end();
999           ++it ) {
1000
1001         if ((*it)->xrunOccurred()) {
1002             debugWarning("Xrun on RECV SP %p due to ISO side xrun\n",*it);
1003             (*it)->dumpInfo();
1004         }
1005         if (!((*it)->canClientTransferFrames(m_period))) {
1006             debugWarning("Xrun on RECV SP %p due to buffer side xrun\n",*it);
1007             (*it)->dumpInfo();
1008         }
1009     }
1010     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1011           it != m_TransmitProcessors.end();
1012           ++it ) {
1013         if ((*it)->xrunOccurred()) {
1014             debugWarning("Xrun on XMIT SP %p due to ISO side xrun\n",*it);
1015         }
1016         if (!((*it)->canClientTransferFrames(m_period))) {
1017             debugWarning("Xrun on XMIT SP %p due to buffer side xrun\n",*it);
1018         }
1019     }
1020 #endif
1021
1022     m_nbperiods++;
1023
1024     m_WaitLock.Unlock();
1025     // now we can signal the client that we are (should be) ready
1026     return !xrun_occurred;
1027 }
1028
1029 /**
1030  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
1031  *
1032  * Transfers one period of frames from the client side to the Iso side and vice versa.
1033  *
1034  * @return true if successful, false otherwise (indicates xrun).
1035  */
1036 bool StreamProcessorManager::transfer() {
1037     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
1038     bool retval=true;
1039     retval &= transfer(StreamProcessor::ePT_Receive);
1040     retval &= transfer(StreamProcessor::ePT_Transmit);
1041     return retval;
1042 }
1043
1044 /**
1045  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
1046  *
1047  * Transfers one period of frames from the client side to the Iso side or vice versa.
1048  *
1049  * @param t The processor type to tranfer for (receive or transmit)
1050  * @return true if successful, false otherwise (indicates xrun).
1051  */
1052 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
1053     if(m_SyncSource == NULL) return false;
1054     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
1055         "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
1056         t, m_time_of_transfer,
1057         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
1058         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
1059         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
1060
1061     bool retval = true;
1062     // a static cast could make sure that there is no performance
1063     // penalty for the virtual functions (to be checked)
1064     if (t==StreamProcessor::ePT_Receive) {
1065         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1066                 it != m_ReceiveProcessors.end();
1067                 ++it ) {
1068             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
1069                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
1070                             m_period, m_time_of_transfer,*it);
1071                 retval &= false; // buffer underrun
1072             }
1073         }
1074     } else {
1075         // FIXME: in the SPM it would be nice to have system time instead of
1076         //        1394 time
1077         float rate = m_SyncSource->getTicksPerFrame();
1078         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)((m_nb_buffers * m_period))) * rate);
1079
1080         // the data we are putting into the buffer is intended to be transmitted
1081         // one ringbuffer size after it has been received
1082         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1083
1084         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1085                 it != m_TransmitProcessors.end();
1086                 ++it ) {
1087             // FIXME: in the SPM it would be nice to have system time instead of
1088             //        1394 time
1089             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
1090                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
1091                         m_period, transmit_timestamp, *it);
1092                 retval &= false; // buffer underrun
1093             }
1094         }
1095     }
1096     return retval;
1097 }
1098
1099 /**
1100  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
1101  *
1102  * Transfers one period of silence to the Iso side for transmit SP's
1103  * or dump one period of frames for receive SP's
1104  *
1105  * @return true if successful, false otherwise (indicates xrun).
1106  */
1107 bool StreamProcessorManager::transferSilence() {
1108     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
1109     bool retval=true;
1110     // NOTE: the order here is opposite from the order in
1111     // normal operation (transmit is before receive), because
1112     // we can do that here (data=silence=available) and
1113     // it increases reliability (esp. on startup)
1114     retval &= transferSilence(StreamProcessor::ePT_Transmit);
1115     retval &= transferSilence(StreamProcessor::ePT_Receive);
1116     return retval;
1117 }
1118
1119 /**
1120  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
1121  *
1122  * Transfers one period of silence to the Iso side for transmit SP's
1123  * or dump one period of frames for receive SP's
1124  *
1125  * @param t The processor type to tranfer for (receive or transmit)
1126  * @return true if successful, false otherwise (indicates xrun).
1127  */
1128 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
1129     if(m_SyncSource == NULL) return false;
1130     debugOutput( DEBUG_LEVEL_VERY_VERBOSE,
1131         "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
1132         t, m_time_of_transfer,
1133         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
1134         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
1135         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
1136
1137     bool retval = true;
1138     // a static cast could make sure that there is no performance
1139     // penalty for the virtual functions (to be checked)
1140     if (t==StreamProcessor::ePT_Receive) {
1141         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1142                 it != m_ReceiveProcessors.end();
1143                 ++it ) {
1144             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
1145                     debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n",
1146                             m_period, m_time_of_transfer,*it);
1147                 retval &= false; // buffer underrun
1148             }
1149         }
1150     } else {
1151         // FIXME: in the SPM it would be nice to have system time instead of
1152         //        1394 time
1153         float rate = m_SyncSource->getTicksPerFrame();
1154         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
1155
1156         // the data we are putting into the buffer is intended to be transmitted
1157         // one ringbuffer size after it has been received
1158         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1159
1160         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1161                 it != m_TransmitProcessors.end();
1162                 ++it ) {
1163             // FIXME: in the SPM it would be nice to have system time instead of
1164             //        1394 time
1165             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
1166                 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
1167                         m_period, transmit_timestamp, *it);
1168                 retval &= false; // buffer underrun
1169             }
1170         }
1171     }
1172     return retval;
1173 }
1174
1175 void StreamProcessorManager::dumpInfo() {
1176     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1177     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
1178     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
1179     debugOutputShort( DEBUG_LEVEL_NORMAL, "Data type: %s\n", (m_audio_datatype==eADT_Float?"float":"int24"));
1180
1181     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
1182     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1183         it != m_ReceiveProcessors.end();
1184         ++it ) {
1185         (*it)->dumpInfo();
1186     }
1187
1188     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
1189     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1190         it != m_TransmitProcessors.end();
1191         ++it ) {
1192         (*it)->dumpInfo();
1193     }
1194
1195     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1196
1197 }
1198
1199 void StreamProcessorManager::setVerboseLevel(int l) {
1200     setDebugLevel(l);
1201     m_WaitLock.setVerboseLevel(l);
1202
1203     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
1204     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1205         it != m_ReceiveProcessors.end();
1206         ++it ) {
1207         (*it)->setVerboseLevel(l);
1208     }
1209
1210     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
1211     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1212         it != m_TransmitProcessors.end();
1213         ++it ) {
1214         (*it)->setVerboseLevel(l);
1215     }
1216 }
1217
1218
1219 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1220     int count=0;
1221
1222     if (direction == Port::E_Capture) {
1223         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1224             it != m_ReceiveProcessors.end();
1225             ++it ) {
1226             count += (*it)->getPortCount(type);
1227         }
1228     } else {
1229         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1230             it != m_TransmitProcessors.end();
1231             ++it ) {
1232             count += (*it)->getPortCount(type);
1233         }
1234     }
1235     return count;
1236 }
1237
1238 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1239     int count=0;
1240
1241     if (direction == Port::E_Capture) {
1242         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1243             it != m_ReceiveProcessors.end();
1244             ++it ) {
1245             count += (*it)->getPortCount();
1246         }
1247     } else {
1248         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1249             it != m_TransmitProcessors.end();
1250             ++it ) {
1251             count += (*it)->getPortCount();
1252         }
1253     }
1254     return count;
1255 }
1256
1257 // TODO: implement a port map here, instead of the loop
1258
1259 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1260     int count=0;
1261     int prevcount=0;
1262
1263     if (direction == Port::E_Capture) {
1264         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1265             it != m_ReceiveProcessors.end();
1266             ++it ) {
1267             count += (*it)->getPortCount();
1268             if (count > idx) {
1269                 return (*it)->getPortAtIdx(idx-prevcount);
1270             }
1271             prevcount=count;
1272         }
1273     } else {
1274         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1275             it != m_TransmitProcessors.end();
1276             ++it ) {
1277             count += (*it)->getPortCount();
1278             if (count > idx) {
1279                 return (*it)->getPortAtIdx(idx-prevcount);
1280             }
1281             prevcount=count;
1282         }
1283     }
1284     return NULL;
1285 }
1286
1287 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1288     m_thread_realtime=rt;
1289     m_thread_priority=priority;
1290     return true;
1291 }
1292
1293
1294 } // end of namespace
Note: See TracBrowser for help on using the browser.