root/branches/libffado-2.0/src/libstreaming/StreamProcessorManager.cpp

Revision 1536, 64.4 kB (checked in by ppalmers, 12 years ago)

introduce transmit prebuffering to increase reliability

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23 #include "config.h"
24
25 #include "StreamProcessorManager.h"
26 #include "generic/StreamProcessor.h"
27 #include "generic/Port.h"
28 #include "libieee1394/cycletimer.h"
29
30 #include "devicemanager.h"
31
32 #include "libutil/Time.h"
33
34 #include <errno.h>
35 #include <assert.h>
36 #include <math.h>
37
38 namespace Streaming {
39
40 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
41
42 StreamProcessorManager::StreamProcessorManager(DeviceManager &p)
43     : m_time_of_transfer ( 0 )
44     #ifdef DEBUG
45     , m_time_of_transfer2 ( 0 )
46     #endif
47     , m_is_slave( false )
48     , m_SyncSource(NULL)
49     , m_parent( p )
50     , m_xrun_happened( false )
51     , m_activity_wait_timeout_nsec( 0 ) // dynamically set
52     , m_nb_buffers( 0 )
53     , m_period( 0 )
54     , m_sync_delay( 0 )
55     , m_audio_datatype( eADT_Float )
56     , m_nominal_framerate ( 0 )
57     , m_xruns(0)
58     , m_shutdown_needed(false)
59     , m_nbperiods(0)
60     , m_WaitLock( new Util::PosixMutex("SPMWAIT") )
61 {
62     addOption(Util::OptionContainer::Option("slaveMode",false));
63     sem_init(&m_activity_semaphore, 0, 0);
64 }
65
66 StreamProcessorManager::StreamProcessorManager(DeviceManager &p, unsigned int period,
67                                                unsigned int framerate, unsigned int nb_buffers)
68     : m_time_of_transfer ( 0 )
69     #ifdef DEBUG
70     , m_time_of_transfer2 ( 0 )
71     #endif
72     , m_is_slave( false )
73     , m_SyncSource(NULL)
74     , m_parent( p )
75     , m_xrun_happened( false )
76     , m_activity_wait_timeout_nsec( 0 ) // dynamically set
77     , m_nb_buffers(nb_buffers)
78     , m_period(period)
79     , m_sync_delay( 0 )
80     , m_audio_datatype( eADT_Float )
81     , m_nominal_framerate ( framerate )
82     , m_xruns(0)
83     , m_shutdown_needed(false)
84     , m_nbperiods(0)
85     , m_WaitLock( new Util::PosixMutex("SPMWAIT") )
86 {
87     addOption(Util::OptionContainer::Option("slaveMode",false));
88     sem_init(&m_activity_semaphore, 0, 0);
89 }
90
91 StreamProcessorManager::~StreamProcessorManager() {
92     sem_post(&m_activity_semaphore);
93     sem_destroy(&m_activity_semaphore);
94     delete m_WaitLock;
95 }
96
97 // void
98 // StreamProcessorManager::handleBusReset(Ieee1394Service &s)
99 // {
100 // //     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) Handle bus reset on service %p...\n", this, &s);
101 // //
102 // //     bool handled_at_least_one = false;
103 // //     // note that all receive streams are gone once a device is unplugged
104 // //
105 // //     // synchronize with the wait lock
106 // //     Util::MutexLockHelper lock(*m_WaitLock);
107 // //
108 // //     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) got wait lock...\n", this);
109 // //     // cause all SP's to bail out
110 // //     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
111 // //           it != m_ReceiveProcessors.end();
112 // //           ++it )
113 // //     {
114 // //         if(&s == &((*it)->getParent().get1394Service())) {
115 // //             debugOutput(DEBUG_LEVEL_NORMAL,
116 // //                         "issue busreset on receive SPM on channel %d\n",
117 // //                         (*it)->getChannel());
118 // //             (*it)->handleBusReset();
119 // //             handled_at_least_one = true;
120 // //         } else {
121 // //             debugOutput(DEBUG_LEVEL_NORMAL,
122 // //                         "skipping receive SPM on channel %d since not on service %p\n",
123 // //                         (*it)->getChannel(), &s);
124 // //         }
125 // //     }
126 // //     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
127 // //           it != m_TransmitProcessors.end();
128 // //           ++it )
129 // //     {
130 // //         if(&s == &((*it)->getParent().get1394Service())) {
131 // //             debugOutput(DEBUG_LEVEL_NORMAL,
132 // //                         "issue busreset on transmit SPM on channel %d\n",
133 // //                         (*it)->getChannel());
134 // //             (*it)->handleBusReset();
135 // //             handled_at_least_one = true;
136 // //         } else {
137 // //             debugOutput(DEBUG_LEVEL_NORMAL,
138 // //                         "skipping transmit SPM on channel %d since not on service %p\n",
139 // //                         (*it)->getChannel(), &s);
140 // //         }
141 // //     }
142 // //
143 // //     // FIXME: we request shutdown for now.
144 // //     m_shutdown_needed = handled_at_least_one;
145 // }
146
147 void
148 StreamProcessorManager::signalActivity()
149 {
150     sem_post(&m_activity_semaphore);
151     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p activity\n", this);
152 }
153
154 enum StreamProcessorManager::eActivityResult
155 StreamProcessorManager::waitForActivity()
156 {
157     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p waiting for activity\n", this);
158     struct timespec ts;
159     int result;
160
161     if (m_activity_wait_timeout_nsec >= 0) {
162
163         if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
164             debugError("clock_gettime failed\n");
165             return eAR_Error;
166         }
167         ts.tv_nsec += m_activity_wait_timeout_nsec;
168         while(ts.tv_nsec >= 1000000000LL) {
169             ts.tv_sec += 1;
170             ts.tv_nsec -= 1000000000LL;
171         }
172     }
173
174     if (m_activity_wait_timeout_nsec >= 0) {
175         result = sem_timedwait(&m_activity_semaphore, &ts);
176     } else {
177         result = sem_wait(&m_activity_semaphore);
178     }
179
180     if(result != 0) {
181         if (errno == ETIMEDOUT) {
182             debugOutput(DEBUG_LEVEL_VERBOSE,
183                         "(%p) sem_timedwait() timed out (result=%d)\n",
184                         this, result);
185             return eAR_Timeout;
186         } else if (errno == EINTR) {
187             debugOutput(DEBUG_LEVEL_VERBOSE,
188                         "(%p) sem_[timed]wait() interrupted by signal (result=%d)\n",
189                         this, result);
190             return eAR_Interrupted;
191         } else if (errno == EINVAL) {
192             debugError("(%p) sem_[timed]wait error (result=%d errno=EINVAL)\n",
193                         this, result);
194             debugError("(%p) timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n",
195                        this, m_activity_wait_timeout_nsec, ts.tv_sec, ts.tv_nsec);
196             return eAR_Error;
197         } else {
198             debugError("(%p) sem_[timed]wait error (result=%d errno=%d)\n",
199                         this, result, errno);
200             debugError("(%p) timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n",
201                        this, m_activity_wait_timeout_nsec, ts.tv_sec, ts.tv_nsec);
202             return eAR_Error;
203         }
204     }
205
206     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p got activity\n", this);
207     return eAR_Activity;
208 }
209
210 /**
211  * Registers \ref processor with this manager.
212  *
213  * also registers it with the isohandlermanager
214  *
215  * be sure to call isohandlermanager->init() first!
216  * and be sure that the processors are also ->init()'ed
217  *
218  * @param processor
219  * @return true if successfull
220  */
221 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
222 {
223     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
224     assert(processor);
225     if (processor->getType() == StreamProcessor::ePT_Receive) {
226         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
227         m_ReceiveProcessors.push_back(processor);
228         Util::Functor* f = new Util::MemberFunctor0< StreamProcessorManager*, void (StreamProcessorManager::*)() >
229                     ( this, &StreamProcessorManager::updateShadowLists, false );
230         processor->addPortManagerUpdateHandler(f);
231         updateShadowLists();
232         return true;
233     }
234     if (processor->getType() == StreamProcessor::ePT_Transmit) {
235         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
236         m_TransmitProcessors.push_back(processor);
237         Util::Functor* f = new Util::MemberFunctor0< StreamProcessorManager*, void (StreamProcessorManager::*)() >
238                     ( this, &StreamProcessorManager::updateShadowLists, false );
239         processor->addPortManagerUpdateHandler(f);
240         updateShadowLists();
241         return true;
242     }
243
244     debugFatal("Unsupported processor type!\n");
245     return false;
246 }
247
248 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
249 {
250     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
251     assert(processor);
252
253     if (processor->getType()==StreamProcessor::ePT_Receive) {
254
255         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
256               it != m_ReceiveProcessors.end();
257               ++it )
258         {
259             if ( *it == processor ) {
260                 if (*it == m_SyncSource) {
261                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
262                     m_SyncSource = NULL;
263                 }
264                 m_ReceiveProcessors.erase(it);
265                 // remove the functor
266                 Util::Functor * f = processor->getUpdateHandlerForPtr(this);
267                 if(f) {
268                     processor->remPortManagerUpdateHandler(f);
269                     delete f;
270                 }
271                 updateShadowLists();
272                 return true;
273             }
274         }
275     }
276
277     if (processor->getType()==StreamProcessor::ePT_Transmit) {
278         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
279               it != m_TransmitProcessors.end();
280               ++it )
281         {
282             if ( *it == processor ) {
283                 if (*it == m_SyncSource) {
284                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
285                     m_SyncSource = NULL;
286                 }
287                 m_TransmitProcessors.erase(it);
288                 // remove the functor
289                 Util::Functor * f = processor->getUpdateHandlerForPtr(this);
290                 if(f) {
291                     processor->remPortManagerUpdateHandler(f);
292                     delete f;
293                 }
294                 updateShadowLists();
295                 return true;
296             }
297         }
298     }
299
300     debugFatal("Processor (%p) not found!\n",processor);
301     return false; //not found
302 }
303
304 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
305     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
306     m_SyncSource = s;
307     return true;
308 }
309
310 bool StreamProcessorManager::prepare() {
311
312     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
313     m_is_slave=false;
314     if(!getOption("slaveMode", m_is_slave)) {
315         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
316     }
317
318     m_shutdown_needed=false;
319
320     // if no sync source is set, select one here
321     if(m_SyncSource == NULL) {
322        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
323     }
324
325     // FIXME: put into separate method
326     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
327           it != m_ReceiveProcessors.end();
328           ++it )
329     {
330         if(m_SyncSource == NULL) {
331             debugWarning(" => Sync Source is %p.\n", *it);
332             m_SyncSource = *it;
333         }
334     }
335     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
336           it != m_TransmitProcessors.end();
337           ++it )
338     {
339         if(m_SyncSource == NULL) {
340             debugWarning(" => Sync Source is %p.\n", *it);
341             m_SyncSource = *it;
342         }
343     }
344
345     // now do the actual preparation of the SP's
346     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
347     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
348         it != m_ReceiveProcessors.end();
349         ++it ) {
350
351         if(!(*it)->setOption("slaveMode", m_is_slave)) {
352             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
353         }
354
355         if(!(*it)->prepare()) {
356             debugFatal(  " could not prepare (%p)...\n",(*it));
357             return false;
358         }
359     }
360     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
361     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
362         it != m_TransmitProcessors.end();
363         ++it ) {
364         if(!(*it)->setOption("slaveMode", m_is_slave)) {
365             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
366         }
367         if(!(*it)->prepare()) {
368             debugFatal( " could not prepare (%p)...\n",(*it));
369             return false;
370         }
371     }
372
373     // if there are no stream processors registered,
374     // fail
375     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
376         debugFatal("No stream processors registered, can't do anything usefull\n");
377         return false;
378     }
379
380     // set the activity timeout value to two periods worth of usecs.
381     // since we can expect activity once every period, but we might have some
382     // offset, the safe value is two periods.
383     int timeout_usec = 2*1000LL * 1000LL * m_period / m_nominal_framerate;
384     debugOutput(DEBUG_LEVEL_VERBOSE, "setting activity timeout to %d\n", timeout_usec);
385     setActivityWaitTimeoutUsec(timeout_usec);
386
387     updateShadowLists();
388
389     return true;
390 }
391
392 bool
393 StreamProcessorManager::startDryRunning()
394 {
395     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
396     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
397     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
398             it != m_TransmitProcessors.end();
399             ++it ) {
400         if ((*it)->inError()) {
401             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
402             return false;
403         }
404         if (!(*it)->isDryRunning()) {
405             if(!(*it)->scheduleStartDryRunning(-1)) {
406                 debugError("Could not put SP %p into the dry-running state\n", *it);
407                 return false;
408             }
409         } else {
410             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
411         }
412     }
413     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
414             it != m_ReceiveProcessors.end();
415             ++it ) {
416         if ((*it)->inError()) {
417             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
418             return false;
419         }
420         if (!(*it)->isDryRunning()) {
421             if(!(*it)->scheduleStartDryRunning(-1)) {
422                 debugError("Could not put SP %p into the dry-running state\n", *it);
423                 return false;
424             }
425         } else {
426             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
427         }
428     }
429     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
430     // wait for the syncsource to start running.
431     // that will block the waitForPeriod call until everyone has started (theoretically)
432     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_DRYRUN; // by then it should have started
433     bool all_dry_running = false;
434     while (!all_dry_running && cnt) {
435         all_dry_running = true;
436         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
437                 it != m_ReceiveProcessors.end();
438                 ++it ) {
439             all_dry_running &= (*it)->isDryRunning();
440         }
441         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
442                 it != m_TransmitProcessors.end();
443                 ++it ) {
444             all_dry_running &= (*it)->isDryRunning();
445         }
446
447         SleepRelativeUsec(125);
448         cnt--;
449     }
450     if(cnt==0) {
451         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
452         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
453                 it != m_ReceiveProcessors.end();
454                 ++it ) {
455             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
456                 (*it)->getTypeString(), *it, (*it)->getStateString());
457         }
458         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
459                 it != m_TransmitProcessors.end();
460                 ++it ) {
461             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
462                 (*it)->getTypeString(), *it, (*it)->getStateString());
463         }
464         return false;
465     }
466     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
467     return true;
468 }
469
470 bool StreamProcessorManager::syncStartAll() {
471     if(m_SyncSource == NULL) return false;
472
473     // get the options
474     int signal_delay_ticks = STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS;
475     int xmit_prebuffer_frames = STREAMPROCESSORMANAGER_XMIT_PREBUFFER_FRAMES;
476     int sync_wait_time_msec = STREAMPROCESSORMANAGER_SYNC_WAIT_TIME_MSEC;
477     int cycles_for_startup = STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP;
478     int prestart_cycles_for_xmit = STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_XMIT;
479     int prestart_cycles_for_recv = STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_RECV;
480     Util::Configuration &config = m_parent.getConfiguration();
481     config.getValueForSetting("streaming.spm.signal_delay_ticks", signal_delay_ticks);
482     config.getValueForSetting("streaming.spm.xmit_prebuffer_frames", xmit_prebuffer_frames);
483     config.getValueForSetting("streaming.spm.sync_wait_time_msec", sync_wait_time_msec);
484     config.getValueForSetting("streaming.spm.cycles_for_startup", cycles_for_startup);
485     config.getValueForSetting("streaming.spm.prestart_cycles_for_xmit", prestart_cycles_for_xmit);
486     config.getValueForSetting("streaming.spm.prestart_cycles_for_recv", prestart_cycles_for_recv);
487
488     // figure out when to get the SP's running.
489     // the xmit SP's should also know the base timestamp
490     // streams should be aligned here
491
492     // now find out how long we have to delay the wait operation such that
493     // the received frames will all be presented to the SP
494     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
495     int max_of_min_delay = 0;
496     int min_delay = 0;
497     int packet_size_frames = 0;
498     int max_packet_size_frames = 0;
499
500     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
501             it != m_ReceiveProcessors.end();
502             ++it ) {
503         min_delay = (*it)->getMaxFrameLatency();
504         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
505         packet_size_frames = (*it)->getNominalFramesPerPacket();
506         if(packet_size_frames > max_packet_size_frames) max_packet_size_frames = packet_size_frames;
507     }
508     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
509             it != m_TransmitProcessors.end();
510             ++it ) {
511         packet_size_frames = (*it)->getNominalFramesPerPacket();
512         if(packet_size_frames > max_packet_size_frames) max_packet_size_frames = packet_size_frames;
513     }
514     debugOutput( DEBUG_LEVEL_VERBOSE, " max_of_min_delay = %d, max_packet_size_frames = %d...\n", max_of_min_delay, max_packet_size_frames);
515
516     // add some processing margin. This only shifts the time
517     // at which the buffer is transfer()'ed. This makes things somewhat
518     // more robust.
519     m_sync_delay = max_of_min_delay + signal_delay_ticks;
520
521     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
522     //        have aquired lock
523     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
524     //sleep(2); // FIXME: be smarter here
525
526     // make sure that we are dry-running long enough for the
527     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
528     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
529
530     unsigned int nb_sync_runs = (sync_wait_time_msec * getNominalRate());
531     nb_sync_runs /= 1000;
532     nb_sync_runs /= getPeriodSize();
533
534     while(nb_sync_runs--) { // or while not sync-ed?
535         // check if we were woken up too soon
536         uint64_t ticks_at_period = m_SyncSource->getTimeAtPeriod();
537         uint64_t ticks_at_period_margin = ticks_at_period + m_sync_delay;
538         uint64_t pred_system_time_at_xfer = m_SyncSource->getParent().get1394Service().getSystemTimeForCycleTimerTicks(ticks_at_period_margin);
539    
540         #ifdef DEBUG
541         int64_t now = Util::SystemTimeSource::getCurrentTime();
542         debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "CTR  pred: %lld, syncdelay: %lld, diff: %lld\n", ticks_at_period, ticks_at_period_margin, ticks_at_period_margin-ticks_at_period );
543         debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "PREWAIT  pred: %lld, now: %lld, wait: %lld\n", pred_system_time_at_xfer, now, pred_system_time_at_xfer-now );
544         #endif
545    
546         // wait until it's time to transfer
547         Util::SystemTimeSource::SleepUsecAbsolute(pred_system_time_at_xfer);
548    
549         #ifdef DEBUG
550         now = Util::SystemTimeSource::getCurrentTime();
551         debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "POSTWAIT pred: %lld, now: %lld, excess: %lld\n", pred_system_time_at_xfer, now, now-pred_system_time_at_xfer );
552         #endif
553     }
554
555     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
556     // FIXME: in the SPM it would be nice to have system time instead of
557     //        1394 time
558
559     float syncrate = 0.0;
560     float tpf = m_SyncSource->getTicksPerFrame();
561     if (tpf > 0.0) {
562         syncrate = 24576000.0/tpf;
563     } else {
564         debugWarning("tpf <= 0? %f\n", tpf);
565     }
566     debugOutput( DEBUG_LEVEL_VERBOSE, " sync source frame rate: %f fps (%f tpf)\n", syncrate, tpf);
567
568     // we now should have decent sync info on the sync source
569     // determine a point in time where the system should start
570     // figure out where we are now
571     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
572     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
573         time_of_first_sample,
574         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
575         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
576         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
577
578     // start wet-running in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles
579     // this is the time window we have to setup all SP's such that they
580     // can start wet-running correctly.
581     // we have to round this time to an integer number of audio packets
582     double time_for_startup_abs = (double)(cycles_for_startup * TICKS_PER_CYCLE);
583     int time_for_startup_frames = (int)(time_for_startup_abs / tpf);
584     time_for_startup_frames = ((time_for_startup_frames / max_packet_size_frames) + 1) * max_packet_size_frames;
585     uint64_t time_for_startup_ticks = (uint64_t)((float)time_for_startup_frames * tpf);
586
587     time_of_first_sample = addTicks(time_of_first_sample,
588                                     time_for_startup_ticks);
589     debugOutput( DEBUG_LEVEL_VERBOSE, "  add %d frames (%011llu ticks)...\n",
590         time_for_startup_frames, time_for_startup_ticks);
591
592     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
593         time_of_first_sample,
594         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
595         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
596         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
597
598     // we should start wet-running the transmit SP's some cycles in advance
599     // such that we know it is wet-running when it should output its first sample
600     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
601                                                  prestart_cycles_for_xmit * TICKS_PER_CYCLE);
602
603     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
604                                                  prestart_cycles_for_recv * TICKS_PER_CYCLE);
605     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
606         time_to_start_xmit,
607         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
608         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
609         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
610     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
611         time_to_start_recv,
612         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
613         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
614         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
615
616     // print the sync delay
617     int sync_delay_frames = (int)((float)m_sync_delay / m_SyncSource->getTicksPerFrame());
618     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay: %d = %d + %d ticks (%03us %04uc %04ut) [%d frames]...\n",
619         m_sync_delay, max_of_min_delay, signal_delay_ticks,
620         (unsigned int)TICKS_TO_SECS(m_sync_delay),
621         (unsigned int)TICKS_TO_CYCLES(m_sync_delay),
622         (unsigned int)TICKS_TO_OFFSET(m_sync_delay),
623         sync_delay_frames);
624
625     // check if this can even work.
626     // the worst case point where we can receive a period is at 1 period + sync delay
627     // this means that the number of frames in the xmit buffer has to be at least
628     // 1 period + sync delay
629     if(xmit_prebuffer_frames + m_period * m_nb_buffers < m_period + sync_delay_frames) {
630         debugWarning("The amount of transmit buffer frames (%d) is too small (< %d). "
631                      "This will most likely cause xruns.\n",
632                      xmit_prebuffer_frames + m_period * m_nb_buffers,
633                      m_period + sync_delay_frames);
634     }
635
636     // at this point the buffer head timestamp of the transmit buffers can be set
637     // this is the presentation time of the first sample in the buffer
638     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
639           it != m_TransmitProcessors.end();
640           ++it ) {
641         // set the number of prebuffer frames
642         (*it)->setExtraBufferFrames(xmit_prebuffer_frames);
643
644         // set the TSP of the first sample in the buffer
645         (*it)->setBufferHeadTimestamp(time_of_first_sample);
646         ffado_timestamp_t ts;
647         signed int fc;
648         (*it)->getBufferHeadTimestamp ( &ts, &fc );
649         debugOutput( DEBUG_LEVEL_VERBOSE, " transmit buffer tail %010lld => head TS %010lld, fc=%d...\n",
650                     time_of_first_sample, (uint64_t)ts, fc);
651     }
652
653     // the receive processors can be delayed by sync_delay ticks
654     // this means that in the worst case we have to be able to accomodate
655     // an extra sync_delay ticks worth of frames in the receive SP buffer
656     // the sync delay should be rounded to an integer amount of max_packet_size
657     int tmp = sync_delay_frames / max_packet_size_frames;
658     tmp = tmp + 1;
659     sync_delay_frames = tmp * max_packet_size_frames;
660     if (sync_delay_frames < 1024) sync_delay_frames = 1024; //HACK
661
662     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
663           it != m_ReceiveProcessors.end();
664           ++it ) {
665         // set the number of extra buffer frames
666         (*it)->setExtraBufferFrames(sync_delay_frames);
667     }
668
669     // switch syncsource to running state
670     uint64_t time_to_start_sync;
671     // FIXME: this is most likely not going to work for transmit sync sources
672     // but those are unsupported in this version
673     if(m_SyncSource->getType() == StreamProcessor::ePT_Receive ) {
674         time_to_start_sync = time_to_start_recv;
675     } else {
676         time_to_start_sync = time_to_start_xmit;
677     }
678     if(!m_SyncSource->scheduleStartRunning(time_to_start_sync)) {
679         debugError("m_SyncSource->scheduleStartRunning(%11llu) failed\n", time_to_start_sync);
680         return false;
681     }
682
683     // STEP X: switch all non-syncsource SP's over to the running state
684     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
685           it != m_ReceiveProcessors.end();
686           ++it ) {
687         if(*it != m_SyncSource) {
688             if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
689                 debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
690                 return false;
691             }
692         }
693     }
694     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
695           it != m_TransmitProcessors.end();
696           ++it ) {
697         if(*it != m_SyncSource) {
698             if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
699                 debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
700                 return false;
701             }
702         }
703     }
704     // wait for the syncsource to start running.
705     // that will block the waitForPeriod call until everyone has started (theoretically)
706     // note: the SP's are scheduled to start in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles,
707     // so a 20 times this value should be a good timeout
708     //int cnt = cycles_for_startup * 20; // by then it should have started
709     // or maybe we just have to use 1 second, as this wraps the cycle counter
710     int cnt = 8000;
711     while (!m_SyncSource->isRunning() && cnt) {
712         SleepRelativeUsec(125);
713         cnt--;
714     }
715     if(cnt==0) {
716         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
717         return false;
718     }
719
720     // the sync source is running, we can now read a decent received timestamp from it
721     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
722
723     // and a (rough) approximation of the rate
724     float rate = m_SyncSource->getTicksPerFrame();
725
726     #ifdef DEBUG
727     // the time at which the previous period would have passed
728     m_time_of_transfer2 = m_time_of_transfer;
729     m_time_of_transfer2 = substractTicks(m_time_of_transfer2, (uint64_t)(m_period * rate));
730     #endif
731
732     debugOutput( DEBUG_LEVEL_VERBOSE, "  initial time of transfer %010lld, rate %f...\n",
733                 m_time_of_transfer, rate);
734
735     // FIXME: ideally we'd want the SP itself to account for the xmit_prebuffer_frames
736     // but that would also require to use a different approach to setting the initial TSP's
737     int64_t delay_in_ticks = (int64_t)(((float)((m_nb_buffers-1) * m_period + xmit_prebuffer_frames)) * rate);
738
739     // then use this information to initialize the xmit handlers
740
741     //  we now set the buffer tail timestamp of the transmit buffer
742     //  to the period transfer time instant plus what's nb_buffers - 1
743     //  in ticks. This due to the fact that we (should) have received one period
744     //  worth of ticks at t = m_time_of_transfer
745     //  hence one period of frames should also have been transmitted, which means
746     //  that there should be (nb_buffers - 1) * periodsize of frames in the xmit buffer
747     //  there are also xmit_prebuffer_frames frames extra present in the buffer
748     //  that allows us to calculate the tail timestamp for the buffer.
749
750     int64_t transmit_tail_timestamp = addTicks(m_time_of_transfer, delay_in_ticks);
751     debugOutput( DEBUG_LEVEL_VERBOSE, "  preset transmit tail TS %010lld, rate %f...\n",
752                 transmit_tail_timestamp, rate);
753
754     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
755         it != m_TransmitProcessors.end();
756         ++it ) {
757         (*it)->setTicksPerFrame(rate);
758         (*it)->setBufferTailTimestamp(transmit_tail_timestamp);
759         ffado_timestamp_t ts;
760         signed int fc;
761         (*it)->getBufferHeadTimestamp ( &ts, &fc );
762         debugOutput( DEBUG_LEVEL_VERBOSE, "   => transmit head TS %010lld, fc=%d...\n",
763                     (uint64_t)ts, fc);
764     }
765
766     // align the received streams to be phase aligned
767     if(!alignReceivedStreams()) {
768         debugError("Could not align streams...\n");
769         return false;
770     }
771
772     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
773     return true;
774 }
775
776 bool
777 StreamProcessorManager::alignReceivedStreams()
778 {
779     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
780     unsigned int nb_sync_runs;
781     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
782     int64_t diff_between_streams[nb_rcv_sp];
783     int64_t diff;
784
785     unsigned int i;
786
787     int cnt = STREAMPROCESSORMANAGER_NB_ALIGN_TRIES;
788     int align_average_time_msec = STREAMPROCESSORMANAGER_ALIGN_AVERAGE_TIME_MSEC;
789     Util::Configuration &config = m_parent.getConfiguration();
790     config.getValueForSetting("streaming.spm.align_tries", cnt);
791     config.getValueForSetting("streaming.spm.align_average_time_msec", align_average_time_msec);
792
793     unsigned int periods_per_align_try = (align_average_time_msec * getNominalRate());
794     periods_per_align_try /= 1000;
795     periods_per_align_try /= getPeriodSize();
796     debugOutput( DEBUG_LEVEL_VERBOSE, " averaging over %u periods...\n", periods_per_align_try);
797
798     bool aligned = false;
799     while (!aligned && cnt--) {
800         nb_sync_runs = periods_per_align_try;
801         while(nb_sync_runs) {
802             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
803             if(!waitForPeriod()) {
804                 debugWarning("xrun while aligning streams...\n");
805                 return false;
806             }
807
808             // before we do anything else, transfer
809             if(!transferSilence()) {
810                 debugError("Could not transfer silence\n");
811                 return false;
812             }
813
814             // now calculate the stream offset
815             i = 0;
816             for ( i = 0; i < nb_rcv_sp; i++) {
817                 StreamProcessor *s = m_ReceiveProcessors.at(i);
818                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
819                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",
820                     m_SyncSource, s, diff);
821                 if ( nb_sync_runs == periods_per_align_try ) {
822                     diff_between_streams[i] = diff;
823                 } else {
824                     diff_between_streams[i] += diff;
825                 }
826             }
827
828             nb_sync_runs--;
829         }
830         // calculate the average offsets
831         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
832         int diff_between_streams_frames[nb_rcv_sp];
833         aligned = true;
834         for ( i = 0; i < nb_rcv_sp; i++) {
835             StreamProcessor *s = m_ReceiveProcessors.at(i);
836
837             diff_between_streams[i] /= periods_per_align_try;
838             diff_between_streams_frames[i] = (int)roundf(diff_between_streams[i] / s->getTicksPerFrame());
839             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",
840                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
841
842             aligned &= (diff_between_streams_frames[i] == 0);
843
844             // reposition the stream
845             if(!s->shiftStream(diff_between_streams_frames[i])) {
846                 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]);
847                 return false;
848             }
849         }
850         if (!aligned) {
851             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
852         }
853     }
854     if (cnt == 0) {
855         debugError("Align failed\n");
856         return false;
857     }
858     return true;
859 }
860
861 bool StreamProcessorManager::start() {
862     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
863
864     // start all SP's synchonized
865     bool start_result = false;
866     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
867         // put all SP's into dry-running state
868         if (!startDryRunning()) {
869             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
870             start_result = false;
871             continue;
872         }
873
874         start_result = syncStartAll();
875         if(start_result) {
876             break;
877         } else {
878             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
879             if(m_shutdown_needed) {
880                 debugOutput(DEBUG_LEVEL_VERBOSE, "Some fatal error occurred, stop trying.\n");
881                 return false;
882             }
883         }
884     }
885     if (!start_result) {
886         debugFatal("Could not syncStartAll...\n");
887         return false;
888     }
889     debugOutput( DEBUG_LEVEL_VERBOSE, " Started...\n");
890     return true;
891 }
892
893 bool StreamProcessorManager::stop() {
894     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
895
896     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
897     // switch SP's over to the dry-running state
898     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
899           it != m_ReceiveProcessors.end();
900           ++it ) {
901         if((*it)->isRunning()) {
902             if(!(*it)->scheduleStopRunning(-1)) {
903                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
904                 return false;
905             }
906         }
907     }
908     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
909           it != m_TransmitProcessors.end();
910           ++it ) {
911         if((*it)->isRunning()) {
912             if(!(*it)->scheduleStopRunning(-1)) {
913                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
914                 return false;
915             }
916         }
917     }
918     // wait for the SP's to get into the dry-running/stopped state
919     int cnt = 8000;
920     bool ready = false;
921     while (!ready && cnt) {
922         ready = true;
923         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
924             it != m_ReceiveProcessors.end();
925             ++it ) {
926             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream() || (*it)->inError());
927         }
928         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
929             it != m_TransmitProcessors.end();
930             ++it ) {
931             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream() || (*it)->inError());
932         }
933         SleepRelativeUsec(125);
934         cnt--;
935     }
936     if(cnt==0) {
937         debugWarning(" Timeout waiting for the SP's to start dry-running\n");
938         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
939             it != m_ReceiveProcessors.end();
940             ++it ) {
941             (*it)->dumpInfo();
942         }
943         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
944             it != m_TransmitProcessors.end();
945             ++it ) {
946             (*it)->dumpInfo();
947         }
948         return false;
949     }
950
951     // switch SP's over to the stopped state
952     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
953           it != m_ReceiveProcessors.end();
954           ++it ) {
955         if ((*it)->inError()) {
956             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
957         } else if(!(*it)->scheduleStopDryRunning(-1)) {
958             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
959             return false;
960         }
961     }
962     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
963           it != m_TransmitProcessors.end();
964           ++it ) {
965         if ((*it)->inError()) {
966             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
967         } else if(!(*it)->scheduleStopDryRunning(-1)) {
968             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
969             return false;
970         }
971     }
972     // wait for the SP's to get into the stopped state
973     cnt = 8000;
974     ready = false;
975     while (!ready && cnt) {
976         ready = true;
977         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
978             it != m_ReceiveProcessors.end();
979             ++it ) {
980             ready &= ((*it)->isStopped() || (*it)->inError());
981         }
982         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
983             it != m_TransmitProcessors.end();
984             ++it ) {
985             ready &= ((*it)->isStopped() || (*it)->inError());
986         }
987         SleepRelativeUsec(125);
988         cnt--;
989     }
990     if(cnt==0) {
991         debugWarning(" Timeout waiting for the SP's to stop\n");
992         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
993             it != m_ReceiveProcessors.end();
994             ++it ) {
995             (*it)->dumpInfo();
996         }
997         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
998             it != m_TransmitProcessors.end();
999             ++it ) {
1000             (*it)->dumpInfo();
1001         }
1002         return false;
1003     }
1004     debugOutput( DEBUG_LEVEL_VERBOSE, " Stopped...\n");
1005     return true;
1006 }
1007
1008 /**
1009  * Called upon Xrun events. This brings all StreamProcessors back
1010  * into their starting state, and then carries on streaming. This should
1011  * have the same effect as restarting the whole thing.
1012  *
1013  * @return true if successful, false otherwise
1014  */
1015 bool StreamProcessorManager::handleXrun() {
1016
1017     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
1018
1019     dumpInfo();
1020
1021     /*
1022      * Reset means:
1023      * 1) Disabling the SP's, so that they don't process any packets
1024      *    note: the isomanager does keep on delivering/requesting them
1025      * 2) Bringing all buffers & streamprocessors into a know state
1026      *    - Clear all capture buffers
1027      *    - Put nb_periods*period_size of null frames into the playback buffers
1028      * 3) Re-enable the SP's
1029      */
1030
1031     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
1032     // start all SP's synchonized
1033     bool start_result = false;
1034     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
1035         if(m_shutdown_needed) {
1036             debugOutput(DEBUG_LEVEL_VERBOSE, "Shutdown requested...\n");
1037             return true;
1038         }
1039         // put all SP's into dry-running state
1040         if (!startDryRunning()) {
1041             debugShowBackLog();
1042             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
1043             start_result = false;
1044             continue;
1045         }
1046
1047         start_result = syncStartAll();
1048         if(start_result) {
1049             break;
1050         } else {
1051             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
1052         }
1053     }
1054     if (!start_result) {
1055         debugFatal("Could not syncStartAll...\n");
1056         return false;
1057     }
1058     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
1059
1060     return true;
1061 }
1062
1063 /**
1064  * @brief Waits until the next period of samples is ready
1065  *
1066  * This function does not return until a full period of samples is (or should be)
1067  * ready to be transferred.
1068  *
1069  * @return true if the period is ready, false if not
1070  */
1071 bool StreamProcessorManager::waitForPeriod() {
1072     if(m_SyncSource == NULL) return false;
1073     if(m_shutdown_needed) return false;
1074     bool xrun_occurred = false;
1075     bool in_error = false;
1076
1077     // grab the wait lock
1078     // this ensures that bus reset handling doesn't interfere
1079     Util::MutexLockHelper lock(*m_WaitLock);
1080     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1081                         "waiting for period (%d frames in buffer)...\n",
1082                         m_SyncSource->getBufferFill());
1083     uint64_t ticks_at_period = m_SyncSource->getTimeAtPeriod();
1084     uint64_t ticks_at_period_margin = ticks_at_period + m_sync_delay;
1085     uint64_t pred_system_time_at_xfer = m_SyncSource->getParent().get1394Service().getSystemTimeForCycleTimerTicks(ticks_at_period_margin);
1086
1087     #ifdef DEBUG
1088     int64_t now = Util::SystemTimeSource::getCurrentTime();
1089     debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "CTR  pred: %lld, syncdelay: %lld, diff: %lld\n", ticks_at_period, ticks_at_period_margin, ticks_at_period_margin-ticks_at_period );
1090     debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "PREWAIT  pred: %lld, now: %lld, wait: %lld\n", pred_system_time_at_xfer, now, pred_system_time_at_xfer-now );
1091     #endif
1092
1093     // wait until it's time to transfer
1094     Util::SystemTimeSource::SleepUsecAbsolute(pred_system_time_at_xfer);
1095
1096     #ifdef DEBUG
1097     now = Util::SystemTimeSource::getCurrentTime();
1098     debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "POSTWAIT pred: %lld, now: %lld, excess: %lld\n", pred_system_time_at_xfer, now, now-pred_system_time_at_xfer );
1099     #endif
1100
1101     // the period should be ready now
1102     #ifdef DEBUG
1103     int rcv_fills[10];
1104     int xmt_fills[10];
1105     int i;
1106     i=0;
1107     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1108         it != m_ReceiveProcessors.end();
1109         ++it ) {
1110         rcv_fills[i] = (*it)->getBufferFill();
1111         debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "RECV SP %p bufferfill: %05d\n", *it, rcv_fills[i]);
1112         i++;
1113     }
1114     i=0;
1115     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1116         it != m_TransmitProcessors.end();
1117         ++it ) {
1118         xmt_fills[i] = (*it)->getBufferFill();
1119         debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "XMIT SP %p bufferfill: %05d\n", *it, xmt_fills[i]);
1120         i++;
1121     }
1122     for(i=0;i<1;i++) {
1123         debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "SP %02d RECV: %05d [%05d] XMIT: %05d [%05d] DIFF: %05d\n", i,
1124                     rcv_fills[i], rcv_fills[i] - m_period,
1125                     xmt_fills[i], xmt_fills[i] - m_period,
1126                     rcv_fills[i] - xmt_fills[i]);
1127     }
1128     #endif
1129
1130     #if STREAMPROCESSORMANAGER_ALLOW_DELAYED_PERIOD_SIGNAL
1131     // HACK: we force wait until every SP is ready. this is needed
1132     // since the raw1394 interface provides no control over interrupts
1133     // resulting in very bad predictability on when the data is present.
1134     bool period_not_ready = true;
1135     while(period_not_ready) {
1136         period_not_ready = false;
1137         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1138             it != m_ReceiveProcessors.end();
1139             ++it ) {
1140             bool this_sp_period_ready = (*it)->canConsumePeriod();
1141             if (!this_sp_period_ready) {
1142                 period_not_ready = true;
1143             }
1144         }
1145         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1146             it != m_TransmitProcessors.end();
1147             ++it ) {
1148             bool this_sp_period_ready = (*it)->canProducePeriod();
1149             if (!this_sp_period_ready) {
1150                 period_not_ready = true;
1151             }
1152         }
1153
1154         if (period_not_ready) {
1155             debugOutput(DEBUG_LEVEL_VERBOSE, " wait extended since period not ready...\n");
1156             Util::SystemTimeSource::SleepUsecRelative(125); // one cycle
1157         }
1158
1159         // check for underruns/errors on the ISO side,
1160         // those should make us bail out of the wait loop
1161         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1162             it != m_ReceiveProcessors.end();
1163             ++it ) {
1164             // a xrun has occurred on the Iso side
1165             xrun_occurred |= (*it)->xrunOccurred();
1166             in_error |= (*it)->inError();
1167         }
1168         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1169             it != m_TransmitProcessors.end();
1170             ++it ) {
1171             // a xrun has occurred on the Iso side
1172             xrun_occurred |= (*it)->xrunOccurred();
1173             in_error |= (*it)->inError();
1174         }
1175         if(xrun_occurred | in_error | m_shutdown_needed) break;
1176     }
1177     #else
1178     // check for underruns/errors on the ISO side,
1179     // those should make us bail out of the wait loop
1180     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1181         it != m_ReceiveProcessors.end();
1182         ++it ) {
1183         // xrun on data buffer side
1184         if (!(*it)->canConsumePeriod()) {
1185             xrun_occurred = true;
1186         }
1187         // a xrun has occurred on the Iso side
1188         xrun_occurred |= (*it)->xrunOccurred();
1189         in_error |= (*it)->inError();
1190     }
1191     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1192         it != m_TransmitProcessors.end();
1193         ++it ) {
1194         // xrun on data buffer side
1195         if (!(*it)->canProducePeriod()) {
1196             xrun_occurred = true;
1197         }
1198         // a xrun has occurred on the Iso side
1199         xrun_occurred |= (*it)->xrunOccurred();
1200         in_error |= (*it)->inError();
1201     }
1202     #endif
1203
1204     if(xrun_occurred) {
1205         debugOutput( DEBUG_LEVEL_VERBOSE, "exit due to xrun...\n");
1206     }
1207     if(in_error) {
1208         debugOutput( DEBUG_LEVEL_VERBOSE, "exit due to error...\n");
1209         m_shutdown_needed = true;
1210     }
1211
1212     // we save the 'ideal' time of the transfer at this point,
1213     // because we can have interleaved read - process - write
1214     // cycles making that we modify a receiving stream's buffer
1215     // before we get to writing.
1216     // NOTE: before waitForPeriod() is called again, both the transmit
1217     //       and the receive processors should have done their transfer.
1218     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
1219    
1220     #ifdef DEBUG
1221     int ticks_per_period = (int)(m_SyncSource->getTicksPerFrame() * m_period);
1222    
1223     int diff = diffTicks(m_time_of_transfer, m_time_of_transfer2);
1224     // display message if the difference between two successive tick
1225     // values is more than 50 ticks. 1 sample at 48k is 512 ticks
1226     // so 50 ticks = 10%, which is a rather large jitter value.
1227     if(diff-ticks_per_period > 50 || diff-ticks_per_period < -50) {
1228         debugOutput(DEBUG_LEVEL_VERBOSE, "rather large TSP difference TS=%011llu => TS=%011llu (%d, nom %d)\n",
1229                                             m_time_of_transfer2, m_time_of_transfer, diff, ticks_per_period);
1230     }
1231     m_time_of_transfer2 = m_time_of_transfer;
1232     #endif
1233
1234     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1235                         "transfer period %d at %llu ticks...\n",
1236                         m_nbperiods, m_time_of_transfer);
1237
1238     #ifdef DEBUG
1239     int rcv_bf=0, xmt_bf=0;
1240     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1241         it != m_ReceiveProcessors.end();
1242         ++it ) {
1243         rcv_bf = (*it)->getBufferFill();
1244     }
1245     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1246         it != m_TransmitProcessors.end();
1247         ++it ) {
1248         xmt_bf = (*it)->getBufferFill();
1249     }
1250     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
1251                         "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
1252                         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
1253
1254     // check if xruns occurred on the Iso side.
1255     // also check if xruns will occur should we transfer() now
1256     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1257           it != m_ReceiveProcessors.end();
1258           ++it ) {
1259
1260         if ((*it)->xrunOccurred()) {
1261             debugOutput(DEBUG_LEVEL_NORMAL,
1262                         "Xrun on RECV SP %p due to ISO side xrun\n", *it);
1263             (*it)->dumpInfo();
1264         }
1265         if (!((*it)->canClientTransferFrames(m_period))) {
1266             debugOutput(DEBUG_LEVEL_NORMAL,
1267                         "Xrun on RECV SP %p due to buffer side xrun\n", *it);
1268             (*it)->dumpInfo();
1269         }
1270     }
1271     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1272           it != m_TransmitProcessors.end();
1273           ++it ) {
1274         if ((*it)->xrunOccurred()) {
1275             debugOutput(DEBUG_LEVEL_NORMAL,
1276                         "Xrun on XMIT SP %p due to ISO side xrun\n", *it);
1277         }
1278         if (!((*it)->canClientTransferFrames(m_period))) {
1279             debugOutput(DEBUG_LEVEL_NORMAL,
1280                         "Xrun on XMIT SP %p due to buffer side xrun\n", *it);
1281         }
1282     }
1283     #endif
1284     m_nbperiods++;
1285
1286     // this is to notify the client of the delay that we introduced by waiting
1287     pred_system_time_at_xfer = m_SyncSource->getParent().get1394Service().getSystemTimeForCycleTimerTicks(m_time_of_transfer);
1288
1289     m_delayed_usecs = Util::SystemTimeSource::getCurrentTime() - pred_system_time_at_xfer;
1290     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1291                         "delayed for %d usecs...\n",
1292                         m_delayed_usecs);
1293
1294     // now we can signal the client that we are (should be) ready
1295     return !xrun_occurred;
1296 }
1297
1298 /**
1299  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
1300  *
1301  * Transfers one period of frames from the client side to the Iso side and vice versa.
1302  *
1303  * @return true if successful, false otherwise (indicates xrun).
1304  */
1305 bool StreamProcessorManager::transfer() {
1306     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
1307     bool retval=true;
1308     retval &= transfer(StreamProcessor::ePT_Receive);
1309     retval &= transfer(StreamProcessor::ePT_Transmit);
1310     return retval;
1311 }
1312
1313 /**
1314  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
1315  *
1316  * Transfers one period of frames from the client side to the Iso side or vice versa.
1317  *
1318  * @param t The processor type to tranfer for (receive or transmit)
1319  * @return true if successful, false otherwise (indicates xrun).
1320  */
1321 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
1322     if(m_SyncSource == NULL) return false;
1323     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
1324         "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
1325         t, m_time_of_transfer,
1326         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
1327         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
1328         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
1329
1330     bool retval = true;
1331     // a static cast could make sure that there is no performance
1332     // penalty for the virtual functions (to be checked)
1333     if (t==StreamProcessor::ePT_Receive) {
1334         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1335                 it != m_ReceiveProcessors.end();
1336                 ++it ) {
1337             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
1338                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
1339                             m_period, m_time_of_transfer,*it);
1340                 retval &= false; // buffer underrun
1341             }
1342         }
1343     } else {
1344         // FIXME: in the SPM it would be nice to have system time instead of
1345         //        1394 time
1346         float rate = m_SyncSource->getTicksPerFrame();
1347
1348         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1349                 it != m_TransmitProcessors.end();
1350                 ++it ) {
1351             // this is the delay in frames between the point where a frame is received and
1352             // when it is transmitted again
1353             unsigned int one_ringbuffer_in_frames = m_nb_buffers * m_period + (*it)->getExtraBufferFrames();
1354             int64_t one_ringbuffer_in_ticks = (int64_t)(((float)one_ringbuffer_in_frames) * rate);
1355    
1356             // the data we are putting into the buffer is intended to be transmitted
1357             // one ringbuffer size after it has been received
1358             int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1359
1360             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
1361                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
1362                         m_period, transmit_timestamp, *it);
1363                 retval &= false; // buffer underrun
1364             }
1365         }
1366     }
1367     return retval;
1368 }
1369
1370 /**
1371  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
1372  *
1373  * Transfers one period of silence to the Iso side for transmit SP's
1374  * or dump one period of frames for receive SP's
1375  *
1376  * @return true if successful, false otherwise (indicates xrun).
1377  */
1378 bool StreamProcessorManager::transferSilence() {
1379     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
1380     bool retval=true;
1381     // NOTE: the order here is opposite from the order in
1382     // normal operation (transmit is before receive), because
1383     // we can do that here (data=silence=available) and
1384     // it increases reliability (esp. on startup)
1385     retval &= transferSilence(StreamProcessor::ePT_Transmit);
1386     retval &= transferSilence(StreamProcessor::ePT_Receive);
1387     return retval;
1388 }
1389
1390 /**
1391  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
1392  *
1393  * Transfers one period of silence to the Iso side for transmit SP's
1394  * or dump one period of frames for receive SP's
1395  *
1396  * @param t The processor type to tranfer for (receive or transmit)
1397  * @return true if successful, false otherwise (indicates xrun).
1398  */
1399 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
1400     if(m_SyncSource == NULL) return false;
1401     debugOutput( DEBUG_LEVEL_VERY_VERBOSE,
1402         "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
1403         t, m_time_of_transfer,
1404         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
1405         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
1406         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
1407
1408     bool retval = true;
1409     // a static cast could make sure that there is no performance
1410     // penalty for the virtual functions (to be checked)
1411     if (t==StreamProcessor::ePT_Receive) {
1412         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1413                 it != m_ReceiveProcessors.end();
1414                 ++it ) {
1415             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
1416                     debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n",
1417                             m_period, m_time_of_transfer,*it);
1418                 retval &= false; // buffer underrun
1419             }
1420         }
1421     } else {
1422         // FIXME: in the SPM it would be nice to have system time instead of
1423         //        1394 time
1424         float rate = m_SyncSource->getTicksPerFrame();
1425
1426         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1427                 it != m_TransmitProcessors.end();
1428                 ++it ) {
1429             // this is the delay in frames between the point where a frame is received and
1430             // when it is transmitted again
1431             unsigned int one_ringbuffer_in_frames = m_nb_buffers * m_period + (*it)->getExtraBufferFrames();
1432             int64_t one_ringbuffer_in_ticks = (int64_t)(((float)one_ringbuffer_in_frames) * rate);
1433    
1434             // the data we are putting into the buffer is intended to be transmitted
1435             // one ringbuffer size after it has been received
1436             int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1437
1438             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
1439                 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
1440                         m_period, transmit_timestamp, *it);
1441                 retval &= false; // buffer underrun
1442             }
1443         }
1444     }
1445     return retval;
1446 }
1447
1448 void StreamProcessorManager::dumpInfo() {
1449     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1450     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
1451     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
1452     debugOutputShort( DEBUG_LEVEL_NORMAL, "Data type: %s\n", (m_audio_datatype==eADT_Float?"float":"int24"));
1453
1454     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
1455     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1456         it != m_ReceiveProcessors.end();
1457         ++it ) {
1458         (*it)->dumpInfo();
1459     }
1460
1461     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
1462     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1463         it != m_TransmitProcessors.end();
1464         ++it ) {
1465         (*it)->dumpInfo();
1466     }
1467
1468     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1469
1470     // list port info in verbose mode
1471     debugOutputShort( DEBUG_LEVEL_VERBOSE, "Port Information\n");
1472     int nb_ports;
1473    
1474     debugOutputShort( DEBUG_LEVEL_VERBOSE, " Playback\n");
1475     nb_ports = getPortCount(Port::E_Playback);
1476     for(int i=0; i < nb_ports; i++) {
1477         Port *p = getPortByIndex(i, Port::E_Playback);
1478         debugOutputShort( DEBUG_LEVEL_VERBOSE, "  %3d (%p): ", i, p);
1479         if (p) {
1480             bool disabled = p->isDisabled();
1481             debugOutputShort( DEBUG_LEVEL_VERBOSE, "[%p] [%3s] ", &p->getManager(), (disabled?"off":"on"));
1482             debugOutputShort( DEBUG_LEVEL_VERBOSE, "[%7s] ", p->getPortTypeName().c_str());
1483             debugOutputShort( DEBUG_LEVEL_VERBOSE, "%3s ", p->getName().c_str());
1484         } else {
1485             debugOutputShort( DEBUG_LEVEL_VERBOSE, "invalid ");
1486         }
1487         debugOutputShort( DEBUG_LEVEL_VERBOSE, "\n");
1488     }
1489     debugOutputShort( DEBUG_LEVEL_VERBOSE, " Capture\n");
1490     nb_ports = getPortCount(Port::E_Capture);
1491     for(int i=0; i < nb_ports; i++) {
1492         Port *p = getPortByIndex(i, Port::E_Capture);
1493         debugOutputShort( DEBUG_LEVEL_VERBOSE, "  %3d (%p): ", i, p);
1494         if (p) {
1495             bool disabled = p->isDisabled();
1496             debugOutputShort( DEBUG_LEVEL_VERBOSE, "[%p] [%3s] ", &p->getManager(), (disabled?"off":"on"));
1497             debugOutputShort( DEBUG_LEVEL_VERBOSE, "[%7s] ", p->getPortTypeName().c_str());
1498             debugOutputShort( DEBUG_LEVEL_VERBOSE, " %3s ", p->getName().c_str());
1499         } else {
1500             debugOutputShort( DEBUG_LEVEL_VERBOSE, " invalid ");
1501         }
1502         debugOutputShort( DEBUG_LEVEL_VERBOSE, "\n");
1503     }
1504
1505     debugOutputShort( DEBUG_LEVEL_VERBOSE, "----------------------------------------------------\n");
1506
1507 }
1508
1509 void StreamProcessorManager::setVerboseLevel(int l) {
1510     if(m_WaitLock) m_WaitLock->setVerboseLevel(l);
1511
1512     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1513         it != m_ReceiveProcessors.end();
1514         ++it ) {
1515         (*it)->setVerboseLevel(l);
1516     }
1517     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1518         it != m_TransmitProcessors.end();
1519         ++it ) {
1520         (*it)->setVerboseLevel(l);
1521     }
1522     setDebugLevel(l);
1523     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting verbose level to %d...\n", l );
1524 }
1525
1526 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1527     int count=0;
1528
1529     if (direction == Port::E_Capture) {
1530         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1531             it != m_ReceiveProcessors.end();
1532             ++it ) {
1533             count += (*it)->getPortCount(type);
1534         }
1535     } else {
1536         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1537             it != m_TransmitProcessors.end();
1538             ++it ) {
1539             count += (*it)->getPortCount(type);
1540         }
1541     }
1542     return count;
1543 }
1544
1545 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1546     int count=0;
1547
1548     if (direction == Port::E_Capture) {
1549         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1550             it != m_ReceiveProcessors.end();
1551             ++it ) {
1552             count += (*it)->getPortCount();
1553         }
1554     } else {
1555         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1556             it != m_TransmitProcessors.end();
1557             ++it ) {
1558             count += (*it)->getPortCount();
1559         }
1560     }
1561     return count;
1562 }
1563
1564 void
1565 StreamProcessorManager::updateShadowLists()
1566 {
1567     debugOutput( DEBUG_LEVEL_VERBOSE, "Updating port shadow lists...\n");
1568     m_CapturePorts_shadow.clear();
1569     m_PlaybackPorts_shadow.clear();
1570
1571     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1572         it != m_ReceiveProcessors.end();
1573         ++it ) {
1574         PortManager *pm = *it;
1575         for (int i=0; i < pm->getPortCount(); i++) {
1576             Port *p = pm->getPortAtIdx(i);
1577             if (!p) {
1578                 debugError("getPortAtIdx(%d) returned NULL\n", i);
1579                 continue;
1580             }
1581             if(p->getDirection() != Port::E_Capture) {
1582                 debugError("port at idx %d for receive SP is not a capture port!\n", i);
1583                 continue;
1584             }
1585             m_CapturePorts_shadow.push_back(p);
1586         }
1587     }
1588     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1589         it != m_TransmitProcessors.end();
1590         ++it ) {
1591         PortManager *pm = *it;
1592         for (int i=0; i < pm->getPortCount(); i++) {
1593             Port *p = pm->getPortAtIdx(i);
1594             if (!p) {
1595                 debugError("getPortAtIdx(%d) returned NULL\n", i);
1596                 continue;
1597             }
1598             if(p->getDirection() != Port::E_Playback) {
1599                 debugError("port at idx %d for transmit SP is not a playback port!\n", i);
1600                 continue;
1601             }
1602             m_PlaybackPorts_shadow.push_back(p);
1603         }
1604     }
1605 }
1606
1607 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1608     debugOutputExtreme( DEBUG_LEVEL_ULTRA_VERBOSE, "getPortByIndex(%d, %d)...\n", idx, direction);
1609     if (direction == Port::E_Capture) {
1610         #ifdef DEBUG
1611         if(idx >= (int)m_CapturePorts_shadow.size()) {
1612             debugError("Capture port %d out of range (%d)\n", idx, m_CapturePorts_shadow.size());
1613             return NULL;
1614         }
1615         #endif
1616         return m_CapturePorts_shadow.at(idx);
1617     } else {
1618         #ifdef DEBUG
1619         if(idx >= (int)m_PlaybackPorts_shadow.size()) {
1620             debugError("Playback port %d out of range (%d)\n", idx, m_PlaybackPorts_shadow.size());
1621             return NULL;
1622         }
1623         #endif
1624         return m_PlaybackPorts_shadow.at(idx);
1625     }
1626     return NULL;
1627 }
1628
1629 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1630     m_thread_realtime=rt;
1631     m_thread_priority=priority;
1632     return true;
1633 }
1634
1635
1636 } // end of namespace
Note: See TracBrowser for help on using the browser.