root/branches/libffado-2.0/src/libstreaming/StreamProcessorManager.cpp

Revision 1346, 53.0 kB (checked in by ppalmers, 16 years ago)

add one syncdelay worth of frames to the roundtrip loop. this should allow to use only 2 buffers instead of 3

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23 #include "config.h"
24
25 #include "StreamProcessorManager.h"
26 #include "generic/StreamProcessor.h"
27 #include "generic/Port.h"
28 #include "libieee1394/cycletimer.h"
29
30 #include "devicemanager.h"
31
32 #include "libutil/Time.h"
33
34 #include <errno.h>
35 #include <assert.h>
36 #include <math.h>
37
38 namespace Streaming {
39
40 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
41
42 StreamProcessorManager::StreamProcessorManager(DeviceManager &p)
43     : m_is_slave( false )
44     , m_SyncSource(NULL)
45     , m_parent( p )
46     , m_xrun_happened( false )
47     , m_activity_wait_timeout_usec( 1000*1000 )
48     , m_nb_buffers( 0 )
49     , m_period( 0 )
50     , m_audio_datatype( eADT_Float )
51     , m_nominal_framerate ( 0 )
52     , m_xruns(0)
53     , m_shutdown_needed(false)
54     , m_nbperiods(0)
55     , m_WaitLock( new Util::PosixMutex("SPMWAIT") )
56 {
57     addOption(Util::OptionContainer::Option("slaveMode",false));
58     sem_init(&m_activity_semaphore, 0, 0);
59 }
60
61 StreamProcessorManager::StreamProcessorManager(DeviceManager &p, unsigned int period,
62                                                unsigned int framerate, unsigned int nb_buffers)
63     : m_is_slave( false )
64     , m_SyncSource(NULL)
65     , m_parent( p )
66     , m_xrun_happened( false )
67     , m_activity_wait_timeout_usec( 1000*1000 )
68     , m_nb_buffers(nb_buffers)
69     , m_period(period)
70     , m_audio_datatype( eADT_Float )
71     , m_nominal_framerate ( framerate )
72     , m_xruns(0)
73     , m_shutdown_needed(false)
74     , m_nbperiods(0)
75     , m_WaitLock( new Util::PosixMutex("SPMWAIT") )
76 {
77     addOption(Util::OptionContainer::Option("slaveMode",false));
78     sem_init(&m_activity_semaphore, 0, 0);
79 }
80
81 StreamProcessorManager::~StreamProcessorManager() {
82     sem_post(&m_activity_semaphore);
83     sem_destroy(&m_activity_semaphore);
84     delete m_WaitLock;
85 }
86
87 // void
88 // StreamProcessorManager::handleBusReset(Ieee1394Service &s)
89 // {
90 // //     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) Handle bus reset on service %p...\n", this, &s);
91 // //
92 // //     bool handled_at_least_one = false;
93 // //     // note that all receive streams are gone once a device is unplugged
94 // //
95 // //     // synchronize with the wait lock
96 // //     Util::MutexLockHelper lock(*m_WaitLock);
97 // //
98 // //     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) got wait lock...\n", this);
99 // //     // cause all SP's to bail out
100 // //     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
101 // //           it != m_ReceiveProcessors.end();
102 // //           ++it )
103 // //     {
104 // //         if(&s == &((*it)->getParent().get1394Service())) {
105 // //             debugOutput(DEBUG_LEVEL_NORMAL,
106 // //                         "issue busreset on receive SPM on channel %d\n",
107 // //                         (*it)->getChannel());
108 // //             (*it)->handleBusReset();
109 // //             handled_at_least_one = true;
110 // //         } else {
111 // //             debugOutput(DEBUG_LEVEL_NORMAL,
112 // //                         "skipping receive SPM on channel %d since not on service %p\n",
113 // //                         (*it)->getChannel(), &s);
114 // //         }
115 // //     }
116 // //     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
117 // //           it != m_TransmitProcessors.end();
118 // //           ++it )
119 // //     {
120 // //         if(&s == &((*it)->getParent().get1394Service())) {
121 // //             debugOutput(DEBUG_LEVEL_NORMAL,
122 // //                         "issue busreset on transmit SPM on channel %d\n",
123 // //                         (*it)->getChannel());
124 // //             (*it)->handleBusReset();
125 // //             handled_at_least_one = true;
126 // //         } else {
127 // //             debugOutput(DEBUG_LEVEL_NORMAL,
128 // //                         "skipping transmit SPM on channel %d since not on service %p\n",
129 // //                         (*it)->getChannel(), &s);
130 // //         }
131 // //     }
132 // //
133 // //     // FIXME: we request shutdown for now.
134 // //     m_shutdown_needed = handled_at_least_one;
135 // }
136
137 void
138 StreamProcessorManager::signalActivity()
139 {
140     sem_post(&m_activity_semaphore);
141     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p activity\n", this);
142 }
143
144 enum StreamProcessorManager::eActivityResult
145 StreamProcessorManager::waitForActivity()
146 {
147     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p waiting for activity\n", this);
148     struct timespec ts;
149     int result;
150
151     long long int timeout_nsec=0;
152     if (m_activity_wait_timeout_usec >= 0) {
153         timeout_nsec = m_activity_wait_timeout_usec * 1000LL;
154
155         if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
156             debugError("clock_gettime failed\n");
157             return eAR_Error;
158         }
159         ts.tv_nsec += timeout_nsec;
160         while(ts.tv_nsec >= 1000000000LL) {
161             ts.tv_sec += 1;
162             ts.tv_nsec -= 1000000000LL;
163         }
164     }
165
166     if (m_activity_wait_timeout_usec >= 0) {
167         result = sem_timedwait(&m_activity_semaphore, &ts);
168     } else {
169         result = sem_wait(&m_activity_semaphore);
170     }
171
172     if(result != 0) {
173         if (errno == ETIMEDOUT) {
174             debugOutput(DEBUG_LEVEL_VERBOSE,
175                         "(%p) sem_timedwait() timed out (result=%d)\n",
176                         this, result);
177             return eAR_Timeout;
178         } else if (errno == EINTR) {
179             debugOutput(DEBUG_LEVEL_VERBOSE,
180                         "(%p) sem_[timed]wait() interrupted by signal (result=%d)\n",
181                         this, result);
182             return eAR_Interrupted;
183         } else if (errno == EINVAL) {
184             debugError("(%p) sem_[timed]wait error (result=%d errno=EINVAL)\n",
185                         this, result);
186             debugError("(%p) timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n",
187                        this, timeout_nsec, ts.tv_sec, ts.tv_nsec);
188             return eAR_Error;
189         } else {
190             debugError("(%p) sem_[timed]wait error (result=%d errno=%d)\n",
191                         this, result, errno);
192             debugError("(%p) timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n",
193                        this, timeout_nsec, ts.tv_sec, ts.tv_nsec);
194             return eAR_Error;
195         }
196     }
197
198     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p got activity\n", this);
199     return eAR_Activity;
200 }
201
202 /**
203  * Registers \ref processor with this manager.
204  *
205  * also registers it with the isohandlermanager
206  *
207  * be sure to call isohandlermanager->init() first!
208  * and be sure that the processors are also ->init()'ed
209  *
210  * @param processor
211  * @return true if successfull
212  */
213 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
214 {
215     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
216     assert(processor);
217     if (processor->getType() == StreamProcessor::ePT_Receive) {
218         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
219         m_ReceiveProcessors.push_back(processor);
220         return true;
221     }
222
223     if (processor->getType() == StreamProcessor::ePT_Transmit) {
224         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
225         m_TransmitProcessors.push_back(processor);
226         return true;
227     }
228
229     debugFatal("Unsupported processor type!\n");
230     return false;
231 }
232
233 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
234 {
235     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
236     assert(processor);
237
238     if (processor->getType()==StreamProcessor::ePT_Receive) {
239
240         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
241               it != m_ReceiveProcessors.end();
242               ++it )
243         {
244             if ( *it == processor ) {
245                 if (*it == m_SyncSource) {
246                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
247                     m_SyncSource = NULL;
248                 }
249                 m_ReceiveProcessors.erase(it);
250                 return true;
251             }
252         }
253     }
254
255     if (processor->getType()==StreamProcessor::ePT_Transmit) {
256         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
257               it != m_TransmitProcessors.end();
258               ++it )
259         {
260             if ( *it == processor ) {
261                 if (*it == m_SyncSource) {
262                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
263                     m_SyncSource = NULL;
264                 }
265                 m_TransmitProcessors.erase(it);
266                 return true;
267             }
268         }
269     }
270
271     debugFatal("Processor (%p) not found!\n",processor);
272     return false; //not found
273 }
274
275 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
276     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
277     m_SyncSource=s;
278     return true;
279 }
280
281 bool StreamProcessorManager::prepare() {
282
283     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
284     m_is_slave=false;
285     if(!getOption("slaveMode", m_is_slave)) {
286         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
287     }
288
289     m_shutdown_needed=false;
290
291     // if no sync source is set, select one here
292     if(m_SyncSource == NULL) {
293        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
294     }
295
296     // FIXME: put into separate method
297     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
298           it != m_ReceiveProcessors.end();
299           ++it )
300     {
301         if(m_SyncSource == NULL) {
302             debugWarning(" => Sync Source is %p.\n", *it);
303             m_SyncSource = *it;
304         }
305     }
306     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
307           it != m_TransmitProcessors.end();
308           ++it )
309     {
310         if(m_SyncSource == NULL) {
311             debugWarning(" => Sync Source is %p.\n", *it);
312             m_SyncSource = *it;
313         }
314     }
315
316     // now do the actual preparation of the SP's
317     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
318     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
319         it != m_ReceiveProcessors.end();
320         ++it ) {
321
322         if(!(*it)->setOption("slaveMode", m_is_slave)) {
323             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
324         }
325
326         if(!(*it)->prepare()) {
327             debugFatal(  " could not prepare (%p)...\n",(*it));
328             return false;
329         }
330     }
331     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
332     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
333         it != m_TransmitProcessors.end();
334         ++it ) {
335         if(!(*it)->setOption("slaveMode", m_is_slave)) {
336             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
337         }
338         if(!(*it)->prepare()) {
339             debugFatal( " could not prepare (%p)...\n",(*it));
340             return false;
341         }
342     }
343
344     // if there are no stream processors registered,
345     // fail
346     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
347         debugFatal("No stream processors registered, can't do anything usefull\n");
348         return false;
349     }
350
351     // set the activity timeout value to two periods worth of usecs.
352     // since we can expect activity once every period, but we might have some
353     // offset, the safe value is two periods.
354     int timeout_usec = 2*1000LL * 1000LL * m_period / m_nominal_framerate;
355     debugOutput(DEBUG_LEVEL_VERBOSE, "setting activity timeout to %d\n", timeout_usec);
356     setActivityWaitTimeoutUsec(timeout_usec);
357
358     return true;
359 }
360
361 bool
362 StreamProcessorManager::startDryRunning()
363 {
364     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
365     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
366     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
367             it != m_TransmitProcessors.end();
368             ++it ) {
369         if ((*it)->inError()) {
370             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
371             return false;
372         }
373         if (!(*it)->isDryRunning()) {
374             if(!(*it)->scheduleStartDryRunning(-1)) {
375                 debugError("Could not put SP %p into the dry-running state\n", *it);
376                 return false;
377             }
378         } else {
379             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
380         }
381     }
382     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
383             it != m_ReceiveProcessors.end();
384             ++it ) {
385         if ((*it)->inError()) {
386             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
387             return false;
388         }
389         if (!(*it)->isDryRunning()) {
390             if(!(*it)->scheduleStartDryRunning(-1)) {
391                 debugError("Could not put SP %p into the dry-running state\n", *it);
392                 return false;
393             }
394         } else {
395             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
396         }
397     }
398     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
399     // wait for the syncsource to start running.
400     // that will block the waitForPeriod call until everyone has started (theoretically)
401     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_DRYRUN; // by then it should have started
402     bool all_dry_running = false;
403     while (!all_dry_running && cnt) {
404         all_dry_running = true;
405         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
406                 it != m_ReceiveProcessors.end();
407                 ++it ) {
408             all_dry_running &= (*it)->isDryRunning();
409         }
410         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
411                 it != m_TransmitProcessors.end();
412                 ++it ) {
413             all_dry_running &= (*it)->isDryRunning();
414         }
415
416         SleepRelativeUsec(125);
417         cnt--;
418     }
419     if(cnt==0) {
420         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
421         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
422                 it != m_ReceiveProcessors.end();
423                 ++it ) {
424             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
425                 (*it)->getTypeString(), *it, (*it)->getStateString());
426         }
427         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
428                 it != m_TransmitProcessors.end();
429                 ++it ) {
430             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
431                 (*it)->getTypeString(), *it, (*it)->getStateString());
432         }
433         return false;
434     }
435     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
436     return true;
437 }
438
439 bool StreamProcessorManager::syncStartAll() {
440     if(m_SyncSource == NULL) return false;
441     // figure out when to get the SP's running.
442     // the xmit SP's should also know the base timestamp
443     // streams should be aligned here
444
445     // now find out how long we have to delay the wait operation such that
446     // the received frames will all be presented to the SP
447     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
448     int max_of_min_delay = 0;
449     int min_delay = 0;
450     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
451             it != m_ReceiveProcessors.end();
452             ++it ) {
453         min_delay = (*it)->getMaxFrameLatency();
454         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
455     }
456
457     // add some processing margin. This only shifts the time
458     // at which the buffer is transfer()'ed. This makes things somewhat
459     // more robust. It should be noted though that shifting the transfer
460     // time to a later time instant also causes the xmit buffer fill to be
461     // lower on average.
462     max_of_min_delay += STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS;
463
464     m_SyncSource->setSyncDelay(max_of_min_delay);
465     unsigned int syncdelay = m_SyncSource->getSyncDelay();
466     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d => %d ticks (%03us %04uc %04ut)...\n",
467         max_of_min_delay, syncdelay,
468         (unsigned int)TICKS_TO_SECS(syncdelay),
469         (unsigned int)TICKS_TO_CYCLES(syncdelay),
470         (unsigned int)TICKS_TO_OFFSET(syncdelay));
471
472     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
473     //        have aquired lock
474     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
475     //sleep(2); // FIXME: be smarter here
476
477     // make sure that we are dry-running long enough for the
478     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
479     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
480
481     unsigned int nb_sync_runs = (STREAMPROCESSORMANAGER_SYNC_WAIT_TIME_MSEC * getNominalRate());
482     nb_sync_runs /= 1000;
483     nb_sync_runs /= getPeriodSize();
484
485     int64_t time_till_next_period;
486     while(nb_sync_runs--) { // or while not sync-ed?
487         // check if we were woken up too soon
488         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
489         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
490         if(time_till_next_period > 0) {
491             // wait for the period
492             SleepRelativeUsec(time_till_next_period);
493         }
494     }
495
496     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
497     // FIXME: in the SPM it would be nice to have system time instead of
498     //        1394 time
499
500     // we now should have decent sync info on the sync source
501     // determine a point in time where the system should start
502     // figure out where we are now
503     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
504     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
505         time_of_first_sample,
506         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
507         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
508         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
509
510     // start wet-running in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles
511     // this is the time window we have to setup all SP's such that they
512     // can start wet-running correctly.
513     time_of_first_sample = addTicks(time_of_first_sample,
514                                     STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP * TICKS_PER_CYCLE);
515
516     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
517         time_of_first_sample,
518         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
519         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
520         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
521
522     // we should start wet-running the transmit SP's some cycles in advance
523     // such that we know it is wet-running when it should output its first sample
524     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
525                                                  STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE);
526
527     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
528                                                  STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE);
529     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
530         time_to_start_xmit,
531         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
532         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
533         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
534     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
535         time_to_start_recv,
536         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
537         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
538         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
539
540     // at this point the buffer head timestamp of the transmit buffers can be set
541     // this is the presentation time of the first sample in the buffer
542     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
543           it != m_TransmitProcessors.end();
544           ++it ) {
545         (*it)->setBufferHeadTimestamp(time_of_first_sample);
546     }
547
548     // switch syncsource to running state
549     uint64_t time_to_start_sync;
550     // FIXME: this is most likely not going to work for transmit sync sources
551     // but those are unsupported in this version
552     if(m_SyncSource->getType() == StreamProcessor::ePT_Receive ) {
553         time_to_start_sync = time_to_start_recv;
554     } else {
555         time_to_start_sync = time_to_start_xmit;
556     }
557     if(!m_SyncSource->scheduleStartRunning(time_to_start_sync)) {
558         debugError("m_SyncSource->scheduleStartRunning(%11llu) failed\n", time_to_start_sync);
559         return false;
560     }
561
562     // STEP X: switch all non-syncsource SP's over to the running state
563     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
564           it != m_ReceiveProcessors.end();
565           ++it ) {
566         if(*it != m_SyncSource) {
567             if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
568                 debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
569                 return false;
570             }
571         }
572     }
573     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
574           it != m_TransmitProcessors.end();
575           ++it ) {
576         if(*it != m_SyncSource) {
577             if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
578                 debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
579                 return false;
580             }
581         }
582     }
583     // wait for the syncsource to start running.
584     // that will block the waitForPeriod call until everyone has started (theoretically)
585     // note: the SP's are scheduled to start in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles,
586     // so a 20 times this value should be a good timeout
587     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP * 20; // by then it should have started
588     while (!m_SyncSource->isRunning() && cnt) {
589         SleepRelativeUsec(125);
590         cnt--;
591     }
592     if(cnt==0) {
593         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
594         return false;
595     }
596
597     // the sync source is running, we can now read a decent received timestamp from it
598     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
599
600     // and a (still very rough) approximation of the rate
601     float rate = m_SyncSource->getTicksPerFrame();
602     int64_t delay_in_ticks=(int64_t)(((float)((m_nb_buffers-1) * m_period)) * rate);
603     // also add the sync delay
604     delay_in_ticks += m_SyncSource->getSyncDelay();
605     debugOutput( DEBUG_LEVEL_VERBOSE, "  initial time of transfer %010lld, rate %f...\n",
606                 m_time_of_transfer, rate);
607
608     // then use this information to initialize the xmit handlers
609
610     //  we now set the buffer tail timestamp of the transmit buffer
611     //  to the period transfer time instant plus what's nb_buffers - 1
612     //  in ticks. This due to the fact that we (should) have received one period
613     //  worth of ticks at t=m_time_of_transfer
614     //  hence one period of frames should also have been transmitted, which means
615     //  that there should be (nb_buffers - 1) * periodsize of frames in the xmit buffer
616     //  that allows us to calculate the tail timestamp for the buffer.
617
618     int64_t transmit_tail_timestamp = addTicks(m_time_of_transfer, delay_in_ticks);
619
620     debugOutput( DEBUG_LEVEL_VERBOSE, "  preset transmit tail TS %010lld, rate %f...\n",
621                 transmit_tail_timestamp, rate);
622
623     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
624         it != m_TransmitProcessors.end();
625         ++it ) {
626         (*it)->setBufferTailTimestamp(transmit_tail_timestamp);
627         (*it)->setTicksPerFrame(rate);
628     }
629
630     // align the received streams to be phase aligned
631     if(!alignReceivedStreams()) {
632         debugError("Could not align streams...\n");
633         return false;
634     }
635
636     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
637     return true;
638 }
639
640 bool
641 StreamProcessorManager::alignReceivedStreams()
642 {
643     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
644     unsigned int nb_sync_runs;
645     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
646     int64_t diff_between_streams[nb_rcv_sp];
647     int64_t diff;
648
649     unsigned int i;
650
651     unsigned int periods_per_align_try = (STREAMPROCESSORMANAGER_ALIGN_AVERAGE_TIME_MSEC * getNominalRate());
652     periods_per_align_try /= 1000;
653     periods_per_align_try /= getPeriodSize();
654     debugOutput( DEBUG_LEVEL_VERBOSE, " averaging over %u periods...\n", periods_per_align_try);
655
656     bool aligned = false;
657     int cnt = STREAMPROCESSORMANAGER_NB_ALIGN_TRIES;
658     while (!aligned && cnt--) {
659         nb_sync_runs = periods_per_align_try;
660         while(nb_sync_runs) {
661             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
662             if(!waitForPeriod()) {
663                 debugWarning("xrun while aligning streams...\n");
664                 return false;
665             }
666
667             // before we do anything else, transfer
668             if(!transferSilence()) {
669                 debugError("Could not transfer silence\n");
670                 return false;
671             }
672
673             // now calculate the stream offset
674             i = 0;
675             for ( i = 0; i < nb_rcv_sp; i++) {
676                 StreamProcessor *s = m_ReceiveProcessors.at(i);
677                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
678                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",
679                     m_SyncSource, s, diff);
680                 if ( nb_sync_runs == periods_per_align_try ) {
681                     diff_between_streams[i] = diff;
682                 } else {
683                     diff_between_streams[i] += diff;
684                 }
685             }
686
687             nb_sync_runs--;
688         }
689         // calculate the average offsets
690         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
691         int diff_between_streams_frames[nb_rcv_sp];
692         aligned = true;
693         for ( i = 0; i < nb_rcv_sp; i++) {
694             StreamProcessor *s = m_ReceiveProcessors.at(i);
695
696             diff_between_streams[i] /= periods_per_align_try;
697             diff_between_streams_frames[i] = (int)roundf(diff_between_streams[i] / s->getTicksPerFrame());
698             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",
699                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
700
701             aligned &= (diff_between_streams_frames[i] == 0);
702
703             // reposition the stream
704             if(!s->shiftStream(diff_between_streams_frames[i])) {
705                 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]);
706                 return false;
707             }
708         }
709         if (!aligned) {
710             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
711         }
712     }
713     if (cnt == 0) {
714         debugError("Align failed\n");
715         return false;
716     }
717     return true;
718 }
719
720 bool StreamProcessorManager::start() {
721     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
722
723     // start all SP's synchonized
724     bool start_result = false;
725     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
726         // put all SP's into dry-running state
727         if (!startDryRunning()) {
728             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
729             start_result = false;
730             continue;
731         }
732
733         start_result = syncStartAll();
734         if(start_result) {
735             break;
736         } else {
737             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
738             if(m_shutdown_needed) {
739                 debugOutput(DEBUG_LEVEL_VERBOSE, "Some fatal error occurred, stop trying.\n");
740                 return false;
741             }
742         }
743     }
744     if (!start_result) {
745         debugFatal("Could not syncStartAll...\n");
746         return false;
747     }
748     debugOutput( DEBUG_LEVEL_VERBOSE, " Started...\n");
749     return true;
750 }
751
752 bool StreamProcessorManager::stop() {
753     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
754
755     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
756     // switch SP's over to the dry-running state
757     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
758           it != m_ReceiveProcessors.end();
759           ++it ) {
760         if((*it)->isRunning()) {
761             if(!(*it)->scheduleStopRunning(-1)) {
762                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
763                 return false;
764             }
765         }
766     }
767     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
768           it != m_TransmitProcessors.end();
769           ++it ) {
770         if((*it)->isRunning()) {
771             if(!(*it)->scheduleStopRunning(-1)) {
772                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
773                 return false;
774             }
775         }
776     }
777     // wait for the SP's to get into the dry-running/stopped state
778     int cnt = 8000;
779     bool ready = false;
780     while (!ready && cnt) {
781         ready = true;
782         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
783             it != m_ReceiveProcessors.end();
784             ++it ) {
785             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream() || (*it)->inError());
786         }
787         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
788             it != m_TransmitProcessors.end();
789             ++it ) {
790             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream() || (*it)->inError());
791         }
792         SleepRelativeUsec(125);
793         cnt--;
794     }
795     if(cnt==0) {
796         debugWarning(" Timeout waiting for the SP's to start dry-running\n");
797         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
798             it != m_ReceiveProcessors.end();
799             ++it ) {
800             (*it)->dumpInfo();
801         }
802         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
803             it != m_TransmitProcessors.end();
804             ++it ) {
805             (*it)->dumpInfo();
806         }
807         return false;
808     }
809
810     // switch SP's over to the stopped state
811     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
812           it != m_ReceiveProcessors.end();
813           ++it ) {
814         if ((*it)->inError()) {
815             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
816         } else if(!(*it)->scheduleStopDryRunning(-1)) {
817             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
818             return false;
819         }
820     }
821     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
822           it != m_TransmitProcessors.end();
823           ++it ) {
824         if ((*it)->inError()) {
825             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
826         } else if(!(*it)->scheduleStopDryRunning(-1)) {
827             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
828             return false;
829         }
830     }
831     // wait for the SP's to get into the stopped state
832     cnt = 8000;
833     ready = false;
834     while (!ready && cnt) {
835         ready = true;
836         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
837             it != m_ReceiveProcessors.end();
838             ++it ) {
839             ready &= ((*it)->isStopped() || (*it)->inError());
840         }
841         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
842             it != m_TransmitProcessors.end();
843             ++it ) {
844             ready &= ((*it)->isStopped() || (*it)->inError());
845         }
846         SleepRelativeUsec(125);
847         cnt--;
848     }
849     if(cnt==0) {
850         debugWarning(" Timeout waiting for the SP's to stop\n");
851         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
852             it != m_ReceiveProcessors.end();
853             ++it ) {
854             (*it)->dumpInfo();
855         }
856         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
857             it != m_TransmitProcessors.end();
858             ++it ) {
859             (*it)->dumpInfo();
860         }
861         return false;
862     }
863     debugOutput( DEBUG_LEVEL_VERBOSE, " Stopped...\n");
864     return true;
865 }
866
867 /**
868  * Called upon Xrun events. This brings all StreamProcessors back
869  * into their starting state, and then carries on streaming. This should
870  * have the same effect as restarting the whole thing.
871  *
872  * @return true if successful, false otherwise
873  */
874 bool StreamProcessorManager::handleXrun() {
875
876     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
877
878     dumpInfo();
879
880     /*
881      * Reset means:
882      * 1) Disabling the SP's, so that they don't process any packets
883      *    note: the isomanager does keep on delivering/requesting them
884      * 2) Bringing all buffers & streamprocessors into a know state
885      *    - Clear all capture buffers
886      *    - Put nb_periods*period_size of null frames into the playback buffers
887      * 3) Re-enable the SP's
888      */
889
890     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
891     // start all SP's synchonized
892     bool start_result = false;
893     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
894         if(m_shutdown_needed) {
895             debugOutput(DEBUG_LEVEL_VERBOSE, "Shutdown requested...\n");
896             return true;
897         }
898         // put all SP's into dry-running state
899         if (!startDryRunning()) {
900             debugShowBackLog();
901             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
902             start_result = false;
903             continue;
904         }
905
906         start_result = syncStartAll();
907         if(start_result) {
908             break;
909         } else {
910             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
911         }
912     }
913     if (!start_result) {
914         debugFatal("Could not syncStartAll...\n");
915         return false;
916     }
917     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
918
919     return true;
920 }
921
922 /**
923  * @brief Waits until the next period of samples is ready
924  *
925  * This function does not return until a full period of samples is (or should be)
926  * ready to be transferred.
927  *
928  * @return true if the period is ready, false if not
929  */
930 bool StreamProcessorManager::waitForPeriod() {
931     if(m_SyncSource == NULL) return false;
932     if(m_shutdown_needed) return false;
933     bool xrun_occurred = false;
934     bool in_error = false;
935
936     // grab the wait lock
937     // this ensures that bus reset handling doesn't interfere
938     Util::MutexLockHelper lock(*m_WaitLock);
939     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
940                         "waiting for period (%d frames in buffer)...\n",
941                         m_SyncSource->getBufferFill());
942     uint64_t ticks_at_period = m_SyncSource->getTimeAtPeriod();
943     uint64_t ticks_at_period_margin = ticks_at_period + m_SyncSource->getSyncDelay();
944     uint64_t pred_system_time_at_xfer = m_SyncSource->getParent().get1394Service().getSystemTimeForCycleTimerTicks(ticks_at_period_margin);
945
946     #ifdef DEBUG
947     int64_t now = Util::SystemTimeSource::getCurrentTime();
948     debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "pred: %lld, now: %lld, wait: %lld\n", pred_system_time_at_xfer, now, pred_system_time_at_xfer-now );
949     #endif
950
951     // wait until it's time to transfer
952     Util::SystemTimeSource::SleepUsecAbsolute(pred_system_time_at_xfer);
953
954     #ifdef DEBUG
955     now = Util::SystemTimeSource::getCurrentTime();
956     debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "pred: %lld now: %lld, excess: %lld\n", pred_system_time_at_xfer, now, now-pred_system_time_at_xfer );
957     #endif
958
959     // the period should be ready now
960
961     #if STREAMPROCESSORMANAGER_ALLOW_DELAYED_PERIOD_SIGNAL
962     // HACK: we force wait until every SP is ready. this is needed
963     // since the raw1394 interface provides no control over interrupts
964     // resulting in very bad predictability on when the data is present.
965     bool period_not_ready = true;
966     while(period_not_ready) {
967         period_not_ready = false;
968         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
969             it != m_ReceiveProcessors.end();
970             ++it ) {
971             bool this_sp_period_ready = (*it)->canConsumePeriod();
972             if (!this_sp_period_ready) {
973                 period_not_ready = true;
974             }
975         }
976         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
977             it != m_TransmitProcessors.end();
978             ++it ) {
979             bool this_sp_period_ready = (*it)->canProducePeriod();
980             if (!this_sp_period_ready) {
981                 period_not_ready = true;
982             }
983         }
984
985         if (period_not_ready) {
986             debugOutput(DEBUG_LEVEL_VERBOSE, " wait extended since period not ready...\n");
987             Util::SystemTimeSource::SleepUsecRelative(125); // one cycle
988         }
989
990         // check for underruns/errors on the ISO side,
991         // those should make us bail out of the wait loop
992         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
993             it != m_ReceiveProcessors.end();
994             ++it ) {
995             // a xrun has occurred on the Iso side
996             xrun_occurred |= (*it)->xrunOccurred();
997             in_error |= (*it)->inError();
998         }
999         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1000             it != m_TransmitProcessors.end();
1001             ++it ) {
1002             // a xrun has occurred on the Iso side
1003             xrun_occurred |= (*it)->xrunOccurred();
1004             in_error |= (*it)->inError();
1005         }
1006         if(xrun_occurred | in_error | m_shutdown_needed) break;
1007     }
1008     #else
1009     // check for underruns/errors on the ISO side,
1010     // those should make us bail out of the wait loop
1011     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1012         it != m_ReceiveProcessors.end();
1013         ++it ) {
1014         // xrun on data buffer side
1015         if (!(*it)->canConsumePeriod()) {
1016             xrun_occurred = true;
1017         }
1018         // a xrun has occurred on the Iso side
1019         xrun_occurred |= (*it)->xrunOccurred();
1020         in_error |= (*it)->inError();
1021     }
1022     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1023         it != m_TransmitProcessors.end();
1024         ++it ) {
1025         // xrun on data buffer side
1026         if (!(*it)->canProducePeriod()) {
1027             xrun_occurred = true;
1028         }
1029         // a xrun has occurred on the Iso side
1030         xrun_occurred |= (*it)->xrunOccurred();
1031         in_error |= (*it)->inError();
1032     }
1033     #endif
1034
1035     if(xrun_occurred) {
1036         debugOutput( DEBUG_LEVEL_VERBOSE, "exit due to xrun...\n");
1037     }
1038     if(in_error) {
1039         debugOutput( DEBUG_LEVEL_VERBOSE, "exit due to error...\n");
1040         m_shutdown_needed = true;
1041     }
1042
1043     // we save the 'ideal' time of the transfer at this point,
1044     // because we can have interleaved read - process - write
1045     // cycles making that we modify a receiving stream's buffer
1046     // before we get to writing.
1047     // NOTE: before waitForPeriod() is called again, both the transmit
1048     //       and the receive processors should have done their transfer.
1049     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
1050    
1051     #ifdef DEBUG
1052     static uint64_t m_time_of_transfer2 = m_time_of_transfer;
1053    
1054     int ticks_per_period = (int)(m_SyncSource->getTicksPerFrame() * m_period);
1055     int diff=diffTicks(m_time_of_transfer, m_time_of_transfer2);
1056     // display message if the difference between two successive tick
1057     // values is more than 50 ticks. 1 sample at 48k is 512 ticks
1058     // so 50 ticks = 10%, which is a rather large jitter value.
1059     if(diff-ticks_per_period > 50 || diff-ticks_per_period < -50) {
1060         debugOutput(DEBUG_LEVEL_VERBOSE, "rather large TSP difference TS=%011llu => TS=%011llu (%d, nom %d)\n",
1061                                             m_time_of_transfer2, m_time_of_transfer, diff, ticks_per_period);
1062     }
1063     m_time_of_transfer2 = m_time_of_transfer;
1064     #endif
1065
1066     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1067                         "transfer period %d at %llu ticks...\n",
1068                         m_nbperiods, m_time_of_transfer);
1069
1070     // this is to notify the client of the delay that we introduced by waiting
1071     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
1072     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1073                         "delayed for %d usecs...\n",
1074                         m_delayed_usecs);
1075
1076 #ifdef DEBUG
1077     int rcv_bf=0, xmt_bf=0;
1078     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1079         it != m_ReceiveProcessors.end();
1080         ++it ) {
1081         rcv_bf = (*it)->getBufferFill();
1082     }
1083     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1084         it != m_TransmitProcessors.end();
1085         ++it ) {
1086         xmt_bf = (*it)->getBufferFill();
1087     }
1088     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
1089                         "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
1090                         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
1091
1092     // check if xruns occurred on the Iso side.
1093     // also check if xruns will occur should we transfer() now
1094     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1095           it != m_ReceiveProcessors.end();
1096           ++it ) {
1097
1098         if ((*it)->xrunOccurred()) {
1099             debugOutput(DEBUG_LEVEL_NORMAL,
1100                         "Xrun on RECV SP %p due to ISO side xrun\n", *it);
1101             (*it)->dumpInfo();
1102         }
1103         if (!((*it)->canClientTransferFrames(m_period))) {
1104             debugOutput(DEBUG_LEVEL_NORMAL,
1105                         "Xrun on RECV SP %p due to buffer side xrun\n", *it);
1106             (*it)->dumpInfo();
1107         }
1108     }
1109     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1110           it != m_TransmitProcessors.end();
1111           ++it ) {
1112         if ((*it)->xrunOccurred()) {
1113             debugOutput(DEBUG_LEVEL_NORMAL,
1114                         "Xrun on XMIT SP %p due to ISO side xrun\n", *it);
1115         }
1116         if (!((*it)->canClientTransferFrames(m_period))) {
1117             debugOutput(DEBUG_LEVEL_NORMAL,
1118                         "Xrun on XMIT SP %p due to buffer side xrun\n", *it);
1119         }
1120     }
1121 #endif
1122     m_nbperiods++;
1123     // now we can signal the client that we are (should be) ready
1124     return !xrun_occurred;
1125 }
1126
1127 /**
1128  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
1129  *
1130  * Transfers one period of frames from the client side to the Iso side and vice versa.
1131  *
1132  * @return true if successful, false otherwise (indicates xrun).
1133  */
1134 bool StreamProcessorManager::transfer() {
1135     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
1136     bool retval=true;
1137     retval &= transfer(StreamProcessor::ePT_Receive);
1138     retval &= transfer(StreamProcessor::ePT_Transmit);
1139     return retval;
1140 }
1141
1142 /**
1143  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
1144  *
1145  * Transfers one period of frames from the client side to the Iso side or vice versa.
1146  *
1147  * @param t The processor type to tranfer for (receive or transmit)
1148  * @return true if successful, false otherwise (indicates xrun).
1149  */
1150 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
1151     if(m_SyncSource == NULL) return false;
1152     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
1153         "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
1154         t, m_time_of_transfer,
1155         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
1156         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
1157         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
1158
1159     bool retval = true;
1160     // a static cast could make sure that there is no performance
1161     // penalty for the virtual functions (to be checked)
1162     if (t==StreamProcessor::ePT_Receive) {
1163         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1164                 it != m_ReceiveProcessors.end();
1165                 ++it ) {
1166             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
1167                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
1168                             m_period, m_time_of_transfer,*it);
1169                 retval &= false; // buffer underrun
1170             }
1171         }
1172     } else {
1173         // FIXME: in the SPM it would be nice to have system time instead of
1174         //        1394 time
1175         float rate = m_SyncSource->getTicksPerFrame();
1176         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)((m_nb_buffers * m_period))) * rate);
1177
1178         // the data we are putting into the buffer is intended to be transmitted
1179         // one ringbuffer size after it has been received
1180
1181         // we also add one syncdelay as a safety margin, since that's the amount of time we can get
1182         // postponed.
1183         int syncdelay = m_SyncSource->getSyncDelay();
1184         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks + syncdelay);
1185
1186         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1187                 it != m_TransmitProcessors.end();
1188                 ++it ) {
1189             // FIXME: in the SPM it would be nice to have system time instead of
1190             //        1394 time
1191             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
1192                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
1193                         m_period, transmit_timestamp, *it);
1194                 retval &= false; // buffer underrun
1195             }
1196         }
1197     }
1198     return retval;
1199 }
1200
1201 /**
1202  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
1203  *
1204  * Transfers one period of silence to the Iso side for transmit SP's
1205  * or dump one period of frames for receive SP's
1206  *
1207  * @return true if successful, false otherwise (indicates xrun).
1208  */
1209 bool StreamProcessorManager::transferSilence() {
1210     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
1211     bool retval=true;
1212     // NOTE: the order here is opposite from the order in
1213     // normal operation (transmit is before receive), because
1214     // we can do that here (data=silence=available) and
1215     // it increases reliability (esp. on startup)
1216     retval &= transferSilence(StreamProcessor::ePT_Transmit);
1217     retval &= transferSilence(StreamProcessor::ePT_Receive);
1218     return retval;
1219 }
1220
1221 /**
1222  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
1223  *
1224  * Transfers one period of silence to the Iso side for transmit SP's
1225  * or dump one period of frames for receive SP's
1226  *
1227  * @param t The processor type to tranfer for (receive or transmit)
1228  * @return true if successful, false otherwise (indicates xrun).
1229  */
1230 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
1231     if(m_SyncSource == NULL) return false;
1232     debugOutput( DEBUG_LEVEL_VERY_VERBOSE,
1233         "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
1234         t, m_time_of_transfer,
1235         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
1236         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
1237         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
1238
1239     bool retval = true;
1240     // a static cast could make sure that there is no performance
1241     // penalty for the virtual functions (to be checked)
1242     if (t==StreamProcessor::ePT_Receive) {
1243         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1244                 it != m_ReceiveProcessors.end();
1245                 ++it ) {
1246             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
1247                     debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n",
1248                             m_period, m_time_of_transfer,*it);
1249                 retval &= false; // buffer underrun
1250             }
1251         }
1252     } else {
1253         // FIXME: in the SPM it would be nice to have system time instead of
1254         //        1394 time
1255         float rate = m_SyncSource->getTicksPerFrame();
1256         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
1257
1258         // the data we are putting into the buffer is intended to be transmitted
1259         // one ringbuffer size after it has been received
1260         // we also add one syncdelay as a safety margin, since that's the amount of time we can get
1261         // postponed.
1262         int syncdelay = m_SyncSource->getSyncDelay();
1263         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks + syncdelay);
1264
1265         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1266                 it != m_TransmitProcessors.end();
1267                 ++it ) {
1268             // FIXME: in the SPM it would be nice to have system time instead of
1269             //        1394 time
1270             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
1271                 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
1272                         m_period, transmit_timestamp, *it);
1273                 retval &= false; // buffer underrun
1274             }
1275         }
1276     }
1277     return retval;
1278 }
1279
1280 void StreamProcessorManager::dumpInfo() {
1281     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1282     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
1283     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
1284     debugOutputShort( DEBUG_LEVEL_NORMAL, "Data type: %s\n", (m_audio_datatype==eADT_Float?"float":"int24"));
1285
1286     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
1287     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1288         it != m_ReceiveProcessors.end();
1289         ++it ) {
1290         (*it)->dumpInfo();
1291     }
1292
1293     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
1294     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1295         it != m_TransmitProcessors.end();
1296         ++it ) {
1297         (*it)->dumpInfo();
1298     }
1299
1300     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1301
1302 }
1303
1304 void StreamProcessorManager::setVerboseLevel(int l) {
1305     if(m_WaitLock) m_WaitLock->setVerboseLevel(l);
1306
1307     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1308         it != m_ReceiveProcessors.end();
1309         ++it ) {
1310         (*it)->setVerboseLevel(l);
1311     }
1312     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1313         it != m_TransmitProcessors.end();
1314         ++it ) {
1315         (*it)->setVerboseLevel(l);
1316     }
1317     setDebugLevel(l);
1318     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting verbose level to %d...\n", l );
1319 }
1320
1321 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1322     int count=0;
1323
1324     if (direction == Port::E_Capture) {
1325         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1326             it != m_ReceiveProcessors.end();
1327             ++it ) {
1328             count += (*it)->getPortCount(type);
1329         }
1330     } else {
1331         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1332             it != m_TransmitProcessors.end();
1333             ++it ) {
1334             count += (*it)->getPortCount(type);
1335         }
1336     }
1337     return count;
1338 }
1339
1340 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1341     int count=0;
1342
1343     if (direction == Port::E_Capture) {
1344         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1345             it != m_ReceiveProcessors.end();
1346             ++it ) {
1347             count += (*it)->getPortCount();
1348         }
1349     } else {
1350         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1351             it != m_TransmitProcessors.end();
1352             ++it ) {
1353             count += (*it)->getPortCount();
1354         }
1355     }
1356     return count;
1357 }
1358
1359 // TODO: implement a port map here, instead of the loop
1360 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1361     int count=0;
1362     int prevcount=0;
1363
1364     if (direction == Port::E_Capture) {
1365         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1366             it != m_ReceiveProcessors.end();
1367             ++it ) {
1368             count += (*it)->getPortCount();
1369             if (count > idx) {
1370                 return (*it)->getPortAtIdx(idx-prevcount);
1371             }
1372             prevcount=count;
1373         }
1374     } else {
1375         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1376             it != m_TransmitProcessors.end();
1377             ++it ) {
1378             count += (*it)->getPortCount();
1379             if (count > idx) {
1380                 return (*it)->getPortAtIdx(idx-prevcount);
1381             }
1382             prevcount=count;
1383         }
1384     }
1385     return NULL;
1386 }
1387
1388 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1389     m_thread_realtime=rt;
1390     m_thread_priority=priority;
1391     return true;
1392 }
1393
1394
1395 } // end of namespace
Note: See TracBrowser for help on using the browser.