root/trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

Revision 2177, 70.3 kB (checked in by jwoithe, 8 years ago)

More work on ticket #242. sem_timedwait() can only work against CLOCK_REALTIME, so the time used to construct its absolute timeout value must be acquired from CLOCK_REALTIME regardless of the clock used by Util::SystemTimeSource?.

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "config.h"
25
26 #include "StreamProcessorManager.h"
27 #include "generic/StreamProcessor.h"
28 #include "generic/Port.h"
29 #include "libieee1394/cycletimer.h"
30
31 #include "devicemanager.h"
32
33 #include "libutil/Time.h"
34
35 #include <errno.h>
36 #include <assert.h>
37 #include <math.h>
38
39 namespace Streaming {
40
41 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
42
43 StreamProcessorManager::StreamProcessorManager(DeviceManager &p)
44     : m_time_of_transfer ( 0 )
45     #ifdef DEBUG
46     , m_time_of_transfer2 ( 0 )
47     #endif
48     , m_is_slave( false )
49     , m_SyncSource(NULL)
50     , m_parent( p )
51     , m_xrun_happened( false )
52     , m_activity_wait_timeout_nsec( 0 ) // dynamically set
53     , m_nb_buffers( 0 )
54     , m_period( 0 )
55     , m_sync_delay( 0 )
56     , m_audio_datatype( eADT_Float )
57     , m_nominal_framerate ( 0 )
58     , m_xruns(0)
59     , m_shutdown_needed(false)
60     , m_nbperiods(0)
61     , m_WaitLock( new Util::PosixMutex("SPMWAIT") )
62     , m_max_diff_ticks( 50 )
63 {
64     addOption(Util::OptionContainer::Option("slaveMode",false));
65     sem_init(&m_activity_semaphore, 0, 0);
66 }
67
68 StreamProcessorManager::StreamProcessorManager(DeviceManager &p, unsigned int period,
69                                                unsigned int framerate, unsigned int nb_buffers)
70     : m_time_of_transfer ( 0 )
71     #ifdef DEBUG
72     , m_time_of_transfer2 ( 0 )
73     #endif
74     , m_is_slave( false )
75     , m_SyncSource(NULL)
76     , m_parent( p )
77     , m_xrun_happened( false )
78     , m_activity_wait_timeout_nsec( 0 ) // dynamically set
79     , m_nb_buffers(nb_buffers)
80     , m_period(period)
81     , m_sync_delay( 0 )
82     , m_audio_datatype( eADT_Float )
83     , m_nominal_framerate ( framerate )
84     , m_xruns(0)
85     , m_shutdown_needed(false)
86     , m_nbperiods(0)
87     , m_WaitLock( new Util::PosixMutex("SPMWAIT") )
88     , m_max_diff_ticks( 50 )
89 {
90     addOption(Util::OptionContainer::Option("slaveMode",false));
91     sem_init(&m_activity_semaphore, 0, 0);
92 }
93
94 StreamProcessorManager::~StreamProcessorManager() {
95     sem_post(&m_activity_semaphore);
96     sem_destroy(&m_activity_semaphore);
97     delete m_WaitLock;
98 }
99
100 // void
101 // StreamProcessorManager::handleBusReset(Ieee1394Service &s)
102 // {
103 // //     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) Handle bus reset on service %p...\n", this, &s);
104 // //
105 // //     bool handled_at_least_one = false;
106 // //     // note that all receive streams are gone once a device is unplugged
107 // //
108 // //     // synchronize with the wait lock
109 // //     Util::MutexLockHelper lock(*m_WaitLock);
110 // //
111 // //     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) got wait lock...\n", this);
112 // //     // cause all SP's to bail out
113 // //     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
114 // //           it != m_ReceiveProcessors.end();
115 // //           ++it )
116 // //     {
117 // //         if(&s == &((*it)->getParent().get1394Service())) {
118 // //             debugOutput(DEBUG_LEVEL_NORMAL,
119 // //                         "issue busreset on receive SPM on channel %d\n",
120 // //                         (*it)->getChannel());
121 // //             (*it)->handleBusReset();
122 // //             handled_at_least_one = true;
123 // //         } else {
124 // //             debugOutput(DEBUG_LEVEL_NORMAL,
125 // //                         "skipping receive SPM on channel %d since not on service %p\n",
126 // //                         (*it)->getChannel(), &s);
127 // //         }
128 // //     }
129 // //     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
130 // //           it != m_TransmitProcessors.end();
131 // //           ++it )
132 // //     {
133 // //         if(&s == &((*it)->getParent().get1394Service())) {
134 // //             debugOutput(DEBUG_LEVEL_NORMAL,
135 // //                         "issue busreset on transmit SPM on channel %d\n",
136 // //                         (*it)->getChannel());
137 // //             (*it)->handleBusReset();
138 // //             handled_at_least_one = true;
139 // //         } else {
140 // //             debugOutput(DEBUG_LEVEL_NORMAL,
141 // //                         "skipping transmit SPM on channel %d since not on service %p\n",
142 // //                         (*it)->getChannel(), &s);
143 // //         }
144 // //     }
145 // //
146 // //     // FIXME: we request shutdown for now.
147 // //     m_shutdown_needed = handled_at_least_one;
148 // }
149
150 void
151 StreamProcessorManager::signalActivity()
152 {
153     sem_post(&m_activity_semaphore);
154     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p activity\n", this);
155 }
156
157 enum StreamProcessorManager::eActivityResult
158 StreamProcessorManager::waitForActivity()
159 {
160     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p waiting for activity\n", this);
161     struct timespec ts;
162     int result;
163
164     if (m_activity_wait_timeout_nsec >= 0) {
165
166         // CLOCK_REALTIME must be used because that's what sem_timedwait()
167         // uses.  This is safe - regardless of the clock used by
168         // Util::SystemTimeSource - so long as the resulting time is only
169         // used to implement a timeout in sem_timedwait().
170         if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
171             debugError("clock_gettime failed\n");
172             return eAR_Error;
173         }
174         ts.tv_nsec += m_activity_wait_timeout_nsec;
175         while(ts.tv_nsec >= 1000000000LL) {
176             ts.tv_sec += 1;
177             ts.tv_nsec -= 1000000000LL;
178         }
179     }
180
181     if (m_activity_wait_timeout_nsec >= 0) {
182         result = sem_timedwait(&m_activity_semaphore, &ts);
183     } else {
184         result = sem_wait(&m_activity_semaphore);
185     }
186
187     if(result != 0) {
188         if (errno == ETIMEDOUT) {
189             debugOutput(DEBUG_LEVEL_VERBOSE,
190                         "(%p) sem_timedwait() timed out (result=%d)\n",
191                         this, result);
192             return eAR_Timeout;
193         } else if (errno == EINTR) {
194             debugOutput(DEBUG_LEVEL_VERBOSE,
195                         "(%p) sem_[timed]wait() interrupted by signal (result=%d)\n",
196                         this, result);
197             return eAR_Interrupted;
198         } else if (errno == EINVAL) {
199             debugError("(%p) sem_[timed]wait error (result=%d errno=EINVAL)\n",
200                         this, result);
201             debugError("(%p) timeout_nsec=%"PRId64" ts.sec=%"PRId64" ts.nsec=%"PRId64"\n",
202                        this, m_activity_wait_timeout_nsec,
203                        (int64_t)ts.tv_sec, (int64_t)ts.tv_nsec);
204             return eAR_Error;
205         } else {
206             debugError("(%p) sem_[timed]wait error (result=%d errno=%d)\n",
207                         this, result, errno);
208             debugError("(%p) timeout_nsec=%"PRId64" ts.sec=%"PRId64" ts.nsec=%"PRId64"\n",
209                        this, m_activity_wait_timeout_nsec,
210                        (int64_t)ts.tv_sec, (int64_t)ts.tv_nsec);
211             return eAR_Error;
212         }
213     }
214
215     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p got activity\n", this);
216     return eAR_Activity;
217 }
218
219 /**
220  * Registers \ref processor with this manager.
221  *
222  * also registers it with the isohandlermanager
223  *
224  * be sure to call isohandlermanager->init() first!
225  * and be sure that the processors are also ->init()'ed
226  *
227  * @param processor
228  * @return true if successfull
229  */
230 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
231 {
232     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
233     assert(processor);
234     if (processor->getType() == StreamProcessor::ePT_Receive) {
235         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
236         m_ReceiveProcessors.push_back(processor);
237         Util::Functor* f = new Util::MemberFunctor0< StreamProcessorManager*, void (StreamProcessorManager::*)() >
238                     ( this, &StreamProcessorManager::updateShadowLists, false );
239         processor->addPortManagerUpdateHandler(f);
240         updateShadowLists();
241         return true;
242     }
243     if (processor->getType() == StreamProcessor::ePT_Transmit) {
244         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
245         m_TransmitProcessors.push_back(processor);
246         Util::Functor* f = new Util::MemberFunctor0< StreamProcessorManager*, void (StreamProcessorManager::*)() >
247                     ( this, &StreamProcessorManager::updateShadowLists, false );
248         processor->addPortManagerUpdateHandler(f);
249         updateShadowLists();
250         return true;
251     }
252
253     debugFatal("Unsupported processor type!\n");
254     return false;
255 }
256
257 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
258 {
259     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
260     assert(processor);
261
262     if (processor->getType()==StreamProcessor::ePT_Receive) {
263
264         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
265               it != m_ReceiveProcessors.end();
266               ++it )
267         {
268             if ( *it == processor ) {
269                 if (*it == m_SyncSource) {
270                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
271                     m_SyncSource = NULL;
272                 }
273                 m_ReceiveProcessors.erase(it);
274                 // remove the functor
275                 Util::Functor * f = processor->getUpdateHandlerForPtr(this);
276                 if(f) {
277                     processor->remPortManagerUpdateHandler(f);
278                     delete f;
279                 }
280                 updateShadowLists();
281                 return true;
282             }
283         }
284     }
285
286     if (processor->getType()==StreamProcessor::ePT_Transmit) {
287         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
288               it != m_TransmitProcessors.end();
289               ++it )
290         {
291             if ( *it == processor ) {
292                 if (*it == m_SyncSource) {
293                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
294                     m_SyncSource = NULL;
295                 }
296                 m_TransmitProcessors.erase(it);
297                 // remove the functor
298                 Util::Functor * f = processor->getUpdateHandlerForPtr(this);
299                 if(f) {
300                     processor->remPortManagerUpdateHandler(f);
301                     delete f;
302                 }
303                 updateShadowLists();
304                 return true;
305             }
306         }
307     }
308
309     debugFatal("Processor (%p) not found!\n",processor);
310     return false; //not found
311 }
312
313 bool StreamProcessorManager::streamingParamsOk(signed int period, signed int rate, signed int n_buffers)
314 {
315     // Return true if the given parameter combination is valid.  If any
316     // parameter is set to -1 the currently set value is used in the test.
317     signed int min_period;
318
319     if (period < 0)
320         period = m_period;
321     if (rate < 0)
322         rate = m_nominal_framerate;
323     if (n_buffers < 0)
324         n_buffers = m_nb_buffers;
325
326     // For most interfaces data is transmitted with 8/16/32 samples per
327     // packet (at 1x, 2x and 4x rates respectively).  This more or less
328     // places a lower limit on the size of the period.  Furthermore, the
329     // current FFADO architecture dictates that m_nb_buffers can be no lower
330     // than 2.
331
332     if (n_buffers < 2) {
333         printMessage("FFADO requires at least 2 buffers\n");
334         return false;
335     }
336
337     // The boundary between 1x, 2x and 4x speed is taken from the RME driver
338     // since this seems to be the device with the widest available sampling
339     // rate range.
340     if (rate < 56000) {
341         // 1x speed
342         min_period = 8;
343     } else
344     if (rate < 112000) {
345         // 2x speed
346         min_period = 16;
347     } else {
348         // 4x speed
349         min_period = 32;
350     }
351
352     if (period < min_period) {
353         printMessage("At a rate of %d Hz, FFADO requires a buffer size of at least %d samples\n",
354             rate, min_period);
355         return false;
356     }
357     return true;
358 }
359
360 void StreamProcessorManager::setPeriodSize(unsigned int period) {
361     // This method is called early in the initialisation sequence to set the
362     // initial period size.  However, at that point in time the stream
363     // processors haven't been registered so they won't have their buffers
364     // configured from here.  The initial allocation of the stream processor
365     // (SP) buffers happens from within the SP prepare() method.
366     //
367     // SP period size changes will normally only be acted on from here
368     // if the change comes about due to a runtime change in the buffer size,
369     // as happens via jack's setbufsize facility for example.
370
371     if (period == m_period)
372         return;
373
374     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting period size to %d (was %d)\n", period, m_period);
375     m_period = period;
376
377     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
378           it != m_ReceiveProcessors.end();
379           ++it )
380     {
381         if ((*it)->periodSizeChanged(period) == false)
382             debugWarning("receive stream processor %p couldn't set period size\n", *it);
383     }
384     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
385           it != m_TransmitProcessors.end();
386           ++it )
387     {
388         if ((*it)->periodSizeChanged(period) == false)
389             debugWarning("transmit stream processor %p couldn't set period size\n", *it);
390     }
391
392     // Keep the activity timeout in sync with the new period size.  See
393     // also comments about this in prepare().
394     if (m_nominal_framerate > 0) {
395         int timeout_usec = 2*1000LL * 1000LL * m_period / m_nominal_framerate;
396         debugOutput(DEBUG_LEVEL_VERBOSE, "setting activity timeout to %d\n", timeout_usec);
397         setActivityWaitTimeoutUsec(timeout_usec);
398     }
399 }
400
401 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
402     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
403     m_SyncSource = s;
404     return true;
405 }
406
407 bool StreamProcessorManager::prepare() {
408
409     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
410     m_is_slave=false;
411     if(!getOption("slaveMode", m_is_slave)) {
412         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
413     }
414
415     m_shutdown_needed=false;
416
417     // if no sync source is set, select one here
418     if(m_SyncSource == NULL) {
419        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
420     }
421
422     // FIXME: put into separate method
423     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
424           it != m_ReceiveProcessors.end();
425           ++it )
426     {
427         if(m_SyncSource == NULL) {
428             debugWarning(" => Sync Source is %p.\n", *it);
429             m_SyncSource = *it;
430         }
431     }
432     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
433           it != m_TransmitProcessors.end();
434           ++it )
435     {
436         if(m_SyncSource == NULL) {
437             debugWarning(" => Sync Source is %p.\n", *it);
438             m_SyncSource = *it;
439         }
440     }
441
442     // now do the actual preparation of the SP's
443     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
444     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
445         it != m_ReceiveProcessors.end();
446         ++it ) {
447
448         if(!(*it)->setOption("slaveMode", m_is_slave)) {
449             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
450         }
451
452         if(!(*it)->prepare()) {
453             debugFatal(  " could not prepare (%p)...\n",(*it));
454             return false;
455         }
456     }
457     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
458     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
459         it != m_TransmitProcessors.end();
460         ++it ) {
461         if(!(*it)->setOption("slaveMode", m_is_slave)) {
462             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
463         }
464         if(!(*it)->prepare()) {
465             debugFatal( " could not prepare (%p)...\n",(*it));
466             return false;
467         }
468     }
469
470     // if there are no stream processors registered,
471     // fail
472     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
473         debugFatal("No stream processors registered, can't do anything useful\n");
474         return false;
475     }
476
477     // set the activity timeout value to two periods worth of usecs.
478     // since we can expect activity once every period, but we might have some
479     // offset, the safe value is two periods.
480     int timeout_usec = 2*1000LL * 1000LL * m_period / m_nominal_framerate;
481     debugOutput(DEBUG_LEVEL_VERBOSE, "setting activity timeout to %d\n", timeout_usec);
482     setActivityWaitTimeoutUsec(timeout_usec);
483
484     updateShadowLists();
485
486     return true;
487 }
488
489 bool
490 StreamProcessorManager::startDryRunning()
491 {
492     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
493     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
494             it != m_TransmitProcessors.end();
495             ++it ) {
496         if ((*it)->inError()) {
497             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
498             return false;
499         }
500         if (!(*it)->isDryRunning()) {
501             if(!(*it)->scheduleStartDryRunning(-1)) {
502                 debugError("Could not put '%s' SP %p into the dry-running state\n", (*it)->getTypeString(), *it);
503                 return false;
504             }
505         } else {
506             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
507         }
508     }
509     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
510             it != m_ReceiveProcessors.end();
511             ++it ) {
512         if ((*it)->inError()) {
513             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
514             return false;
515         }
516         if (!(*it)->isDryRunning()) {
517             if(!(*it)->scheduleStartDryRunning(-1)) {
518                 debugError("Could not put '%s' SP %p into the dry-running state\n", (*it)->getTypeString(), *it);
519                 return false;
520             }
521         } else {
522             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
523         }
524     }
525     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
526     // wait for the syncsource to start running.
527     // that will block the waitForPeriod call until everyone has started (theoretically)
528     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_DRYRUN; // by then it should have started
529     bool all_dry_running = false;
530     while (!all_dry_running && cnt) {
531         all_dry_running = true;
532         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
533                 it != m_ReceiveProcessors.end();
534                 ++it ) {
535             all_dry_running &= (*it)->isDryRunning();
536         }
537         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
538                 it != m_TransmitProcessors.end();
539                 ++it ) {
540             all_dry_running &= (*it)->isDryRunning();
541         }
542
543         SleepRelativeUsec(125);
544         cnt--;
545     }
546     if(cnt==0) {
547         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
548         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
549                 it != m_ReceiveProcessors.end();
550                 ++it ) {
551             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
552                 (*it)->getTypeString(), *it, (*it)->getStateString());
553         }
554         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
555                 it != m_TransmitProcessors.end();
556                 ++it ) {
557             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
558                 (*it)->getTypeString(), *it, (*it)->getStateString());
559         }
560         return false;
561     }
562     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
563     return true;
564 }
565
566 bool StreamProcessorManager::syncStartAll() {
567     if(m_SyncSource == NULL) return false;
568
569     // get the options
570     int signal_delay_ticks = STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS;
571     int xmit_prebuffer_frames = STREAMPROCESSORMANAGER_XMIT_PREBUFFER_FRAMES;
572     int sync_wait_time_msec = STREAMPROCESSORMANAGER_SYNC_WAIT_TIME_MSEC;
573     int cycles_for_startup = STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP;
574     int prestart_cycles_for_xmit = STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_XMIT;
575     int prestart_cycles_for_recv = STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_RECV;
576     Util::Configuration &config = m_parent.getConfiguration();
577     config.getValueForSetting("streaming.spm.signal_delay_ticks", signal_delay_ticks);
578     config.getValueForSetting("streaming.spm.xmit_prebuffer_frames", xmit_prebuffer_frames);
579     config.getValueForSetting("streaming.spm.sync_wait_time_msec", sync_wait_time_msec);
580     config.getValueForSetting("streaming.spm.cycles_for_startup", cycles_for_startup);
581     config.getValueForSetting("streaming.spm.prestart_cycles_for_xmit", prestart_cycles_for_xmit);
582     config.getValueForSetting("streaming.spm.prestart_cycles_for_recv", prestart_cycles_for_recv);
583
584     // figure out when to get the SP's running.
585     // the xmit SP's should also know the base timestamp
586     // streams should be aligned here
587
588     // now find out how long we have to delay the wait operation such that
589     // the received frames will all be presented to the SP
590     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
591     int max_of_min_delay = 0;
592     int min_delay = 0;
593     int packet_size_frames = 0;
594     int max_packet_size_frames = 0;
595
596     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
597             it != m_ReceiveProcessors.end();
598             ++it ) {
599         min_delay = (*it)->getMaxFrameLatency();
600         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
601         packet_size_frames = (*it)->getNominalFramesPerPacket();
602         if(packet_size_frames > max_packet_size_frames) max_packet_size_frames = packet_size_frames;
603     }
604     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
605             it != m_TransmitProcessors.end();
606             ++it ) {
607         packet_size_frames = (*it)->getNominalFramesPerPacket();
608         if(packet_size_frames > max_packet_size_frames) max_packet_size_frames = packet_size_frames;
609     }
610     debugOutput( DEBUG_LEVEL_VERBOSE, " max_of_min_delay = %d, max_packet_size_frames = %d...\n", max_of_min_delay, max_packet_size_frames);
611
612     // add some processing margin. This only shifts the time
613     // at which the buffer is transfer()'ed. This makes things somewhat
614     // more robust.
615     m_sync_delay = max_of_min_delay + signal_delay_ticks;
616
617     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
618     //        have aquired lock
619     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
620     //sleep(2); // FIXME: be smarter here
621
622     // make sure that we are dry-running long enough for the
623     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
624     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
625
626     unsigned int nb_sync_runs = (sync_wait_time_msec * getNominalRate());
627     nb_sync_runs /= 1000;
628     nb_sync_runs /= getPeriodSize();
629
630     while(nb_sync_runs--) { // or while not sync-ed?
631         // check if we were woken up too soon
632         uint64_t ticks_at_period = m_SyncSource->getTimeAtPeriod();
633         uint64_t ticks_at_period_margin = ticks_at_period + m_sync_delay;
634         uint64_t pred_system_time_at_xfer = m_SyncSource->getParent().get1394Service().getSystemTimeForCycleTimerTicks(ticks_at_period_margin);
635    
636         #ifdef DEBUG
637         int64_t now = Util::SystemTimeSource::getCurrentTime();
638         debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "CTR  pred: %"PRId64", syncdelay: %"PRId64", diff: %"PRId64"\n", ticks_at_period, ticks_at_period_margin, ticks_at_period_margin-ticks_at_period );
639         debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "PREWAIT  pred: %"PRId64", now: %"PRId64", wait: %"PRId64"\n", pred_system_time_at_xfer, now, pred_system_time_at_xfer-now );
640         #endif
641    
642         // wait until it's time to transfer
643         Util::SystemTimeSource::SleepUsecAbsolute(pred_system_time_at_xfer);
644    
645         #ifdef DEBUG
646         now = Util::SystemTimeSource::getCurrentTime();
647         debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "POSTWAIT pred: %"PRId64", now: %"PRId64", excess: %"PRId64"\n", pred_system_time_at_xfer, now, now-pred_system_time_at_xfer );
648         #endif
649     }
650
651     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
652     // FIXME: in the SPM it would be nice to have system time instead of
653     //        1394 time
654
655     float syncrate = 0.0;
656     float tpf = m_SyncSource->getTicksPerFrame();
657     if (tpf > 0.0) {
658         syncrate = 24576000.0/tpf;
659     } else {
660         debugWarning("tpf <= 0? %f\n", tpf);
661     }
662     debugOutput( DEBUG_LEVEL_VERBOSE, " sync source frame rate: %f fps (%f tpf)\n", syncrate, tpf);
663
664     // we now should have decent sync info on the sync source
665     // determine a point in time where the system should start
666     // figure out where we are now
667     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
668     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011"PRIu64" (%03us %04uc %04ut)...\n",
669         time_of_first_sample,
670         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
671         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
672         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
673
674     // start wet-running in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles
675     // this is the time window we have to setup all SP's such that they
676     // can start wet-running correctly.
677     // we have to round this time to an integer number of audio packets
678     double time_for_startup_abs = (double)(cycles_for_startup * TICKS_PER_CYCLE);
679     int time_for_startup_frames = (int)(time_for_startup_abs / tpf);
680     time_for_startup_frames = ((time_for_startup_frames / max_packet_size_frames) + 1) * max_packet_size_frames;
681     uint64_t time_for_startup_ticks = (uint64_t)((float)time_for_startup_frames * tpf);
682
683     time_of_first_sample = addTicks(time_of_first_sample,
684                                     time_for_startup_ticks);
685     debugOutput( DEBUG_LEVEL_VERBOSE, "  add %d frames (%011"PRIu64" ticks)...\n",
686         time_for_startup_frames, time_for_startup_ticks);
687
688     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011"PRIu64" (%03us %04uc %04ut)...\n",
689         time_of_first_sample,
690         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
691         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
692         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
693
694     // we should start wet-running the transmit SP's some cycles in advance
695     // such that we know it is wet-running when it should output its first sample
696     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
697                                                  prestart_cycles_for_xmit * TICKS_PER_CYCLE);
698
699     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
700                                                  prestart_cycles_for_recv * TICKS_PER_CYCLE);
701     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011"PRIu64" (%03us %04uc %04ut)...\n",
702         time_to_start_xmit,
703         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
704         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
705         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
706     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011"PRIu64" (%03us %04uc %04ut)...\n",
707         time_to_start_recv,
708         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
709         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
710         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
711
712     // print the sync delay
713     int sync_delay_frames = (int)((float)m_sync_delay / m_SyncSource->getTicksPerFrame());
714     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay: %d = %d + %d ticks (%03us %04uc %04ut) [%d frames]...\n",
715         m_sync_delay, max_of_min_delay, signal_delay_ticks,
716         (unsigned int)TICKS_TO_SECS(m_sync_delay),
717         (unsigned int)TICKS_TO_CYCLES(m_sync_delay),
718         (unsigned int)TICKS_TO_OFFSET(m_sync_delay),
719         sync_delay_frames);
720
721     // the amount of prebuffer frames should be a multiple of the common block size
722     // as otherwise the position of MIDI is messed up
723     if(xmit_prebuffer_frames % max_packet_size_frames) {
724         int tmp = 0;
725         while(tmp < xmit_prebuffer_frames) {
726             tmp += max_packet_size_frames;
727         }
728         debugOutput(DEBUG_LEVEL_VERBOSE,
729                     "The number of prebuffer frames (%d) is not a multiple of the common block size (%d), increased to %d...\n",
730                     xmit_prebuffer_frames, max_packet_size_frames, tmp);
731         xmit_prebuffer_frames = tmp;
732     }
733
734     // check if this can even work.
735     // the worst case point where we can receive a period is at 1 period + sync delay
736     // this means that the number of frames in the xmit buffer has to be at least
737     // 1 period + sync delay
738     if(xmit_prebuffer_frames + m_period * m_nb_buffers < m_period + sync_delay_frames) {
739         debugWarning("The amount of transmit buffer frames (%d) is too small (< %d). "
740                      "This will most likely cause xruns.\n",
741                      xmit_prebuffer_frames + m_period * m_nb_buffers,
742                      m_period + sync_delay_frames);
743     }
744
745     // at this point the buffer head timestamp of the transmit buffers can be set
746     // this is the presentation time of the first sample in the buffer
747     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
748           it != m_TransmitProcessors.end();
749           ++it ) {
750         // set the number of prebuffer frames
751         (*it)->setExtraBufferFrames(xmit_prebuffer_frames);
752
753         // set the TSP of the first sample in the buffer
754         (*it)->setBufferHeadTimestamp(time_of_first_sample);
755         ffado_timestamp_t ts;
756         signed int fc;
757         (*it)->getBufferHeadTimestamp ( &ts, &fc );
758         debugOutput( DEBUG_LEVEL_VERBOSE, " transmit buffer tail %010"PRId64" => head TS %010"PRIu64", fc=%d...\n",
759                     time_of_first_sample, (uint64_t)ts, fc);
760     }
761
762     // the receive processors can be delayed by sync_delay ticks
763     // this means that in the worst case we have to be able to accomodate
764     // an extra sync_delay ticks worth of frames in the receive SP buffer
765     // the sync delay should be rounded to an integer amount of max_packet_size
766     int tmp = sync_delay_frames / max_packet_size_frames;
767     tmp = tmp + 1;
768     sync_delay_frames = tmp * max_packet_size_frames;
769     if (sync_delay_frames < 1024) sync_delay_frames = 1024; //HACK
770
771     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
772           it != m_ReceiveProcessors.end();
773           ++it ) {
774         // set the number of extra buffer frames
775         (*it)->setExtraBufferFrames(sync_delay_frames);
776     }
777
778     // switch syncsource to running state
779     uint64_t time_to_start_sync;
780     // FIXME: this is most likely not going to work for transmit sync sources
781     // but those are unsupported in this version
782     if(m_SyncSource->getType() == StreamProcessor::ePT_Receive ) {
783         time_to_start_sync = time_to_start_recv;
784     } else {
785         time_to_start_sync = time_to_start_xmit;
786     }
787     if(!m_SyncSource->scheduleStartRunning(time_to_start_sync)) {
788         debugError("m_SyncSource->scheduleStartRunning(%11"PRIu64") failed\n", time_to_start_sync);
789         return false;
790     }
791
792     // STEP X: switch all non-syncsource SP's over to the running state
793     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
794           it != m_ReceiveProcessors.end();
795           ++it ) {
796         if(*it != m_SyncSource) {
797             if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
798                 debugError("%p->scheduleStartRunning(%11"PRIu64") failed\n", *it, time_to_start_recv);
799                 return false;
800             }
801         }
802     }
803     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
804           it != m_TransmitProcessors.end();
805           ++it ) {
806         if(*it != m_SyncSource) {
807             if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
808                 debugError("%p->scheduleStartRunning(%11"PRIu64") failed\n", *it, time_to_start_xmit);
809                 return false;
810             }
811         }
812     }
813     // wait for the syncsource to start running.
814     // that will block the waitForPeriod call until everyone has started (theoretically)
815     // note: the SP's are scheduled to start in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles,
816     // so a 20 times this value should be a good timeout
817     //int cnt = cycles_for_startup * 20; // by then it should have started
818     // or maybe we just have to use 1 second, as this wraps the cycle counter
819     int cnt = 8000;
820     while (!m_SyncSource->isRunning() && cnt) {
821         SleepRelativeUsec(125);
822         cnt--;
823     }
824     if(cnt==0) {
825         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
826         return false;
827     }
828
829     // the sync source is running, we can now read a decent received timestamp from it
830     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
831
832     // and a (rough) approximation of the rate
833     float rate = m_SyncSource->getTicksPerFrame();
834
835     #ifdef DEBUG
836     // the time at which the previous period would have passed
837     m_time_of_transfer2 = m_time_of_transfer;
838     m_time_of_transfer2 = substractTicks(m_time_of_transfer2, (uint64_t)(m_period * rate));
839     #endif
840
841     debugOutput( DEBUG_LEVEL_VERBOSE, "  initial time of transfer %010"PRId64", rate %f...\n",
842                 m_time_of_transfer, rate);
843
844     // FIXME: ideally we'd want the SP itself to account for the xmit_prebuffer_frames
845     // but that would also require to use a different approach to setting the initial TSP's
846     int64_t delay_in_ticks = (int64_t)(((float)((m_nb_buffers-1) * m_period + xmit_prebuffer_frames)) * rate);
847
848     // then use this information to initialize the xmit handlers
849
850     //  we now set the buffer tail timestamp of the transmit buffer
851     //  to the period transfer time instant plus what's nb_buffers - 1
852     //  in ticks. This due to the fact that we (should) have received one period
853     //  worth of ticks at t = m_time_of_transfer
854     //  hence one period of frames should also have been transmitted, which means
855     //  that there should be (nb_buffers - 1) * periodsize of frames in the xmit buffer
856     //  there are also xmit_prebuffer_frames frames extra present in the buffer
857     //  that allows us to calculate the tail timestamp for the buffer.
858
859     int64_t transmit_tail_timestamp = addTicks(m_time_of_transfer, delay_in_ticks);
860     debugOutput( DEBUG_LEVEL_VERBOSE, "  preset transmit tail TS %010"PRId64", rate %f...\n",
861                 transmit_tail_timestamp, rate);
862
863     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
864         it != m_TransmitProcessors.end();
865         ++it ) {
866         (*it)->setTicksPerFrame(rate);
867         (*it)->setBufferTailTimestamp(transmit_tail_timestamp);
868         ffado_timestamp_t ts;
869         signed int fc;
870         (*it)->getBufferHeadTimestamp ( &ts, &fc );
871         debugOutput( DEBUG_LEVEL_VERBOSE, "   => transmit head TS %010"PRId64", fc=%d...\n",
872                     (uint64_t)ts, fc);
873     }
874
875     // align the received streams to be phase aligned
876     if(!alignReceivedStreams()) {
877         debugError("Could not align streams...\n");
878         return false;
879     }
880
881     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
882     return true;
883 }
884
885 bool
886 StreamProcessorManager::alignReceivedStreams()
887 {
888     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
889     unsigned int nb_sync_runs;
890     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
891     int64_t diff_between_streams[nb_rcv_sp];
892     int64_t diff;
893
894     unsigned int i;
895
896     int cnt = STREAMPROCESSORMANAGER_NB_ALIGN_TRIES;
897     int align_average_time_msec = STREAMPROCESSORMANAGER_ALIGN_AVERAGE_TIME_MSEC;
898     Util::Configuration &config = m_parent.getConfiguration();
899     config.getValueForSetting("streaming.spm.align_tries", cnt);
900     config.getValueForSetting("streaming.spm.align_average_time_msec", align_average_time_msec);
901
902     unsigned int periods_per_align_try = (align_average_time_msec * getNominalRate());
903     periods_per_align_try /= 1000;
904     periods_per_align_try /= getPeriodSize();
905     debugOutput( DEBUG_LEVEL_VERBOSE, " averaging over %u periods...\n", periods_per_align_try);
906
907     bool aligned = false;
908     while (!aligned && cnt--) {
909         nb_sync_runs = periods_per_align_try;
910         while(nb_sync_runs) {
911             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
912             if(!waitForPeriod()) {
913                 debugWarning("xrun while aligning streams...\n");
914                 return false;
915             }
916
917             // before we do anything else, transfer
918             if(!transferSilence()) {
919                 debugError("Could not transfer silence\n");
920                 return false;
921             }
922
923             // now calculate the stream offset
924             i = 0;
925             for ( i = 0; i < nb_rcv_sp; i++) {
926                 StreamProcessor *s = m_ReceiveProcessors.at(i);
927                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
928                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %"PRId64" ticks...\n",
929                     m_SyncSource, s, diff);
930                 if ( nb_sync_runs == periods_per_align_try ) {
931                     diff_between_streams[i] = diff;
932                 } else {
933                     diff_between_streams[i] += diff;
934                 }
935             }
936
937             nb_sync_runs--;
938         }
939         // calculate the average offsets
940         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
941         int diff_between_streams_frames[nb_rcv_sp];
942         aligned = true;
943
944         // first find whether the streams are aligned and what their offset is
945         for ( i = 0; i < nb_rcv_sp; i++) {
946             StreamProcessor *s = m_ReceiveProcessors.at(i);
947
948             diff_between_streams[i] /= periods_per_align_try;
949             diff_between_streams_frames[i] = (int)roundf(diff_between_streams[i] / s->getTicksPerFrame());
950             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %"PRId64" ticks, %d frames...\n",
951                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
952
953             aligned &= (diff_between_streams_frames[i] == 0);
954         }
955
956         // if required, align the streams
957         int frames_to_shift_stream[nb_rcv_sp];
958         int min_shift = 9999;
959         if (!aligned) {
960             // find the minimum value (= earliest stream)
961             for ( i = 0; i < nb_rcv_sp; i++) {
962                 if (diff_between_streams_frames[i] < min_shift) {
963                     min_shift = diff_between_streams_frames[i];
964                 }
965             }
966             debugOutput( DEBUG_LEVEL_VERBOSE, " correcting shift with %d frames\n", min_shift);
967             // ensure that the streams are shifted only in the 'positive' direction
968             // i.e. that frames are only dropped, not added since that results
969             // in multiple writers for the data ringbuffer
970             // this also results in 'minimal shift' (not that it's required since the
971             // sync SP is part of the SP set)
972             for ( i = 0; i < nb_rcv_sp; i++) {
973                 frames_to_shift_stream[i] = diff_between_streams_frames[i] - min_shift;
974                 debugOutput(DEBUG_LEVEL_VERBOSE,
975                             "  going to drop %03d frames from stream %d\n",
976                             frames_to_shift_stream[i], i);
977             }
978             // perform the actual shift
979             for ( i = 0; i < nb_rcv_sp; i++) {
980                 StreamProcessor *s = m_ReceiveProcessors.at(i);
981                 // reposition the stream
982                 if(!s->shiftStream(frames_to_shift_stream[i])) {
983                     debugError("Could not shift SP %p %d frames\n", s, frames_to_shift_stream[i]);
984                     return false;
985                 }
986             }
987         }
988
989         if (!aligned) {
990             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
991         }
992     }
993     if (cnt == 0) {
994         debugError("Align failed\n");
995         return false;
996     }
997     return true;
998 }
999
1000 bool StreamProcessorManager::start() {
1001     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
1002
1003     // start all SP's synchonized
1004     bool start_result = false;
1005     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
1006         // put all SP's into dry-running state
1007         if (!startDryRunning()) {
1008             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
1009             start_result = false;
1010             continue;
1011         }
1012
1013         start_result = syncStartAll();
1014         if(start_result) {
1015             break;
1016         } else {
1017             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
1018             if(m_shutdown_needed) {
1019                 debugOutput(DEBUG_LEVEL_VERBOSE, "Some fatal error occurred, stop trying.\n");
1020                 return false;
1021             }
1022         }
1023     }
1024     if (!start_result) {
1025         debugFatal("Could not syncStartAll...\n");
1026         return false;
1027     }
1028     debugOutput( DEBUG_LEVEL_VERBOSE, " Started...\n");
1029     return true;
1030 }
1031
1032 bool StreamProcessorManager::stop() {
1033     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
1034
1035     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
1036     // switch SP's over to the dry-running state
1037     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1038           it != m_ReceiveProcessors.end();
1039           ++it ) {
1040         if((*it)->isRunning()) {
1041             if(!(*it)->scheduleStopRunning(-1)) {
1042                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
1043                 return false;
1044             }
1045         }
1046     }
1047     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1048           it != m_TransmitProcessors.end();
1049           ++it ) {
1050         if((*it)->isRunning()) {
1051             if(!(*it)->scheduleStopRunning(-1)) {
1052                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
1053                 return false;
1054             }
1055         }
1056     }
1057     // wait for the SP's to get into the dry-running/stopped state
1058     int cnt = 8000;
1059     bool ready = false;
1060     while (!ready && cnt) {
1061         ready = true;
1062         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1063             it != m_ReceiveProcessors.end();
1064             ++it ) {
1065             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream() || (*it)->inError());
1066         }
1067         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1068             it != m_TransmitProcessors.end();
1069             ++it ) {
1070             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream() || (*it)->inError());
1071         }
1072         SleepRelativeUsec(125);
1073         cnt--;
1074     }
1075     if(cnt==0) {
1076         debugWarning(" Timeout waiting for the SP's to start dry-running\n");
1077         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1078             it != m_ReceiveProcessors.end();
1079             ++it ) {
1080             (*it)->dumpInfo();
1081         }
1082         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1083             it != m_TransmitProcessors.end();
1084             ++it ) {
1085             (*it)->dumpInfo();
1086         }
1087         return false;
1088     }
1089
1090     // switch SP's over to the stopped state
1091     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1092           it != m_ReceiveProcessors.end();
1093           ++it ) {
1094         if ((*it)->inError()) {
1095             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
1096         } else if(!(*it)->scheduleStopDryRunning(-1)) {
1097             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
1098             return false;
1099         }
1100     }
1101     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1102           it != m_TransmitProcessors.end();
1103           ++it ) {
1104         if ((*it)->inError()) {
1105             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
1106         } else if(!(*it)->scheduleStopDryRunning(-1)) {
1107             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
1108             return false;
1109         }
1110     }
1111     // wait for the SP's to get into the stopped state
1112     cnt = 8000;
1113     ready = false;
1114     while (!ready && cnt) {
1115         ready = true;
1116         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1117             it != m_ReceiveProcessors.end();
1118             ++it ) {
1119             ready &= ((*it)->isStopped() || (*it)->inError());
1120         }
1121         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1122             it != m_TransmitProcessors.end();
1123             ++it ) {
1124             ready &= ((*it)->isStopped() || (*it)->inError());
1125         }
1126         SleepRelativeUsec(125);
1127         cnt--;
1128     }
1129     if(cnt==0) {
1130         debugWarning(" Timeout waiting for the SP's to stop\n");
1131         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1132             it != m_ReceiveProcessors.end();
1133             ++it ) {
1134             (*it)->dumpInfo();
1135         }
1136         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1137             it != m_TransmitProcessors.end();
1138             ++it ) {
1139             (*it)->dumpInfo();
1140         }
1141         return false;
1142     }
1143     debugOutput( DEBUG_LEVEL_VERBOSE, " Stopped...\n");
1144     return true;
1145 }
1146
1147 /**
1148  * Called upon Xrun events. This brings all StreamProcessors back
1149  * into their starting state, and then carries on streaming. This should
1150  * have the same effect as restarting the whole thing.
1151  *
1152  * @return true if successful, false otherwise
1153  */
1154 bool StreamProcessorManager::handleXrun() {
1155
1156     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
1157
1158     dumpInfo();
1159
1160     /*
1161      * Reset means:
1162      * 1) Disabling the SP's, so that they don't process any packets
1163      *    note: the isomanager does keep on delivering/requesting them
1164      * 2) Bringing all buffers & streamprocessors into a know state
1165      *    - Clear all capture buffers
1166      *    - Put nb_periods*period_size of null frames into the playback buffers
1167      * 3) Re-enable the SP's
1168      */
1169
1170     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
1171     // start all SP's synchonized
1172     bool start_result = false;
1173     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
1174         if(m_shutdown_needed) {
1175             debugOutput(DEBUG_LEVEL_VERBOSE, "Shutdown requested...\n");
1176             return true;
1177         }
1178         // put all SP's into dry-running state
1179         if (!startDryRunning()) {
1180             debugShowBackLog();
1181             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
1182             start_result = false;
1183             continue;
1184         }
1185
1186         start_result = syncStartAll();
1187         if(start_result) {
1188             break;
1189         } else {
1190             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
1191         }
1192     }
1193     if (!start_result) {
1194         debugFatal("Could not syncStartAll...\n");
1195         return false;
1196     }
1197     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
1198
1199     return true;
1200 }
1201
1202 /**
1203  * @brief Waits until the next period of samples is ready
1204  *
1205  * This function does not return until a full period of samples is (or should be)
1206  * ready to be transferred.
1207  *
1208  * @return true if the period is ready, false if not
1209  */
1210 bool StreamProcessorManager::waitForPeriod() {
1211     if(m_SyncSource == NULL) return false;
1212     if(m_shutdown_needed) return false;
1213     bool xrun_occurred = false;
1214     bool in_error = false;
1215
1216     // grab the wait lock
1217     // this ensures that bus reset handling doesn't interfere
1218     Util::MutexLockHelper lock(*m_WaitLock);
1219     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1220                         "waiting for period (%d frames in buffer)...\n",
1221                         m_SyncSource->getBufferFill());
1222     uint64_t ticks_at_period = m_SyncSource->getTimeAtPeriod();
1223     uint64_t ticks_at_period_margin = ticks_at_period + m_sync_delay;
1224     uint64_t pred_system_time_at_xfer = m_SyncSource->getParent().get1394Service().getSystemTimeForCycleTimerTicks(ticks_at_period_margin);
1225
1226     #ifdef DEBUG
1227     int64_t now = Util::SystemTimeSource::getCurrentTime();
1228     debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "CTR  pred: %"PRId64", syncdelay: %"PRId64", diff: %"PRId64"\n", ticks_at_period, ticks_at_period_margin, ticks_at_period_margin-ticks_at_period );
1229     debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "PREWAIT  pred: %"PRId64", now: %"PRId64", wait: %"PRId64"\n", pred_system_time_at_xfer, now, pred_system_time_at_xfer-now );
1230     #endif
1231
1232     // wait until it's time to transfer
1233     Util::SystemTimeSource::SleepUsecAbsolute(pred_system_time_at_xfer);
1234
1235     #ifdef DEBUG
1236     now = Util::SystemTimeSource::getCurrentTime();
1237     debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "POSTWAIT pred: %"PRId64", now: %"PRId64", excess: %"PRId64"\n", pred_system_time_at_xfer, now, now-pred_system_time_at_xfer );
1238     #endif
1239
1240     // the period should be ready now
1241     #ifdef DEBUG
1242     int rcv_fills[10];
1243     int xmt_fills[10];
1244     int i;
1245     i=0;
1246     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1247         it != m_ReceiveProcessors.end();
1248         ++it ) {
1249         rcv_fills[i] = (*it)->getBufferFill();
1250         debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "RECV SP %p bufferfill: %05d\n", *it, rcv_fills[i]);
1251         i++;
1252     }
1253     i=0;
1254     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1255         it != m_TransmitProcessors.end();
1256         ++it ) {
1257         xmt_fills[i] = (*it)->getBufferFill();
1258         debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "XMIT SP %p bufferfill: %05d\n", *it, xmt_fills[i]);
1259         i++;
1260     }
1261     for(i=0;i<1;i++) {
1262         debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "SP %02d RECV: %05d [%05d] XMIT: %05d [%05d] DIFF: %05d\n", i,
1263                     rcv_fills[i], rcv_fills[i] - m_period,
1264                     xmt_fills[i], xmt_fills[i] - m_period,
1265                     rcv_fills[i] - xmt_fills[i]);
1266     }
1267     #endif
1268
1269     #if STREAMPROCESSORMANAGER_ALLOW_DELAYED_PERIOD_SIGNAL
1270     // HACK: we force wait until every SP is ready. this is needed
1271     // since the raw1394 interface provides no control over interrupts
1272     // resulting in very bad predictability on when the data is present.
1273     bool period_not_ready = true;
1274     while(period_not_ready) {
1275         period_not_ready = false;
1276         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1277             it != m_ReceiveProcessors.end();
1278             ++it ) {
1279             bool this_sp_period_ready = (*it)->canConsumePeriod();
1280             if (!this_sp_period_ready) {
1281                 period_not_ready = true;
1282             }
1283         }
1284         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1285             it != m_TransmitProcessors.end();
1286             ++it ) {
1287             bool this_sp_period_ready = (*it)->canProducePeriod();
1288             if (!this_sp_period_ready) {
1289                 period_not_ready = true;
1290             }
1291         }
1292
1293         if (period_not_ready) {
1294             debugOutput(DEBUG_LEVEL_VERBOSE, " wait extended since period not ready...\n");
1295             Util::SystemTimeSource::SleepUsecRelative(125); // one cycle
1296         }
1297
1298         // check for underruns/errors on the ISO side,
1299         // those should make us bail out of the wait loop
1300         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1301             it != m_ReceiveProcessors.end();
1302             ++it ) {
1303             // a xrun has occurred on the Iso side
1304             xrun_occurred |= (*it)->xrunOccurred();
1305             in_error |= (*it)->inError();
1306         }
1307         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1308             it != m_TransmitProcessors.end();
1309             ++it ) {
1310             // a xrun has occurred on the Iso side
1311             xrun_occurred |= (*it)->xrunOccurred();
1312             in_error |= (*it)->inError();
1313         }
1314         if(xrun_occurred | in_error | m_shutdown_needed) break;
1315     }
1316     #else
1317     // check for underruns/errors on the ISO side,
1318     // those should make us bail out of the wait loop
1319     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1320         it != m_ReceiveProcessors.end();
1321         ++it ) {
1322         // xrun on data buffer side
1323         if (!(*it)->canConsumePeriod()) {
1324             xrun_occurred = true;
1325         }
1326         // a xrun has occurred on the Iso side
1327         xrun_occurred |= (*it)->xrunOccurred();
1328         in_error |= (*it)->inError();
1329     }
1330     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1331         it != m_TransmitProcessors.end();
1332         ++it ) {
1333         // xrun on data buffer side
1334         if (!(*it)->canProducePeriod()) {
1335             xrun_occurred = true;
1336         }
1337         // a xrun has occurred on the Iso side
1338         xrun_occurred |= (*it)->xrunOccurred();
1339         in_error |= (*it)->inError();
1340     }
1341     #endif
1342
1343     if(xrun_occurred) {
1344         debugOutput( DEBUG_LEVEL_VERBOSE, "exit due to xrun...\n");
1345     }
1346     if(in_error) {
1347         debugOutput( DEBUG_LEVEL_VERBOSE, "exit due to error...\n");
1348         m_shutdown_needed = true;
1349     }
1350
1351     // we save the 'ideal' time of the transfer at this point,
1352     // because we can have interleaved read - process - write
1353     // cycles making that we modify a receiving stream's buffer
1354     // before we get to writing.
1355     // NOTE: before waitForPeriod() is called again, both the transmit
1356     //       and the receive processors should have done their transfer.
1357     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
1358    
1359     #ifdef DEBUG
1360     int ticks_per_period = (int)(m_SyncSource->getTicksPerFrame() * m_period);
1361    
1362     int diff = diffTicks(m_time_of_transfer, m_time_of_transfer2);
1363     // display message if the difference between two successive tick
1364     // values is more than 50 ticks. 1 sample at 48k is 512 ticks
1365     // so 50 ticks = 10%, which is a rather large jitter value.
1366     if(diff-ticks_per_period > m_max_diff_ticks || diff-ticks_per_period < -m_max_diff_ticks) {
1367         debugOutput(DEBUG_LEVEL_VERBOSE, "rather large TSP difference TS=%011"PRIu64" => TS=%011"PRIu64" (%d, nom %d)\n",
1368                                             m_time_of_transfer2, m_time_of_transfer, diff, ticks_per_period);
1369     }
1370     m_time_of_transfer2 = m_time_of_transfer;
1371     #endif
1372
1373     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1374                         "transfer period %d at %"PRIu64" ticks...\n",
1375                         m_nbperiods, m_time_of_transfer);
1376
1377     #ifdef DEBUG
1378     int rcv_bf=0, xmt_bf=0;
1379     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1380         it != m_ReceiveProcessors.end();
1381         ++it ) {
1382         rcv_bf = (*it)->getBufferFill();
1383     }
1384     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1385         it != m_TransmitProcessors.end();
1386         ++it ) {
1387         xmt_bf = (*it)->getBufferFill();
1388     }
1389     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
1390                         "XF at %011"PRIu64" ticks, RBF=%d, XBF=%d, SUM=%d...\n",
1391                         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
1392
1393     // check if xruns occurred on the Iso side.
1394     // also check if xruns will occur should we transfer() now
1395     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1396           it != m_ReceiveProcessors.end();
1397           ++it ) {
1398
1399         if ((*it)->xrunOccurred()) {
1400             debugOutput(DEBUG_LEVEL_NORMAL,
1401                         "Xrun on RECV SP %p due to ISO side xrun\n", *it);
1402             (*it)->dumpInfo();
1403         }
1404         if (!((*it)->canClientTransferFrames(m_period))) {
1405             debugOutput(DEBUG_LEVEL_NORMAL,
1406                         "Xrun on RECV SP %p due to buffer side xrun\n", *it);
1407             (*it)->dumpInfo();
1408         }
1409     }
1410     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1411           it != m_TransmitProcessors.end();
1412           ++it ) {
1413         if ((*it)->xrunOccurred()) {
1414             debugOutput(DEBUG_LEVEL_NORMAL,
1415                         "Xrun on XMIT SP %p due to ISO side xrun\n", *it);
1416         }
1417         if (!((*it)->canClientTransferFrames(m_period))) {
1418             debugOutput(DEBUG_LEVEL_NORMAL,
1419                         "Xrun on XMIT SP %p due to buffer side xrun\n", *it);
1420         }
1421     }
1422     #endif
1423     m_nbperiods++;
1424
1425     // this is to notify the client of the delay that we introduced by waiting
1426     pred_system_time_at_xfer = m_SyncSource->getParent().get1394Service().getSystemTimeForCycleTimerTicks(m_time_of_transfer);
1427
1428     m_delayed_usecs = Util::SystemTimeSource::getCurrentTime() - pred_system_time_at_xfer;
1429     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1430                         "delayed for %d usecs...\n",
1431                         m_delayed_usecs);
1432
1433     // now we can signal the client that we are (should be) ready
1434     return !xrun_occurred;
1435 }
1436
1437 /**
1438  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
1439  *
1440  * Transfers one period of frames from the client side to the Iso side and vice versa.
1441  *
1442  * @return true if successful, false otherwise (indicates xrun).
1443  */
1444 bool StreamProcessorManager::transfer() {
1445     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
1446     bool retval=true;
1447     retval &= transfer(StreamProcessor::ePT_Receive);
1448     retval &= transfer(StreamProcessor::ePT_Transmit);
1449     return retval;
1450 }
1451
1452 /**
1453  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
1454  *
1455  * Transfers one period of frames from the client side to the Iso side or vice versa.
1456  *
1457  * @param t The processor type to tranfer for (receive or transmit)
1458  * @return true if successful, false otherwise (indicates xrun).
1459  */
1460 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
1461     if(m_SyncSource == NULL) return false;
1462     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
1463         "transfer(%d) at TS=%011"PRIu64" (%03us %04uc %04ut)...\n",
1464         t, m_time_of_transfer,
1465         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
1466         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
1467         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
1468
1469     bool retval = true;
1470     // a static cast could make sure that there is no performance
1471     // penalty for the virtual functions (to be checked)
1472     if (t==StreamProcessor::ePT_Receive) {
1473         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1474                 it != m_ReceiveProcessors.end();
1475                 ++it ) {
1476             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
1477                     debugWarning("could not getFrames(%u, %11"PRIu64") from stream processor (%p)\n",
1478                             m_period, m_time_of_transfer,*it);
1479                 retval &= false; // buffer underrun
1480             }
1481         }
1482     } else {
1483         // FIXME: in the SPM it would be nice to have system time instead of
1484         //        1394 time
1485         float rate = m_SyncSource->getTicksPerFrame();
1486
1487         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1488                 it != m_TransmitProcessors.end();
1489                 ++it ) {
1490             // this is the delay in frames between the point where a frame is received and
1491             // when it is transmitted again
1492             unsigned int one_ringbuffer_in_frames = m_nb_buffers * m_period + (*it)->getExtraBufferFrames();
1493             int64_t one_ringbuffer_in_ticks = (int64_t)(((float)one_ringbuffer_in_frames) * rate);
1494    
1495             // the data we are putting into the buffer is intended to be transmitted
1496             // one ringbuffer size after it has been received
1497             int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1498
1499             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
1500                 debugWarning("could not putFrames(%u,%"PRIu64") to stream processor (%p)\n",
1501                         m_period, transmit_timestamp, *it);
1502                 retval &= false; // buffer underrun
1503             }
1504         }
1505     }
1506     return retval;
1507 }
1508
1509 /**
1510  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
1511  *
1512  * Transfers one period of silence to the Iso side for transmit SP's
1513  * or dump one period of frames for receive SP's
1514  *
1515  * @return true if successful, false otherwise (indicates xrun).
1516  */
1517 bool StreamProcessorManager::transferSilence() {
1518     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
1519     bool retval=true;
1520     // NOTE: the order here is opposite from the order in
1521     // normal operation (transmit is before receive), because
1522     // we can do that here (data=silence=available) and
1523     // it increases reliability (esp. on startup)
1524     retval &= transferSilence(StreamProcessor::ePT_Transmit);
1525     retval &= transferSilence(StreamProcessor::ePT_Receive);
1526     return retval;
1527 }
1528
1529 /**
1530  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
1531  *
1532  * Transfers one period of silence to the Iso side for transmit SP's
1533  * or dump one period of frames for receive SP's
1534  *
1535  * @param t The processor type to tranfer for (receive or transmit)
1536  * @return true if successful, false otherwise (indicates xrun).
1537  */
1538 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
1539     if(m_SyncSource == NULL) return false;
1540     debugOutput( DEBUG_LEVEL_VERY_VERBOSE,
1541         "transferSilence(%d) at TS=%011"PRIu64" (%03us %04uc %04ut)...\n",
1542         t, m_time_of_transfer,
1543         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
1544         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
1545         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
1546
1547     bool retval = true;
1548     // a static cast could make sure that there is no performance
1549     // penalty for the virtual functions (to be checked)
1550     if (t==StreamProcessor::ePT_Receive) {
1551         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1552                 it != m_ReceiveProcessors.end();
1553                 ++it ) {
1554             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
1555                     debugWarning("could not dropFrames(%u, %11"PRIu64") from stream processor (%p)\n",
1556                             m_period, m_time_of_transfer,*it);
1557                 retval &= false; // buffer underrun
1558             }
1559         }
1560     } else {
1561         // FIXME: in the SPM it would be nice to have system time instead of
1562         //        1394 time
1563         float rate = m_SyncSource->getTicksPerFrame();
1564
1565         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1566                 it != m_TransmitProcessors.end();
1567                 ++it ) {
1568             // this is the delay in frames between the point where a frame is received and
1569             // when it is transmitted again
1570             unsigned int one_ringbuffer_in_frames = m_nb_buffers * m_period + (*it)->getExtraBufferFrames();
1571             int64_t one_ringbuffer_in_ticks = (int64_t)(((float)one_ringbuffer_in_frames) * rate);
1572    
1573             // the data we are putting into the buffer is intended to be transmitted
1574             // one ringbuffer size after it has been received
1575             int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1576
1577             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
1578                 debugWarning("could not putSilenceFrames(%u,%"PRIu64") to stream processor (%p)\n",
1579                         m_period, transmit_timestamp, *it);
1580                 retval &= false; // buffer underrun
1581             }
1582         }
1583     }
1584     return retval;
1585 }
1586
1587 void StreamProcessorManager::dumpInfo() {
1588     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1589     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
1590     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
1591     debugOutputShort( DEBUG_LEVEL_NORMAL, "Data type: %s\n", (m_audio_datatype==eADT_Float?"float":"int24"));
1592
1593     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
1594     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1595         it != m_ReceiveProcessors.end();
1596         ++it ) {
1597         (*it)->dumpInfo();
1598     }
1599
1600     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
1601     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1602         it != m_TransmitProcessors.end();
1603         ++it ) {
1604         (*it)->dumpInfo();
1605     }
1606
1607     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1608
1609     // list port info in verbose mode
1610     debugOutputShort( DEBUG_LEVEL_VERBOSE, "Port Information\n");
1611     int nb_ports;
1612    
1613     debugOutputShort( DEBUG_LEVEL_VERBOSE, " Playback\n");
1614     nb_ports = getPortCount(Port::E_Playback);
1615     for(int i=0; i < nb_ports; i++) {
1616         Port *p = getPortByIndex(i, Port::E_Playback);
1617         debugOutputShort( DEBUG_LEVEL_VERBOSE, "  %3d (%p): ", i, p);
1618         if (p) {
1619             bool disabled = p->isDisabled();
1620             debugOutputShort( DEBUG_LEVEL_VERBOSE, "[%p] [%3s] ", &p->getManager(), (disabled?"off":"on"));
1621             debugOutputShort( DEBUG_LEVEL_VERBOSE, "[%7s] ", p->getPortTypeName().c_str());
1622             debugOutputShort( DEBUG_LEVEL_VERBOSE, "%3s ", p->getName().c_str());
1623         } else {
1624             debugOutputShort( DEBUG_LEVEL_VERBOSE, "invalid ");
1625         }
1626         debugOutputShort( DEBUG_LEVEL_VERBOSE, "\n");
1627     }
1628     debugOutputShort( DEBUG_LEVEL_VERBOSE, " Capture\n");
1629     nb_ports = getPortCount(Port::E_Capture);
1630     for(int i=0; i < nb_ports; i++) {
1631         Port *p = getPortByIndex(i, Port::E_Capture);
1632         debugOutputShort( DEBUG_LEVEL_VERBOSE, "  %3d (%p): ", i, p);
1633         if (p) {
1634             bool disabled = p->isDisabled();
1635             debugOutputShort( DEBUG_LEVEL_VERBOSE, "[%p] [%3s] ", &p->getManager(), (disabled?"off":"on"));
1636             debugOutputShort( DEBUG_LEVEL_VERBOSE, "[%7s] ", p->getPortTypeName().c_str());
1637             debugOutputShort( DEBUG_LEVEL_VERBOSE, " %3s ", p->getName().c_str());
1638         } else {
1639             debugOutputShort( DEBUG_LEVEL_VERBOSE, " invalid ");
1640         }
1641         debugOutputShort( DEBUG_LEVEL_VERBOSE, "\n");
1642     }
1643
1644     debugOutputShort( DEBUG_LEVEL_VERBOSE, "----------------------------------------------------\n");
1645
1646 }
1647
1648 void StreamProcessorManager::setVerboseLevel(int l) {
1649     if(m_WaitLock) m_WaitLock->setVerboseLevel(l);
1650
1651     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1652         it != m_ReceiveProcessors.end();
1653         ++it ) {
1654         (*it)->setVerboseLevel(l);
1655     }
1656     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1657         it != m_TransmitProcessors.end();
1658         ++it ) {
1659         (*it)->setVerboseLevel(l);
1660     }
1661     setDebugLevel(l);
1662     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting verbose level to %d...\n", l );
1663 }
1664
1665 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1666     int count=0;
1667
1668     if (direction == Port::E_Capture) {
1669         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1670             it != m_ReceiveProcessors.end();
1671             ++it ) {
1672             count += (*it)->getPortCount(type);
1673         }
1674     } else {
1675         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1676             it != m_TransmitProcessors.end();
1677             ++it ) {
1678             count += (*it)->getPortCount(type);
1679         }
1680     }
1681     return count;
1682 }
1683
1684 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1685     int count=0;
1686
1687     if (direction == Port::E_Capture) {
1688         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1689             it != m_ReceiveProcessors.end();
1690             ++it ) {
1691             count += (*it)->getPortCount();
1692         }
1693     } else {
1694         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1695             it != m_TransmitProcessors.end();
1696             ++it ) {
1697             count += (*it)->getPortCount();
1698         }
1699     }
1700     return count;
1701 }
1702
1703 void
1704 StreamProcessorManager::updateShadowLists()
1705 {
1706     debugOutput( DEBUG_LEVEL_VERBOSE, "Updating port shadow lists...\n");
1707     m_CapturePorts_shadow.clear();
1708     m_PlaybackPorts_shadow.clear();
1709
1710     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1711         it != m_ReceiveProcessors.end();
1712         ++it ) {
1713         PortManager *pm = *it;
1714         for (int i=0; i < pm->getPortCount(); i++) {
1715             Port *p = pm->getPortAtIdx(i);
1716             if (!p) {
1717                 debugError("getPortAtIdx(%d) returned NULL\n", i);
1718                 continue;
1719             }
1720             if(p->getDirection() != Port::E_Capture) {
1721                 debugError("port at idx %d for receive SP is not a capture port!\n", i);
1722                 continue;
1723             }
1724             m_CapturePorts_shadow.push_back(p);
1725         }
1726     }
1727     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1728         it != m_TransmitProcessors.end();
1729         ++it ) {
1730         PortManager *pm = *it;
1731         for (int i=0; i < pm->getPortCount(); i++) {
1732             Port *p = pm->getPortAtIdx(i);
1733             if (!p) {
1734                 debugError("getPortAtIdx(%d) returned NULL\n", i);
1735                 continue;
1736             }
1737             if(p->getDirection() != Port::E_Playback) {
1738                 debugError("port at idx %d for transmit SP is not a playback port!\n", i);
1739                 continue;
1740             }
1741             m_PlaybackPorts_shadow.push_back(p);
1742         }
1743     }
1744 }
1745
1746 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1747     debugOutputExtreme( DEBUG_LEVEL_ULTRA_VERBOSE, "getPortByIndex(%d, %d)...\n", idx, direction);
1748     if (direction == Port::E_Capture) {
1749         #ifdef DEBUG
1750         if(idx >= (int)m_CapturePorts_shadow.size()) {
1751             debugError("Capture port %d out of range (%zd)\n", idx, m_CapturePorts_shadow.size());
1752             return NULL;
1753         }
1754         #endif
1755         return m_CapturePorts_shadow.at(idx);
1756     } else {
1757         #ifdef DEBUG
1758         if(idx >= (int)m_PlaybackPorts_shadow.size()) {
1759             debugError("Playback port %d out of range (%zd)\n", idx, m_PlaybackPorts_shadow.size());
1760             return NULL;
1761         }
1762         #endif
1763         return m_PlaybackPorts_shadow.at(idx);
1764     }
1765     return NULL;
1766 }
1767
1768 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1769     m_thread_realtime=rt;
1770     m_thread_priority=priority;
1771     return true;
1772 }
1773
1774
1775 } // end of namespace
Note: See TracBrowser for help on using the browser.