root/branches/libffado-2.0/src/libstreaming/StreamProcessorManager.cpp

Revision 1373, 54.0 kB (checked in by ppalmers, 15 years ago)

make IsoManager? and SPM settings from config.h overridable by the user/distro config file.

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23 #include "config.h"
24
25 #include "StreamProcessorManager.h"
26 #include "generic/StreamProcessor.h"
27 #include "generic/Port.h"
28 #include "libieee1394/cycletimer.h"
29
30 #include "devicemanager.h"
31
32 #include "libutil/Time.h"
33
34 #include <errno.h>
35 #include <assert.h>
36 #include <math.h>
37
38 namespace Streaming {
39
40 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
41
42 StreamProcessorManager::StreamProcessorManager(DeviceManager &p)
43     : m_is_slave( false )
44     , m_SyncSource(NULL)
45     , m_parent( p )
46     , m_xrun_happened( false )
47     , m_activity_wait_timeout_nsec( 0 ) // dynamically set
48     , m_nb_buffers( 0 )
49     , m_period( 0 )
50     , m_audio_datatype( eADT_Float )
51     , m_nominal_framerate ( 0 )
52     , m_xruns(0)
53     , m_shutdown_needed(false)
54     , m_nbperiods(0)
55     , m_WaitLock( new Util::PosixMutex("SPMWAIT") )
56 {
57     addOption(Util::OptionContainer::Option("slaveMode",false));
58     sem_init(&m_activity_semaphore, 0, 0);
59 }
60
61 StreamProcessorManager::StreamProcessorManager(DeviceManager &p, unsigned int period,
62                                                unsigned int framerate, unsigned int nb_buffers)
63     : m_is_slave( false )
64     , m_SyncSource(NULL)
65     , m_parent( p )
66     , m_xrun_happened( false )
67     , m_activity_wait_timeout_nsec( 0 ) // dynamically set
68     , m_nb_buffers(nb_buffers)
69     , m_period(period)
70     , m_audio_datatype( eADT_Float )
71     , m_nominal_framerate ( framerate )
72     , m_xruns(0)
73     , m_shutdown_needed(false)
74     , m_nbperiods(0)
75     , m_WaitLock( new Util::PosixMutex("SPMWAIT") )
76 {
77     addOption(Util::OptionContainer::Option("slaveMode",false));
78     sem_init(&m_activity_semaphore, 0, 0);
79 }
80
81 StreamProcessorManager::~StreamProcessorManager() {
82     sem_post(&m_activity_semaphore);
83     sem_destroy(&m_activity_semaphore);
84     delete m_WaitLock;
85 }
86
87 // void
88 // StreamProcessorManager::handleBusReset(Ieee1394Service &s)
89 // {
90 // //     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) Handle bus reset on service %p...\n", this, &s);
91 // //
92 // //     bool handled_at_least_one = false;
93 // //     // note that all receive streams are gone once a device is unplugged
94 // //
95 // //     // synchronize with the wait lock
96 // //     Util::MutexLockHelper lock(*m_WaitLock);
97 // //
98 // //     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) got wait lock...\n", this);
99 // //     // cause all SP's to bail out
100 // //     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
101 // //           it != m_ReceiveProcessors.end();
102 // //           ++it )
103 // //     {
104 // //         if(&s == &((*it)->getParent().get1394Service())) {
105 // //             debugOutput(DEBUG_LEVEL_NORMAL,
106 // //                         "issue busreset on receive SPM on channel %d\n",
107 // //                         (*it)->getChannel());
108 // //             (*it)->handleBusReset();
109 // //             handled_at_least_one = true;
110 // //         } else {
111 // //             debugOutput(DEBUG_LEVEL_NORMAL,
112 // //                         "skipping receive SPM on channel %d since not on service %p\n",
113 // //                         (*it)->getChannel(), &s);
114 // //         }
115 // //     }
116 // //     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
117 // //           it != m_TransmitProcessors.end();
118 // //           ++it )
119 // //     {
120 // //         if(&s == &((*it)->getParent().get1394Service())) {
121 // //             debugOutput(DEBUG_LEVEL_NORMAL,
122 // //                         "issue busreset on transmit SPM on channel %d\n",
123 // //                         (*it)->getChannel());
124 // //             (*it)->handleBusReset();
125 // //             handled_at_least_one = true;
126 // //         } else {
127 // //             debugOutput(DEBUG_LEVEL_NORMAL,
128 // //                         "skipping transmit SPM on channel %d since not on service %p\n",
129 // //                         (*it)->getChannel(), &s);
130 // //         }
131 // //     }
132 // //
133 // //     // FIXME: we request shutdown for now.
134 // //     m_shutdown_needed = handled_at_least_one;
135 // }
136
137 void
138 StreamProcessorManager::signalActivity()
139 {
140     sem_post(&m_activity_semaphore);
141     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p activity\n", this);
142 }
143
144 enum StreamProcessorManager::eActivityResult
145 StreamProcessorManager::waitForActivity()
146 {
147     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p waiting for activity\n", this);
148     struct timespec ts;
149     int result;
150
151     if (m_activity_wait_timeout_nsec >= 0) {
152
153         if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
154             debugError("clock_gettime failed\n");
155             return eAR_Error;
156         }
157         ts.tv_nsec += m_activity_wait_timeout_nsec;
158         while(ts.tv_nsec >= 1000000000LL) {
159             ts.tv_sec += 1;
160             ts.tv_nsec -= 1000000000LL;
161         }
162     }
163
164     if (m_activity_wait_timeout_nsec >= 0) {
165         result = sem_timedwait(&m_activity_semaphore, &ts);
166     } else {
167         result = sem_wait(&m_activity_semaphore);
168     }
169
170     if(result != 0) {
171         if (errno == ETIMEDOUT) {
172             debugOutput(DEBUG_LEVEL_VERBOSE,
173                         "(%p) sem_timedwait() timed out (result=%d)\n",
174                         this, result);
175             return eAR_Timeout;
176         } else if (errno == EINTR) {
177             debugOutput(DEBUG_LEVEL_VERBOSE,
178                         "(%p) sem_[timed]wait() interrupted by signal (result=%d)\n",
179                         this, result);
180             return eAR_Interrupted;
181         } else if (errno == EINVAL) {
182             debugError("(%p) sem_[timed]wait error (result=%d errno=EINVAL)\n",
183                         this, result);
184             debugError("(%p) timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n",
185                        this, m_activity_wait_timeout_nsec, ts.tv_sec, ts.tv_nsec);
186             return eAR_Error;
187         } else {
188             debugError("(%p) sem_[timed]wait error (result=%d errno=%d)\n",
189                         this, result, errno);
190             debugError("(%p) timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n",
191                        this, m_activity_wait_timeout_nsec, ts.tv_sec, ts.tv_nsec);
192             return eAR_Error;
193         }
194     }
195
196     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p got activity\n", this);
197     return eAR_Activity;
198 }
199
200 /**
201  * Registers \ref processor with this manager.
202  *
203  * also registers it with the isohandlermanager
204  *
205  * be sure to call isohandlermanager->init() first!
206  * and be sure that the processors are also ->init()'ed
207  *
208  * @param processor
209  * @return true if successfull
210  */
211 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
212 {
213     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
214     assert(processor);
215     if (processor->getType() == StreamProcessor::ePT_Receive) {
216         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
217         m_ReceiveProcessors.push_back(processor);
218         return true;
219     }
220
221     if (processor->getType() == StreamProcessor::ePT_Transmit) {
222         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
223         m_TransmitProcessors.push_back(processor);
224         return true;
225     }
226
227     debugFatal("Unsupported processor type!\n");
228     return false;
229 }
230
231 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
232 {
233     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
234     assert(processor);
235
236     if (processor->getType()==StreamProcessor::ePT_Receive) {
237
238         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
239               it != m_ReceiveProcessors.end();
240               ++it )
241         {
242             if ( *it == processor ) {
243                 if (*it == m_SyncSource) {
244                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
245                     m_SyncSource = NULL;
246                 }
247                 m_ReceiveProcessors.erase(it);
248                 return true;
249             }
250         }
251     }
252
253     if (processor->getType()==StreamProcessor::ePT_Transmit) {
254         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
255               it != m_TransmitProcessors.end();
256               ++it )
257         {
258             if ( *it == processor ) {
259                 if (*it == m_SyncSource) {
260                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
261                     m_SyncSource = NULL;
262                 }
263                 m_TransmitProcessors.erase(it);
264                 return true;
265             }
266         }
267     }
268
269     debugFatal("Processor (%p) not found!\n",processor);
270     return false; //not found
271 }
272
273 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
274     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
275     m_SyncSource=s;
276     return true;
277 }
278
279 bool StreamProcessorManager::prepare() {
280
281     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
282     m_is_slave=false;
283     if(!getOption("slaveMode", m_is_slave)) {
284         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
285     }
286
287     m_shutdown_needed=false;
288
289     // if no sync source is set, select one here
290     if(m_SyncSource == NULL) {
291        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
292     }
293
294     // FIXME: put into separate method
295     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
296           it != m_ReceiveProcessors.end();
297           ++it )
298     {
299         if(m_SyncSource == NULL) {
300             debugWarning(" => Sync Source is %p.\n", *it);
301             m_SyncSource = *it;
302         }
303     }
304     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
305           it != m_TransmitProcessors.end();
306           ++it )
307     {
308         if(m_SyncSource == NULL) {
309             debugWarning(" => Sync Source is %p.\n", *it);
310             m_SyncSource = *it;
311         }
312     }
313
314     // now do the actual preparation of the SP's
315     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
316     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
317         it != m_ReceiveProcessors.end();
318         ++it ) {
319
320         if(!(*it)->setOption("slaveMode", m_is_slave)) {
321             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
322         }
323
324         if(!(*it)->prepare()) {
325             debugFatal(  " could not prepare (%p)...\n",(*it));
326             return false;
327         }
328     }
329     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
330     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
331         it != m_TransmitProcessors.end();
332         ++it ) {
333         if(!(*it)->setOption("slaveMode", m_is_slave)) {
334             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
335         }
336         if(!(*it)->prepare()) {
337             debugFatal( " could not prepare (%p)...\n",(*it));
338             return false;
339         }
340     }
341
342     // if there are no stream processors registered,
343     // fail
344     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
345         debugFatal("No stream processors registered, can't do anything usefull\n");
346         return false;
347     }
348
349     // set the activity timeout value to two periods worth of usecs.
350     // since we can expect activity once every period, but we might have some
351     // offset, the safe value is two periods.
352     int timeout_usec = 2*1000LL * 1000LL * m_period / m_nominal_framerate;
353     debugOutput(DEBUG_LEVEL_VERBOSE, "setting activity timeout to %d\n", timeout_usec);
354     setActivityWaitTimeoutUsec(timeout_usec);
355
356     return true;
357 }
358
359 bool
360 StreamProcessorManager::startDryRunning()
361 {
362     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
363     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
364     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
365             it != m_TransmitProcessors.end();
366             ++it ) {
367         if ((*it)->inError()) {
368             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
369             return false;
370         }
371         if (!(*it)->isDryRunning()) {
372             if(!(*it)->scheduleStartDryRunning(-1)) {
373                 debugError("Could not put SP %p into the dry-running state\n", *it);
374                 return false;
375             }
376         } else {
377             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
378         }
379     }
380     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
381             it != m_ReceiveProcessors.end();
382             ++it ) {
383         if ((*it)->inError()) {
384             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
385             return false;
386         }
387         if (!(*it)->isDryRunning()) {
388             if(!(*it)->scheduleStartDryRunning(-1)) {
389                 debugError("Could not put SP %p into the dry-running state\n", *it);
390                 return false;
391             }
392         } else {
393             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
394         }
395     }
396     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
397     // wait for the syncsource to start running.
398     // that will block the waitForPeriod call until everyone has started (theoretically)
399     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_DRYRUN; // by then it should have started
400     bool all_dry_running = false;
401     while (!all_dry_running && cnt) {
402         all_dry_running = true;
403         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
404                 it != m_ReceiveProcessors.end();
405                 ++it ) {
406             all_dry_running &= (*it)->isDryRunning();
407         }
408         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
409                 it != m_TransmitProcessors.end();
410                 ++it ) {
411             all_dry_running &= (*it)->isDryRunning();
412         }
413
414         SleepRelativeUsec(125);
415         cnt--;
416     }
417     if(cnt==0) {
418         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
419         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
420                 it != m_ReceiveProcessors.end();
421                 ++it ) {
422             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
423                 (*it)->getTypeString(), *it, (*it)->getStateString());
424         }
425         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
426                 it != m_TransmitProcessors.end();
427                 ++it ) {
428             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
429                 (*it)->getTypeString(), *it, (*it)->getStateString());
430         }
431         return false;
432     }
433     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
434     return true;
435 }
436
437 bool StreamProcessorManager::syncStartAll() {
438     if(m_SyncSource == NULL) return false;
439
440     // get the options
441     int signal_delay_ticks = STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS;
442     int sync_wait_time_msec = STREAMPROCESSORMANAGER_SYNC_WAIT_TIME_MSEC;
443     int cycles_for_startup = STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP;
444     int prestart_cycles_for_xmit = STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_XMIT;
445     int prestart_cycles_for_recv = STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_RECV;
446     Util::Configuration &config = m_parent.getConfiguration();
447     config.getValueForSetting("streaming.spm.signal_delay_ticks", signal_delay_ticks);
448     config.getValueForSetting("streaming.spm.sync_wait_time_msec", sync_wait_time_msec);
449     config.getValueForSetting("streaming.spm.cycles_for_startup", cycles_for_startup);
450     config.getValueForSetting("streaming.spm.prestart_cycles_for_xmit", prestart_cycles_for_xmit);
451     config.getValueForSetting("streaming.spm.prestart_cycles_for_recv", prestart_cycles_for_recv);
452
453     // figure out when to get the SP's running.
454     // the xmit SP's should also know the base timestamp
455     // streams should be aligned here
456
457     // now find out how long we have to delay the wait operation such that
458     // the received frames will all be presented to the SP
459     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
460     int max_of_min_delay = 0;
461     int min_delay = 0;
462     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
463             it != m_ReceiveProcessors.end();
464             ++it ) {
465         min_delay = (*it)->getMaxFrameLatency();
466         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
467     }
468
469     // add some processing margin. This only shifts the time
470     // at which the buffer is transfer()'ed. This makes things somewhat
471     // more robust. It should be noted though that shifting the transfer
472     // time to a later time instant also causes the xmit buffer fill to be
473     // lower on average.
474     max_of_min_delay += signal_delay_ticks;
475
476     m_SyncSource->setSyncDelay(max_of_min_delay);
477     unsigned int syncdelay = m_SyncSource->getSyncDelay();
478     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d => %d ticks (%03us %04uc %04ut)...\n",
479         max_of_min_delay, syncdelay,
480         (unsigned int)TICKS_TO_SECS(syncdelay),
481         (unsigned int)TICKS_TO_CYCLES(syncdelay),
482         (unsigned int)TICKS_TO_OFFSET(syncdelay));
483
484     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
485     //        have aquired lock
486     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
487     //sleep(2); // FIXME: be smarter here
488
489     // make sure that we are dry-running long enough for the
490     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
491     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
492
493     unsigned int nb_sync_runs = (sync_wait_time_msec * getNominalRate());
494     nb_sync_runs /= 1000;
495     nb_sync_runs /= getPeriodSize();
496
497     int64_t time_till_next_period;
498     while(nb_sync_runs--) { // or while not sync-ed?
499         // check if we were woken up too soon
500         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
501         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
502         if(time_till_next_period > 0) {
503             // wait for the period
504             SleepRelativeUsec(time_till_next_period);
505         }
506     }
507
508     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
509     // FIXME: in the SPM it would be nice to have system time instead of
510     //        1394 time
511
512     // we now should have decent sync info on the sync source
513     // determine a point in time where the system should start
514     // figure out where we are now
515     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
516     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
517         time_of_first_sample,
518         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
519         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
520         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
521
522     // start wet-running in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles
523     // this is the time window we have to setup all SP's such that they
524     // can start wet-running correctly.
525     time_of_first_sample = addTicks(time_of_first_sample,
526                                     cycles_for_startup * TICKS_PER_CYCLE);
527
528     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
529         time_of_first_sample,
530         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
531         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
532         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
533
534     // we should start wet-running the transmit SP's some cycles in advance
535     // such that we know it is wet-running when it should output its first sample
536     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
537                                                  prestart_cycles_for_xmit * TICKS_PER_CYCLE);
538
539     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
540                                                  prestart_cycles_for_recv * TICKS_PER_CYCLE);
541     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
542         time_to_start_xmit,
543         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
544         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
545         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
546     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
547         time_to_start_recv,
548         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
549         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
550         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
551
552     // at this point the buffer head timestamp of the transmit buffers can be set
553     // this is the presentation time of the first sample in the buffer
554     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
555           it != m_TransmitProcessors.end();
556           ++it ) {
557         (*it)->setBufferHeadTimestamp(time_of_first_sample);
558     }
559
560     // switch syncsource to running state
561     uint64_t time_to_start_sync;
562     // FIXME: this is most likely not going to work for transmit sync sources
563     // but those are unsupported in this version
564     if(m_SyncSource->getType() == StreamProcessor::ePT_Receive ) {
565         time_to_start_sync = time_to_start_recv;
566     } else {
567         time_to_start_sync = time_to_start_xmit;
568     }
569     if(!m_SyncSource->scheduleStartRunning(time_to_start_sync)) {
570         debugError("m_SyncSource->scheduleStartRunning(%11llu) failed\n", time_to_start_sync);
571         return false;
572     }
573
574     // STEP X: switch all non-syncsource SP's over to the running state
575     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
576           it != m_ReceiveProcessors.end();
577           ++it ) {
578         if(*it != m_SyncSource) {
579             if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
580                 debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
581                 return false;
582             }
583         }
584     }
585     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
586           it != m_TransmitProcessors.end();
587           ++it ) {
588         if(*it != m_SyncSource) {
589             if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
590                 debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
591                 return false;
592             }
593         }
594     }
595     // wait for the syncsource to start running.
596     // that will block the waitForPeriod call until everyone has started (theoretically)
597     // note: the SP's are scheduled to start in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles,
598     // so a 20 times this value should be a good timeout
599     int cnt = cycles_for_startup * 20; // by then it should have started
600     while (!m_SyncSource->isRunning() && cnt) {
601         SleepRelativeUsec(125);
602         cnt--;
603     }
604     if(cnt==0) {
605         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
606         return false;
607     }
608
609     // the sync source is running, we can now read a decent received timestamp from it
610     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
611
612     // and a (still very rough) approximation of the rate
613     float rate = m_SyncSource->getTicksPerFrame();
614     int64_t delay_in_ticks=(int64_t)(((float)((m_nb_buffers-1) * m_period)) * rate);
615     // also add the sync delay
616     delay_in_ticks += m_SyncSource->getSyncDelay();
617     debugOutput( DEBUG_LEVEL_VERBOSE, "  initial time of transfer %010lld, rate %f...\n",
618                 m_time_of_transfer, rate);
619
620     // then use this information to initialize the xmit handlers
621
622     //  we now set the buffer tail timestamp of the transmit buffer
623     //  to the period transfer time instant plus what's nb_buffers - 1
624     //  in ticks. This due to the fact that we (should) have received one period
625     //  worth of ticks at t=m_time_of_transfer
626     //  hence one period of frames should also have been transmitted, which means
627     //  that there should be (nb_buffers - 1) * periodsize of frames in the xmit buffer
628     //  that allows us to calculate the tail timestamp for the buffer.
629
630     int64_t transmit_tail_timestamp = addTicks(m_time_of_transfer, delay_in_ticks);
631
632     debugOutput( DEBUG_LEVEL_VERBOSE, "  preset transmit tail TS %010lld, rate %f...\n",
633                 transmit_tail_timestamp, rate);
634
635     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
636         it != m_TransmitProcessors.end();
637         ++it ) {
638         (*it)->setBufferTailTimestamp(transmit_tail_timestamp);
639         (*it)->setTicksPerFrame(rate);
640     }
641
642     // align the received streams to be phase aligned
643     if(!alignReceivedStreams()) {
644         debugError("Could not align streams...\n");
645         return false;
646     }
647
648     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
649     return true;
650 }
651
652 bool
653 StreamProcessorManager::alignReceivedStreams()
654 {
655     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
656     unsigned int nb_sync_runs;
657     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
658     int64_t diff_between_streams[nb_rcv_sp];
659     int64_t diff;
660
661     unsigned int i;
662
663     int cnt = STREAMPROCESSORMANAGER_NB_ALIGN_TRIES;
664     int align_average_time_msec = STREAMPROCESSORMANAGER_ALIGN_AVERAGE_TIME_MSEC;
665     Util::Configuration &config = m_parent.getConfiguration();
666     config.getValueForSetting("streaming.spm.align_tries", cnt);
667     config.getValueForSetting("streaming.spm.align_average_time_msec", align_average_time_msec);
668
669     unsigned int periods_per_align_try = (align_average_time_msec * getNominalRate());
670     periods_per_align_try /= 1000;
671     periods_per_align_try /= getPeriodSize();
672     debugOutput( DEBUG_LEVEL_VERBOSE, " averaging over %u periods...\n", periods_per_align_try);
673
674     bool aligned = false;
675     while (!aligned && cnt--) {
676         nb_sync_runs = periods_per_align_try;
677         while(nb_sync_runs) {
678             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
679             if(!waitForPeriod()) {
680                 debugWarning("xrun while aligning streams...\n");
681                 return false;
682             }
683
684             // before we do anything else, transfer
685             if(!transferSilence()) {
686                 debugError("Could not transfer silence\n");
687                 return false;
688             }
689
690             // now calculate the stream offset
691             i = 0;
692             for ( i = 0; i < nb_rcv_sp; i++) {
693                 StreamProcessor *s = m_ReceiveProcessors.at(i);
694                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
695                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",
696                     m_SyncSource, s, diff);
697                 if ( nb_sync_runs == periods_per_align_try ) {
698                     diff_between_streams[i] = diff;
699                 } else {
700                     diff_between_streams[i] += diff;
701                 }
702             }
703
704             nb_sync_runs--;
705         }
706         // calculate the average offsets
707         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
708         int diff_between_streams_frames[nb_rcv_sp];
709         aligned = true;
710         for ( i = 0; i < nb_rcv_sp; i++) {
711             StreamProcessor *s = m_ReceiveProcessors.at(i);
712
713             diff_between_streams[i] /= periods_per_align_try;
714             diff_between_streams_frames[i] = (int)roundf(diff_between_streams[i] / s->getTicksPerFrame());
715             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",
716                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
717
718             aligned &= (diff_between_streams_frames[i] == 0);
719
720             // reposition the stream
721             if(!s->shiftStream(diff_between_streams_frames[i])) {
722                 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]);
723                 return false;
724             }
725         }
726         if (!aligned) {
727             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
728         }
729     }
730     if (cnt == 0) {
731         debugError("Align failed\n");
732         return false;
733     }
734     return true;
735 }
736
737 bool StreamProcessorManager::start() {
738     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
739
740     // start all SP's synchonized
741     bool start_result = false;
742     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
743         // put all SP's into dry-running state
744         if (!startDryRunning()) {
745             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
746             start_result = false;
747             continue;
748         }
749
750         start_result = syncStartAll();
751         if(start_result) {
752             break;
753         } else {
754             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
755             if(m_shutdown_needed) {
756                 debugOutput(DEBUG_LEVEL_VERBOSE, "Some fatal error occurred, stop trying.\n");
757                 return false;
758             }
759         }
760     }
761     if (!start_result) {
762         debugFatal("Could not syncStartAll...\n");
763         return false;
764     }
765     debugOutput( DEBUG_LEVEL_VERBOSE, " Started...\n");
766     return true;
767 }
768
769 bool StreamProcessorManager::stop() {
770     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
771
772     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
773     // switch SP's over to the dry-running state
774     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
775           it != m_ReceiveProcessors.end();
776           ++it ) {
777         if((*it)->isRunning()) {
778             if(!(*it)->scheduleStopRunning(-1)) {
779                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
780                 return false;
781             }
782         }
783     }
784     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
785           it != m_TransmitProcessors.end();
786           ++it ) {
787         if((*it)->isRunning()) {
788             if(!(*it)->scheduleStopRunning(-1)) {
789                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
790                 return false;
791             }
792         }
793     }
794     // wait for the SP's to get into the dry-running/stopped state
795     int cnt = 8000;
796     bool ready = false;
797     while (!ready && cnt) {
798         ready = true;
799         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
800             it != m_ReceiveProcessors.end();
801             ++it ) {
802             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream() || (*it)->inError());
803         }
804         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
805             it != m_TransmitProcessors.end();
806             ++it ) {
807             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream() || (*it)->inError());
808         }
809         SleepRelativeUsec(125);
810         cnt--;
811     }
812     if(cnt==0) {
813         debugWarning(" Timeout waiting for the SP's to start dry-running\n");
814         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
815             it != m_ReceiveProcessors.end();
816             ++it ) {
817             (*it)->dumpInfo();
818         }
819         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
820             it != m_TransmitProcessors.end();
821             ++it ) {
822             (*it)->dumpInfo();
823         }
824         return false;
825     }
826
827     // switch SP's over to the stopped state
828     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
829           it != m_ReceiveProcessors.end();
830           ++it ) {
831         if ((*it)->inError()) {
832             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
833         } else if(!(*it)->scheduleStopDryRunning(-1)) {
834             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
835             return false;
836         }
837     }
838     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
839           it != m_TransmitProcessors.end();
840           ++it ) {
841         if ((*it)->inError()) {
842             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
843         } else if(!(*it)->scheduleStopDryRunning(-1)) {
844             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
845             return false;
846         }
847     }
848     // wait for the SP's to get into the stopped state
849     cnt = 8000;
850     ready = false;
851     while (!ready && cnt) {
852         ready = true;
853         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
854             it != m_ReceiveProcessors.end();
855             ++it ) {
856             ready &= ((*it)->isStopped() || (*it)->inError());
857         }
858         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
859             it != m_TransmitProcessors.end();
860             ++it ) {
861             ready &= ((*it)->isStopped() || (*it)->inError());
862         }
863         SleepRelativeUsec(125);
864         cnt--;
865     }
866     if(cnt==0) {
867         debugWarning(" Timeout waiting for the SP's to stop\n");
868         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
869             it != m_ReceiveProcessors.end();
870             ++it ) {
871             (*it)->dumpInfo();
872         }
873         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
874             it != m_TransmitProcessors.end();
875             ++it ) {
876             (*it)->dumpInfo();
877         }
878         return false;
879     }
880     debugOutput( DEBUG_LEVEL_VERBOSE, " Stopped...\n");
881     return true;
882 }
883
884 /**
885  * Called upon Xrun events. This brings all StreamProcessors back
886  * into their starting state, and then carries on streaming. This should
887  * have the same effect as restarting the whole thing.
888  *
889  * @return true if successful, false otherwise
890  */
891 bool StreamProcessorManager::handleXrun() {
892
893     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
894
895     dumpInfo();
896
897     /*
898      * Reset means:
899      * 1) Disabling the SP's, so that they don't process any packets
900      *    note: the isomanager does keep on delivering/requesting them
901      * 2) Bringing all buffers & streamprocessors into a know state
902      *    - Clear all capture buffers
903      *    - Put nb_periods*period_size of null frames into the playback buffers
904      * 3) Re-enable the SP's
905      */
906
907     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
908     // start all SP's synchonized
909     bool start_result = false;
910     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
911         if(m_shutdown_needed) {
912             debugOutput(DEBUG_LEVEL_VERBOSE, "Shutdown requested...\n");
913             return true;
914         }
915         // put all SP's into dry-running state
916         if (!startDryRunning()) {
917             debugShowBackLog();
918             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
919             start_result = false;
920             continue;
921         }
922
923         start_result = syncStartAll();
924         if(start_result) {
925             break;
926         } else {
927             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
928         }
929     }
930     if (!start_result) {
931         debugFatal("Could not syncStartAll...\n");
932         return false;
933     }
934     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
935
936     return true;
937 }
938
939 /**
940  * @brief Waits until the next period of samples is ready
941  *
942  * This function does not return until a full period of samples is (or should be)
943  * ready to be transferred.
944  *
945  * @return true if the period is ready, false if not
946  */
947 bool StreamProcessorManager::waitForPeriod() {
948     if(m_SyncSource == NULL) return false;
949     if(m_shutdown_needed) return false;
950     bool xrun_occurred = false;
951     bool in_error = false;
952
953     // grab the wait lock
954     // this ensures that bus reset handling doesn't interfere
955     Util::MutexLockHelper lock(*m_WaitLock);
956     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
957                         "waiting for period (%d frames in buffer)...\n",
958                         m_SyncSource->getBufferFill());
959     uint64_t ticks_at_period = m_SyncSource->getTimeAtPeriod();
960     uint64_t ticks_at_period_margin = ticks_at_period + m_SyncSource->getSyncDelay();
961     uint64_t pred_system_time_at_xfer = m_SyncSource->getParent().get1394Service().getSystemTimeForCycleTimerTicks(ticks_at_period_margin);
962
963     #ifdef DEBUG
964     int64_t now = Util::SystemTimeSource::getCurrentTime();
965     debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "pred: %lld, now: %lld, wait: %lld\n", pred_system_time_at_xfer, now, pred_system_time_at_xfer-now );
966     #endif
967
968     // wait until it's time to transfer
969     Util::SystemTimeSource::SleepUsecAbsolute(pred_system_time_at_xfer);
970
971     #ifdef DEBUG
972     now = Util::SystemTimeSource::getCurrentTime();
973     debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "pred: %lld now: %lld, excess: %lld\n", pred_system_time_at_xfer, now, now-pred_system_time_at_xfer );
974     #endif
975
976     // the period should be ready now
977
978     #if STREAMPROCESSORMANAGER_ALLOW_DELAYED_PERIOD_SIGNAL
979     // HACK: we force wait until every SP is ready. this is needed
980     // since the raw1394 interface provides no control over interrupts
981     // resulting in very bad predictability on when the data is present.
982     bool period_not_ready = true;
983     while(period_not_ready) {
984         period_not_ready = false;
985         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
986             it != m_ReceiveProcessors.end();
987             ++it ) {
988             bool this_sp_period_ready = (*it)->canConsumePeriod();
989             if (!this_sp_period_ready) {
990                 period_not_ready = true;
991             }
992         }
993         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
994             it != m_TransmitProcessors.end();
995             ++it ) {
996             bool this_sp_period_ready = (*it)->canProducePeriod();
997             if (!this_sp_period_ready) {
998                 period_not_ready = true;
999             }
1000         }
1001
1002         if (period_not_ready) {
1003             debugOutput(DEBUG_LEVEL_VERBOSE, " wait extended since period not ready...\n");
1004             Util::SystemTimeSource::SleepUsecRelative(125); // one cycle
1005         }
1006
1007         // check for underruns/errors on the ISO side,
1008         // those should make us bail out of the wait loop
1009         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1010             it != m_ReceiveProcessors.end();
1011             ++it ) {
1012             // a xrun has occurred on the Iso side
1013             xrun_occurred |= (*it)->xrunOccurred();
1014             in_error |= (*it)->inError();
1015         }
1016         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1017             it != m_TransmitProcessors.end();
1018             ++it ) {
1019             // a xrun has occurred on the Iso side
1020             xrun_occurred |= (*it)->xrunOccurred();
1021             in_error |= (*it)->inError();
1022         }
1023         if(xrun_occurred | in_error | m_shutdown_needed) break;
1024     }
1025     #else
1026     // check for underruns/errors on the ISO side,
1027     // those should make us bail out of the wait loop
1028     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1029         it != m_ReceiveProcessors.end();
1030         ++it ) {
1031         // xrun on data buffer side
1032         if (!(*it)->canConsumePeriod()) {
1033             xrun_occurred = true;
1034         }
1035         // a xrun has occurred on the Iso side
1036         xrun_occurred |= (*it)->xrunOccurred();
1037         in_error |= (*it)->inError();
1038     }
1039     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1040         it != m_TransmitProcessors.end();
1041         ++it ) {
1042         // xrun on data buffer side
1043         if (!(*it)->canProducePeriod()) {
1044             xrun_occurred = true;
1045         }
1046         // a xrun has occurred on the Iso side
1047         xrun_occurred |= (*it)->xrunOccurred();
1048         in_error |= (*it)->inError();
1049     }
1050     #endif
1051
1052     if(xrun_occurred) {
1053         debugOutput( DEBUG_LEVEL_VERBOSE, "exit due to xrun...\n");
1054     }
1055     if(in_error) {
1056         debugOutput( DEBUG_LEVEL_VERBOSE, "exit due to error...\n");
1057         m_shutdown_needed = true;
1058     }
1059
1060     // we save the 'ideal' time of the transfer at this point,
1061     // because we can have interleaved read - process - write
1062     // cycles making that we modify a receiving stream's buffer
1063     // before we get to writing.
1064     // NOTE: before waitForPeriod() is called again, both the transmit
1065     //       and the receive processors should have done their transfer.
1066     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
1067    
1068     #ifdef DEBUG
1069     static uint64_t m_time_of_transfer2 = m_time_of_transfer;
1070    
1071     int ticks_per_period = (int)(m_SyncSource->getTicksPerFrame() * m_period);
1072     int diff=diffTicks(m_time_of_transfer, m_time_of_transfer2);
1073     // display message if the difference between two successive tick
1074     // values is more than 50 ticks. 1 sample at 48k is 512 ticks
1075     // so 50 ticks = 10%, which is a rather large jitter value.
1076     if(diff-ticks_per_period > 50 || diff-ticks_per_period < -50) {
1077         debugOutput(DEBUG_LEVEL_VERBOSE, "rather large TSP difference TS=%011llu => TS=%011llu (%d, nom %d)\n",
1078                                             m_time_of_transfer2, m_time_of_transfer, diff, ticks_per_period);
1079     }
1080     m_time_of_transfer2 = m_time_of_transfer;
1081     #endif
1082
1083     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1084                         "transfer period %d at %llu ticks...\n",
1085                         m_nbperiods, m_time_of_transfer);
1086
1087     // this is to notify the client of the delay that we introduced by waiting
1088     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
1089     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1090                         "delayed for %d usecs...\n",
1091                         m_delayed_usecs);
1092
1093 #ifdef DEBUG
1094     int rcv_bf=0, xmt_bf=0;
1095     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1096         it != m_ReceiveProcessors.end();
1097         ++it ) {
1098         rcv_bf = (*it)->getBufferFill();
1099     }
1100     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1101         it != m_TransmitProcessors.end();
1102         ++it ) {
1103         xmt_bf = (*it)->getBufferFill();
1104     }
1105     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
1106                         "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
1107                         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
1108
1109     // check if xruns occurred on the Iso side.
1110     // also check if xruns will occur should we transfer() now
1111     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1112           it != m_ReceiveProcessors.end();
1113           ++it ) {
1114
1115         if ((*it)->xrunOccurred()) {
1116             debugOutput(DEBUG_LEVEL_NORMAL,
1117                         "Xrun on RECV SP %p due to ISO side xrun\n", *it);
1118             (*it)->dumpInfo();
1119         }
1120         if (!((*it)->canClientTransferFrames(m_period))) {
1121             debugOutput(DEBUG_LEVEL_NORMAL,
1122                         "Xrun on RECV SP %p due to buffer side xrun\n", *it);
1123             (*it)->dumpInfo();
1124         }
1125     }
1126     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1127           it != m_TransmitProcessors.end();
1128           ++it ) {
1129         if ((*it)->xrunOccurred()) {
1130             debugOutput(DEBUG_LEVEL_NORMAL,
1131                         "Xrun on XMIT SP %p due to ISO side xrun\n", *it);
1132         }
1133         if (!((*it)->canClientTransferFrames(m_period))) {
1134             debugOutput(DEBUG_LEVEL_NORMAL,
1135                         "Xrun on XMIT SP %p due to buffer side xrun\n", *it);
1136         }
1137     }
1138 #endif
1139     m_nbperiods++;
1140     // now we can signal the client that we are (should be) ready
1141     return !xrun_occurred;
1142 }
1143
1144 /**
1145  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
1146  *
1147  * Transfers one period of frames from the client side to the Iso side and vice versa.
1148  *
1149  * @return true if successful, false otherwise (indicates xrun).
1150  */
1151 bool StreamProcessorManager::transfer() {
1152     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
1153     bool retval=true;
1154     retval &= transfer(StreamProcessor::ePT_Receive);
1155     retval &= transfer(StreamProcessor::ePT_Transmit);
1156     return retval;
1157 }
1158
1159 /**
1160  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
1161  *
1162  * Transfers one period of frames from the client side to the Iso side or vice versa.
1163  *
1164  * @param t The processor type to tranfer for (receive or transmit)
1165  * @return true if successful, false otherwise (indicates xrun).
1166  */
1167 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
1168     if(m_SyncSource == NULL) return false;
1169     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
1170         "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
1171         t, m_time_of_transfer,
1172         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
1173         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
1174         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
1175
1176     bool retval = true;
1177     // a static cast could make sure that there is no performance
1178     // penalty for the virtual functions (to be checked)
1179     if (t==StreamProcessor::ePT_Receive) {
1180         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1181                 it != m_ReceiveProcessors.end();
1182                 ++it ) {
1183             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
1184                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
1185                             m_period, m_time_of_transfer,*it);
1186                 retval &= false; // buffer underrun
1187             }
1188         }
1189     } else {
1190         // FIXME: in the SPM it would be nice to have system time instead of
1191         //        1394 time
1192         float rate = m_SyncSource->getTicksPerFrame();
1193         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)((m_nb_buffers * m_period))) * rate);
1194
1195         // the data we are putting into the buffer is intended to be transmitted
1196         // one ringbuffer size after it has been received
1197
1198         // we also add one syncdelay as a safety margin, since that's the amount of time we can get
1199         // postponed.
1200         int syncdelay = m_SyncSource->getSyncDelay();
1201         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks + syncdelay);
1202
1203         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1204                 it != m_TransmitProcessors.end();
1205                 ++it ) {
1206             // FIXME: in the SPM it would be nice to have system time instead of
1207             //        1394 time
1208             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
1209                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
1210                         m_period, transmit_timestamp, *it);
1211                 retval &= false; // buffer underrun
1212             }
1213         }
1214     }
1215     return retval;
1216 }
1217
1218 /**
1219  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
1220  *
1221  * Transfers one period of silence to the Iso side for transmit SP's
1222  * or dump one period of frames for receive SP's
1223  *
1224  * @return true if successful, false otherwise (indicates xrun).
1225  */
1226 bool StreamProcessorManager::transferSilence() {
1227     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
1228     bool retval=true;
1229     // NOTE: the order here is opposite from the order in
1230     // normal operation (transmit is before receive), because
1231     // we can do that here (data=silence=available) and
1232     // it increases reliability (esp. on startup)
1233     retval &= transferSilence(StreamProcessor::ePT_Transmit);
1234     retval &= transferSilence(StreamProcessor::ePT_Receive);
1235     return retval;
1236 }
1237
1238 /**
1239  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
1240  *
1241  * Transfers one period of silence to the Iso side for transmit SP's
1242  * or dump one period of frames for receive SP's
1243  *
1244  * @param t The processor type to tranfer for (receive or transmit)
1245  * @return true if successful, false otherwise (indicates xrun).
1246  */
1247 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
1248     if(m_SyncSource == NULL) return false;
1249     debugOutput( DEBUG_LEVEL_VERY_VERBOSE,
1250         "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
1251         t, m_time_of_transfer,
1252         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
1253         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
1254         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
1255
1256     bool retval = true;
1257     // a static cast could make sure that there is no performance
1258     // penalty for the virtual functions (to be checked)
1259     if (t==StreamProcessor::ePT_Receive) {
1260         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1261                 it != m_ReceiveProcessors.end();
1262                 ++it ) {
1263             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
1264                     debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n",
1265                             m_period, m_time_of_transfer,*it);
1266                 retval &= false; // buffer underrun
1267             }
1268         }
1269     } else {
1270         // FIXME: in the SPM it would be nice to have system time instead of
1271         //        1394 time
1272         float rate = m_SyncSource->getTicksPerFrame();
1273         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
1274
1275         // the data we are putting into the buffer is intended to be transmitted
1276         // one ringbuffer size after it has been received
1277         // we also add one syncdelay as a safety margin, since that's the amount of time we can get
1278         // postponed.
1279         int syncdelay = m_SyncSource->getSyncDelay();
1280         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks + syncdelay);
1281
1282         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1283                 it != m_TransmitProcessors.end();
1284                 ++it ) {
1285             // FIXME: in the SPM it would be nice to have system time instead of
1286             //        1394 time
1287             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
1288                 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
1289                         m_period, transmit_timestamp, *it);
1290                 retval &= false; // buffer underrun
1291             }
1292         }
1293     }
1294     return retval;
1295 }
1296
1297 void StreamProcessorManager::dumpInfo() {
1298     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1299     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
1300     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
1301     debugOutputShort( DEBUG_LEVEL_NORMAL, "Data type: %s\n", (m_audio_datatype==eADT_Float?"float":"int24"));
1302
1303     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
1304     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1305         it != m_ReceiveProcessors.end();
1306         ++it ) {
1307         (*it)->dumpInfo();
1308     }
1309
1310     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
1311     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1312         it != m_TransmitProcessors.end();
1313         ++it ) {
1314         (*it)->dumpInfo();
1315     }
1316
1317     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1318
1319 }
1320
1321 void StreamProcessorManager::setVerboseLevel(int l) {
1322     if(m_WaitLock) m_WaitLock->setVerboseLevel(l);
1323
1324     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1325         it != m_ReceiveProcessors.end();
1326         ++it ) {
1327         (*it)->setVerboseLevel(l);
1328     }
1329     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1330         it != m_TransmitProcessors.end();
1331         ++it ) {
1332         (*it)->setVerboseLevel(l);
1333     }
1334     setDebugLevel(l);
1335     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting verbose level to %d...\n", l );
1336 }
1337
1338 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1339     int count=0;
1340
1341     if (direction == Port::E_Capture) {
1342         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1343             it != m_ReceiveProcessors.end();
1344             ++it ) {
1345             count += (*it)->getPortCount(type);
1346         }
1347     } else {
1348         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1349             it != m_TransmitProcessors.end();
1350             ++it ) {
1351             count += (*it)->getPortCount(type);
1352         }
1353     }
1354     return count;
1355 }
1356
1357 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1358     int count=0;
1359
1360     if (direction == Port::E_Capture) {
1361         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1362             it != m_ReceiveProcessors.end();
1363             ++it ) {
1364             count += (*it)->getPortCount();
1365         }
1366     } else {
1367         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1368             it != m_TransmitProcessors.end();
1369             ++it ) {
1370             count += (*it)->getPortCount();
1371         }
1372     }
1373     return count;
1374 }
1375
1376 // TODO: implement a port map here, instead of the loop
1377 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1378     int count=0;
1379     int prevcount=0;
1380
1381     if (direction == Port::E_Capture) {
1382         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1383             it != m_ReceiveProcessors.end();
1384             ++it ) {
1385             count += (*it)->getPortCount();
1386             if (count > idx) {
1387                 return (*it)->getPortAtIdx(idx-prevcount);
1388             }
1389             prevcount=count;
1390         }
1391     } else {
1392         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1393             it != m_TransmitProcessors.end();
1394             ++it ) {
1395             count += (*it)->getPortCount();
1396             if (count > idx) {
1397                 return (*it)->getPortAtIdx(idx-prevcount);
1398             }
1399             prevcount=count;
1400         }
1401     }
1402     return NULL;
1403 }
1404
1405 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1406     m_thread_realtime=rt;
1407     m_thread_priority=priority;
1408     return true;
1409 }
1410
1411
1412 } // end of namespace
Note: See TracBrowser for help on using the browser.