root/trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

Revision 1498, 58.2 kB (checked in by ppalmers, 15 years ago)

Merge all changes from 2.0 branch into trunk (since r1361). This _should_ contain all forward merges done in the mean time. At this moment in time both branches should be in sync.

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "config.h"
25
26 #include "StreamProcessorManager.h"
27 #include "generic/StreamProcessor.h"
28 #include "generic/Port.h"
29 #include "libieee1394/cycletimer.h"
30
31 #include "devicemanager.h"
32
33 #include "libutil/Time.h"
34
35 #include <errno.h>
36 #include <assert.h>
37 #include <math.h>
38
39 namespace Streaming {
40
41 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
42
43 StreamProcessorManager::StreamProcessorManager(DeviceManager &p)
44     : m_is_slave( false )
45     , m_SyncSource(NULL)
46     , m_parent( p )
47     , m_xrun_happened( false )
48     , m_activity_wait_timeout_nsec( 0 ) // dynamically set
49     , m_nb_buffers( 0 )
50     , m_period( 0 )
51     , m_audio_datatype( eADT_Float )
52     , m_nominal_framerate ( 0 )
53     , m_xruns(0)
54     , m_shutdown_needed(false)
55     , m_nbperiods(0)
56     , m_WaitLock( new Util::PosixMutex("SPMWAIT") )
57 {
58     addOption(Util::OptionContainer::Option("slaveMode",false));
59     sem_init(&m_activity_semaphore, 0, 0);
60 }
61
62 StreamProcessorManager::StreamProcessorManager(DeviceManager &p, unsigned int period,
63                                                unsigned int framerate, unsigned int nb_buffers)
64     : m_is_slave( false )
65     , m_SyncSource(NULL)
66     , m_parent( p )
67     , m_xrun_happened( false )
68     , m_activity_wait_timeout_nsec( 0 ) // dynamically set
69     , m_nb_buffers(nb_buffers)
70     , m_period(period)
71     , m_audio_datatype( eADT_Float )
72     , m_nominal_framerate ( framerate )
73     , m_xruns(0)
74     , m_shutdown_needed(false)
75     , m_nbperiods(0)
76     , m_WaitLock( new Util::PosixMutex("SPMWAIT") )
77 {
78     addOption(Util::OptionContainer::Option("slaveMode",false));
79     sem_init(&m_activity_semaphore, 0, 0);
80 }
81
82 StreamProcessorManager::~StreamProcessorManager() {
83     sem_post(&m_activity_semaphore);
84     sem_destroy(&m_activity_semaphore);
85     delete m_WaitLock;
86 }
87
88 // void
89 // StreamProcessorManager::handleBusReset(Ieee1394Service &s)
90 // {
91 // //     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) Handle bus reset on service %p...\n", this, &s);
92 // //
93 // //     bool handled_at_least_one = false;
94 // //     // note that all receive streams are gone once a device is unplugged
95 // //
96 // //     // synchronize with the wait lock
97 // //     Util::MutexLockHelper lock(*m_WaitLock);
98 // //
99 // //     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) got wait lock...\n", this);
100 // //     // cause all SP's to bail out
101 // //     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
102 // //           it != m_ReceiveProcessors.end();
103 // //           ++it )
104 // //     {
105 // //         if(&s == &((*it)->getParent().get1394Service())) {
106 // //             debugOutput(DEBUG_LEVEL_NORMAL,
107 // //                         "issue busreset on receive SPM on channel %d\n",
108 // //                         (*it)->getChannel());
109 // //             (*it)->handleBusReset();
110 // //             handled_at_least_one = true;
111 // //         } else {
112 // //             debugOutput(DEBUG_LEVEL_NORMAL,
113 // //                         "skipping receive SPM on channel %d since not on service %p\n",
114 // //                         (*it)->getChannel(), &s);
115 // //         }
116 // //     }
117 // //     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
118 // //           it != m_TransmitProcessors.end();
119 // //           ++it )
120 // //     {
121 // //         if(&s == &((*it)->getParent().get1394Service())) {
122 // //             debugOutput(DEBUG_LEVEL_NORMAL,
123 // //                         "issue busreset on transmit SPM on channel %d\n",
124 // //                         (*it)->getChannel());
125 // //             (*it)->handleBusReset();
126 // //             handled_at_least_one = true;
127 // //         } else {
128 // //             debugOutput(DEBUG_LEVEL_NORMAL,
129 // //                         "skipping transmit SPM on channel %d since not on service %p\n",
130 // //                         (*it)->getChannel(), &s);
131 // //         }
132 // //     }
133 // //
134 // //     // FIXME: we request shutdown for now.
135 // //     m_shutdown_needed = handled_at_least_one;
136 // }
137
138 void
139 StreamProcessorManager::signalActivity()
140 {
141     sem_post(&m_activity_semaphore);
142     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p activity\n", this);
143 }
144
145 enum StreamProcessorManager::eActivityResult
146 StreamProcessorManager::waitForActivity()
147 {
148     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p waiting for activity\n", this);
149     struct timespec ts;
150     int result;
151
152     if (m_activity_wait_timeout_nsec >= 0) {
153
154         if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
155             debugError("clock_gettime failed\n");
156             return eAR_Error;
157         }
158         ts.tv_nsec += m_activity_wait_timeout_nsec;
159         while(ts.tv_nsec >= 1000000000LL) {
160             ts.tv_sec += 1;
161             ts.tv_nsec -= 1000000000LL;
162         }
163     }
164
165     if (m_activity_wait_timeout_nsec >= 0) {
166         result = sem_timedwait(&m_activity_semaphore, &ts);
167     } else {
168         result = sem_wait(&m_activity_semaphore);
169     }
170
171     if(result != 0) {
172         if (errno == ETIMEDOUT) {
173             debugOutput(DEBUG_LEVEL_VERBOSE,
174                         "(%p) sem_timedwait() timed out (result=%d)\n",
175                         this, result);
176             return eAR_Timeout;
177         } else if (errno == EINTR) {
178             debugOutput(DEBUG_LEVEL_VERBOSE,
179                         "(%p) sem_[timed]wait() interrupted by signal (result=%d)\n",
180                         this, result);
181             return eAR_Interrupted;
182         } else if (errno == EINVAL) {
183             debugError("(%p) sem_[timed]wait error (result=%d errno=EINVAL)\n",
184                         this, result);
185             debugError("(%p) timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n",
186                        this, m_activity_wait_timeout_nsec, ts.tv_sec, ts.tv_nsec);
187             return eAR_Error;
188         } else {
189             debugError("(%p) sem_[timed]wait error (result=%d errno=%d)\n",
190                         this, result, errno);
191             debugError("(%p) timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n",
192                        this, m_activity_wait_timeout_nsec, ts.tv_sec, ts.tv_nsec);
193             return eAR_Error;
194         }
195     }
196
197     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p got activity\n", this);
198     return eAR_Activity;
199 }
200
201 /**
202  * Registers \ref processor with this manager.
203  *
204  * also registers it with the isohandlermanager
205  *
206  * be sure to call isohandlermanager->init() first!
207  * and be sure that the processors are also ->init()'ed
208  *
209  * @param processor
210  * @return true if successfull
211  */
212 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
213 {
214     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
215     assert(processor);
216     if (processor->getType() == StreamProcessor::ePT_Receive) {
217         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
218         m_ReceiveProcessors.push_back(processor);
219         Util::Functor* f = new Util::MemberFunctor0< StreamProcessorManager*, void (StreamProcessorManager::*)() >
220                     ( this, &StreamProcessorManager::updateShadowLists, false );
221         processor->addPortManagerUpdateHandler(f);
222         updateShadowLists();
223         return true;
224     }
225     if (processor->getType() == StreamProcessor::ePT_Transmit) {
226         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
227         m_TransmitProcessors.push_back(processor);
228         Util::Functor* f = new Util::MemberFunctor0< StreamProcessorManager*, void (StreamProcessorManager::*)() >
229                     ( this, &StreamProcessorManager::updateShadowLists, false );
230         processor->addPortManagerUpdateHandler(f);
231         updateShadowLists();
232         return true;
233     }
234
235     debugFatal("Unsupported processor type!\n");
236     return false;
237 }
238
239 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
240 {
241     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
242     assert(processor);
243
244     if (processor->getType()==StreamProcessor::ePT_Receive) {
245
246         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
247               it != m_ReceiveProcessors.end();
248               ++it )
249         {
250             if ( *it == processor ) {
251                 if (*it == m_SyncSource) {
252                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
253                     m_SyncSource = NULL;
254                 }
255                 m_ReceiveProcessors.erase(it);
256                 // remove the functor
257                 Util::Functor * f = processor->getUpdateHandlerForPtr(this);
258                 if(f) {
259                     processor->remPortManagerUpdateHandler(f);
260                     delete f;
261                 }
262                 updateShadowLists();
263                 return true;
264             }
265         }
266     }
267
268     if (processor->getType()==StreamProcessor::ePT_Transmit) {
269         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
270               it != m_TransmitProcessors.end();
271               ++it )
272         {
273             if ( *it == processor ) {
274                 if (*it == m_SyncSource) {
275                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
276                     m_SyncSource = NULL;
277                 }
278                 m_TransmitProcessors.erase(it);
279                 // remove the functor
280                 Util::Functor * f = processor->getUpdateHandlerForPtr(this);
281                 if(f) {
282                     processor->remPortManagerUpdateHandler(f);
283                     delete f;
284                 }
285                 updateShadowLists();
286                 return true;
287             }
288         }
289     }
290
291     debugFatal("Processor (%p) not found!\n",processor);
292     return false; //not found
293 }
294
295 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
296     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
297     m_SyncSource = s;
298     return true;
299 }
300
301 bool StreamProcessorManager::prepare() {
302
303     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
304     m_is_slave=false;
305     if(!getOption("slaveMode", m_is_slave)) {
306         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
307     }
308
309     m_shutdown_needed=false;
310
311     // if no sync source is set, select one here
312     if(m_SyncSource == NULL) {
313        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
314     }
315
316     // FIXME: put into separate method
317     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
318           it != m_ReceiveProcessors.end();
319           ++it )
320     {
321         if(m_SyncSource == NULL) {
322             debugWarning(" => Sync Source is %p.\n", *it);
323             m_SyncSource = *it;
324         }
325     }
326     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
327           it != m_TransmitProcessors.end();
328           ++it )
329     {
330         if(m_SyncSource == NULL) {
331             debugWarning(" => Sync Source is %p.\n", *it);
332             m_SyncSource = *it;
333         }
334     }
335
336     // now do the actual preparation of the SP's
337     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
338     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
339         it != m_ReceiveProcessors.end();
340         ++it ) {
341
342         if(!(*it)->setOption("slaveMode", m_is_slave)) {
343             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
344         }
345
346         if(!(*it)->prepare()) {
347             debugFatal(  " could not prepare (%p)...\n",(*it));
348             return false;
349         }
350     }
351     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
352     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
353         it != m_TransmitProcessors.end();
354         ++it ) {
355         if(!(*it)->setOption("slaveMode", m_is_slave)) {
356             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
357         }
358         if(!(*it)->prepare()) {
359             debugFatal( " could not prepare (%p)...\n",(*it));
360             return false;
361         }
362     }
363
364     // if there are no stream processors registered,
365     // fail
366     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
367         debugFatal("No stream processors registered, can't do anything usefull\n");
368         return false;
369     }
370
371     // set the activity timeout value to two periods worth of usecs.
372     // since we can expect activity once every period, but we might have some
373     // offset, the safe value is two periods.
374     int timeout_usec = 2*1000LL * 1000LL * m_period / m_nominal_framerate;
375     debugOutput(DEBUG_LEVEL_VERBOSE, "setting activity timeout to %d\n", timeout_usec);
376     setActivityWaitTimeoutUsec(timeout_usec);
377
378     updateShadowLists();
379
380     return true;
381 }
382
383 bool
384 StreamProcessorManager::startDryRunning()
385 {
386     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
387     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
388     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
389             it != m_TransmitProcessors.end();
390             ++it ) {
391         if ((*it)->inError()) {
392             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
393             return false;
394         }
395         if (!(*it)->isDryRunning()) {
396             if(!(*it)->scheduleStartDryRunning(-1)) {
397                 debugError("Could not put SP %p into the dry-running state\n", *it);
398                 return false;
399             }
400         } else {
401             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
402         }
403     }
404     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
405             it != m_ReceiveProcessors.end();
406             ++it ) {
407         if ((*it)->inError()) {
408             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
409             return false;
410         }
411         if (!(*it)->isDryRunning()) {
412             if(!(*it)->scheduleStartDryRunning(-1)) {
413                 debugError("Could not put SP %p into the dry-running state\n", *it);
414                 return false;
415             }
416         } else {
417             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
418         }
419     }
420     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
421     // wait for the syncsource to start running.
422     // that will block the waitForPeriod call until everyone has started (theoretically)
423     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_DRYRUN; // by then it should have started
424     bool all_dry_running = false;
425     while (!all_dry_running && cnt) {
426         all_dry_running = true;
427         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
428                 it != m_ReceiveProcessors.end();
429                 ++it ) {
430             all_dry_running &= (*it)->isDryRunning();
431         }
432         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
433                 it != m_TransmitProcessors.end();
434                 ++it ) {
435             all_dry_running &= (*it)->isDryRunning();
436         }
437
438         SleepRelativeUsec(125);
439         cnt--;
440     }
441     if(cnt==0) {
442         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
443         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
444                 it != m_ReceiveProcessors.end();
445                 ++it ) {
446             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
447                 (*it)->getTypeString(), *it, (*it)->getStateString());
448         }
449         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
450                 it != m_TransmitProcessors.end();
451                 ++it ) {
452             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
453                 (*it)->getTypeString(), *it, (*it)->getStateString());
454         }
455         return false;
456     }
457     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
458     return true;
459 }
460
461 bool StreamProcessorManager::syncStartAll() {
462     if(m_SyncSource == NULL) return false;
463
464     // get the options
465     int signal_delay_ticks = STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS;
466     int sync_wait_time_msec = STREAMPROCESSORMANAGER_SYNC_WAIT_TIME_MSEC;
467     int cycles_for_startup = STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP;
468     int prestart_cycles_for_xmit = STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_XMIT;
469     int prestart_cycles_for_recv = STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_RECV;
470     Util::Configuration &config = m_parent.getConfiguration();
471     config.getValueForSetting("streaming.spm.signal_delay_ticks", signal_delay_ticks);
472     config.getValueForSetting("streaming.spm.sync_wait_time_msec", sync_wait_time_msec);
473     config.getValueForSetting("streaming.spm.cycles_for_startup", cycles_for_startup);
474     config.getValueForSetting("streaming.spm.prestart_cycles_for_xmit", prestart_cycles_for_xmit);
475     config.getValueForSetting("streaming.spm.prestart_cycles_for_recv", prestart_cycles_for_recv);
476
477     // figure out when to get the SP's running.
478     // the xmit SP's should also know the base timestamp
479     // streams should be aligned here
480
481     // now find out how long we have to delay the wait operation such that
482     // the received frames will all be presented to the SP
483     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
484     int max_of_min_delay = 0;
485     int min_delay = 0;
486     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
487             it != m_ReceiveProcessors.end();
488             ++it ) {
489         min_delay = (*it)->getMaxFrameLatency();
490         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
491     }
492
493     // add some processing margin. This only shifts the time
494     // at which the buffer is transfer()'ed. This makes things somewhat
495     // more robust. It should be noted though that shifting the transfer
496     // time to a later time instant also causes the xmit buffer fill to be
497     // lower on average.
498     max_of_min_delay += signal_delay_ticks;
499
500     m_SyncSource->setSyncDelay(max_of_min_delay);
501     unsigned int syncdelay = m_SyncSource->getSyncDelay();
502     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d => %d ticks (%03us %04uc %04ut)...\n",
503         max_of_min_delay, syncdelay,
504         (unsigned int)TICKS_TO_SECS(syncdelay),
505         (unsigned int)TICKS_TO_CYCLES(syncdelay),
506         (unsigned int)TICKS_TO_OFFSET(syncdelay));
507
508     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
509     //        have aquired lock
510     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
511     //sleep(2); // FIXME: be smarter here
512
513     // make sure that we are dry-running long enough for the
514     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
515     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
516
517     unsigned int nb_sync_runs = (sync_wait_time_msec * getNominalRate());
518     nb_sync_runs /= 1000;
519     nb_sync_runs /= getPeriodSize();
520
521     int64_t time_till_next_period;
522     while(nb_sync_runs--) { // or while not sync-ed?
523         // check if we were woken up too soon
524         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
525         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
526         if(time_till_next_period > 0) {
527             // wait for the period
528             SleepRelativeUsec(time_till_next_period);
529         }
530     }
531
532     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
533     // FIXME: in the SPM it would be nice to have system time instead of
534     //        1394 time
535
536     // we now should have decent sync info on the sync source
537     // determine a point in time where the system should start
538     // figure out where we are now
539     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
540     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
541         time_of_first_sample,
542         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
543         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
544         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
545
546     // start wet-running in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles
547     // this is the time window we have to setup all SP's such that they
548     // can start wet-running correctly.
549     time_of_first_sample = addTicks(time_of_first_sample,
550                                     cycles_for_startup * TICKS_PER_CYCLE);
551
552     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
553         time_of_first_sample,
554         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
555         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
556         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
557
558     // we should start wet-running the transmit SP's some cycles in advance
559     // such that we know it is wet-running when it should output its first sample
560     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
561                                                  prestart_cycles_for_xmit * TICKS_PER_CYCLE);
562
563     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
564                                                  prestart_cycles_for_recv * TICKS_PER_CYCLE);
565     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
566         time_to_start_xmit,
567         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
568         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
569         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
570     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
571         time_to_start_recv,
572         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
573         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
574         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
575
576     // at this point the buffer head timestamp of the transmit buffers can be set
577     // this is the presentation time of the first sample in the buffer
578     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
579           it != m_TransmitProcessors.end();
580           ++it ) {
581         (*it)->setBufferHeadTimestamp(time_of_first_sample);
582     }
583
584     // switch syncsource to running state
585     uint64_t time_to_start_sync;
586     // FIXME: this is most likely not going to work for transmit sync sources
587     // but those are unsupported in this version
588     if(m_SyncSource->getType() == StreamProcessor::ePT_Receive ) {
589         time_to_start_sync = time_to_start_recv;
590     } else {
591         time_to_start_sync = time_to_start_xmit;
592     }
593     if(!m_SyncSource->scheduleStartRunning(time_to_start_sync)) {
594         debugError("m_SyncSource->scheduleStartRunning(%11llu) failed\n", time_to_start_sync);
595         return false;
596     }
597
598     // STEP X: switch all non-syncsource SP's over to the running state
599     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
600           it != m_ReceiveProcessors.end();
601           ++it ) {
602         if(*it != m_SyncSource) {
603             if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
604                 debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
605                 return false;
606             }
607         }
608     }
609     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
610           it != m_TransmitProcessors.end();
611           ++it ) {
612         if(*it != m_SyncSource) {
613             if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
614                 debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
615                 return false;
616             }
617         }
618     }
619     // wait for the syncsource to start running.
620     // that will block the waitForPeriod call until everyone has started (theoretically)
621     // note: the SP's are scheduled to start in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles,
622     // so a 20 times this value should be a good timeout
623     int cnt = cycles_for_startup * 20; // by then it should have started
624     while (!m_SyncSource->isRunning() && cnt) {
625         SleepRelativeUsec(125);
626         cnt--;
627     }
628     if(cnt==0) {
629         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
630         return false;
631     }
632
633     // the sync source is running, we can now read a decent received timestamp from it
634     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
635
636     // and a (still very rough) approximation of the rate
637     float rate = m_SyncSource->getTicksPerFrame();
638     int64_t delay_in_ticks=(int64_t)(((float)((m_nb_buffers-1) * m_period)) * rate);
639     // also add the sync delay
640     delay_in_ticks += m_SyncSource->getSyncDelay();
641     debugOutput( DEBUG_LEVEL_VERBOSE, "  initial time of transfer %010lld, rate %f...\n",
642                 m_time_of_transfer, rate);
643
644     // then use this information to initialize the xmit handlers
645
646     //  we now set the buffer tail timestamp of the transmit buffer
647     //  to the period transfer time instant plus what's nb_buffers - 1
648     //  in ticks. This due to the fact that we (should) have received one period
649     //  worth of ticks at t=m_time_of_transfer
650     //  hence one period of frames should also have been transmitted, which means
651     //  that there should be (nb_buffers - 1) * periodsize of frames in the xmit buffer
652     //  that allows us to calculate the tail timestamp for the buffer.
653
654     int64_t transmit_tail_timestamp = addTicks(m_time_of_transfer, delay_in_ticks);
655
656     debugOutput( DEBUG_LEVEL_VERBOSE, "  preset transmit tail TS %010lld, rate %f...\n",
657                 transmit_tail_timestamp, rate);
658
659     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
660         it != m_TransmitProcessors.end();
661         ++it ) {
662         (*it)->setBufferTailTimestamp(transmit_tail_timestamp);
663         (*it)->setTicksPerFrame(rate);
664     }
665
666     // align the received streams to be phase aligned
667     if(!alignReceivedStreams()) {
668         debugError("Could not align streams...\n");
669         return false;
670     }
671
672     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
673     return true;
674 }
675
676 bool
677 StreamProcessorManager::alignReceivedStreams()
678 {
679     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
680     unsigned int nb_sync_runs;
681     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
682     int64_t diff_between_streams[nb_rcv_sp];
683     int64_t diff;
684
685     unsigned int i;
686
687     int cnt = STREAMPROCESSORMANAGER_NB_ALIGN_TRIES;
688     int align_average_time_msec = STREAMPROCESSORMANAGER_ALIGN_AVERAGE_TIME_MSEC;
689     Util::Configuration &config = m_parent.getConfiguration();
690     config.getValueForSetting("streaming.spm.align_tries", cnt);
691     config.getValueForSetting("streaming.spm.align_average_time_msec", align_average_time_msec);
692
693     unsigned int periods_per_align_try = (align_average_time_msec * getNominalRate());
694     periods_per_align_try /= 1000;
695     periods_per_align_try /= getPeriodSize();
696     debugOutput( DEBUG_LEVEL_VERBOSE, " averaging over %u periods...\n", periods_per_align_try);
697
698     bool aligned = false;
699     while (!aligned && cnt--) {
700         nb_sync_runs = periods_per_align_try;
701         while(nb_sync_runs) {
702             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
703             if(!waitForPeriod()) {
704                 debugWarning("xrun while aligning streams...\n");
705                 return false;
706             }
707
708             // before we do anything else, transfer
709             if(!transferSilence()) {
710                 debugError("Could not transfer silence\n");
711                 return false;
712             }
713
714             // now calculate the stream offset
715             i = 0;
716             for ( i = 0; i < nb_rcv_sp; i++) {
717                 StreamProcessor *s = m_ReceiveProcessors.at(i);
718                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
719                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",
720                     m_SyncSource, s, diff);
721                 if ( nb_sync_runs == periods_per_align_try ) {
722                     diff_between_streams[i] = diff;
723                 } else {
724                     diff_between_streams[i] += diff;
725                 }
726             }
727
728             nb_sync_runs--;
729         }
730         // calculate the average offsets
731         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
732         int diff_between_streams_frames[nb_rcv_sp];
733         aligned = true;
734         for ( i = 0; i < nb_rcv_sp; i++) {
735             StreamProcessor *s = m_ReceiveProcessors.at(i);
736
737             diff_between_streams[i] /= periods_per_align_try;
738             diff_between_streams_frames[i] = (int)roundf(diff_between_streams[i] / s->getTicksPerFrame());
739             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",
740                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
741
742             aligned &= (diff_between_streams_frames[i] == 0);
743
744             // reposition the stream
745             if(!s->shiftStream(diff_between_streams_frames[i])) {
746                 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]);
747                 return false;
748             }
749         }
750         if (!aligned) {
751             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
752         }
753     }
754     if (cnt == 0) {
755         debugError("Align failed\n");
756         return false;
757     }
758     return true;
759 }
760
761 bool StreamProcessorManager::start() {
762     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
763
764     // start all SP's synchonized
765     bool start_result = false;
766     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
767         // put all SP's into dry-running state
768         if (!startDryRunning()) {
769             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
770             start_result = false;
771             continue;
772         }
773
774         start_result = syncStartAll();
775         if(start_result) {
776             break;
777         } else {
778             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
779             if(m_shutdown_needed) {
780                 debugOutput(DEBUG_LEVEL_VERBOSE, "Some fatal error occurred, stop trying.\n");
781                 return false;
782             }
783         }
784     }
785     if (!start_result) {
786         debugFatal("Could not syncStartAll...\n");
787         return false;
788     }
789     debugOutput( DEBUG_LEVEL_VERBOSE, " Started...\n");
790     return true;
791 }
792
793 bool StreamProcessorManager::stop() {
794     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
795
796     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
797     // switch SP's over to the dry-running state
798     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
799           it != m_ReceiveProcessors.end();
800           ++it ) {
801         if((*it)->isRunning()) {
802             if(!(*it)->scheduleStopRunning(-1)) {
803                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
804                 return false;
805             }
806         }
807     }
808     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
809           it != m_TransmitProcessors.end();
810           ++it ) {
811         if((*it)->isRunning()) {
812             if(!(*it)->scheduleStopRunning(-1)) {
813                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
814                 return false;
815             }
816         }
817     }
818     // wait for the SP's to get into the dry-running/stopped state
819     int cnt = 8000;
820     bool ready = false;
821     while (!ready && cnt) {
822         ready = true;
823         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
824             it != m_ReceiveProcessors.end();
825             ++it ) {
826             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream() || (*it)->inError());
827         }
828         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
829             it != m_TransmitProcessors.end();
830             ++it ) {
831             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream() || (*it)->inError());
832         }
833         SleepRelativeUsec(125);
834         cnt--;
835     }
836     if(cnt==0) {
837         debugWarning(" Timeout waiting for the SP's to start dry-running\n");
838         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
839             it != m_ReceiveProcessors.end();
840             ++it ) {
841             (*it)->dumpInfo();
842         }
843         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
844             it != m_TransmitProcessors.end();
845             ++it ) {
846             (*it)->dumpInfo();
847         }
848         return false;
849     }
850
851     // switch SP's over to the stopped state
852     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
853           it != m_ReceiveProcessors.end();
854           ++it ) {
855         if ((*it)->inError()) {
856             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
857         } else if(!(*it)->scheduleStopDryRunning(-1)) {
858             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
859             return false;
860         }
861     }
862     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
863           it != m_TransmitProcessors.end();
864           ++it ) {
865         if ((*it)->inError()) {
866             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
867         } else if(!(*it)->scheduleStopDryRunning(-1)) {
868             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
869             return false;
870         }
871     }
872     // wait for the SP's to get into the stopped state
873     cnt = 8000;
874     ready = false;
875     while (!ready && cnt) {
876         ready = true;
877         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
878             it != m_ReceiveProcessors.end();
879             ++it ) {
880             ready &= ((*it)->isStopped() || (*it)->inError());
881         }
882         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
883             it != m_TransmitProcessors.end();
884             ++it ) {
885             ready &= ((*it)->isStopped() || (*it)->inError());
886         }
887         SleepRelativeUsec(125);
888         cnt--;
889     }
890     if(cnt==0) {
891         debugWarning(" Timeout waiting for the SP's to stop\n");
892         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
893             it != m_ReceiveProcessors.end();
894             ++it ) {
895             (*it)->dumpInfo();
896         }
897         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
898             it != m_TransmitProcessors.end();
899             ++it ) {
900             (*it)->dumpInfo();
901         }
902         return false;
903     }
904     debugOutput( DEBUG_LEVEL_VERBOSE, " Stopped...\n");
905     return true;
906 }
907
908 /**
909  * Called upon Xrun events. This brings all StreamProcessors back
910  * into their starting state, and then carries on streaming. This should
911  * have the same effect as restarting the whole thing.
912  *
913  * @return true if successful, false otherwise
914  */
915 bool StreamProcessorManager::handleXrun() {
916
917     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
918
919     dumpInfo();
920
921     /*
922      * Reset means:
923      * 1) Disabling the SP's, so that they don't process any packets
924      *    note: the isomanager does keep on delivering/requesting them
925      * 2) Bringing all buffers & streamprocessors into a know state
926      *    - Clear all capture buffers
927      *    - Put nb_periods*period_size of null frames into the playback buffers
928      * 3) Re-enable the SP's
929      */
930
931     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
932     // start all SP's synchonized
933     bool start_result = false;
934     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
935         if(m_shutdown_needed) {
936             debugOutput(DEBUG_LEVEL_VERBOSE, "Shutdown requested...\n");
937             return true;
938         }
939         // put all SP's into dry-running state
940         if (!startDryRunning()) {
941             debugShowBackLog();
942             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
943             start_result = false;
944             continue;
945         }
946
947         start_result = syncStartAll();
948         if(start_result) {
949             break;
950         } else {
951             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
952         }
953     }
954     if (!start_result) {
955         debugFatal("Could not syncStartAll...\n");
956         return false;
957     }
958     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
959
960     return true;
961 }
962
963 /**
964  * @brief Waits until the next period of samples is ready
965  *
966  * This function does not return until a full period of samples is (or should be)
967  * ready to be transferred.
968  *
969  * @return true if the period is ready, false if not
970  */
971 bool StreamProcessorManager::waitForPeriod() {
972     if(m_SyncSource == NULL) return false;
973     if(m_shutdown_needed) return false;
974     bool xrun_occurred = false;
975     bool in_error = false;
976
977     // grab the wait lock
978     // this ensures that bus reset handling doesn't interfere
979     Util::MutexLockHelper lock(*m_WaitLock);
980     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
981                         "waiting for period (%d frames in buffer)...\n",
982                         m_SyncSource->getBufferFill());
983     uint64_t ticks_at_period = m_SyncSource->getTimeAtPeriod();
984     uint64_t ticks_at_period_margin = ticks_at_period + m_SyncSource->getSyncDelay();
985     uint64_t pred_system_time_at_xfer = m_SyncSource->getParent().get1394Service().getSystemTimeForCycleTimerTicks(ticks_at_period_margin);
986
987     #ifdef DEBUG
988     int64_t now = Util::SystemTimeSource::getCurrentTime();
989     debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "pred: %lld, now: %lld, wait: %lld\n", pred_system_time_at_xfer, now, pred_system_time_at_xfer-now );
990     #endif
991
992     // wait until it's time to transfer
993     Util::SystemTimeSource::SleepUsecAbsolute(pred_system_time_at_xfer);
994
995     #ifdef DEBUG
996     now = Util::SystemTimeSource::getCurrentTime();
997     debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "pred: %lld now: %lld, excess: %lld\n", pred_system_time_at_xfer, now, now-pred_system_time_at_xfer );
998     #endif
999
1000     // the period should be ready now
1001
1002     #if STREAMPROCESSORMANAGER_ALLOW_DELAYED_PERIOD_SIGNAL
1003     // HACK: we force wait until every SP is ready. this is needed
1004     // since the raw1394 interface provides no control over interrupts
1005     // resulting in very bad predictability on when the data is present.
1006     bool period_not_ready = true;
1007     while(period_not_ready) {
1008         period_not_ready = false;
1009         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1010             it != m_ReceiveProcessors.end();
1011             ++it ) {
1012             bool this_sp_period_ready = (*it)->canConsumePeriod();
1013             if (!this_sp_period_ready) {
1014                 period_not_ready = true;
1015             }
1016         }
1017         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1018             it != m_TransmitProcessors.end();
1019             ++it ) {
1020             bool this_sp_period_ready = (*it)->canProducePeriod();
1021             if (!this_sp_period_ready) {
1022                 period_not_ready = true;
1023             }
1024         }
1025
1026         if (period_not_ready) {
1027             debugOutput(DEBUG_LEVEL_VERBOSE, " wait extended since period not ready...\n");
1028             Util::SystemTimeSource::SleepUsecRelative(125); // one cycle
1029         }
1030
1031         // check for underruns/errors on the ISO side,
1032         // those should make us bail out of the wait loop
1033         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1034             it != m_ReceiveProcessors.end();
1035             ++it ) {
1036             // a xrun has occurred on the Iso side
1037             xrun_occurred |= (*it)->xrunOccurred();
1038             in_error |= (*it)->inError();
1039         }
1040         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1041             it != m_TransmitProcessors.end();
1042             ++it ) {
1043             // a xrun has occurred on the Iso side
1044             xrun_occurred |= (*it)->xrunOccurred();
1045             in_error |= (*it)->inError();
1046         }
1047         if(xrun_occurred | in_error | m_shutdown_needed) break;
1048     }
1049     #else
1050     // check for underruns/errors on the ISO side,
1051     // those should make us bail out of the wait loop
1052     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1053         it != m_ReceiveProcessors.end();
1054         ++it ) {
1055         // xrun on data buffer side
1056         if (!(*it)->canConsumePeriod()) {
1057             xrun_occurred = true;
1058         }
1059         // a xrun has occurred on the Iso side
1060         xrun_occurred |= (*it)->xrunOccurred();
1061         in_error |= (*it)->inError();
1062     }
1063     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1064         it != m_TransmitProcessors.end();
1065         ++it ) {
1066         // xrun on data buffer side
1067         if (!(*it)->canProducePeriod()) {
1068             xrun_occurred = true;
1069         }
1070         // a xrun has occurred on the Iso side
1071         xrun_occurred |= (*it)->xrunOccurred();
1072         in_error |= (*it)->inError();
1073     }
1074     #endif
1075
1076     if(xrun_occurred) {
1077         debugOutput( DEBUG_LEVEL_VERBOSE, "exit due to xrun...\n");
1078     }
1079     if(in_error) {
1080         debugOutput( DEBUG_LEVEL_VERBOSE, "exit due to error...\n");
1081         m_shutdown_needed = true;
1082     }
1083
1084     // we save the 'ideal' time of the transfer at this point,
1085     // because we can have interleaved read - process - write
1086     // cycles making that we modify a receiving stream's buffer
1087     // before we get to writing.
1088     // NOTE: before waitForPeriod() is called again, both the transmit
1089     //       and the receive processors should have done their transfer.
1090     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
1091    
1092     #ifdef DEBUG
1093     static uint64_t m_time_of_transfer2 = m_time_of_transfer;
1094    
1095     int ticks_per_period = (int)(m_SyncSource->getTicksPerFrame() * m_period);
1096     int diff=diffTicks(m_time_of_transfer, m_time_of_transfer2);
1097     // display message if the difference between two successive tick
1098     // values is more than 50 ticks. 1 sample at 48k is 512 ticks
1099     // so 50 ticks = 10%, which is a rather large jitter value.
1100     if(diff-ticks_per_period > 50 || diff-ticks_per_period < -50) {
1101         debugOutput(DEBUG_LEVEL_VERBOSE, "rather large TSP difference TS=%011llu => TS=%011llu (%d, nom %d)\n",
1102                                             m_time_of_transfer2, m_time_of_transfer, diff, ticks_per_period);
1103     }
1104     m_time_of_transfer2 = m_time_of_transfer;
1105     #endif
1106
1107     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1108                         "transfer period %d at %llu ticks...\n",
1109                         m_nbperiods, m_time_of_transfer);
1110
1111     // this is to notify the client of the delay that we introduced by waiting
1112     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
1113     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1114                         "delayed for %d usecs...\n",
1115                         m_delayed_usecs);
1116
1117 #ifdef DEBUG
1118     int rcv_bf=0, xmt_bf=0;
1119     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1120         it != m_ReceiveProcessors.end();
1121         ++it ) {
1122         rcv_bf = (*it)->getBufferFill();
1123     }
1124     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1125         it != m_TransmitProcessors.end();
1126         ++it ) {
1127         xmt_bf = (*it)->getBufferFill();
1128     }
1129     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
1130                         "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
1131                         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
1132
1133     // check if xruns occurred on the Iso side.
1134     // also check if xruns will occur should we transfer() now
1135     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1136           it != m_ReceiveProcessors.end();
1137           ++it ) {
1138
1139         if ((*it)->xrunOccurred()) {
1140             debugOutput(DEBUG_LEVEL_NORMAL,
1141                         "Xrun on RECV SP %p due to ISO side xrun\n", *it);
1142             (*it)->dumpInfo();
1143         }
1144         if (!((*it)->canClientTransferFrames(m_period))) {
1145             debugOutput(DEBUG_LEVEL_NORMAL,
1146                         "Xrun on RECV SP %p due to buffer side xrun\n", *it);
1147             (*it)->dumpInfo();
1148         }
1149     }
1150     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1151           it != m_TransmitProcessors.end();
1152           ++it ) {
1153         if ((*it)->xrunOccurred()) {
1154             debugOutput(DEBUG_LEVEL_NORMAL,
1155                         "Xrun on XMIT SP %p due to ISO side xrun\n", *it);
1156         }
1157         if (!((*it)->canClientTransferFrames(m_period))) {
1158             debugOutput(DEBUG_LEVEL_NORMAL,
1159                         "Xrun on XMIT SP %p due to buffer side xrun\n", *it);
1160         }
1161     }
1162 #endif
1163     m_nbperiods++;
1164     // now we can signal the client that we are (should be) ready
1165     return !xrun_occurred;
1166 }
1167
1168 /**
1169  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
1170  *
1171  * Transfers one period of frames from the client side to the Iso side and vice versa.
1172  *
1173  * @return true if successful, false otherwise (indicates xrun).
1174  */
1175 bool StreamProcessorManager::transfer() {
1176     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
1177     bool retval=true;
1178     retval &= transfer(StreamProcessor::ePT_Receive);
1179     retval &= transfer(StreamProcessor::ePT_Transmit);
1180     return retval;
1181 }
1182
1183 /**
1184  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
1185  *
1186  * Transfers one period of frames from the client side to the Iso side or vice versa.
1187  *
1188  * @param t The processor type to tranfer for (receive or transmit)
1189  * @return true if successful, false otherwise (indicates xrun).
1190  */
1191 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
1192     if(m_SyncSource == NULL) return false;
1193     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
1194         "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
1195         t, m_time_of_transfer,
1196         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
1197         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
1198         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
1199
1200     bool retval = true;
1201     // a static cast could make sure that there is no performance
1202     // penalty for the virtual functions (to be checked)
1203     if (t==StreamProcessor::ePT_Receive) {
1204         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1205                 it != m_ReceiveProcessors.end();
1206                 ++it ) {
1207             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
1208                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
1209                             m_period, m_time_of_transfer,*it);
1210                 retval &= false; // buffer underrun
1211             }
1212         }
1213     } else {
1214         // FIXME: in the SPM it would be nice to have system time instead of
1215         //        1394 time
1216         float rate = m_SyncSource->getTicksPerFrame();
1217         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)((m_nb_buffers * m_period))) * rate);
1218
1219         // the data we are putting into the buffer is intended to be transmitted
1220         // one ringbuffer size after it has been received
1221
1222         // we also add one syncdelay as a safety margin, since that's the amount of time we can get
1223         // postponed.
1224         int syncdelay = m_SyncSource->getSyncDelay();
1225         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks + syncdelay);
1226
1227         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1228                 it != m_TransmitProcessors.end();
1229                 ++it ) {
1230             // FIXME: in the SPM it would be nice to have system time instead of
1231             //        1394 time
1232             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
1233                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
1234                         m_period, transmit_timestamp, *it);
1235                 retval &= false; // buffer underrun
1236             }
1237         }
1238     }
1239     return retval;
1240 }
1241
1242 /**
1243  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
1244  *
1245  * Transfers one period of silence to the Iso side for transmit SP's
1246  * or dump one period of frames for receive SP's
1247  *
1248  * @return true if successful, false otherwise (indicates xrun).
1249  */
1250 bool StreamProcessorManager::transferSilence() {
1251     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
1252     bool retval=true;
1253     // NOTE: the order here is opposite from the order in
1254     // normal operation (transmit is before receive), because
1255     // we can do that here (data=silence=available) and
1256     // it increases reliability (esp. on startup)
1257     retval &= transferSilence(StreamProcessor::ePT_Transmit);
1258     retval &= transferSilence(StreamProcessor::ePT_Receive);
1259     return retval;
1260 }
1261
1262 /**
1263  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
1264  *
1265  * Transfers one period of silence to the Iso side for transmit SP's
1266  * or dump one period of frames for receive SP's
1267  *
1268  * @param t The processor type to tranfer for (receive or transmit)
1269  * @return true if successful, false otherwise (indicates xrun).
1270  */
1271 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
1272     if(m_SyncSource == NULL) return false;
1273     debugOutput( DEBUG_LEVEL_VERY_VERBOSE,
1274         "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
1275         t, m_time_of_transfer,
1276         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
1277         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
1278         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
1279
1280     bool retval = true;
1281     // a static cast could make sure that there is no performance
1282     // penalty for the virtual functions (to be checked)
1283     if (t==StreamProcessor::ePT_Receive) {
1284         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1285                 it != m_ReceiveProcessors.end();
1286                 ++it ) {
1287             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
1288                     debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n",
1289                             m_period, m_time_of_transfer,*it);
1290                 retval &= false; // buffer underrun
1291             }
1292         }
1293     } else {
1294         // FIXME: in the SPM it would be nice to have system time instead of
1295         //        1394 time
1296         float rate = m_SyncSource->getTicksPerFrame();
1297         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
1298
1299         // the data we are putting into the buffer is intended to be transmitted
1300         // one ringbuffer size after it has been received
1301         // we also add one syncdelay as a safety margin, since that's the amount of time we can get
1302         // postponed.
1303         int syncdelay = m_SyncSource->getSyncDelay();
1304         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks + syncdelay);
1305
1306         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1307                 it != m_TransmitProcessors.end();
1308                 ++it ) {
1309             // FIXME: in the SPM it would be nice to have system time instead of
1310             //        1394 time
1311             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
1312                 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
1313                         m_period, transmit_timestamp, *it);
1314                 retval &= false; // buffer underrun
1315             }
1316         }
1317     }
1318     return retval;
1319 }
1320
1321 void StreamProcessorManager::dumpInfo() {
1322     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1323     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
1324     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
1325     debugOutputShort( DEBUG_LEVEL_NORMAL, "Data type: %s\n", (m_audio_datatype==eADT_Float?"float":"int24"));
1326
1327     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
1328     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1329         it != m_ReceiveProcessors.end();
1330         ++it ) {
1331         (*it)->dumpInfo();
1332     }
1333
1334     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
1335     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1336         it != m_TransmitProcessors.end();
1337         ++it ) {
1338         (*it)->dumpInfo();
1339     }
1340
1341     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1342
1343     // list port info in verbose mode
1344     debugOutputShort( DEBUG_LEVEL_VERBOSE, "Port Information\n");
1345     int nb_ports;
1346    
1347     debugOutputShort( DEBUG_LEVEL_VERBOSE, " Playback\n");
1348     nb_ports = getPortCount(Port::E_Playback);
1349     for(int i=0; i < nb_ports; i++) {
1350         Port *p = getPortByIndex(i, Port::E_Playback);
1351         debugOutputShort( DEBUG_LEVEL_VERBOSE, "  %3d (%p): ", i, p);
1352         if (p) {
1353             bool disabled = p->isDisabled();
1354             debugOutputShort( DEBUG_LEVEL_VERBOSE, "[%p] [%3s] ", &p->getManager(), (disabled?"off":"on"));
1355             debugOutputShort( DEBUG_LEVEL_VERBOSE, "[%7s] ", p->getPortTypeName().c_str());
1356             debugOutputShort( DEBUG_LEVEL_VERBOSE, "%3s ", p->getName().c_str());
1357         } else {
1358             debugOutputShort( DEBUG_LEVEL_VERBOSE, "invalid ");
1359         }
1360         debugOutputShort( DEBUG_LEVEL_VERBOSE, "\n");
1361     }
1362     debugOutputShort( DEBUG_LEVEL_VERBOSE, " Capture\n");
1363     nb_ports = getPortCount(Port::E_Capture);
1364     for(int i=0; i < nb_ports; i++) {
1365         Port *p = getPortByIndex(i, Port::E_Capture);
1366         debugOutputShort( DEBUG_LEVEL_VERBOSE, "  %3d (%p): ", i, p);
1367         if (p) {
1368             bool disabled = p->isDisabled();
1369             debugOutputShort( DEBUG_LEVEL_VERBOSE, "[%p] [%3s] ", &p->getManager(), (disabled?"off":"on"));
1370             debugOutputShort( DEBUG_LEVEL_VERBOSE, "[%7s] ", p->getPortTypeName().c_str());
1371             debugOutputShort( DEBUG_LEVEL_VERBOSE, " %3s ", p->getName().c_str());
1372         } else {
1373             debugOutputShort( DEBUG_LEVEL_VERBOSE, " invalid ");
1374         }
1375         debugOutputShort( DEBUG_LEVEL_VERBOSE, "\n");
1376     }
1377
1378     debugOutputShort( DEBUG_LEVEL_VERBOSE, "----------------------------------------------------\n");
1379
1380 }
1381
1382 void StreamProcessorManager::setVerboseLevel(int l) {
1383     if(m_WaitLock) m_WaitLock->setVerboseLevel(l);
1384
1385     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1386         it != m_ReceiveProcessors.end();
1387         ++it ) {
1388         (*it)->setVerboseLevel(l);
1389     }
1390     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1391         it != m_TransmitProcessors.end();
1392         ++it ) {
1393         (*it)->setVerboseLevel(l);
1394     }
1395     setDebugLevel(l);
1396     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting verbose level to %d...\n", l );
1397 }
1398
1399 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1400     int count=0;
1401
1402     if (direction == Port::E_Capture) {
1403         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1404             it != m_ReceiveProcessors.end();
1405             ++it ) {
1406             count += (*it)->getPortCount(type);
1407         }
1408     } else {
1409         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1410             it != m_TransmitProcessors.end();
1411             ++it ) {
1412             count += (*it)->getPortCount(type);
1413         }
1414     }
1415     return count;
1416 }
1417
1418 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1419     int count=0;
1420
1421     if (direction == Port::E_Capture) {
1422         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1423             it != m_ReceiveProcessors.end();
1424             ++it ) {
1425             count += (*it)->getPortCount();
1426         }
1427     } else {
1428         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1429             it != m_TransmitProcessors.end();
1430             ++it ) {
1431             count += (*it)->getPortCount();
1432         }
1433     }
1434     return count;
1435 }
1436
1437 void
1438 StreamProcessorManager::updateShadowLists()
1439 {
1440     debugOutput( DEBUG_LEVEL_VERBOSE, "Updating port shadow lists...\n");
1441     m_CapturePorts_shadow.clear();
1442     m_PlaybackPorts_shadow.clear();
1443
1444     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1445         it != m_ReceiveProcessors.end();
1446         ++it ) {
1447         PortManager *pm = *it;
1448         for (int i=0; i < pm->getPortCount(); i++) {
1449             Port *p = pm->getPortAtIdx(i);
1450             if (!p) {
1451                 debugError("getPortAtIdx(%d) returned NULL\n", i);
1452                 continue;
1453             }
1454             if(p->getDirection() != Port::E_Capture) {
1455                 debugError("port at idx %d for receive SP is not a capture port!\n", i);
1456                 continue;
1457             }
1458             m_CapturePorts_shadow.push_back(p);
1459         }
1460     }
1461     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1462         it != m_TransmitProcessors.end();
1463         ++it ) {
1464         PortManager *pm = *it;
1465         for (int i=0; i < pm->getPortCount(); i++) {
1466             Port *p = pm->getPortAtIdx(i);
1467             if (!p) {
1468                 debugError("getPortAtIdx(%d) returned NULL\n", i);
1469                 continue;
1470             }
1471             if(p->getDirection() != Port::E_Playback) {
1472                 debugError("port at idx %d for transmit SP is not a playback port!\n", i);
1473                 continue;
1474             }
1475             m_PlaybackPorts_shadow.push_back(p);
1476         }
1477     }
1478 }
1479
1480 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1481     debugOutputExtreme( DEBUG_LEVEL_ULTRA_VERBOSE, "getPortByIndex(%d, %d)...\n", idx, direction);
1482     if (direction == Port::E_Capture) {
1483         #ifdef DEBUG
1484         if(idx >= (int)m_CapturePorts_shadow.size()) {
1485             debugError("Capture port %d out of range (%d)\n", idx, m_CapturePorts_shadow.size());
1486             return NULL;
1487         }
1488         #endif
1489         return m_CapturePorts_shadow.at(idx);
1490     } else {
1491         #ifdef DEBUG
1492         if(idx >= (int)m_PlaybackPorts_shadow.size()) {
1493             debugError("Playback port %d out of range (%d)\n", idx, m_PlaybackPorts_shadow.size());
1494             return NULL;
1495         }
1496         #endif
1497         return m_PlaybackPorts_shadow.at(idx);
1498     }
1499     return NULL;
1500 }
1501
1502 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1503     m_thread_realtime=rt;
1504     m_thread_priority=priority;
1505     return true;
1506 }
1507
1508
1509 } // end of namespace
Note: See TracBrowser for help on using the browser.