root/trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

Revision 1045, 48.0 kB (checked in by ppalmers, 15 years ago)

use MutexLockHelper? in SPM

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23 #include "config.h"
24
25 #include "StreamProcessorManager.h"
26 #include "generic/StreamProcessor.h"
27 #include "generic/Port.h"
28 #include "libieee1394/cycletimer.h"
29
30 #include "libutil/Time.h"
31
32 #include <errno.h>
33 #include <assert.h>
34 #include <math.h>
35
36 namespace Streaming {
37
38 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
39
40 StreamProcessorManager::StreamProcessorManager()
41     : m_is_slave( false )
42     , m_SyncSource(NULL)
43     , m_xrun_happened( false )
44     , m_activity_wait_timeout_usec( 1000*1000 )
45     , m_nb_buffers( 0 )
46     , m_period( 0 )
47     , m_audio_datatype( eADT_Float )
48     , m_nominal_framerate ( 0 )
49     , m_xruns(0)
50     , m_shutdown_needed(false)
51     , m_nbperiods(0)
52     , m_WaitLock( new Util::PosixMutex )
53 {
54     addOption(Util::OptionContainer::Option("slaveMode",false));
55     sem_init(&m_activity_semaphore, 0, 0);
56 }
57
58 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int framerate, unsigned int nb_buffers)
59     : m_is_slave( false )
60     , m_SyncSource(NULL)
61     , m_xrun_happened( false )
62     , m_activity_wait_timeout_usec( 1000*1000 )
63     , m_nb_buffers(nb_buffers)
64     , m_period(period)
65     , m_audio_datatype( eADT_Float )
66     , m_nominal_framerate ( framerate )
67     , m_xruns(0)
68     , m_shutdown_needed(false)
69     , m_nbperiods(0)
70     , m_WaitLock( new Util::PosixMutex )
71 {
72     addOption(Util::OptionContainer::Option("slaveMode",false));
73     sem_init(&m_activity_semaphore, 0, 0);
74 }
75
76 StreamProcessorManager::~StreamProcessorManager() {
77     sem_post(&m_activity_semaphore);
78     sem_destroy(&m_activity_semaphore);
79     delete m_WaitLock;
80 }
81
82 void
83 StreamProcessorManager::handleBusReset()
84 {
85     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) Handle bus reset...\n", this);
86
87     // FIXME: we request shutdown for now.
88     m_shutdown_needed=true;
89
90     // note that all receive streams are gone once a device is unplugged
91
92     // synchronize with the wait lock
93     Util::MutexLockHelper lock(*m_WaitLock);
94
95     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) got wait lock...\n", this);
96     // cause all SP's to bail out
97     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
98           it != m_ReceiveProcessors.end();
99           ++it )
100     {
101         (*it)->handleBusReset();
102     }
103     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
104           it != m_TransmitProcessors.end();
105           ++it )
106     {
107         (*it)->handleBusReset();
108     }
109 }
110
111 void
112 StreamProcessorManager::signalActivity()
113 {
114     sem_post(&m_activity_semaphore);
115     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p activity\n", this);
116 }
117
118 enum StreamProcessorManager::eActivityResult
119 StreamProcessorManager::waitForActivity()
120 {
121     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p waiting for activity\n", this);
122     struct timespec ts;
123     int result;
124
125     if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
126         debugError("clock_gettime failed\n");
127         return eAR_Error;
128     }
129     long long int timeout_nsec=0;
130     int timeout_sec = 0;
131     if (m_activity_wait_timeout_usec >= 0) {
132         timeout_nsec = m_activity_wait_timeout_usec * 1000LL;
133         timeout_sec = 0;
134         while(timeout_nsec >= 1000000000LL) {
135             timeout_sec += 1;
136             timeout_nsec -= 1000000000LL;
137         }
138         ts.tv_nsec += timeout_nsec;
139         ts.tv_sec += timeout_sec;
140     }
141
142     if (m_activity_wait_timeout_usec >= 0) {
143         result = sem_timedwait(&m_activity_semaphore, &ts);
144     } else {
145         result = sem_wait(&m_activity_semaphore);
146     }
147
148     if(result != 0) {
149         if (result == ETIMEDOUT) {
150             debugOutput(DEBUG_LEVEL_VERBOSE,
151                         "(%p) pthread_cond_timedwait() timed out (result=%d)\n",
152                         this, result);
153             return eAR_Timeout;
154         } else if (result == EINTR) {
155             debugOutput(DEBUG_LEVEL_VERBOSE,
156                         "(%p) pthread_cond_[timed]wait() interrupted by signal (result=%d)\n",
157                         this, result);
158             return eAR_Interrupted;
159         } else {
160             debugError("(%p) pthread_cond_[timed]wait error (result=%d)\n",
161                         this, result);
162             debugError("(%p) timeout_sec=%d timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n",
163                        this, timeout_sec, timeout_nsec, ts.tv_sec, ts.tv_nsec);
164             return eAR_Error;
165         }
166     }
167
168     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p got activity\n", this);
169     return eAR_Activity;
170 }
171
172 /**
173  * Registers \ref processor with this manager.
174  *
175  * also registers it with the isohandlermanager
176  *
177  * be sure to call isohandlermanager->init() first!
178  * and be sure that the processors are also ->init()'ed
179  *
180  * @param processor
181  * @return true if successfull
182  */
183 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
184 {
185     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
186     assert(processor);
187     if (processor->getType() == StreamProcessor::ePT_Receive) {
188         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
189         m_ReceiveProcessors.push_back(processor);
190         return true;
191     }
192
193     if (processor->getType() == StreamProcessor::ePT_Transmit) {
194         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
195         m_TransmitProcessors.push_back(processor);
196         return true;
197     }
198
199     debugFatal("Unsupported processor type!\n");
200     return false;
201 }
202
203 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
204 {
205     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
206     assert(processor);
207
208     if (processor->getType()==StreamProcessor::ePT_Receive) {
209
210         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
211               it != m_ReceiveProcessors.end();
212               ++it )
213         {
214             if ( *it == processor ) {
215                 if (*it == m_SyncSource) {
216                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
217                     m_SyncSource = NULL;
218                 }
219                 m_ReceiveProcessors.erase(it);
220                 return true;
221             }
222         }
223     }
224
225     if (processor->getType()==StreamProcessor::ePT_Transmit) {
226         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
227               it != m_TransmitProcessors.end();
228               ++it )
229         {
230             if ( *it == processor ) {
231                 if (*it == m_SyncSource) {
232                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
233                     m_SyncSource = NULL;
234                 }
235                 m_TransmitProcessors.erase(it);
236                 return true;
237             }
238         }
239     }
240
241     debugFatal("Processor (%p) not found!\n",processor);
242     return false; //not found
243 }
244
245 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
246     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
247     m_SyncSource=s;
248     return true;
249 }
250
251 bool StreamProcessorManager::prepare() {
252
253     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
254     m_is_slave=false;
255     if(!getOption("slaveMode", m_is_slave)) {
256         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
257     }
258
259     m_shutdown_needed=false;
260
261     // if no sync source is set, select one here
262     if(m_SyncSource == NULL) {
263        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
264     }
265
266     // FIXME: put into separate method
267     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
268           it != m_ReceiveProcessors.end();
269           ++it )
270     {
271         if(m_SyncSource == NULL) {
272             debugWarning(" => Sync Source is %p.\n", *it);
273             m_SyncSource = *it;
274         }
275     }
276     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
277           it != m_TransmitProcessors.end();
278           ++it )
279     {
280         if(m_SyncSource == NULL) {
281             debugWarning(" => Sync Source is %p.\n", *it);
282             m_SyncSource = *it;
283         }
284     }
285
286     // now do the actual preparation of the SP's
287     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
288     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
289         it != m_ReceiveProcessors.end();
290         ++it ) {
291
292         if(!(*it)->setOption("slaveMode", m_is_slave)) {
293             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
294         }
295
296         if(!(*it)->prepare()) {
297             debugFatal(  " could not prepare (%p)...\n",(*it));
298             return false;
299         }
300     }
301     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
302     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
303         it != m_TransmitProcessors.end();
304         ++it ) {
305         if(!(*it)->setOption("slaveMode", m_is_slave)) {
306             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
307         }
308         if(!(*it)->prepare()) {
309             debugFatal( " could not prepare (%p)...\n",(*it));
310             return false;
311         }
312     }
313
314     // if there are no stream processors registered,
315     // fail
316     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
317         debugFatal("No stream processors registered, can't do anything usefull\n");
318         return false;
319     }
320     return true;
321 }
322
323 bool StreamProcessorManager::startDryRunning() {
324     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
325     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
326     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
327             it != m_TransmitProcessors.end();
328             ++it ) {
329         if (!(*it)->isDryRunning()) {
330             if(!(*it)->scheduleStartDryRunning(-1)) {
331                 debugError("Could not put SP %p into the dry-running state\n", *it);
332                 return false;
333             }
334         } else {
335             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
336         }
337     }
338     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
339             it != m_ReceiveProcessors.end();
340             ++it ) {
341         if (!(*it)->isDryRunning()) {
342             if(!(*it)->scheduleStartDryRunning(-1)) {
343                 debugError("Could not put SP %p into the dry-running state\n", *it);
344                 return false;
345             }
346         } else {
347             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
348         }
349     }
350     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
351     // wait for the syncsource to start running.
352     // that will block the waitForPeriod call until everyone has started (theoretically)
353     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_DRYRUN; // by then it should have started
354     bool all_dry_running = false;
355     while (!all_dry_running && cnt) {
356         all_dry_running = true;
357         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
358                 it != m_ReceiveProcessors.end();
359                 ++it ) {
360             all_dry_running &= (*it)->isDryRunning();
361         }
362         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
363                 it != m_TransmitProcessors.end();
364                 ++it ) {
365             all_dry_running &= (*it)->isDryRunning();
366         }
367
368         SleepRelativeUsec(125);
369         cnt--;
370     }
371     if(cnt==0) {
372         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
373         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
374                 it != m_ReceiveProcessors.end();
375                 ++it ) {
376             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
377                 (*it)->getTypeString(), *it, (*it)->getStateString());
378         }
379         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
380                 it != m_TransmitProcessors.end();
381                 ++it ) {
382             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
383                 (*it)->getTypeString(), *it, (*it)->getStateString());
384         }
385         return false;
386     }
387     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
388     return true;
389 }
390
391 bool StreamProcessorManager::syncStartAll() {
392     if(m_SyncSource == NULL) return false;
393     // figure out when to get the SP's running.
394     // the xmit SP's should also know the base timestamp
395     // streams should be aligned here
396
397     // now find out how long we have to delay the wait operation such that
398     // the received frames will all be presented to the SP
399     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
400     int max_of_min_delay = 0;
401     int min_delay = 0;
402     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
403             it != m_ReceiveProcessors.end();
404             ++it ) {
405         min_delay = (*it)->getMaxFrameLatency();
406         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
407     }
408
409     // add some processing margin. This only shifts the time
410     // at which the buffer is transfer()'ed. This makes things somewhat
411     // more robust. It should be noted though that shifting the transfer
412     // time to a later time instant also causes the xmit buffer fill to be
413     // lower on average.
414     max_of_min_delay += STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS;
415     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n",
416         max_of_min_delay,
417         (unsigned int)TICKS_TO_SECS(max_of_min_delay),
418         (unsigned int)TICKS_TO_CYCLES(max_of_min_delay),
419         (unsigned int)TICKS_TO_OFFSET(max_of_min_delay));
420     m_SyncSource->setSyncDelay(max_of_min_delay);
421
422     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
423     //        have aquired lock
424     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
425     //sleep(2); // FIXME: be smarter here
426
427     // make sure that we are dry-running long enough for the
428     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
429     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
430    
431     unsigned int nb_sync_runs = (STREAMPROCESSORMANAGER_SYNC_WAIT_TIME_MSEC * getNominalRate());
432     nb_sync_runs /= 1000;
433     nb_sync_runs /= getPeriodSize();
434
435     int64_t time_till_next_period;
436     while(nb_sync_runs--) { // or while not sync-ed?
437         // check if we were woken up too soon
438         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
439         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
440         if(time_till_next_period > 0) {
441             // wait for the period
442             SleepRelativeUsec(time_till_next_period);
443         }
444     }
445
446     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
447     // FIXME: in the SPM it would be nice to have system time instead of
448     //        1394 time
449
450     // we now should have decent sync info on the sync source
451     // determine a point in time where the system should start
452     // figure out where we are now
453     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
454     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
455         time_of_first_sample,
456         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
457         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
458         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
459
460     // start wet-running in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles
461     // this is the time window we have to setup all SP's such that they
462     // can start wet-running correctly.
463     time_of_first_sample = addTicks(time_of_first_sample,
464                                     STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP * TICKS_PER_CYCLE);
465
466     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
467         time_of_first_sample,
468         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
469         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
470         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
471
472     // we should start wet-running the transmit SP's some cycles in advance
473     // such that we know it is wet-running when it should output its first sample
474     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
475                                                  STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE);
476
477     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
478                                                  STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE);
479     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
480         time_to_start_xmit,
481         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
482         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
483         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
484     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
485         time_to_start_recv,
486         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
487         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
488         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
489
490     // at this point the buffer head timestamp of the transmit buffers can be set
491     // this is the presentation time of the first sample in the buffer
492     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
493           it != m_TransmitProcessors.end();
494           ++it ) {
495         (*it)->setBufferHeadTimestamp(time_of_first_sample);
496     }
497
498     // switch syncsource to running state
499     uint64_t time_to_start_sync;
500     // FIXME: this is most likely not going to work for transmit sync sources
501     // but those are unsupported in this version
502     if(m_SyncSource->getType() == StreamProcessor::ePT_Receive ) {
503         time_to_start_sync = time_to_start_recv;
504     } else {
505         time_to_start_sync = time_to_start_xmit;
506     }
507     if(!m_SyncSource->scheduleStartRunning(time_to_start_sync)) {
508         debugError("m_SyncSource->scheduleStartRunning(%11llu) failed\n", time_to_start_sync);
509         return false;
510     }
511
512     // STEP X: switch all non-syncsource SP's over to the running state
513     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
514           it != m_ReceiveProcessors.end();
515           ++it ) {
516         if(*it != m_SyncSource) {
517             if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
518                 debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
519                 return false;
520             }
521         }
522     }
523     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
524           it != m_TransmitProcessors.end();
525           ++it ) {
526         if(*it != m_SyncSource) {
527             if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
528                 debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
529                 return false;
530             }
531         }
532     }
533     // wait for the syncsource to start running.
534     // that will block the waitForPeriod call until everyone has started (theoretically)
535     // note: the SP's are scheduled to start in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles,
536     // so a 20 times this value should be a good timeout
537     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP * 20; // by then it should have started
538     while (!m_SyncSource->isRunning() && cnt) {
539         SleepRelativeUsec(125);
540         cnt--;
541     }
542     if(cnt==0) {
543         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
544         return false;
545     }
546
547     // the sync source is running, we can now read a decent received timestamp from it
548     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
549
550     // and a (still very rough) approximation of the rate
551     float rate = m_SyncSource->getTicksPerFrame();
552     int64_t delay_in_ticks=(int64_t)(((float)((m_nb_buffers-1) * m_period)) * rate);
553     debugOutput( DEBUG_LEVEL_VERBOSE, "  initial time of transfer %010lld, rate %f...\n",
554                 m_time_of_transfer, rate);
555
556     // then use this information to initialize the xmit handlers
557
558     //  we now set the buffer tail timestamp of the transmit buffer
559     //  to the period transfer time instant plus what's nb_buffers - 1
560     //  in ticks. This due to the fact that we (should) have received one period
561     //  worth of ticks at t=m_time_of_transfer
562     //  hence one period of frames should also have been transmitted, which means
563     //  that there should be (nb_buffers - 1) * periodsize of frames in the xmit buffer
564     //  that allows us to calculate the tail timestamp for the buffer.
565
566     int64_t transmit_tail_timestamp = addTicks(m_time_of_transfer, delay_in_ticks);
567
568     debugOutput( DEBUG_LEVEL_VERBOSE, "  preset transmit tail TS %010lld, rate %f...\n",
569                 transmit_tail_timestamp, rate);
570
571     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
572         it != m_TransmitProcessors.end();
573         ++it ) {
574         (*it)->setBufferTailTimestamp(transmit_tail_timestamp);
575         (*it)->setTicksPerFrame(rate);
576     }
577
578     // align the received streams to be phase aligned
579     if(!alignReceivedStreams()) {
580         debugError("Could not align streams...\n");
581         return false;
582     }
583
584     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
585     return true;
586 }
587
588 bool
589 StreamProcessorManager::alignReceivedStreams()
590 {
591     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
592     unsigned int nb_sync_runs;
593     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
594     int64_t diff_between_streams[nb_rcv_sp];
595     int64_t diff;
596
597     unsigned int i;
598
599     unsigned int periods_per_align_try = (STREAMPROCESSORMANAGER_ALIGN_AVERAGE_TIME_MSEC * getNominalRate());
600     periods_per_align_try /= 1000;
601     periods_per_align_try /= getPeriodSize();
602     debugOutput( DEBUG_LEVEL_VERBOSE, " averaging over %u periods...\n", periods_per_align_try);
603
604     bool aligned = false;
605     int cnt = STREAMPROCESSORMANAGER_NB_ALIGN_TRIES;
606     while (!aligned && cnt--) {
607         nb_sync_runs = periods_per_align_try;
608         while(nb_sync_runs) {
609             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
610             if(!waitForPeriod()) {
611                 debugWarning("xrun while aligning streams...\n");
612                 return false;
613             };
614
615             // before we do anything else, transfer
616             if(!transferSilence()) {
617                 debugError("Could not transfer silence\n");
618                 return false;
619             }
620
621             // now calculate the stream offset
622             i = 0;
623             for ( i = 0; i < nb_rcv_sp; i++) {
624                 StreamProcessor *s = m_ReceiveProcessors.at(i);
625                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
626                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",
627                     m_SyncSource, s, diff);
628                 if ( nb_sync_runs == periods_per_align_try ) {
629                     diff_between_streams[i] = diff;
630                 } else {
631                     diff_between_streams[i] += diff;
632                 }
633             }
634
635             nb_sync_runs--;
636         }
637         // calculate the average offsets
638         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
639         int diff_between_streams_frames[nb_rcv_sp];
640         aligned = true;
641         for ( i = 0; i < nb_rcv_sp; i++) {
642             StreamProcessor *s = m_ReceiveProcessors.at(i);
643
644             diff_between_streams[i] /= periods_per_align_try;
645             diff_between_streams_frames[i] = (int)roundf(diff_between_streams[i] / s->getTicksPerFrame());
646             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",
647                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
648
649             aligned &= (diff_between_streams_frames[i] == 0);
650
651             // reposition the stream
652             if(!s->shiftStream(diff_between_streams_frames[i])) {
653                 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]);
654                 return false;
655             }
656         }
657         if (!aligned) {
658             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
659         }
660     }
661     if (cnt == 0) {
662         debugError("Align failed\n");
663         return false;
664     }
665     return true;
666 }
667
668 bool StreamProcessorManager::start() {
669     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
670
671     // start all SP's synchonized
672     bool start_result = false;
673     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
674         // put all SP's into dry-running state
675         if (!startDryRunning()) {
676             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
677             start_result = false;
678             continue;
679         }
680
681         start_result = syncStartAll();
682         if(start_result) {
683             break;
684         } else {
685             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
686         }
687     }
688     if (!start_result) {
689         debugFatal("Could not syncStartAll...\n");
690         return false;
691     }
692
693     return true;
694 }
695
696 bool StreamProcessorManager::stop() {
697     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
698
699     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
700     // switch SP's over to the dry-running state
701     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
702           it != m_ReceiveProcessors.end();
703           ++it ) {
704         if((*it)->isRunning()) {
705             if(!(*it)->scheduleStopRunning(-1)) {
706                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
707                 return false;
708             }
709         }
710     }
711     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
712           it != m_TransmitProcessors.end();
713           ++it ) {
714         if((*it)->isRunning()) {
715             if(!(*it)->scheduleStopRunning(-1)) {
716                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
717                 return false;
718             }
719         }
720     }
721     // wait for the SP's to get into the dry-running/stopped state
722     int cnt = 8000;
723     bool ready = false;
724     while (!ready && cnt) {
725         ready = true;
726         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
727             it != m_ReceiveProcessors.end();
728             ++it ) {
729             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream());
730         }
731         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
732             it != m_TransmitProcessors.end();
733             ++it ) {
734             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream());
735         }
736         SleepRelativeUsec(125);
737         cnt--;
738     }
739     if(cnt==0) {
740         debugWarning(" Timeout waiting for the SP's to start dry-running\n");
741         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
742             it != m_ReceiveProcessors.end();
743             ++it ) {
744             (*it)->dumpInfo();
745         }
746         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
747             it != m_TransmitProcessors.end();
748             ++it ) {
749             (*it)->dumpInfo();
750         }
751         return false;
752     }
753
754     // switch SP's over to the stopped state
755     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
756           it != m_ReceiveProcessors.end();
757           ++it ) {
758         if(!(*it)->scheduleStopDryRunning(-1)) {
759             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
760             return false;
761         }
762     }
763     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
764           it != m_TransmitProcessors.end();
765           ++it ) {
766         if(!(*it)->scheduleStopDryRunning(-1)) {
767             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
768             return false;
769         }
770     }
771     // wait for the SP's to get into the stopped state
772     cnt = 8000;
773     ready = false;
774     while (!ready && cnt) {
775         ready = true;
776         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
777             it != m_ReceiveProcessors.end();
778             ++it ) {
779             ready &= (*it)->isStopped();
780         }
781         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
782             it != m_TransmitProcessors.end();
783             ++it ) {
784             ready &= (*it)->isStopped();
785         }
786         SleepRelativeUsec(125);
787         cnt--;
788     }
789     if(cnt==0) {
790         debugWarning(" Timeout waiting for the SP's to stop\n");
791         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
792             it != m_ReceiveProcessors.end();
793             ++it ) {
794             (*it)->dumpInfo();
795         }
796         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
797             it != m_TransmitProcessors.end();
798             ++it ) {
799             (*it)->dumpInfo();
800         }
801         return false;
802     }
803     return true;
804 }
805
806 /**
807  * Called upon Xrun events. This brings all StreamProcessors back
808  * into their starting state, and then carries on streaming. This should
809  * have the same effect as restarting the whole thing.
810  *
811  * @return true if successful, false otherwise
812  */
813 bool StreamProcessorManager::handleXrun() {
814
815     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
816
817     dumpInfo();
818
819     /*
820      * Reset means:
821      * 1) Disabling the SP's, so that they don't process any packets
822      *    note: the isomanager does keep on delivering/requesting them
823      * 2) Bringing all buffers & streamprocessors into a know state
824      *    - Clear all capture buffers
825      *    - Put nb_periods*period_size of null frames into the playback buffers
826      * 3) Re-enable the SP's
827      */
828
829     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
830     // start all SP's synchonized
831     bool start_result = false;
832     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
833         if(m_shutdown_needed) {
834             debugOutput(DEBUG_LEVEL_VERBOSE, "Shutdown requested...\n");
835             return true;
836         }
837         // put all SP's into dry-running state
838         if (!startDryRunning()) {
839             debugShowBackLog();
840             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
841             start_result = false;
842             continue;
843         }
844
845         start_result = syncStartAll();
846         if(start_result) {
847             break;
848         } else {
849             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
850         }
851     }
852     if (!start_result) {
853         debugFatal("Could not syncStartAll...\n");
854         return false;
855     }
856     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
857
858     return true;
859 }
860
861 /**
862  * @brief Waits until the next period of samples is ready
863  *
864  * This function does not return until a full period of samples is (or should be)
865  * ready to be transferred.
866  *
867  * @return true if the period is ready, false if not
868  */
869 bool StreamProcessorManager::waitForPeriod() {
870     if(m_SyncSource == NULL) return false;
871     if(m_shutdown_needed) return false;
872     bool xrun_occurred = false;
873     bool period_not_ready = true;
874
875     // grab the wait lock
876     // this ensures that bus reset handling doesn't interfere
877     Util::MutexLockHelper lock(*m_WaitLock);
878
879     while(period_not_ready) {
880         debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
881                            "waiting for period (%d frames in buffer)...\n",
882                            m_SyncSource->getBufferFill());
883
884         // wait for something to happen
885         switch(waitForActivity()) {
886             case eAR_Error:
887                 debugError("Error while waiting for activity\n");
888                 return false;
889             case eAR_Interrupted:
890                 // FIXME: what to do here?
891                 debugWarning("Interrupted while waiting for activity\n");
892                 break;
893             case eAR_Timeout:
894                 // FIXME: what to do here?
895                 debugWarning("Timeout while waiting for activity\n");
896                 break;
897             case eAR_Activity:
898                 // do nothing
899                 break;
900         }
901         debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "got activity...\n");
902
903         // HACK: this should be solved more elegantly
904         period_not_ready = false;
905         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
906             it != m_ReceiveProcessors.end();
907             ++it ) {
908             bool this_sp_period_ready = (*it)->canConsumePeriod();
909             if (!this_sp_period_ready) {
910                 period_not_ready = true;
911             }
912         }
913         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
914             it != m_TransmitProcessors.end();
915             ++it ) {
916             bool this_sp_period_ready = (*it)->canProducePeriod();
917             if (!this_sp_period_ready) {
918                 period_not_ready = true;
919             }
920         }
921         debugOutputExtreme(DEBUG_LEVEL_VERBOSE, " period not ready? %d...\n", period_not_ready);
922
923         // check for underruns on the ISO side,
924         // those should make us bail out of the wait loop
925         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
926             it != m_ReceiveProcessors.end();
927             ++it ) {
928             // a xrun has occurred on the Iso side
929             xrun_occurred |= (*it)->xrunOccurred();
930         }
931         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
932             it != m_TransmitProcessors.end();
933             ++it ) {
934             // a xrun has occurred on the Iso side
935             xrun_occurred |= (*it)->xrunOccurred();
936         }
937         if(xrun_occurred) break;
938         // FIXME: make sure we also exit this loop when something else happens (e.g. signal, iso error)
939
940         // if we have to shutdown due to some async event (busreset), do so
941         if(m_shutdown_needed) break;
942     }
943
944     if(xrun_occurred) {
945         debugOutput( DEBUG_LEVEL_VERBOSE, "exit due to xrun...\n");
946     }
947
948     // we save the 'ideal' time of the transfer at this point,
949     // because we can have interleaved read - process - write
950     // cycles making that we modify a receiving stream's buffer
951     // before we get to writing.
952     // NOTE: before waitForPeriod() is called again, both the transmit
953     //       and the receive processors should have done their transfer.
954     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
955    
956     #ifdef DEBUG
957     static uint64_t m_time_of_transfer2 = m_time_of_transfer;
958    
959     int ticks_per_period = (int)(m_SyncSource->getTicksPerFrame() * m_period);
960     int diff=diffTicks(m_time_of_transfer, m_time_of_transfer2);
961     // display message if the difference between two successive tick
962     // values is more than 50 ticks. 1 sample at 48k is 512 ticks
963     // so 50 ticks = 10%, which is a rather large jitter value.
964     if(diff-ticks_per_period > 50 || diff-ticks_per_period < -50) {
965         debugOutput(DEBUG_LEVEL_VERBOSE, "rather large TSP difference TS=%011llu => TS=%011llu (%d, nom %d)\n",
966                                             m_time_of_transfer2, m_time_of_transfer, diff, ticks_per_period);
967     }
968     m_time_of_transfer2 = m_time_of_transfer;
969     #endif
970
971     debugOutputExtreme( DEBUG_LEVEL_VERBOSE,
972                         "transfer at %llu ticks...\n",
973                         m_time_of_transfer);
974
975     // this is to notify the client of the delay that we introduced by waiting
976     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
977     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
978                         "delayed for %d usecs...\n",
979                         m_delayed_usecs);
980
981 #ifdef DEBUG
982     int rcv_bf=0, xmt_bf=0;
983     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
984         it != m_ReceiveProcessors.end();
985         ++it ) {
986         rcv_bf = (*it)->getBufferFill();
987     }
988     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
989         it != m_TransmitProcessors.end();
990         ++it ) {
991         xmt_bf = (*it)->getBufferFill();
992     }
993     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
994                         "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
995                         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
996
997     // check if xruns occurred on the Iso side.
998     // also check if xruns will occur should we transfer() now
999     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1000           it != m_ReceiveProcessors.end();
1001           ++it ) {
1002
1003         if ((*it)->xrunOccurred()) {
1004             debugWarning("Xrun on RECV SP %p due to ISO side xrun\n",*it);
1005             (*it)->dumpInfo();
1006         }
1007         if (!((*it)->canClientTransferFrames(m_period))) {
1008             debugWarning("Xrun on RECV SP %p due to buffer side xrun\n",*it);
1009             (*it)->dumpInfo();
1010         }
1011     }
1012     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1013           it != m_TransmitProcessors.end();
1014           ++it ) {
1015         if ((*it)->xrunOccurred()) {
1016             debugWarning("Xrun on XMIT SP %p due to ISO side xrun\n",*it);
1017         }
1018         if (!((*it)->canClientTransferFrames(m_period))) {
1019             debugWarning("Xrun on XMIT SP %p due to buffer side xrun\n",*it);
1020         }
1021     }
1022 #endif
1023
1024     m_nbperiods++;
1025
1026     // now we can signal the client that we are (should be) ready
1027     return !xrun_occurred;
1028 }
1029
1030 /**
1031  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
1032  *
1033  * Transfers one period of frames from the client side to the Iso side and vice versa.
1034  *
1035  * @return true if successful, false otherwise (indicates xrun).
1036  */
1037 bool StreamProcessorManager::transfer() {
1038     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
1039     bool retval=true;
1040     retval &= transfer(StreamProcessor::ePT_Receive);
1041     retval &= transfer(StreamProcessor::ePT_Transmit);
1042     return retval;
1043 }
1044
1045 /**
1046  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
1047  *
1048  * Transfers one period of frames from the client side to the Iso side or vice versa.
1049  *
1050  * @param t The processor type to tranfer for (receive or transmit)
1051  * @return true if successful, false otherwise (indicates xrun).
1052  */
1053 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
1054     if(m_SyncSource == NULL) return false;
1055     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
1056         "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
1057         t, m_time_of_transfer,
1058         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
1059         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
1060         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
1061
1062     bool retval = true;
1063     // a static cast could make sure that there is no performance
1064     // penalty for the virtual functions (to be checked)
1065     if (t==StreamProcessor::ePT_Receive) {
1066         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1067                 it != m_ReceiveProcessors.end();
1068                 ++it ) {
1069             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
1070                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
1071                             m_period, m_time_of_transfer,*it);
1072                 retval &= false; // buffer underrun
1073             }
1074         }
1075     } else {
1076         // FIXME: in the SPM it would be nice to have system time instead of
1077         //        1394 time
1078         float rate = m_SyncSource->getTicksPerFrame();
1079         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)((m_nb_buffers * m_period))) * rate);
1080
1081         // the data we are putting into the buffer is intended to be transmitted
1082         // one ringbuffer size after it has been received
1083         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1084
1085         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1086                 it != m_TransmitProcessors.end();
1087                 ++it ) {
1088             // FIXME: in the SPM it would be nice to have system time instead of
1089             //        1394 time
1090             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
1091                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
1092                         m_period, transmit_timestamp, *it);
1093                 retval &= false; // buffer underrun
1094             }
1095         }
1096     }
1097     return retval;
1098 }
1099
1100 /**
1101  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
1102  *
1103  * Transfers one period of silence to the Iso side for transmit SP's
1104  * or dump one period of frames for receive SP's
1105  *
1106  * @return true if successful, false otherwise (indicates xrun).
1107  */
1108 bool StreamProcessorManager::transferSilence() {
1109     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
1110     bool retval=true;
1111     // NOTE: the order here is opposite from the order in
1112     // normal operation (transmit is before receive), because
1113     // we can do that here (data=silence=available) and
1114     // it increases reliability (esp. on startup)
1115     retval &= transferSilence(StreamProcessor::ePT_Transmit);
1116     retval &= transferSilence(StreamProcessor::ePT_Receive);
1117     return retval;
1118 }
1119
1120 /**
1121  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
1122  *
1123  * Transfers one period of silence to the Iso side for transmit SP's
1124  * or dump one period of frames for receive SP's
1125  *
1126  * @param t The processor type to tranfer for (receive or transmit)
1127  * @return true if successful, false otherwise (indicates xrun).
1128  */
1129 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
1130     if(m_SyncSource == NULL) return false;
1131     debugOutput( DEBUG_LEVEL_VERY_VERBOSE,
1132         "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
1133         t, m_time_of_transfer,
1134         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
1135         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
1136         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
1137
1138     bool retval = true;
1139     // a static cast could make sure that there is no performance
1140     // penalty for the virtual functions (to be checked)
1141     if (t==StreamProcessor::ePT_Receive) {
1142         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1143                 it != m_ReceiveProcessors.end();
1144                 ++it ) {
1145             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
1146                     debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n",
1147                             m_period, m_time_of_transfer,*it);
1148                 retval &= false; // buffer underrun
1149             }
1150         }
1151     } else {
1152         // FIXME: in the SPM it would be nice to have system time instead of
1153         //        1394 time
1154         float rate = m_SyncSource->getTicksPerFrame();
1155         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
1156
1157         // the data we are putting into the buffer is intended to be transmitted
1158         // one ringbuffer size after it has been received
1159         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1160
1161         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1162                 it != m_TransmitProcessors.end();
1163                 ++it ) {
1164             // FIXME: in the SPM it would be nice to have system time instead of
1165             //        1394 time
1166             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
1167                 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
1168                         m_period, transmit_timestamp, *it);
1169                 retval &= false; // buffer underrun
1170             }
1171         }
1172     }
1173     return retval;
1174 }
1175
1176 void StreamProcessorManager::dumpInfo() {
1177     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1178     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
1179     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
1180     debugOutputShort( DEBUG_LEVEL_NORMAL, "Data type: %s\n", (m_audio_datatype==eADT_Float?"float":"int24"));
1181
1182     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
1183     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1184         it != m_ReceiveProcessors.end();
1185         ++it ) {
1186         (*it)->dumpInfo();
1187     }
1188
1189     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
1190     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1191         it != m_TransmitProcessors.end();
1192         ++it ) {
1193         (*it)->dumpInfo();
1194     }
1195
1196     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1197
1198 }
1199
1200 void StreamProcessorManager::setVerboseLevel(int l) {
1201     setDebugLevel(l);
1202     if(m_WaitLock) m_WaitLock->setVerboseLevel(l);
1203
1204     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
1205     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1206         it != m_ReceiveProcessors.end();
1207         ++it ) {
1208         (*it)->setVerboseLevel(l);
1209     }
1210
1211     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
1212     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1213         it != m_TransmitProcessors.end();
1214         ++it ) {
1215         (*it)->setVerboseLevel(l);
1216     }
1217 }
1218
1219
1220 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1221     int count=0;
1222
1223     if (direction == Port::E_Capture) {
1224         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1225             it != m_ReceiveProcessors.end();
1226             ++it ) {
1227             count += (*it)->getPortCount(type);
1228         }
1229     } else {
1230         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1231             it != m_TransmitProcessors.end();
1232             ++it ) {
1233             count += (*it)->getPortCount(type);
1234         }
1235     }
1236     return count;
1237 }
1238
1239 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1240     int count=0;
1241
1242     if (direction == Port::E_Capture) {
1243         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1244             it != m_ReceiveProcessors.end();
1245             ++it ) {
1246             count += (*it)->getPortCount();
1247         }
1248     } else {
1249         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1250             it != m_TransmitProcessors.end();
1251             ++it ) {
1252             count += (*it)->getPortCount();
1253         }
1254     }
1255     return count;
1256 }
1257
1258 // TODO: implement a port map here, instead of the loop
1259
1260 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1261     int count=0;
1262     int prevcount=0;
1263
1264     if (direction == Port::E_Capture) {
1265         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1266             it != m_ReceiveProcessors.end();
1267             ++it ) {
1268             count += (*it)->getPortCount();
1269             if (count > idx) {
1270                 return (*it)->getPortAtIdx(idx-prevcount);
1271             }
1272             prevcount=count;
1273         }
1274     } else {
1275         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1276             it != m_TransmitProcessors.end();
1277             ++it ) {
1278             count += (*it)->getPortCount();
1279             if (count > idx) {
1280                 return (*it)->getPortAtIdx(idx-prevcount);
1281             }
1282             prevcount=count;
1283         }
1284     }
1285     return NULL;
1286 }
1287
1288 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1289     m_thread_realtime=rt;
1290     m_thread_priority=priority;
1291     return true;
1292 }
1293
1294
1295 } // end of namespace
Note: See TracBrowser for help on using the browser.