root/trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

Revision 2171, 70.0 kB (checked in by jwoithe, 8 years ago)

A second pass at addressing ticket #242. Define a global clock source within the SystemTimeSource? object and use this whenever clock_gettime() is called. On systems which support the new raw1394_read_cycle_timer_and_clock() libraw1394 call and CLOCK_MONOTONIC_RAW, these changes should ensure that all timing-sensitive parts of FFADO are using the same clock source. System tests under tests/systemtests/ have not been converted to use this new framework because they exist for different purposes and are not using the FFADO streaming infrastructure.

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "config.h"
25
26 #include "StreamProcessorManager.h"
27 #include "generic/StreamProcessor.h"
28 #include "generic/Port.h"
29 #include "libieee1394/cycletimer.h"
30
31 #include "devicemanager.h"
32
33 #include "libutil/Time.h"
34
35 #include <errno.h>
36 #include <assert.h>
37 #include <math.h>
38
39 namespace Streaming {
40
41 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
42
43 StreamProcessorManager::StreamProcessorManager(DeviceManager &p)
44     : m_time_of_transfer ( 0 )
45     #ifdef DEBUG
46     , m_time_of_transfer2 ( 0 )
47     #endif
48     , m_is_slave( false )
49     , m_SyncSource(NULL)
50     , m_parent( p )
51     , m_xrun_happened( false )
52     , m_activity_wait_timeout_nsec( 0 ) // dynamically set
53     , m_nb_buffers( 0 )
54     , m_period( 0 )
55     , m_sync_delay( 0 )
56     , m_audio_datatype( eADT_Float )
57     , m_nominal_framerate ( 0 )
58     , m_xruns(0)
59     , m_shutdown_needed(false)
60     , m_nbperiods(0)
61     , m_WaitLock( new Util::PosixMutex("SPMWAIT") )
62     , m_max_diff_ticks( 50 )
63 {
64     addOption(Util::OptionContainer::Option("slaveMode",false));
65     sem_init(&m_activity_semaphore, 0, 0);
66 }
67
68 StreamProcessorManager::StreamProcessorManager(DeviceManager &p, unsigned int period,
69                                                unsigned int framerate, unsigned int nb_buffers)
70     : m_time_of_transfer ( 0 )
71     #ifdef DEBUG
72     , m_time_of_transfer2 ( 0 )
73     #endif
74     , m_is_slave( false )
75     , m_SyncSource(NULL)
76     , m_parent( p )
77     , m_xrun_happened( false )
78     , m_activity_wait_timeout_nsec( 0 ) // dynamically set
79     , m_nb_buffers(nb_buffers)
80     , m_period(period)
81     , m_sync_delay( 0 )
82     , m_audio_datatype( eADT_Float )
83     , m_nominal_framerate ( framerate )
84     , m_xruns(0)
85     , m_shutdown_needed(false)
86     , m_nbperiods(0)
87     , m_WaitLock( new Util::PosixMutex("SPMWAIT") )
88     , m_max_diff_ticks( 50 )
89 {
90     addOption(Util::OptionContainer::Option("slaveMode",false));
91     sem_init(&m_activity_semaphore, 0, 0);
92 }
93
94 StreamProcessorManager::~StreamProcessorManager() {
95     sem_post(&m_activity_semaphore);
96     sem_destroy(&m_activity_semaphore);
97     delete m_WaitLock;
98 }
99
100 // void
101 // StreamProcessorManager::handleBusReset(Ieee1394Service &s)
102 // {
103 // //     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) Handle bus reset on service %p...\n", this, &s);
104 // //
105 // //     bool handled_at_least_one = false;
106 // //     // note that all receive streams are gone once a device is unplugged
107 // //
108 // //     // synchronize with the wait lock
109 // //     Util::MutexLockHelper lock(*m_WaitLock);
110 // //
111 // //     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) got wait lock...\n", this);
112 // //     // cause all SP's to bail out
113 // //     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
114 // //           it != m_ReceiveProcessors.end();
115 // //           ++it )
116 // //     {
117 // //         if(&s == &((*it)->getParent().get1394Service())) {
118 // //             debugOutput(DEBUG_LEVEL_NORMAL,
119 // //                         "issue busreset on receive SPM on channel %d\n",
120 // //                         (*it)->getChannel());
121 // //             (*it)->handleBusReset();
122 // //             handled_at_least_one = true;
123 // //         } else {
124 // //             debugOutput(DEBUG_LEVEL_NORMAL,
125 // //                         "skipping receive SPM on channel %d since not on service %p\n",
126 // //                         (*it)->getChannel(), &s);
127 // //         }
128 // //     }
129 // //     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
130 // //           it != m_TransmitProcessors.end();
131 // //           ++it )
132 // //     {
133 // //         if(&s == &((*it)->getParent().get1394Service())) {
134 // //             debugOutput(DEBUG_LEVEL_NORMAL,
135 // //                         "issue busreset on transmit SPM on channel %d\n",
136 // //                         (*it)->getChannel());
137 // //             (*it)->handleBusReset();
138 // //             handled_at_least_one = true;
139 // //         } else {
140 // //             debugOutput(DEBUG_LEVEL_NORMAL,
141 // //                         "skipping transmit SPM on channel %d since not on service %p\n",
142 // //                         (*it)->getChannel(), &s);
143 // //         }
144 // //     }
145 // //
146 // //     // FIXME: we request shutdown for now.
147 // //     m_shutdown_needed = handled_at_least_one;
148 // }
149
150 void
151 StreamProcessorManager::signalActivity()
152 {
153     sem_post(&m_activity_semaphore);
154     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p activity\n", this);
155 }
156
157 enum StreamProcessorManager::eActivityResult
158 StreamProcessorManager::waitForActivity()
159 {
160     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p waiting for activity\n", this);
161     struct timespec ts;
162     int result;
163
164     if (m_activity_wait_timeout_nsec >= 0) {
165
166         if (Util::SystemTimeSource::clockGettime(&ts) == -1) {
167             debugError("clock_gettime failed\n");
168             return eAR_Error;
169         }
170         ts.tv_nsec += m_activity_wait_timeout_nsec;
171         while(ts.tv_nsec >= 1000000000LL) {
172             ts.tv_sec += 1;
173             ts.tv_nsec -= 1000000000LL;
174         }
175     }
176
177     if (m_activity_wait_timeout_nsec >= 0) {
178         result = sem_timedwait(&m_activity_semaphore, &ts);
179     } else {
180         result = sem_wait(&m_activity_semaphore);
181     }
182
183     if(result != 0) {
184         if (errno == ETIMEDOUT) {
185             debugOutput(DEBUG_LEVEL_VERBOSE,
186                         "(%p) sem_timedwait() timed out (result=%d)\n",
187                         this, result);
188             return eAR_Timeout;
189         } else if (errno == EINTR) {
190             debugOutput(DEBUG_LEVEL_VERBOSE,
191                         "(%p) sem_[timed]wait() interrupted by signal (result=%d)\n",
192                         this, result);
193             return eAR_Interrupted;
194         } else if (errno == EINVAL) {
195             debugError("(%p) sem_[timed]wait error (result=%d errno=EINVAL)\n",
196                         this, result);
197             debugError("(%p) timeout_nsec=%"PRId64" ts.sec=%"PRId64" ts.nsec=%"PRId64"\n",
198                        this, m_activity_wait_timeout_nsec,
199                        (int64_t)ts.tv_sec, (int64_t)ts.tv_nsec);
200             return eAR_Error;
201         } else {
202             debugError("(%p) sem_[timed]wait error (result=%d errno=%d)\n",
203                         this, result, errno);
204             debugError("(%p) timeout_nsec=%"PRId64" ts.sec=%"PRId64" ts.nsec=%"PRId64"\n",
205                        this, m_activity_wait_timeout_nsec,
206                        (int64_t)ts.tv_sec, (int64_t)ts.tv_nsec);
207             return eAR_Error;
208         }
209     }
210
211     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p got activity\n", this);
212     return eAR_Activity;
213 }
214
215 /**
216  * Registers \ref processor with this manager.
217  *
218  * also registers it with the isohandlermanager
219  *
220  * be sure to call isohandlermanager->init() first!
221  * and be sure that the processors are also ->init()'ed
222  *
223  * @param processor
224  * @return true if successfull
225  */
226 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
227 {
228     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
229     assert(processor);
230     if (processor->getType() == StreamProcessor::ePT_Receive) {
231         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
232         m_ReceiveProcessors.push_back(processor);
233         Util::Functor* f = new Util::MemberFunctor0< StreamProcessorManager*, void (StreamProcessorManager::*)() >
234                     ( this, &StreamProcessorManager::updateShadowLists, false );
235         processor->addPortManagerUpdateHandler(f);
236         updateShadowLists();
237         return true;
238     }
239     if (processor->getType() == StreamProcessor::ePT_Transmit) {
240         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
241         m_TransmitProcessors.push_back(processor);
242         Util::Functor* f = new Util::MemberFunctor0< StreamProcessorManager*, void (StreamProcessorManager::*)() >
243                     ( this, &StreamProcessorManager::updateShadowLists, false );
244         processor->addPortManagerUpdateHandler(f);
245         updateShadowLists();
246         return true;
247     }
248
249     debugFatal("Unsupported processor type!\n");
250     return false;
251 }
252
253 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
254 {
255     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
256     assert(processor);
257
258     if (processor->getType()==StreamProcessor::ePT_Receive) {
259
260         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
261               it != m_ReceiveProcessors.end();
262               ++it )
263         {
264             if ( *it == processor ) {
265                 if (*it == m_SyncSource) {
266                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
267                     m_SyncSource = NULL;
268                 }
269                 m_ReceiveProcessors.erase(it);
270                 // remove the functor
271                 Util::Functor * f = processor->getUpdateHandlerForPtr(this);
272                 if(f) {
273                     processor->remPortManagerUpdateHandler(f);
274                     delete f;
275                 }
276                 updateShadowLists();
277                 return true;
278             }
279         }
280     }
281
282     if (processor->getType()==StreamProcessor::ePT_Transmit) {
283         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
284               it != m_TransmitProcessors.end();
285               ++it )
286         {
287             if ( *it == processor ) {
288                 if (*it == m_SyncSource) {
289                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
290                     m_SyncSource = NULL;
291                 }
292                 m_TransmitProcessors.erase(it);
293                 // remove the functor
294                 Util::Functor * f = processor->getUpdateHandlerForPtr(this);
295                 if(f) {
296                     processor->remPortManagerUpdateHandler(f);
297                     delete f;
298                 }
299                 updateShadowLists();
300                 return true;
301             }
302         }
303     }
304
305     debugFatal("Processor (%p) not found!\n",processor);
306     return false; //not found
307 }
308
309 bool StreamProcessorManager::streamingParamsOk(signed int period, signed int rate, signed int n_buffers)
310 {
311     // Return true if the given parameter combination is valid.  If any
312     // parameter is set to -1 the currently set value is used in the test.
313     signed int min_period;
314
315     if (period < 0)
316         period = m_period;
317     if (rate < 0)
318         rate = m_nominal_framerate;
319     if (n_buffers < 0)
320         n_buffers = m_nb_buffers;
321
322     // For most interfaces data is transmitted with 8/16/32 samples per
323     // packet (at 1x, 2x and 4x rates respectively).  This more or less
324     // places a lower limit on the size of the period.  Furthermore, the
325     // current FFADO architecture dictates that m_nb_buffers can be no lower
326     // than 2.
327
328     if (n_buffers < 2) {
329         printMessage("FFADO requires at least 2 buffers\n");
330         return false;
331     }
332
333     // The boundary between 1x, 2x and 4x speed is taken from the RME driver
334     // since this seems to be the device with the widest available sampling
335     // rate range.
336     if (rate < 56000) {
337         // 1x speed
338         min_period = 8;
339     } else
340     if (rate < 112000) {
341         // 2x speed
342         min_period = 16;
343     } else {
344         // 4x speed
345         min_period = 32;
346     }
347
348     if (period < min_period) {
349         printMessage("At a rate of %d Hz, FFADO requires a buffer size of at least %d samples\n",
350             rate, min_period);
351         return false;
352     }
353     return true;
354 }
355
356 void StreamProcessorManager::setPeriodSize(unsigned int period) {
357     // This method is called early in the initialisation sequence to set the
358     // initial period size.  However, at that point in time the stream
359     // processors haven't been registered so they won't have their buffers
360     // configured from here.  The initial allocation of the stream processor
361     // (SP) buffers happens from within the SP prepare() method.
362     //
363     // SP period size changes will normally only be acted on from here
364     // if the change comes about due to a runtime change in the buffer size,
365     // as happens via jack's setbufsize facility for example.
366
367     if (period == m_period)
368         return;
369
370     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting period size to %d (was %d)\n", period, m_period);
371     m_period = period;
372
373     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
374           it != m_ReceiveProcessors.end();
375           ++it )
376     {
377         if ((*it)->periodSizeChanged(period) == false)
378             debugWarning("receive stream processor %p couldn't set period size\n", *it);
379     }
380     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
381           it != m_TransmitProcessors.end();
382           ++it )
383     {
384         if ((*it)->periodSizeChanged(period) == false)
385             debugWarning("transmit stream processor %p couldn't set period size\n", *it);
386     }
387
388     // Keep the activity timeout in sync with the new period size.  See
389     // also comments about this in prepare().
390     if (m_nominal_framerate > 0) {
391         int timeout_usec = 2*1000LL * 1000LL * m_period / m_nominal_framerate;
392         debugOutput(DEBUG_LEVEL_VERBOSE, "setting activity timeout to %d\n", timeout_usec);
393         setActivityWaitTimeoutUsec(timeout_usec);
394     }
395 }
396
397 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
398     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
399     m_SyncSource = s;
400     return true;
401 }
402
403 bool StreamProcessorManager::prepare() {
404
405     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
406     m_is_slave=false;
407     if(!getOption("slaveMode", m_is_slave)) {
408         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
409     }
410
411     m_shutdown_needed=false;
412
413     // if no sync source is set, select one here
414     if(m_SyncSource == NULL) {
415        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
416     }
417
418     // FIXME: put into separate method
419     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
420           it != m_ReceiveProcessors.end();
421           ++it )
422     {
423         if(m_SyncSource == NULL) {
424             debugWarning(" => Sync Source is %p.\n", *it);
425             m_SyncSource = *it;
426         }
427     }
428     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
429           it != m_TransmitProcessors.end();
430           ++it )
431     {
432         if(m_SyncSource == NULL) {
433             debugWarning(" => Sync Source is %p.\n", *it);
434             m_SyncSource = *it;
435         }
436     }
437
438     // now do the actual preparation of the SP's
439     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
440     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
441         it != m_ReceiveProcessors.end();
442         ++it ) {
443
444         if(!(*it)->setOption("slaveMode", m_is_slave)) {
445             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
446         }
447
448         if(!(*it)->prepare()) {
449             debugFatal(  " could not prepare (%p)...\n",(*it));
450             return false;
451         }
452     }
453     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
454     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
455         it != m_TransmitProcessors.end();
456         ++it ) {
457         if(!(*it)->setOption("slaveMode", m_is_slave)) {
458             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
459         }
460         if(!(*it)->prepare()) {
461             debugFatal( " could not prepare (%p)...\n",(*it));
462             return false;
463         }
464     }
465
466     // if there are no stream processors registered,
467     // fail
468     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
469         debugFatal("No stream processors registered, can't do anything useful\n");
470         return false;
471     }
472
473     // set the activity timeout value to two periods worth of usecs.
474     // since we can expect activity once every period, but we might have some
475     // offset, the safe value is two periods.
476     int timeout_usec = 2*1000LL * 1000LL * m_period / m_nominal_framerate;
477     debugOutput(DEBUG_LEVEL_VERBOSE, "setting activity timeout to %d\n", timeout_usec);
478     setActivityWaitTimeoutUsec(timeout_usec);
479
480     updateShadowLists();
481
482     return true;
483 }
484
485 bool
486 StreamProcessorManager::startDryRunning()
487 {
488     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
489     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
490             it != m_TransmitProcessors.end();
491             ++it ) {
492         if ((*it)->inError()) {
493             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
494             return false;
495         }
496         if (!(*it)->isDryRunning()) {
497             if(!(*it)->scheduleStartDryRunning(-1)) {
498                 debugError("Could not put '%s' SP %p into the dry-running state\n", (*it)->getTypeString(), *it);
499                 return false;
500             }
501         } else {
502             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
503         }
504     }
505     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
506             it != m_ReceiveProcessors.end();
507             ++it ) {
508         if ((*it)->inError()) {
509             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
510             return false;
511         }
512         if (!(*it)->isDryRunning()) {
513             if(!(*it)->scheduleStartDryRunning(-1)) {
514                 debugError("Could not put '%s' SP %p into the dry-running state\n", (*it)->getTypeString(), *it);
515                 return false;
516             }
517         } else {
518             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
519         }
520     }
521     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
522     // wait for the syncsource to start running.
523     // that will block the waitForPeriod call until everyone has started (theoretically)
524     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_DRYRUN; // by then it should have started
525     bool all_dry_running = false;
526     while (!all_dry_running && cnt) {
527         all_dry_running = true;
528         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
529                 it != m_ReceiveProcessors.end();
530                 ++it ) {
531             all_dry_running &= (*it)->isDryRunning();
532         }
533         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
534                 it != m_TransmitProcessors.end();
535                 ++it ) {
536             all_dry_running &= (*it)->isDryRunning();
537         }
538
539         SleepRelativeUsec(125);
540         cnt--;
541     }
542     if(cnt==0) {
543         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
544         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
545                 it != m_ReceiveProcessors.end();
546                 ++it ) {
547             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
548                 (*it)->getTypeString(), *it, (*it)->getStateString());
549         }
550         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
551                 it != m_TransmitProcessors.end();
552                 ++it ) {
553             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
554                 (*it)->getTypeString(), *it, (*it)->getStateString());
555         }
556         return false;
557     }
558     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
559     return true;
560 }
561
562 bool StreamProcessorManager::syncStartAll() {
563     if(m_SyncSource == NULL) return false;
564
565     // get the options
566     int signal_delay_ticks = STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS;
567     int xmit_prebuffer_frames = STREAMPROCESSORMANAGER_XMIT_PREBUFFER_FRAMES;
568     int sync_wait_time_msec = STREAMPROCESSORMANAGER_SYNC_WAIT_TIME_MSEC;
569     int cycles_for_startup = STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP;
570     int prestart_cycles_for_xmit = STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_XMIT;
571     int prestart_cycles_for_recv = STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_RECV;
572     Util::Configuration &config = m_parent.getConfiguration();
573     config.getValueForSetting("streaming.spm.signal_delay_ticks", signal_delay_ticks);
574     config.getValueForSetting("streaming.spm.xmit_prebuffer_frames", xmit_prebuffer_frames);
575     config.getValueForSetting("streaming.spm.sync_wait_time_msec", sync_wait_time_msec);
576     config.getValueForSetting("streaming.spm.cycles_for_startup", cycles_for_startup);
577     config.getValueForSetting("streaming.spm.prestart_cycles_for_xmit", prestart_cycles_for_xmit);
578     config.getValueForSetting("streaming.spm.prestart_cycles_for_recv", prestart_cycles_for_recv);
579
580     // figure out when to get the SP's running.
581     // the xmit SP's should also know the base timestamp
582     // streams should be aligned here
583
584     // now find out how long we have to delay the wait operation such that
585     // the received frames will all be presented to the SP
586     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
587     int max_of_min_delay = 0;
588     int min_delay = 0;
589     int packet_size_frames = 0;
590     int max_packet_size_frames = 0;
591
592     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
593             it != m_ReceiveProcessors.end();
594             ++it ) {
595         min_delay = (*it)->getMaxFrameLatency();
596         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
597         packet_size_frames = (*it)->getNominalFramesPerPacket();
598         if(packet_size_frames > max_packet_size_frames) max_packet_size_frames = packet_size_frames;
599     }
600     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
601             it != m_TransmitProcessors.end();
602             ++it ) {
603         packet_size_frames = (*it)->getNominalFramesPerPacket();
604         if(packet_size_frames > max_packet_size_frames) max_packet_size_frames = packet_size_frames;
605     }
606     debugOutput( DEBUG_LEVEL_VERBOSE, " max_of_min_delay = %d, max_packet_size_frames = %d...\n", max_of_min_delay, max_packet_size_frames);
607
608     // add some processing margin. This only shifts the time
609     // at which the buffer is transfer()'ed. This makes things somewhat
610     // more robust.
611     m_sync_delay = max_of_min_delay + signal_delay_ticks;
612
613     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
614     //        have aquired lock
615     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
616     //sleep(2); // FIXME: be smarter here
617
618     // make sure that we are dry-running long enough for the
619     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
620     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
621
622     unsigned int nb_sync_runs = (sync_wait_time_msec * getNominalRate());
623     nb_sync_runs /= 1000;
624     nb_sync_runs /= getPeriodSize();
625
626     while(nb_sync_runs--) { // or while not sync-ed?
627         // check if we were woken up too soon
628         uint64_t ticks_at_period = m_SyncSource->getTimeAtPeriod();
629         uint64_t ticks_at_period_margin = ticks_at_period + m_sync_delay;
630         uint64_t pred_system_time_at_xfer = m_SyncSource->getParent().get1394Service().getSystemTimeForCycleTimerTicks(ticks_at_period_margin);
631    
632         #ifdef DEBUG
633         int64_t now = Util::SystemTimeSource::getCurrentTime();
634         debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "CTR  pred: %"PRId64", syncdelay: %"PRId64", diff: %"PRId64"\n", ticks_at_period, ticks_at_period_margin, ticks_at_period_margin-ticks_at_period );
635         debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "PREWAIT  pred: %"PRId64", now: %"PRId64", wait: %"PRId64"\n", pred_system_time_at_xfer, now, pred_system_time_at_xfer-now );
636         #endif
637    
638         // wait until it's time to transfer
639         Util::SystemTimeSource::SleepUsecAbsolute(pred_system_time_at_xfer);
640    
641         #ifdef DEBUG
642         now = Util::SystemTimeSource::getCurrentTime();
643         debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "POSTWAIT pred: %"PRId64", now: %"PRId64", excess: %"PRId64"\n", pred_system_time_at_xfer, now, now-pred_system_time_at_xfer );
644         #endif
645     }
646
647     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
648     // FIXME: in the SPM it would be nice to have system time instead of
649     //        1394 time
650
651     float syncrate = 0.0;
652     float tpf = m_SyncSource->getTicksPerFrame();
653     if (tpf > 0.0) {
654         syncrate = 24576000.0/tpf;
655     } else {
656         debugWarning("tpf <= 0? %f\n", tpf);
657     }
658     debugOutput( DEBUG_LEVEL_VERBOSE, " sync source frame rate: %f fps (%f tpf)\n", syncrate, tpf);
659
660     // we now should have decent sync info on the sync source
661     // determine a point in time where the system should start
662     // figure out where we are now
663     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
664     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011"PRIu64" (%03us %04uc %04ut)...\n",
665         time_of_first_sample,
666         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
667         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
668         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
669
670     // start wet-running in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles
671     // this is the time window we have to setup all SP's such that they
672     // can start wet-running correctly.
673     // we have to round this time to an integer number of audio packets
674     double time_for_startup_abs = (double)(cycles_for_startup * TICKS_PER_CYCLE);
675     int time_for_startup_frames = (int)(time_for_startup_abs / tpf);
676     time_for_startup_frames = ((time_for_startup_frames / max_packet_size_frames) + 1) * max_packet_size_frames;
677     uint64_t time_for_startup_ticks = (uint64_t)((float)time_for_startup_frames * tpf);
678
679     time_of_first_sample = addTicks(time_of_first_sample,
680                                     time_for_startup_ticks);
681     debugOutput( DEBUG_LEVEL_VERBOSE, "  add %d frames (%011"PRIu64" ticks)...\n",
682         time_for_startup_frames, time_for_startup_ticks);
683
684     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011"PRIu64" (%03us %04uc %04ut)...\n",
685         time_of_first_sample,
686         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
687         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
688         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
689
690     // we should start wet-running the transmit SP's some cycles in advance
691     // such that we know it is wet-running when it should output its first sample
692     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
693                                                  prestart_cycles_for_xmit * TICKS_PER_CYCLE);
694
695     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
696                                                  prestart_cycles_for_recv * TICKS_PER_CYCLE);
697     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011"PRIu64" (%03us %04uc %04ut)...\n",
698         time_to_start_xmit,
699         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
700         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
701         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
702     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011"PRIu64" (%03us %04uc %04ut)...\n",
703         time_to_start_recv,
704         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
705         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
706         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
707
708     // print the sync delay
709     int sync_delay_frames = (int)((float)m_sync_delay / m_SyncSource->getTicksPerFrame());
710     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay: %d = %d + %d ticks (%03us %04uc %04ut) [%d frames]...\n",
711         m_sync_delay, max_of_min_delay, signal_delay_ticks,
712         (unsigned int)TICKS_TO_SECS(m_sync_delay),
713         (unsigned int)TICKS_TO_CYCLES(m_sync_delay),
714         (unsigned int)TICKS_TO_OFFSET(m_sync_delay),
715         sync_delay_frames);
716
717     // the amount of prebuffer frames should be a multiple of the common block size
718     // as otherwise the position of MIDI is messed up
719     if(xmit_prebuffer_frames % max_packet_size_frames) {
720         int tmp = 0;
721         while(tmp < xmit_prebuffer_frames) {
722             tmp += max_packet_size_frames;
723         }
724         debugOutput(DEBUG_LEVEL_VERBOSE,
725                     "The number of prebuffer frames (%d) is not a multiple of the common block size (%d), increased to %d...\n",
726                     xmit_prebuffer_frames, max_packet_size_frames, tmp);
727         xmit_prebuffer_frames = tmp;
728     }
729
730     // check if this can even work.
731     // the worst case point where we can receive a period is at 1 period + sync delay
732     // this means that the number of frames in the xmit buffer has to be at least
733     // 1 period + sync delay
734     if(xmit_prebuffer_frames + m_period * m_nb_buffers < m_period + sync_delay_frames) {
735         debugWarning("The amount of transmit buffer frames (%d) is too small (< %d). "
736                      "This will most likely cause xruns.\n",
737                      xmit_prebuffer_frames + m_period * m_nb_buffers,
738                      m_period + sync_delay_frames);
739     }
740
741     // at this point the buffer head timestamp of the transmit buffers can be set
742     // this is the presentation time of the first sample in the buffer
743     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
744           it != m_TransmitProcessors.end();
745           ++it ) {
746         // set the number of prebuffer frames
747         (*it)->setExtraBufferFrames(xmit_prebuffer_frames);
748
749         // set the TSP of the first sample in the buffer
750         (*it)->setBufferHeadTimestamp(time_of_first_sample);
751         ffado_timestamp_t ts;
752         signed int fc;
753         (*it)->getBufferHeadTimestamp ( &ts, &fc );
754         debugOutput( DEBUG_LEVEL_VERBOSE, " transmit buffer tail %010"PRId64" => head TS %010"PRIu64", fc=%d...\n",
755                     time_of_first_sample, (uint64_t)ts, fc);
756     }
757
758     // the receive processors can be delayed by sync_delay ticks
759     // this means that in the worst case we have to be able to accomodate
760     // an extra sync_delay ticks worth of frames in the receive SP buffer
761     // the sync delay should be rounded to an integer amount of max_packet_size
762     int tmp = sync_delay_frames / max_packet_size_frames;
763     tmp = tmp + 1;
764     sync_delay_frames = tmp * max_packet_size_frames;
765     if (sync_delay_frames < 1024) sync_delay_frames = 1024; //HACK
766
767     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
768           it != m_ReceiveProcessors.end();
769           ++it ) {
770         // set the number of extra buffer frames
771         (*it)->setExtraBufferFrames(sync_delay_frames);
772     }
773
774     // switch syncsource to running state
775     uint64_t time_to_start_sync;
776     // FIXME: this is most likely not going to work for transmit sync sources
777     // but those are unsupported in this version
778     if(m_SyncSource->getType() == StreamProcessor::ePT_Receive ) {
779         time_to_start_sync = time_to_start_recv;
780     } else {
781         time_to_start_sync = time_to_start_xmit;
782     }
783     if(!m_SyncSource->scheduleStartRunning(time_to_start_sync)) {
784         debugError("m_SyncSource->scheduleStartRunning(%11"PRIu64") failed\n", time_to_start_sync);
785         return false;
786     }
787
788     // STEP X: switch all non-syncsource SP's over to the running state
789     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
790           it != m_ReceiveProcessors.end();
791           ++it ) {
792         if(*it != m_SyncSource) {
793             if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
794                 debugError("%p->scheduleStartRunning(%11"PRIu64") failed\n", *it, time_to_start_recv);
795                 return false;
796             }
797         }
798     }
799     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
800           it != m_TransmitProcessors.end();
801           ++it ) {
802         if(*it != m_SyncSource) {
803             if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
804                 debugError("%p->scheduleStartRunning(%11"PRIu64") failed\n", *it, time_to_start_xmit);
805                 return false;
806             }
807         }
808     }
809     // wait for the syncsource to start running.
810     // that will block the waitForPeriod call until everyone has started (theoretically)
811     // note: the SP's are scheduled to start in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles,
812     // so a 20 times this value should be a good timeout
813     //int cnt = cycles_for_startup * 20; // by then it should have started
814     // or maybe we just have to use 1 second, as this wraps the cycle counter
815     int cnt = 8000;
816     while (!m_SyncSource->isRunning() && cnt) {
817         SleepRelativeUsec(125);
818         cnt--;
819     }
820     if(cnt==0) {
821         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
822         return false;
823     }
824
825     // the sync source is running, we can now read a decent received timestamp from it
826     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
827
828     // and a (rough) approximation of the rate
829     float rate = m_SyncSource->getTicksPerFrame();
830
831     #ifdef DEBUG
832     // the time at which the previous period would have passed
833     m_time_of_transfer2 = m_time_of_transfer;
834     m_time_of_transfer2 = substractTicks(m_time_of_transfer2, (uint64_t)(m_period * rate));
835     #endif
836
837     debugOutput( DEBUG_LEVEL_VERBOSE, "  initial time of transfer %010"PRId64", rate %f...\n",
838                 m_time_of_transfer, rate);
839
840     // FIXME: ideally we'd want the SP itself to account for the xmit_prebuffer_frames
841     // but that would also require to use a different approach to setting the initial TSP's
842     int64_t delay_in_ticks = (int64_t)(((float)((m_nb_buffers-1) * m_period + xmit_prebuffer_frames)) * rate);
843
844     // then use this information to initialize the xmit handlers
845
846     //  we now set the buffer tail timestamp of the transmit buffer
847     //  to the period transfer time instant plus what's nb_buffers - 1
848     //  in ticks. This due to the fact that we (should) have received one period
849     //  worth of ticks at t = m_time_of_transfer
850     //  hence one period of frames should also have been transmitted, which means
851     //  that there should be (nb_buffers - 1) * periodsize of frames in the xmit buffer
852     //  there are also xmit_prebuffer_frames frames extra present in the buffer
853     //  that allows us to calculate the tail timestamp for the buffer.
854
855     int64_t transmit_tail_timestamp = addTicks(m_time_of_transfer, delay_in_ticks);
856     debugOutput( DEBUG_LEVEL_VERBOSE, "  preset transmit tail TS %010"PRId64", rate %f...\n",
857                 transmit_tail_timestamp, rate);
858
859     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
860         it != m_TransmitProcessors.end();
861         ++it ) {
862         (*it)->setTicksPerFrame(rate);
863         (*it)->setBufferTailTimestamp(transmit_tail_timestamp);
864         ffado_timestamp_t ts;
865         signed int fc;
866         (*it)->getBufferHeadTimestamp ( &ts, &fc );
867         debugOutput( DEBUG_LEVEL_VERBOSE, "   => transmit head TS %010"PRId64", fc=%d...\n",
868                     (uint64_t)ts, fc);
869     }
870
871     // align the received streams to be phase aligned
872     if(!alignReceivedStreams()) {
873         debugError("Could not align streams...\n");
874         return false;
875     }
876
877     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
878     return true;
879 }
880
881 bool
882 StreamProcessorManager::alignReceivedStreams()
883 {
884     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
885     unsigned int nb_sync_runs;
886     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
887     int64_t diff_between_streams[nb_rcv_sp];
888     int64_t diff;
889
890     unsigned int i;
891
892     int cnt = STREAMPROCESSORMANAGER_NB_ALIGN_TRIES;
893     int align_average_time_msec = STREAMPROCESSORMANAGER_ALIGN_AVERAGE_TIME_MSEC;
894     Util::Configuration &config = m_parent.getConfiguration();
895     config.getValueForSetting("streaming.spm.align_tries", cnt);
896     config.getValueForSetting("streaming.spm.align_average_time_msec", align_average_time_msec);
897
898     unsigned int periods_per_align_try = (align_average_time_msec * getNominalRate());
899     periods_per_align_try /= 1000;
900     periods_per_align_try /= getPeriodSize();
901     debugOutput( DEBUG_LEVEL_VERBOSE, " averaging over %u periods...\n", periods_per_align_try);
902
903     bool aligned = false;
904     while (!aligned && cnt--) {
905         nb_sync_runs = periods_per_align_try;
906         while(nb_sync_runs) {
907             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
908             if(!waitForPeriod()) {
909                 debugWarning("xrun while aligning streams...\n");
910                 return false;
911             }
912
913             // before we do anything else, transfer
914             if(!transferSilence()) {
915                 debugError("Could not transfer silence\n");
916                 return false;
917             }
918
919             // now calculate the stream offset
920             i = 0;
921             for ( i = 0; i < nb_rcv_sp; i++) {
922                 StreamProcessor *s = m_ReceiveProcessors.at(i);
923                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
924                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %"PRId64" ticks...\n",
925                     m_SyncSource, s, diff);
926                 if ( nb_sync_runs == periods_per_align_try ) {
927                     diff_between_streams[i] = diff;
928                 } else {
929                     diff_between_streams[i] += diff;
930                 }
931             }
932
933             nb_sync_runs--;
934         }
935         // calculate the average offsets
936         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
937         int diff_between_streams_frames[nb_rcv_sp];
938         aligned = true;
939
940         // first find whether the streams are aligned and what their offset is
941         for ( i = 0; i < nb_rcv_sp; i++) {
942             StreamProcessor *s = m_ReceiveProcessors.at(i);
943
944             diff_between_streams[i] /= periods_per_align_try;
945             diff_between_streams_frames[i] = (int)roundf(diff_between_streams[i] / s->getTicksPerFrame());
946             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %"PRId64" ticks, %d frames...\n",
947                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
948
949             aligned &= (diff_between_streams_frames[i] == 0);
950         }
951
952         // if required, align the streams
953         int frames_to_shift_stream[nb_rcv_sp];
954         int min_shift = 9999;
955         if (!aligned) {
956             // find the minimum value (= earliest stream)
957             for ( i = 0; i < nb_rcv_sp; i++) {
958                 if (diff_between_streams_frames[i] < min_shift) {
959                     min_shift = diff_between_streams_frames[i];
960                 }
961             }
962             debugOutput( DEBUG_LEVEL_VERBOSE, " correcting shift with %d frames\n", min_shift);
963             // ensure that the streams are shifted only in the 'positive' direction
964             // i.e. that frames are only dropped, not added since that results
965             // in multiple writers for the data ringbuffer
966             // this also results in 'minimal shift' (not that it's required since the
967             // sync SP is part of the SP set)
968             for ( i = 0; i < nb_rcv_sp; i++) {
969                 frames_to_shift_stream[i] = diff_between_streams_frames[i] - min_shift;
970                 debugOutput(DEBUG_LEVEL_VERBOSE,
971                             "  going to drop %03d frames from stream %d\n",
972                             frames_to_shift_stream[i], i);
973             }
974             // perform the actual shift
975             for ( i = 0; i < nb_rcv_sp; i++) {
976                 StreamProcessor *s = m_ReceiveProcessors.at(i);
977                 // reposition the stream
978                 if(!s->shiftStream(frames_to_shift_stream[i])) {
979                     debugError("Could not shift SP %p %d frames\n", s, frames_to_shift_stream[i]);
980                     return false;
981                 }
982             }
983         }
984
985         if (!aligned) {
986             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
987         }
988     }
989     if (cnt == 0) {
990         debugError("Align failed\n");
991         return false;
992     }
993     return true;
994 }
995
996 bool StreamProcessorManager::start() {
997     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
998
999     // start all SP's synchonized
1000     bool start_result = false;
1001     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
1002         // put all SP's into dry-running state
1003         if (!startDryRunning()) {
1004             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
1005             start_result = false;
1006             continue;
1007         }
1008
1009         start_result = syncStartAll();
1010         if(start_result) {
1011             break;
1012         } else {
1013             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
1014             if(m_shutdown_needed) {
1015                 debugOutput(DEBUG_LEVEL_VERBOSE, "Some fatal error occurred, stop trying.\n");
1016                 return false;
1017             }
1018         }
1019     }
1020     if (!start_result) {
1021         debugFatal("Could not syncStartAll...\n");
1022         return false;
1023     }
1024     debugOutput( DEBUG_LEVEL_VERBOSE, " Started...\n");
1025     return true;
1026 }
1027
1028 bool StreamProcessorManager::stop() {
1029     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
1030
1031     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
1032     // switch SP's over to the dry-running state
1033     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1034           it != m_ReceiveProcessors.end();
1035           ++it ) {
1036         if((*it)->isRunning()) {
1037             if(!(*it)->scheduleStopRunning(-1)) {
1038                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
1039                 return false;
1040             }
1041         }
1042     }
1043     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1044           it != m_TransmitProcessors.end();
1045           ++it ) {
1046         if((*it)->isRunning()) {
1047             if(!(*it)->scheduleStopRunning(-1)) {
1048                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
1049                 return false;
1050             }
1051         }
1052     }
1053     // wait for the SP's to get into the dry-running/stopped state
1054     int cnt = 8000;
1055     bool ready = false;
1056     while (!ready && cnt) {
1057         ready = true;
1058         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1059             it != m_ReceiveProcessors.end();
1060             ++it ) {
1061             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream() || (*it)->inError());
1062         }
1063         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1064             it != m_TransmitProcessors.end();
1065             ++it ) {
1066             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream() || (*it)->inError());
1067         }
1068         SleepRelativeUsec(125);
1069         cnt--;
1070     }
1071     if(cnt==0) {
1072         debugWarning(" Timeout waiting for the SP's to start dry-running\n");
1073         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1074             it != m_ReceiveProcessors.end();
1075             ++it ) {
1076             (*it)->dumpInfo();
1077         }
1078         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1079             it != m_TransmitProcessors.end();
1080             ++it ) {
1081             (*it)->dumpInfo();
1082         }
1083         return false;
1084     }
1085
1086     // switch SP's over to the stopped state
1087     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1088           it != m_ReceiveProcessors.end();
1089           ++it ) {
1090         if ((*it)->inError()) {
1091             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
1092         } else if(!(*it)->scheduleStopDryRunning(-1)) {
1093             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
1094             return false;
1095         }
1096     }
1097     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1098           it != m_TransmitProcessors.end();
1099           ++it ) {
1100         if ((*it)->inError()) {
1101             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
1102         } else if(!(*it)->scheduleStopDryRunning(-1)) {
1103             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
1104             return false;
1105         }
1106     }
1107     // wait for the SP's to get into the stopped state
1108     cnt = 8000;
1109     ready = false;
1110     while (!ready && cnt) {
1111         ready = true;
1112         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1113             it != m_ReceiveProcessors.end();
1114             ++it ) {
1115             ready &= ((*it)->isStopped() || (*it)->inError());
1116         }
1117         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1118             it != m_TransmitProcessors.end();
1119             ++it ) {
1120             ready &= ((*it)->isStopped() || (*it)->inError());
1121         }
1122         SleepRelativeUsec(125);
1123         cnt--;
1124     }
1125     if(cnt==0) {
1126         debugWarning(" Timeout waiting for the SP's to stop\n");
1127         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1128             it != m_ReceiveProcessors.end();
1129             ++it ) {
1130             (*it)->dumpInfo();
1131         }
1132         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1133             it != m_TransmitProcessors.end();
1134             ++it ) {
1135             (*it)->dumpInfo();
1136         }
1137         return false;
1138     }
1139     debugOutput( DEBUG_LEVEL_VERBOSE, " Stopped...\n");
1140     return true;
1141 }
1142
1143 /**
1144  * Called upon Xrun events. This brings all StreamProcessors back
1145  * into their starting state, and then carries on streaming. This should
1146  * have the same effect as restarting the whole thing.
1147  *
1148  * @return true if successful, false otherwise
1149  */
1150 bool StreamProcessorManager::handleXrun() {
1151
1152     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
1153
1154     dumpInfo();
1155
1156     /*
1157      * Reset means:
1158      * 1) Disabling the SP's, so that they don't process any packets
1159      *    note: the isomanager does keep on delivering/requesting them
1160      * 2) Bringing all buffers & streamprocessors into a know state
1161      *    - Clear all capture buffers
1162      *    - Put nb_periods*period_size of null frames into the playback buffers
1163      * 3) Re-enable the SP's
1164      */
1165
1166     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
1167     // start all SP's synchonized
1168     bool start_result = false;
1169     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
1170         if(m_shutdown_needed) {
1171             debugOutput(DEBUG_LEVEL_VERBOSE, "Shutdown requested...\n");
1172             return true;
1173         }
1174         // put all SP's into dry-running state
1175         if (!startDryRunning()) {
1176             debugShowBackLog();
1177             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
1178             start_result = false;
1179             continue;
1180         }
1181
1182         start_result = syncStartAll();
1183         if(start_result) {
1184             break;
1185         } else {
1186             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
1187         }
1188     }
1189     if (!start_result) {
1190         debugFatal("Could not syncStartAll...\n");
1191         return false;
1192     }
1193     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
1194
1195     return true;
1196 }
1197
1198 /**
1199  * @brief Waits until the next period of samples is ready
1200  *
1201  * This function does not return until a full period of samples is (or should be)
1202  * ready to be transferred.
1203  *
1204  * @return true if the period is ready, false if not
1205  */
1206 bool StreamProcessorManager::waitForPeriod() {
1207     if(m_SyncSource == NULL) return false;
1208     if(m_shutdown_needed) return false;
1209     bool xrun_occurred = false;
1210     bool in_error = false;
1211
1212     // grab the wait lock
1213     // this ensures that bus reset handling doesn't interfere
1214     Util::MutexLockHelper lock(*m_WaitLock);
1215     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1216                         "waiting for period (%d frames in buffer)...\n",
1217                         m_SyncSource->getBufferFill());
1218     uint64_t ticks_at_period = m_SyncSource->getTimeAtPeriod();
1219     uint64_t ticks_at_period_margin = ticks_at_period + m_sync_delay;
1220     uint64_t pred_system_time_at_xfer = m_SyncSource->getParent().get1394Service().getSystemTimeForCycleTimerTicks(ticks_at_period_margin);
1221
1222     #ifdef DEBUG
1223     int64_t now = Util::SystemTimeSource::getCurrentTime();
1224     debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "CTR  pred: %"PRId64", syncdelay: %"PRId64", diff: %"PRId64"\n", ticks_at_period, ticks_at_period_margin, ticks_at_period_margin-ticks_at_period );
1225     debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "PREWAIT  pred: %"PRId64", now: %"PRId64", wait: %"PRId64"\n", pred_system_time_at_xfer, now, pred_system_time_at_xfer-now );
1226     #endif
1227
1228     // wait until it's time to transfer
1229     Util::SystemTimeSource::SleepUsecAbsolute(pred_system_time_at_xfer);
1230
1231     #ifdef DEBUG
1232     now = Util::SystemTimeSource::getCurrentTime();
1233     debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "POSTWAIT pred: %"PRId64", now: %"PRId64", excess: %"PRId64"\n", pred_system_time_at_xfer, now, now-pred_system_time_at_xfer );
1234     #endif
1235
1236     // the period should be ready now
1237     #ifdef DEBUG
1238     int rcv_fills[10];
1239     int xmt_fills[10];
1240     int i;
1241     i=0;
1242     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1243         it != m_ReceiveProcessors.end();
1244         ++it ) {
1245         rcv_fills[i] = (*it)->getBufferFill();
1246         debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "RECV SP %p bufferfill: %05d\n", *it, rcv_fills[i]);
1247         i++;
1248     }
1249     i=0;
1250     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1251         it != m_TransmitProcessors.end();
1252         ++it ) {
1253         xmt_fills[i] = (*it)->getBufferFill();
1254         debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "XMIT SP %p bufferfill: %05d\n", *it, xmt_fills[i]);
1255         i++;
1256     }
1257     for(i=0;i<1;i++) {
1258         debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "SP %02d RECV: %05d [%05d] XMIT: %05d [%05d] DIFF: %05d\n", i,
1259                     rcv_fills[i], rcv_fills[i] - m_period,
1260                     xmt_fills[i], xmt_fills[i] - m_period,
1261                     rcv_fills[i] - xmt_fills[i]);
1262     }
1263     #endif
1264
1265     #if STREAMPROCESSORMANAGER_ALLOW_DELAYED_PERIOD_SIGNAL
1266     // HACK: we force wait until every SP is ready. this is needed
1267     // since the raw1394 interface provides no control over interrupts
1268     // resulting in very bad predictability on when the data is present.
1269     bool period_not_ready = true;
1270     while(period_not_ready) {
1271         period_not_ready = false;
1272         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1273             it != m_ReceiveProcessors.end();
1274             ++it ) {
1275             bool this_sp_period_ready = (*it)->canConsumePeriod();
1276             if (!this_sp_period_ready) {
1277                 period_not_ready = true;
1278             }
1279         }
1280         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1281             it != m_TransmitProcessors.end();
1282             ++it ) {
1283             bool this_sp_period_ready = (*it)->canProducePeriod();
1284             if (!this_sp_period_ready) {
1285                 period_not_ready = true;
1286             }
1287         }
1288
1289         if (period_not_ready) {
1290             debugOutput(DEBUG_LEVEL_VERBOSE, " wait extended since period not ready...\n");
1291             Util::SystemTimeSource::SleepUsecRelative(125); // one cycle
1292         }
1293
1294         // check for underruns/errors on the ISO side,
1295         // those should make us bail out of the wait loop
1296         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1297             it != m_ReceiveProcessors.end();
1298             ++it ) {
1299             // a xrun has occurred on the Iso side
1300             xrun_occurred |= (*it)->xrunOccurred();
1301             in_error |= (*it)->inError();
1302         }
1303         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1304             it != m_TransmitProcessors.end();
1305             ++it ) {
1306             // a xrun has occurred on the Iso side
1307             xrun_occurred |= (*it)->xrunOccurred();
1308             in_error |= (*it)->inError();
1309         }
1310         if(xrun_occurred | in_error | m_shutdown_needed) break;
1311     }
1312     #else
1313     // check for underruns/errors on the ISO side,
1314     // those should make us bail out of the wait loop
1315     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1316         it != m_ReceiveProcessors.end();
1317         ++it ) {
1318         // xrun on data buffer side
1319         if (!(*it)->canConsumePeriod()) {
1320             xrun_occurred = true;
1321         }
1322         // a xrun has occurred on the Iso side
1323         xrun_occurred |= (*it)->xrunOccurred();
1324         in_error |= (*it)->inError();
1325     }
1326     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1327         it != m_TransmitProcessors.end();
1328         ++it ) {
1329         // xrun on data buffer side
1330         if (!(*it)->canProducePeriod()) {
1331             xrun_occurred = true;
1332         }
1333         // a xrun has occurred on the Iso side
1334         xrun_occurred |= (*it)->xrunOccurred();
1335         in_error |= (*it)->inError();
1336     }
1337     #endif
1338
1339     if(xrun_occurred) {
1340         debugOutput( DEBUG_LEVEL_VERBOSE, "exit due to xrun...\n");
1341     }
1342     if(in_error) {
1343         debugOutput( DEBUG_LEVEL_VERBOSE, "exit due to error...\n");
1344         m_shutdown_needed = true;
1345     }
1346
1347     // we save the 'ideal' time of the transfer at this point,
1348     // because we can have interleaved read - process - write
1349     // cycles making that we modify a receiving stream's buffer
1350     // before we get to writing.
1351     // NOTE: before waitForPeriod() is called again, both the transmit
1352     //       and the receive processors should have done their transfer.
1353     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
1354    
1355     #ifdef DEBUG
1356     int ticks_per_period = (int)(m_SyncSource->getTicksPerFrame() * m_period);
1357    
1358     int diff = diffTicks(m_time_of_transfer, m_time_of_transfer2);
1359     // display message if the difference between two successive tick
1360     // values is more than 50 ticks. 1 sample at 48k is 512 ticks
1361     // so 50 ticks = 10%, which is a rather large jitter value.
1362     if(diff-ticks_per_period > m_max_diff_ticks || diff-ticks_per_period < -m_max_diff_ticks) {
1363         debugOutput(DEBUG_LEVEL_VERBOSE, "rather large TSP difference TS=%011"PRIu64" => TS=%011"PRIu64" (%d, nom %d)\n",
1364                                             m_time_of_transfer2, m_time_of_transfer, diff, ticks_per_period);
1365     }
1366     m_time_of_transfer2 = m_time_of_transfer;
1367     #endif
1368
1369     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1370                         "transfer period %d at %"PRIu64" ticks...\n",
1371                         m_nbperiods, m_time_of_transfer);
1372
1373     #ifdef DEBUG
1374     int rcv_bf=0, xmt_bf=0;
1375     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1376         it != m_ReceiveProcessors.end();
1377         ++it ) {
1378         rcv_bf = (*it)->getBufferFill();
1379     }
1380     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1381         it != m_TransmitProcessors.end();
1382         ++it ) {
1383         xmt_bf = (*it)->getBufferFill();
1384     }
1385     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
1386                         "XF at %011"PRIu64" ticks, RBF=%d, XBF=%d, SUM=%d...\n",
1387                         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
1388
1389     // check if xruns occurred on the Iso side.
1390     // also check if xruns will occur should we transfer() now
1391     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1392           it != m_ReceiveProcessors.end();
1393           ++it ) {
1394
1395         if ((*it)->xrunOccurred()) {
1396             debugOutput(DEBUG_LEVEL_NORMAL,
1397                         "Xrun on RECV SP %p due to ISO side xrun\n", *it);
1398             (*it)->dumpInfo();
1399         }
1400         if (!((*it)->canClientTransferFrames(m_period))) {
1401             debugOutput(DEBUG_LEVEL_NORMAL,
1402                         "Xrun on RECV SP %p due to buffer side xrun\n", *it);
1403             (*it)->dumpInfo();
1404         }
1405     }
1406     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1407           it != m_TransmitProcessors.end();
1408           ++it ) {
1409         if ((*it)->xrunOccurred()) {
1410             debugOutput(DEBUG_LEVEL_NORMAL,
1411                         "Xrun on XMIT SP %p due to ISO side xrun\n", *it);
1412         }
1413         if (!((*it)->canClientTransferFrames(m_period))) {
1414             debugOutput(DEBUG_LEVEL_NORMAL,
1415                         "Xrun on XMIT SP %p due to buffer side xrun\n", *it);
1416         }
1417     }
1418     #endif
1419     m_nbperiods++;
1420
1421     // this is to notify the client of the delay that we introduced by waiting
1422     pred_system_time_at_xfer = m_SyncSource->getParent().get1394Service().getSystemTimeForCycleTimerTicks(m_time_of_transfer);
1423
1424     m_delayed_usecs = Util::SystemTimeSource::getCurrentTime() - pred_system_time_at_xfer;
1425     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1426                         "delayed for %d usecs...\n",
1427                         m_delayed_usecs);
1428
1429     // now we can signal the client that we are (should be) ready
1430     return !xrun_occurred;
1431 }
1432
1433 /**
1434  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
1435  *
1436  * Transfers one period of frames from the client side to the Iso side and vice versa.
1437  *
1438  * @return true if successful, false otherwise (indicates xrun).
1439  */
1440 bool StreamProcessorManager::transfer() {
1441     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
1442     bool retval=true;
1443     retval &= transfer(StreamProcessor::ePT_Receive);
1444     retval &= transfer(StreamProcessor::ePT_Transmit);
1445     return retval;
1446 }
1447
1448 /**
1449  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
1450  *
1451  * Transfers one period of frames from the client side to the Iso side or vice versa.
1452  *
1453  * @param t The processor type to tranfer for (receive or transmit)
1454  * @return true if successful, false otherwise (indicates xrun).
1455  */
1456 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
1457     if(m_SyncSource == NULL) return false;
1458     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
1459         "transfer(%d) at TS=%011"PRIu64" (%03us %04uc %04ut)...\n",
1460         t, m_time_of_transfer,
1461         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
1462         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
1463         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
1464
1465     bool retval = true;
1466     // a static cast could make sure that there is no performance
1467     // penalty for the virtual functions (to be checked)
1468     if (t==StreamProcessor::ePT_Receive) {
1469         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1470                 it != m_ReceiveProcessors.end();
1471                 ++it ) {
1472             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
1473                     debugWarning("could not getFrames(%u, %11"PRIu64") from stream processor (%p)\n",
1474                             m_period, m_time_of_transfer,*it);
1475                 retval &= false; // buffer underrun
1476             }
1477         }
1478     } else {
1479         // FIXME: in the SPM it would be nice to have system time instead of
1480         //        1394 time
1481         float rate = m_SyncSource->getTicksPerFrame();
1482
1483         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1484                 it != m_TransmitProcessors.end();
1485                 ++it ) {
1486             // this is the delay in frames between the point where a frame is received and
1487             // when it is transmitted again
1488             unsigned int one_ringbuffer_in_frames = m_nb_buffers * m_period + (*it)->getExtraBufferFrames();
1489             int64_t one_ringbuffer_in_ticks = (int64_t)(((float)one_ringbuffer_in_frames) * rate);
1490    
1491             // the data we are putting into the buffer is intended to be transmitted
1492             // one ringbuffer size after it has been received
1493             int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1494
1495             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
1496                 debugWarning("could not putFrames(%u,%"PRIu64") to stream processor (%p)\n",
1497                         m_period, transmit_timestamp, *it);
1498                 retval &= false; // buffer underrun
1499             }
1500         }
1501     }
1502     return retval;
1503 }
1504
1505 /**
1506  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
1507  *
1508  * Transfers one period of silence to the Iso side for transmit SP's
1509  * or dump one period of frames for receive SP's
1510  *
1511  * @return true if successful, false otherwise (indicates xrun).
1512  */
1513 bool StreamProcessorManager::transferSilence() {
1514     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
1515     bool retval=true;
1516     // NOTE: the order here is opposite from the order in
1517     // normal operation (transmit is before receive), because
1518     // we can do that here (data=silence=available) and
1519     // it increases reliability (esp. on startup)
1520     retval &= transferSilence(StreamProcessor::ePT_Transmit);
1521     retval &= transferSilence(StreamProcessor::ePT_Receive);
1522     return retval;
1523 }
1524
1525 /**
1526  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
1527  *
1528  * Transfers one period of silence to the Iso side for transmit SP's
1529  * or dump one period of frames for receive SP's
1530  *
1531  * @param t The processor type to tranfer for (receive or transmit)
1532  * @return true if successful, false otherwise (indicates xrun).
1533  */
1534 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
1535     if(m_SyncSource == NULL) return false;
1536     debugOutput( DEBUG_LEVEL_VERY_VERBOSE,
1537         "transferSilence(%d) at TS=%011"PRIu64" (%03us %04uc %04ut)...\n",
1538         t, m_time_of_transfer,
1539         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
1540         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
1541         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
1542
1543     bool retval = true;
1544     // a static cast could make sure that there is no performance
1545     // penalty for the virtual functions (to be checked)
1546     if (t==StreamProcessor::ePT_Receive) {
1547         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1548                 it != m_ReceiveProcessors.end();
1549                 ++it ) {
1550             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
1551                     debugWarning("could not dropFrames(%u, %11"PRIu64") from stream processor (%p)\n",
1552                             m_period, m_time_of_transfer,*it);
1553                 retval &= false; // buffer underrun
1554             }
1555         }
1556     } else {
1557         // FIXME: in the SPM it would be nice to have system time instead of
1558         //        1394 time
1559         float rate = m_SyncSource->getTicksPerFrame();
1560
1561         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1562                 it != m_TransmitProcessors.end();
1563                 ++it ) {
1564             // this is the delay in frames between the point where a frame is received and
1565             // when it is transmitted again
1566             unsigned int one_ringbuffer_in_frames = m_nb_buffers * m_period + (*it)->getExtraBufferFrames();
1567             int64_t one_ringbuffer_in_ticks = (int64_t)(((float)one_ringbuffer_in_frames) * rate);
1568    
1569             // the data we are putting into the buffer is intended to be transmitted
1570             // one ringbuffer size after it has been received
1571             int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1572
1573             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
1574                 debugWarning("could not putSilenceFrames(%u,%"PRIu64") to stream processor (%p)\n",
1575                         m_period, transmit_timestamp, *it);
1576                 retval &= false; // buffer underrun
1577             }
1578         }
1579     }
1580     return retval;
1581 }
1582
1583 void StreamProcessorManager::dumpInfo() {
1584     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1585     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
1586     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
1587     debugOutputShort( DEBUG_LEVEL_NORMAL, "Data type: %s\n", (m_audio_datatype==eADT_Float?"float":"int24"));
1588
1589     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
1590     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1591         it != m_ReceiveProcessors.end();
1592         ++it ) {
1593         (*it)->dumpInfo();
1594     }
1595
1596     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
1597     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1598         it != m_TransmitProcessors.end();
1599         ++it ) {
1600         (*it)->dumpInfo();
1601     }
1602
1603     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1604
1605     // list port info in verbose mode
1606     debugOutputShort( DEBUG_LEVEL_VERBOSE, "Port Information\n");
1607     int nb_ports;
1608    
1609     debugOutputShort( DEBUG_LEVEL_VERBOSE, " Playback\n");
1610     nb_ports = getPortCount(Port::E_Playback);
1611     for(int i=0; i < nb_ports; i++) {
1612         Port *p = getPortByIndex(i, Port::E_Playback);
1613         debugOutputShort( DEBUG_LEVEL_VERBOSE, "  %3d (%p): ", i, p);
1614         if (p) {
1615             bool disabled = p->isDisabled();
1616             debugOutputShort( DEBUG_LEVEL_VERBOSE, "[%p] [%3s] ", &p->getManager(), (disabled?"off":"on"));
1617             debugOutputShort( DEBUG_LEVEL_VERBOSE, "[%7s] ", p->getPortTypeName().c_str());
1618             debugOutputShort( DEBUG_LEVEL_VERBOSE, "%3s ", p->getName().c_str());
1619         } else {
1620             debugOutputShort( DEBUG_LEVEL_VERBOSE, "invalid ");
1621         }
1622         debugOutputShort( DEBUG_LEVEL_VERBOSE, "\n");
1623     }
1624     debugOutputShort( DEBUG_LEVEL_VERBOSE, " Capture\n");
1625     nb_ports = getPortCount(Port::E_Capture);
1626     for(int i=0; i < nb_ports; i++) {
1627         Port *p = getPortByIndex(i, Port::E_Capture);
1628         debugOutputShort( DEBUG_LEVEL_VERBOSE, "  %3d (%p): ", i, p);
1629         if (p) {
1630             bool disabled = p->isDisabled();
1631             debugOutputShort( DEBUG_LEVEL_VERBOSE, "[%p] [%3s] ", &p->getManager(), (disabled?"off":"on"));
1632             debugOutputShort( DEBUG_LEVEL_VERBOSE, "[%7s] ", p->getPortTypeName().c_str());
1633             debugOutputShort( DEBUG_LEVEL_VERBOSE, " %3s ", p->getName().c_str());
1634         } else {
1635             debugOutputShort( DEBUG_LEVEL_VERBOSE, " invalid ");
1636         }
1637         debugOutputShort( DEBUG_LEVEL_VERBOSE, "\n");
1638     }
1639
1640     debugOutputShort( DEBUG_LEVEL_VERBOSE, "----------------------------------------------------\n");
1641
1642 }
1643
1644 void StreamProcessorManager::setVerboseLevel(int l) {
1645     if(m_WaitLock) m_WaitLock->setVerboseLevel(l);
1646
1647     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1648         it != m_ReceiveProcessors.end();
1649         ++it ) {
1650         (*it)->setVerboseLevel(l);
1651     }
1652     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1653         it != m_TransmitProcessors.end();
1654         ++it ) {
1655         (*it)->setVerboseLevel(l);
1656     }
1657     setDebugLevel(l);
1658     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting verbose level to %d...\n", l );
1659 }
1660
1661 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1662     int count=0;
1663
1664     if (direction == Port::E_Capture) {
1665         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1666             it != m_ReceiveProcessors.end();
1667             ++it ) {
1668             count += (*it)->getPortCount(type);
1669         }
1670     } else {
1671         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1672             it != m_TransmitProcessors.end();
1673             ++it ) {
1674             count += (*it)->getPortCount(type);
1675         }
1676     }
1677     return count;
1678 }
1679
1680 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1681     int count=0;
1682
1683     if (direction == Port::E_Capture) {
1684         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1685             it != m_ReceiveProcessors.end();
1686             ++it ) {
1687             count += (*it)->getPortCount();
1688         }
1689     } else {
1690         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1691             it != m_TransmitProcessors.end();
1692             ++it ) {
1693             count += (*it)->getPortCount();
1694         }
1695     }
1696     return count;
1697 }
1698
1699 void
1700 StreamProcessorManager::updateShadowLists()
1701 {
1702     debugOutput( DEBUG_LEVEL_VERBOSE, "Updating port shadow lists...\n");
1703     m_CapturePorts_shadow.clear();
1704     m_PlaybackPorts_shadow.clear();
1705
1706     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1707         it != m_ReceiveProcessors.end();
1708         ++it ) {
1709         PortManager *pm = *it;
1710         for (int i=0; i < pm->getPortCount(); i++) {
1711             Port *p = pm->getPortAtIdx(i);
1712             if (!p) {
1713                 debugError("getPortAtIdx(%d) returned NULL\n", i);
1714                 continue;
1715             }
1716             if(p->getDirection() != Port::E_Capture) {
1717                 debugError("port at idx %d for receive SP is not a capture port!\n", i);
1718                 continue;
1719             }
1720             m_CapturePorts_shadow.push_back(p);
1721         }
1722     }
1723     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1724         it != m_TransmitProcessors.end();
1725         ++it ) {
1726         PortManager *pm = *it;
1727         for (int i=0; i < pm->getPortCount(); i++) {
1728             Port *p = pm->getPortAtIdx(i);
1729             if (!p) {
1730                 debugError("getPortAtIdx(%d) returned NULL\n", i);
1731                 continue;
1732             }
1733             if(p->getDirection() != Port::E_Playback) {
1734                 debugError("port at idx %d for transmit SP is not a playback port!\n", i);
1735                 continue;
1736             }
1737             m_PlaybackPorts_shadow.push_back(p);
1738         }
1739     }
1740 }
1741
1742 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1743     debugOutputExtreme( DEBUG_LEVEL_ULTRA_VERBOSE, "getPortByIndex(%d, %d)...\n", idx, direction);
1744     if (direction == Port::E_Capture) {
1745         #ifdef DEBUG
1746         if(idx >= (int)m_CapturePorts_shadow.size()) {
1747             debugError("Capture port %d out of range (%zd)\n", idx, m_CapturePorts_shadow.size());
1748             return NULL;
1749         }
1750         #endif
1751         return m_CapturePorts_shadow.at(idx);
1752     } else {
1753         #ifdef DEBUG
1754         if(idx >= (int)m_PlaybackPorts_shadow.size()) {
1755             debugError("Playback port %d out of range (%zd)\n", idx, m_PlaybackPorts_shadow.size());
1756             return NULL;
1757         }
1758         #endif
1759         return m_PlaybackPorts_shadow.at(idx);
1760     }
1761     return NULL;
1762 }
1763
1764 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1765     m_thread_realtime=rt;
1766     m_thread_priority=priority;
1767     return true;
1768 }
1769
1770
1771 } // end of namespace
Note: See TracBrowser for help on using the browser.