root/trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

Revision 1511, 59.6 kB (checked in by ppalmers, 15 years ago)

fix stream alignment

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "config.h"
25
26 #include "StreamProcessorManager.h"
27 #include "generic/StreamProcessor.h"
28 #include "generic/Port.h"
29 #include "libieee1394/cycletimer.h"
30
31 #include "devicemanager.h"
32
33 #include "libutil/Time.h"
34
35 #include <errno.h>
36 #include <assert.h>
37 #include <math.h>
38
39 namespace Streaming {
40
41 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
42
43 StreamProcessorManager::StreamProcessorManager(DeviceManager &p)
44     : m_is_slave( false )
45     , m_SyncSource(NULL)
46     , m_parent( p )
47     , m_xrun_happened( false )
48     , m_activity_wait_timeout_nsec( 0 ) // dynamically set
49     , m_nb_buffers( 0 )
50     , m_period( 0 )
51     , m_audio_datatype( eADT_Float )
52     , m_nominal_framerate ( 0 )
53     , m_xruns(0)
54     , m_shutdown_needed(false)
55     , m_nbperiods(0)
56     , m_WaitLock( new Util::PosixMutex("SPMWAIT") )
57 {
58     addOption(Util::OptionContainer::Option("slaveMode",false));
59     sem_init(&m_activity_semaphore, 0, 0);
60 }
61
62 StreamProcessorManager::StreamProcessorManager(DeviceManager &p, unsigned int period,
63                                                unsigned int framerate, unsigned int nb_buffers)
64     : m_is_slave( false )
65     , m_SyncSource(NULL)
66     , m_parent( p )
67     , m_xrun_happened( false )
68     , m_activity_wait_timeout_nsec( 0 ) // dynamically set
69     , m_nb_buffers(nb_buffers)
70     , m_period(period)
71     , m_audio_datatype( eADT_Float )
72     , m_nominal_framerate ( framerate )
73     , m_xruns(0)
74     , m_shutdown_needed(false)
75     , m_nbperiods(0)
76     , m_WaitLock( new Util::PosixMutex("SPMWAIT") )
77 {
78     addOption(Util::OptionContainer::Option("slaveMode",false));
79     sem_init(&m_activity_semaphore, 0, 0);
80 }
81
82 StreamProcessorManager::~StreamProcessorManager() {
83     sem_post(&m_activity_semaphore);
84     sem_destroy(&m_activity_semaphore);
85     delete m_WaitLock;
86 }
87
88 // void
89 // StreamProcessorManager::handleBusReset(Ieee1394Service &s)
90 // {
91 // //     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) Handle bus reset on service %p...\n", this, &s);
92 // //
93 // //     bool handled_at_least_one = false;
94 // //     // note that all receive streams are gone once a device is unplugged
95 // //
96 // //     // synchronize with the wait lock
97 // //     Util::MutexLockHelper lock(*m_WaitLock);
98 // //
99 // //     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) got wait lock...\n", this);
100 // //     // cause all SP's to bail out
101 // //     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
102 // //           it != m_ReceiveProcessors.end();
103 // //           ++it )
104 // //     {
105 // //         if(&s == &((*it)->getParent().get1394Service())) {
106 // //             debugOutput(DEBUG_LEVEL_NORMAL,
107 // //                         "issue busreset on receive SPM on channel %d\n",
108 // //                         (*it)->getChannel());
109 // //             (*it)->handleBusReset();
110 // //             handled_at_least_one = true;
111 // //         } else {
112 // //             debugOutput(DEBUG_LEVEL_NORMAL,
113 // //                         "skipping receive SPM on channel %d since not on service %p\n",
114 // //                         (*it)->getChannel(), &s);
115 // //         }
116 // //     }
117 // //     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
118 // //           it != m_TransmitProcessors.end();
119 // //           ++it )
120 // //     {
121 // //         if(&s == &((*it)->getParent().get1394Service())) {
122 // //             debugOutput(DEBUG_LEVEL_NORMAL,
123 // //                         "issue busreset on transmit SPM on channel %d\n",
124 // //                         (*it)->getChannel());
125 // //             (*it)->handleBusReset();
126 // //             handled_at_least_one = true;
127 // //         } else {
128 // //             debugOutput(DEBUG_LEVEL_NORMAL,
129 // //                         "skipping transmit SPM on channel %d since not on service %p\n",
130 // //                         (*it)->getChannel(), &s);
131 // //         }
132 // //     }
133 // //
134 // //     // FIXME: we request shutdown for now.
135 // //     m_shutdown_needed = handled_at_least_one;
136 // }
137
138 void
139 StreamProcessorManager::signalActivity()
140 {
141     sem_post(&m_activity_semaphore);
142     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p activity\n", this);
143 }
144
145 enum StreamProcessorManager::eActivityResult
146 StreamProcessorManager::waitForActivity()
147 {
148     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p waiting for activity\n", this);
149     struct timespec ts;
150     int result;
151
152     if (m_activity_wait_timeout_nsec >= 0) {
153
154         if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
155             debugError("clock_gettime failed\n");
156             return eAR_Error;
157         }
158         ts.tv_nsec += m_activity_wait_timeout_nsec;
159         while(ts.tv_nsec >= 1000000000LL) {
160             ts.tv_sec += 1;
161             ts.tv_nsec -= 1000000000LL;
162         }
163     }
164
165     if (m_activity_wait_timeout_nsec >= 0) {
166         result = sem_timedwait(&m_activity_semaphore, &ts);
167     } else {
168         result = sem_wait(&m_activity_semaphore);
169     }
170
171     if(result != 0) {
172         if (errno == ETIMEDOUT) {
173             debugOutput(DEBUG_LEVEL_VERBOSE,
174                         "(%p) sem_timedwait() timed out (result=%d)\n",
175                         this, result);
176             return eAR_Timeout;
177         } else if (errno == EINTR) {
178             debugOutput(DEBUG_LEVEL_VERBOSE,
179                         "(%p) sem_[timed]wait() interrupted by signal (result=%d)\n",
180                         this, result);
181             return eAR_Interrupted;
182         } else if (errno == EINVAL) {
183             debugError("(%p) sem_[timed]wait error (result=%d errno=EINVAL)\n",
184                         this, result);
185             debugError("(%p) timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n",
186                        this, m_activity_wait_timeout_nsec, ts.tv_sec, ts.tv_nsec);
187             return eAR_Error;
188         } else {
189             debugError("(%p) sem_[timed]wait error (result=%d errno=%d)\n",
190                         this, result, errno);
191             debugError("(%p) timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n",
192                        this, m_activity_wait_timeout_nsec, ts.tv_sec, ts.tv_nsec);
193             return eAR_Error;
194         }
195     }
196
197     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p got activity\n", this);
198     return eAR_Activity;
199 }
200
201 /**
202  * Registers \ref processor with this manager.
203  *
204  * also registers it with the isohandlermanager
205  *
206  * be sure to call isohandlermanager->init() first!
207  * and be sure that the processors are also ->init()'ed
208  *
209  * @param processor
210  * @return true if successfull
211  */
212 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
213 {
214     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
215     assert(processor);
216     if (processor->getType() == StreamProcessor::ePT_Receive) {
217         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
218         m_ReceiveProcessors.push_back(processor);
219         Util::Functor* f = new Util::MemberFunctor0< StreamProcessorManager*, void (StreamProcessorManager::*)() >
220                     ( this, &StreamProcessorManager::updateShadowLists, false );
221         processor->addPortManagerUpdateHandler(f);
222         updateShadowLists();
223         return true;
224     }
225     if (processor->getType() == StreamProcessor::ePT_Transmit) {
226         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
227         m_TransmitProcessors.push_back(processor);
228         Util::Functor* f = new Util::MemberFunctor0< StreamProcessorManager*, void (StreamProcessorManager::*)() >
229                     ( this, &StreamProcessorManager::updateShadowLists, false );
230         processor->addPortManagerUpdateHandler(f);
231         updateShadowLists();
232         return true;
233     }
234
235     debugFatal("Unsupported processor type!\n");
236     return false;
237 }
238
239 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
240 {
241     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
242     assert(processor);
243
244     if (processor->getType()==StreamProcessor::ePT_Receive) {
245
246         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
247               it != m_ReceiveProcessors.end();
248               ++it )
249         {
250             if ( *it == processor ) {
251                 if (*it == m_SyncSource) {
252                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
253                     m_SyncSource = NULL;
254                 }
255                 m_ReceiveProcessors.erase(it);
256                 // remove the functor
257                 Util::Functor * f = processor->getUpdateHandlerForPtr(this);
258                 if(f) {
259                     processor->remPortManagerUpdateHandler(f);
260                     delete f;
261                 }
262                 updateShadowLists();
263                 return true;
264             }
265         }
266     }
267
268     if (processor->getType()==StreamProcessor::ePT_Transmit) {
269         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
270               it != m_TransmitProcessors.end();
271               ++it )
272         {
273             if ( *it == processor ) {
274                 if (*it == m_SyncSource) {
275                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
276                     m_SyncSource = NULL;
277                 }
278                 m_TransmitProcessors.erase(it);
279                 // remove the functor
280                 Util::Functor * f = processor->getUpdateHandlerForPtr(this);
281                 if(f) {
282                     processor->remPortManagerUpdateHandler(f);
283                     delete f;
284                 }
285                 updateShadowLists();
286                 return true;
287             }
288         }
289     }
290
291     debugFatal("Processor (%p) not found!\n",processor);
292     return false; //not found
293 }
294
295 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
296     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
297     m_SyncSource = s;
298     return true;
299 }
300
301 bool StreamProcessorManager::prepare() {
302
303     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
304     m_is_slave=false;
305     if(!getOption("slaveMode", m_is_slave)) {
306         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
307     }
308
309     m_shutdown_needed=false;
310
311     // if no sync source is set, select one here
312     if(m_SyncSource == NULL) {
313        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
314     }
315
316     // FIXME: put into separate method
317     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
318           it != m_ReceiveProcessors.end();
319           ++it )
320     {
321         if(m_SyncSource == NULL) {
322             debugWarning(" => Sync Source is %p.\n", *it);
323             m_SyncSource = *it;
324         }
325     }
326     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
327           it != m_TransmitProcessors.end();
328           ++it )
329     {
330         if(m_SyncSource == NULL) {
331             debugWarning(" => Sync Source is %p.\n", *it);
332             m_SyncSource = *it;
333         }
334     }
335
336     // now do the actual preparation of the SP's
337     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
338     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
339         it != m_ReceiveProcessors.end();
340         ++it ) {
341
342         if(!(*it)->setOption("slaveMode", m_is_slave)) {
343             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
344         }
345
346         if(!(*it)->prepare()) {
347             debugFatal(  " could not prepare (%p)...\n",(*it));
348             return false;
349         }
350     }
351     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
352     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
353         it != m_TransmitProcessors.end();
354         ++it ) {
355         if(!(*it)->setOption("slaveMode", m_is_slave)) {
356             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
357         }
358         if(!(*it)->prepare()) {
359             debugFatal( " could not prepare (%p)...\n",(*it));
360             return false;
361         }
362     }
363
364     // if there are no stream processors registered,
365     // fail
366     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
367         debugFatal("No stream processors registered, can't do anything usefull\n");
368         return false;
369     }
370
371     // set the activity timeout value to two periods worth of usecs.
372     // since we can expect activity once every period, but we might have some
373     // offset, the safe value is two periods.
374     int timeout_usec = 2*1000LL * 1000LL * m_period / m_nominal_framerate;
375     debugOutput(DEBUG_LEVEL_VERBOSE, "setting activity timeout to %d\n", timeout_usec);
376     setActivityWaitTimeoutUsec(timeout_usec);
377
378     updateShadowLists();
379
380     return true;
381 }
382
383 bool
384 StreamProcessorManager::startDryRunning()
385 {
386     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
387     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
388     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
389             it != m_TransmitProcessors.end();
390             ++it ) {
391         if ((*it)->inError()) {
392             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
393             return false;
394         }
395         if (!(*it)->isDryRunning()) {
396             if(!(*it)->scheduleStartDryRunning(-1)) {
397                 debugError("Could not put SP %p into the dry-running state\n", *it);
398                 return false;
399             }
400         } else {
401             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
402         }
403     }
404     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
405             it != m_ReceiveProcessors.end();
406             ++it ) {
407         if ((*it)->inError()) {
408             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
409             return false;
410         }
411         if (!(*it)->isDryRunning()) {
412             if(!(*it)->scheduleStartDryRunning(-1)) {
413                 debugError("Could not put SP %p into the dry-running state\n", *it);
414                 return false;
415             }
416         } else {
417             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
418         }
419     }
420     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
421     // wait for the syncsource to start running.
422     // that will block the waitForPeriod call until everyone has started (theoretically)
423     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_DRYRUN; // by then it should have started
424     bool all_dry_running = false;
425     while (!all_dry_running && cnt) {
426         all_dry_running = true;
427         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
428                 it != m_ReceiveProcessors.end();
429                 ++it ) {
430             all_dry_running &= (*it)->isDryRunning();
431         }
432         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
433                 it != m_TransmitProcessors.end();
434                 ++it ) {
435             all_dry_running &= (*it)->isDryRunning();
436         }
437
438         SleepRelativeUsec(125);
439         cnt--;
440     }
441     if(cnt==0) {
442         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
443         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
444                 it != m_ReceiveProcessors.end();
445                 ++it ) {
446             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
447                 (*it)->getTypeString(), *it, (*it)->getStateString());
448         }
449         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
450                 it != m_TransmitProcessors.end();
451                 ++it ) {
452             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
453                 (*it)->getTypeString(), *it, (*it)->getStateString());
454         }
455         return false;
456     }
457     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
458     return true;
459 }
460
461 bool StreamProcessorManager::syncStartAll() {
462     if(m_SyncSource == NULL) return false;
463
464     // get the options
465     int signal_delay_ticks = STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS;
466     int sync_wait_time_msec = STREAMPROCESSORMANAGER_SYNC_WAIT_TIME_MSEC;
467     int cycles_for_startup = STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP;
468     int prestart_cycles_for_xmit = STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_XMIT;
469     int prestart_cycles_for_recv = STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_RECV;
470     Util::Configuration &config = m_parent.getConfiguration();
471     config.getValueForSetting("streaming.spm.signal_delay_ticks", signal_delay_ticks);
472     config.getValueForSetting("streaming.spm.sync_wait_time_msec", sync_wait_time_msec);
473     config.getValueForSetting("streaming.spm.cycles_for_startup", cycles_for_startup);
474     config.getValueForSetting("streaming.spm.prestart_cycles_for_xmit", prestart_cycles_for_xmit);
475     config.getValueForSetting("streaming.spm.prestart_cycles_for_recv", prestart_cycles_for_recv);
476
477     // figure out when to get the SP's running.
478     // the xmit SP's should also know the base timestamp
479     // streams should be aligned here
480
481     // now find out how long we have to delay the wait operation such that
482     // the received frames will all be presented to the SP
483     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
484     int max_of_min_delay = 0;
485     int min_delay = 0;
486     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
487             it != m_ReceiveProcessors.end();
488             ++it ) {
489         min_delay = (*it)->getMaxFrameLatency();
490         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
491     }
492
493     // add some processing margin. This only shifts the time
494     // at which the buffer is transfer()'ed. This makes things somewhat
495     // more robust. It should be noted though that shifting the transfer
496     // time to a later time instant also causes the xmit buffer fill to be
497     // lower on average.
498     max_of_min_delay += signal_delay_ticks;
499
500     m_SyncSource->setSyncDelay(max_of_min_delay);
501     unsigned int syncdelay = m_SyncSource->getSyncDelay();
502     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d => %d ticks (%03us %04uc %04ut)...\n",
503         max_of_min_delay, syncdelay,
504         (unsigned int)TICKS_TO_SECS(syncdelay),
505         (unsigned int)TICKS_TO_CYCLES(syncdelay),
506         (unsigned int)TICKS_TO_OFFSET(syncdelay));
507
508     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
509     //        have aquired lock
510     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
511     //sleep(2); // FIXME: be smarter here
512
513     // make sure that we are dry-running long enough for the
514     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
515     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
516
517     unsigned int nb_sync_runs = (sync_wait_time_msec * getNominalRate());
518     nb_sync_runs /= 1000;
519     nb_sync_runs /= getPeriodSize();
520
521     int64_t time_till_next_period;
522     while(nb_sync_runs--) { // or while not sync-ed?
523         // check if we were woken up too soon
524         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
525         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
526         if(time_till_next_period > 0) {
527             // wait for the period
528             SleepRelativeUsec(time_till_next_period);
529         }
530     }
531
532     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
533     // FIXME: in the SPM it would be nice to have system time instead of
534     //        1394 time
535
536     // we now should have decent sync info on the sync source
537     // determine a point in time where the system should start
538     // figure out where we are now
539     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
540     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
541         time_of_first_sample,
542         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
543         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
544         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
545
546     // start wet-running in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles
547     // this is the time window we have to setup all SP's such that they
548     // can start wet-running correctly.
549     time_of_first_sample = addTicks(time_of_first_sample,
550                                     cycles_for_startup * TICKS_PER_CYCLE);
551
552     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
553         time_of_first_sample,
554         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
555         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
556         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
557
558     // we should start wet-running the transmit SP's some cycles in advance
559     // such that we know it is wet-running when it should output its first sample
560     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
561                                                  prestart_cycles_for_xmit * TICKS_PER_CYCLE);
562
563     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
564                                                  prestart_cycles_for_recv * TICKS_PER_CYCLE);
565     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
566         time_to_start_xmit,
567         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
568         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
569         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
570     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
571         time_to_start_recv,
572         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
573         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
574         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
575
576     // at this point the buffer head timestamp of the transmit buffers can be set
577     // this is the presentation time of the first sample in the buffer
578     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
579           it != m_TransmitProcessors.end();
580           ++it ) {
581         (*it)->setBufferHeadTimestamp(time_of_first_sample);
582     }
583
584     // switch syncsource to running state
585     uint64_t time_to_start_sync;
586     // FIXME: this is most likely not going to work for transmit sync sources
587     // but those are unsupported in this version
588     if(m_SyncSource->getType() == StreamProcessor::ePT_Receive ) {
589         time_to_start_sync = time_to_start_recv;
590     } else {
591         time_to_start_sync = time_to_start_xmit;
592     }
593     if(!m_SyncSource->scheduleStartRunning(time_to_start_sync)) {
594         debugError("m_SyncSource->scheduleStartRunning(%11llu) failed\n", time_to_start_sync);
595         return false;
596     }
597
598     // STEP X: switch all non-syncsource SP's over to the running state
599     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
600           it != m_ReceiveProcessors.end();
601           ++it ) {
602         if(*it != m_SyncSource) {
603             if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
604                 debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
605                 return false;
606             }
607         }
608     }
609     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
610           it != m_TransmitProcessors.end();
611           ++it ) {
612         if(*it != m_SyncSource) {
613             if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
614                 debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
615                 return false;
616             }
617         }
618     }
619     // wait for the syncsource to start running.
620     // that will block the waitForPeriod call until everyone has started (theoretically)
621     // note: the SP's are scheduled to start in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles,
622     // so a 20 times this value should be a good timeout
623     int cnt = cycles_for_startup * 20; // by then it should have started
624     while (!m_SyncSource->isRunning() && cnt) {
625         SleepRelativeUsec(125);
626         cnt--;
627     }
628     if(cnt==0) {
629         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
630         return false;
631     }
632
633     // the sync source is running, we can now read a decent received timestamp from it
634     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
635
636     // and a (still very rough) approximation of the rate
637     float rate = m_SyncSource->getTicksPerFrame();
638     int64_t delay_in_ticks=(int64_t)(((float)((m_nb_buffers-1) * m_period)) * rate);
639     // also add the sync delay
640     delay_in_ticks += m_SyncSource->getSyncDelay();
641     debugOutput( DEBUG_LEVEL_VERBOSE, "  initial time of transfer %010lld, rate %f...\n",
642                 m_time_of_transfer, rate);
643
644     // then use this information to initialize the xmit handlers
645
646     //  we now set the buffer tail timestamp of the transmit buffer
647     //  to the period transfer time instant plus what's nb_buffers - 1
648     //  in ticks. This due to the fact that we (should) have received one period
649     //  worth of ticks at t=m_time_of_transfer
650     //  hence one period of frames should also have been transmitted, which means
651     //  that there should be (nb_buffers - 1) * periodsize of frames in the xmit buffer
652     //  that allows us to calculate the tail timestamp for the buffer.
653
654     int64_t transmit_tail_timestamp = addTicks(m_time_of_transfer, delay_in_ticks);
655
656     debugOutput( DEBUG_LEVEL_VERBOSE, "  preset transmit tail TS %010lld, rate %f...\n",
657                 transmit_tail_timestamp, rate);
658
659     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
660         it != m_TransmitProcessors.end();
661         ++it ) {
662         (*it)->setBufferTailTimestamp(transmit_tail_timestamp);
663         (*it)->setTicksPerFrame(rate);
664     }
665
666     // align the received streams to be phase aligned
667     if(!alignReceivedStreams()) {
668         debugError("Could not align streams...\n");
669         return false;
670     }
671
672     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
673     return true;
674 }
675
676 bool
677 StreamProcessorManager::alignReceivedStreams()
678 {
679     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
680     unsigned int nb_sync_runs;
681     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
682     int64_t diff_between_streams[nb_rcv_sp];
683     int64_t diff;
684
685     unsigned int i;
686
687     int cnt = STREAMPROCESSORMANAGER_NB_ALIGN_TRIES;
688     int align_average_time_msec = STREAMPROCESSORMANAGER_ALIGN_AVERAGE_TIME_MSEC;
689     Util::Configuration &config = m_parent.getConfiguration();
690     config.getValueForSetting("streaming.spm.align_tries", cnt);
691     config.getValueForSetting("streaming.spm.align_average_time_msec", align_average_time_msec);
692
693     unsigned int periods_per_align_try = (align_average_time_msec * getNominalRate());
694     periods_per_align_try /= 1000;
695     periods_per_align_try /= getPeriodSize();
696     debugOutput( DEBUG_LEVEL_VERBOSE, " averaging over %u periods...\n", periods_per_align_try);
697
698     bool aligned = false;
699     while (!aligned && cnt--) {
700         nb_sync_runs = periods_per_align_try;
701         while(nb_sync_runs) {
702             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
703             if(!waitForPeriod()) {
704                 debugWarning("xrun while aligning streams...\n");
705                 return false;
706             }
707
708             // before we do anything else, transfer
709             if(!transferSilence()) {
710                 debugError("Could not transfer silence\n");
711                 return false;
712             }
713
714             // now calculate the stream offset
715             i = 0;
716             for ( i = 0; i < nb_rcv_sp; i++) {
717                 StreamProcessor *s = m_ReceiveProcessors.at(i);
718                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
719                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",
720                     m_SyncSource, s, diff);
721                 if ( nb_sync_runs == periods_per_align_try ) {
722                     diff_between_streams[i] = diff;
723                 } else {
724                     diff_between_streams[i] += diff;
725                 }
726             }
727
728             nb_sync_runs--;
729         }
730         // calculate the average offsets
731         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
732         int diff_between_streams_frames[nb_rcv_sp];
733         aligned = true;
734
735         // first find whether the streams are aligned and what their offset is
736         for ( i = 0; i < nb_rcv_sp; i++) {
737             StreamProcessor *s = m_ReceiveProcessors.at(i);
738
739             diff_between_streams[i] /= periods_per_align_try;
740             diff_between_streams_frames[i] = (int)roundf(diff_between_streams[i] / s->getTicksPerFrame());
741             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",
742                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
743
744             aligned &= (diff_between_streams_frames[i] == 0);
745         }
746
747         // if required, align the streams
748         int frames_to_shift_stream[nb_rcv_sp];
749         int min_shift = 9999;
750         if (!aligned) {
751             // find the minimum value (= earliest stream)
752             for ( i = 0; i < nb_rcv_sp; i++) {
753                 if (diff_between_streams_frames[i] < min_shift) {
754                     min_shift = diff_between_streams_frames[i];
755                 }
756             }
757             debugOutput( DEBUG_LEVEL_VERBOSE, " correcting shift with %d frames\n", min_shift);
758             // ensure that the streams are shifted only in the 'positive' direction
759             // i.e. that frames are only dropped, not added since that results
760             // in multiple writers for the data ringbuffer
761             // this also results in 'minimal shift' (not that it's required since the
762             // sync SP is part of the SP set)
763             for ( i = 0; i < nb_rcv_sp; i++) {
764                 frames_to_shift_stream[i] = diff_between_streams_frames[i] - min_shift;
765                 debugOutput(DEBUG_LEVEL_VERBOSE,
766                             "  going to drop %03d frames from stream %d\n",
767                             frames_to_shift_stream[i], i);
768             }
769             // perform the actual shift
770             for ( i = 0; i < nb_rcv_sp; i++) {
771                 StreamProcessor *s = m_ReceiveProcessors.at(i);
772                 // reposition the stream
773                 if(!s->shiftStream(frames_to_shift_stream[i])) {
774                     debugError("Could not shift SP %p %d frames\n", s, frames_to_shift_stream[i]);
775                     return false;
776                 }
777             }
778         }
779
780         if (!aligned) {
781             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
782         }
783     }
784     if (cnt == 0) {
785         debugError("Align failed\n");
786         return false;
787     }
788     return true;
789 }
790
791 bool StreamProcessorManager::start() {
792     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
793
794     // start all SP's synchonized
795     bool start_result = false;
796     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
797         // put all SP's into dry-running state
798         if (!startDryRunning()) {
799             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
800             start_result = false;
801             continue;
802         }
803
804         start_result = syncStartAll();
805         if(start_result) {
806             break;
807         } else {
808             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
809             if(m_shutdown_needed) {
810                 debugOutput(DEBUG_LEVEL_VERBOSE, "Some fatal error occurred, stop trying.\n");
811                 return false;
812             }
813         }
814     }
815     if (!start_result) {
816         debugFatal("Could not syncStartAll...\n");
817         return false;
818     }
819     debugOutput( DEBUG_LEVEL_VERBOSE, " Started...\n");
820     return true;
821 }
822
823 bool StreamProcessorManager::stop() {
824     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
825
826     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
827     // switch SP's over to the dry-running state
828     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
829           it != m_ReceiveProcessors.end();
830           ++it ) {
831         if((*it)->isRunning()) {
832             if(!(*it)->scheduleStopRunning(-1)) {
833                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
834                 return false;
835             }
836         }
837     }
838     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
839           it != m_TransmitProcessors.end();
840           ++it ) {
841         if((*it)->isRunning()) {
842             if(!(*it)->scheduleStopRunning(-1)) {
843                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
844                 return false;
845             }
846         }
847     }
848     // wait for the SP's to get into the dry-running/stopped state
849     int cnt = 8000;
850     bool ready = false;
851     while (!ready && cnt) {
852         ready = true;
853         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
854             it != m_ReceiveProcessors.end();
855             ++it ) {
856             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream() || (*it)->inError());
857         }
858         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
859             it != m_TransmitProcessors.end();
860             ++it ) {
861             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream() || (*it)->inError());
862         }
863         SleepRelativeUsec(125);
864         cnt--;
865     }
866     if(cnt==0) {
867         debugWarning(" Timeout waiting for the SP's to start dry-running\n");
868         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
869             it != m_ReceiveProcessors.end();
870             ++it ) {
871             (*it)->dumpInfo();
872         }
873         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
874             it != m_TransmitProcessors.end();
875             ++it ) {
876             (*it)->dumpInfo();
877         }
878         return false;
879     }
880
881     // switch SP's over to the stopped state
882     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
883           it != m_ReceiveProcessors.end();
884           ++it ) {
885         if ((*it)->inError()) {
886             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
887         } else if(!(*it)->scheduleStopDryRunning(-1)) {
888             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
889             return false;
890         }
891     }
892     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
893           it != m_TransmitProcessors.end();
894           ++it ) {
895         if ((*it)->inError()) {
896             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
897         } else if(!(*it)->scheduleStopDryRunning(-1)) {
898             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
899             return false;
900         }
901     }
902     // wait for the SP's to get into the stopped state
903     cnt = 8000;
904     ready = false;
905     while (!ready && cnt) {
906         ready = true;
907         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
908             it != m_ReceiveProcessors.end();
909             ++it ) {
910             ready &= ((*it)->isStopped() || (*it)->inError());
911         }
912         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
913             it != m_TransmitProcessors.end();
914             ++it ) {
915             ready &= ((*it)->isStopped() || (*it)->inError());
916         }
917         SleepRelativeUsec(125);
918         cnt--;
919     }
920     if(cnt==0) {
921         debugWarning(" Timeout waiting for the SP's to stop\n");
922         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
923             it != m_ReceiveProcessors.end();
924             ++it ) {
925             (*it)->dumpInfo();
926         }
927         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
928             it != m_TransmitProcessors.end();
929             ++it ) {
930             (*it)->dumpInfo();
931         }
932         return false;
933     }
934     debugOutput( DEBUG_LEVEL_VERBOSE, " Stopped...\n");
935     return true;
936 }
937
938 /**
939  * Called upon Xrun events. This brings all StreamProcessors back
940  * into their starting state, and then carries on streaming. This should
941  * have the same effect as restarting the whole thing.
942  *
943  * @return true if successful, false otherwise
944  */
945 bool StreamProcessorManager::handleXrun() {
946
947     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
948
949     dumpInfo();
950
951     /*
952      * Reset means:
953      * 1) Disabling the SP's, so that they don't process any packets
954      *    note: the isomanager does keep on delivering/requesting them
955      * 2) Bringing all buffers & streamprocessors into a know state
956      *    - Clear all capture buffers
957      *    - Put nb_periods*period_size of null frames into the playback buffers
958      * 3) Re-enable the SP's
959      */
960
961     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
962     // start all SP's synchonized
963     bool start_result = false;
964     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
965         if(m_shutdown_needed) {
966             debugOutput(DEBUG_LEVEL_VERBOSE, "Shutdown requested...\n");
967             return true;
968         }
969         // put all SP's into dry-running state
970         if (!startDryRunning()) {
971             debugShowBackLog();
972             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
973             start_result = false;
974             continue;
975         }
976
977         start_result = syncStartAll();
978         if(start_result) {
979             break;
980         } else {
981             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
982         }
983     }
984     if (!start_result) {
985         debugFatal("Could not syncStartAll...\n");
986         return false;
987     }
988     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
989
990     return true;
991 }
992
993 /**
994  * @brief Waits until the next period of samples is ready
995  *
996  * This function does not return until a full period of samples is (or should be)
997  * ready to be transferred.
998  *
999  * @return true if the period is ready, false if not
1000  */
1001 bool StreamProcessorManager::waitForPeriod() {
1002     if(m_SyncSource == NULL) return false;
1003     if(m_shutdown_needed) return false;
1004     bool xrun_occurred = false;
1005     bool in_error = false;
1006
1007     // grab the wait lock
1008     // this ensures that bus reset handling doesn't interfere
1009     Util::MutexLockHelper lock(*m_WaitLock);
1010     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1011                         "waiting for period (%d frames in buffer)...\n",
1012                         m_SyncSource->getBufferFill());
1013     uint64_t ticks_at_period = m_SyncSource->getTimeAtPeriod();
1014     uint64_t ticks_at_period_margin = ticks_at_period + m_SyncSource->getSyncDelay();
1015     uint64_t pred_system_time_at_xfer = m_SyncSource->getParent().get1394Service().getSystemTimeForCycleTimerTicks(ticks_at_period_margin);
1016
1017     #ifdef DEBUG
1018     int64_t now = Util::SystemTimeSource::getCurrentTime();
1019     debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "pred: %lld, now: %lld, wait: %lld\n", pred_system_time_at_xfer, now, pred_system_time_at_xfer-now );
1020     #endif
1021
1022     // wait until it's time to transfer
1023     Util::SystemTimeSource::SleepUsecAbsolute(pred_system_time_at_xfer);
1024
1025     #ifdef DEBUG
1026     now = Util::SystemTimeSource::getCurrentTime();
1027     debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "pred: %lld now: %lld, excess: %lld\n", pred_system_time_at_xfer, now, now-pred_system_time_at_xfer );
1028     #endif
1029
1030     // the period should be ready now
1031
1032     #if STREAMPROCESSORMANAGER_ALLOW_DELAYED_PERIOD_SIGNAL
1033     // HACK: we force wait until every SP is ready. this is needed
1034     // since the raw1394 interface provides no control over interrupts
1035     // resulting in very bad predictability on when the data is present.
1036     bool period_not_ready = true;
1037     while(period_not_ready) {
1038         period_not_ready = false;
1039         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1040             it != m_ReceiveProcessors.end();
1041             ++it ) {
1042             bool this_sp_period_ready = (*it)->canConsumePeriod();
1043             if (!this_sp_period_ready) {
1044                 period_not_ready = true;
1045             }
1046         }
1047         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1048             it != m_TransmitProcessors.end();
1049             ++it ) {
1050             bool this_sp_period_ready = (*it)->canProducePeriod();
1051             if (!this_sp_period_ready) {
1052                 period_not_ready = true;
1053             }
1054         }
1055
1056         if (period_not_ready) {
1057             debugOutput(DEBUG_LEVEL_VERBOSE, " wait extended since period not ready...\n");
1058             Util::SystemTimeSource::SleepUsecRelative(125); // one cycle
1059         }
1060
1061         // check for underruns/errors on the ISO side,
1062         // those should make us bail out of the wait loop
1063         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1064             it != m_ReceiveProcessors.end();
1065             ++it ) {
1066             // a xrun has occurred on the Iso side
1067             xrun_occurred |= (*it)->xrunOccurred();
1068             in_error |= (*it)->inError();
1069         }
1070         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1071             it != m_TransmitProcessors.end();
1072             ++it ) {
1073             // a xrun has occurred on the Iso side
1074             xrun_occurred |= (*it)->xrunOccurred();
1075             in_error |= (*it)->inError();
1076         }
1077         if(xrun_occurred | in_error | m_shutdown_needed) break;
1078     }
1079     #else
1080     // check for underruns/errors on the ISO side,
1081     // those should make us bail out of the wait loop
1082     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1083         it != m_ReceiveProcessors.end();
1084         ++it ) {
1085         // xrun on data buffer side
1086         if (!(*it)->canConsumePeriod()) {
1087             xrun_occurred = true;
1088         }
1089         // a xrun has occurred on the Iso side
1090         xrun_occurred |= (*it)->xrunOccurred();
1091         in_error |= (*it)->inError();
1092     }
1093     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1094         it != m_TransmitProcessors.end();
1095         ++it ) {
1096         // xrun on data buffer side
1097         if (!(*it)->canProducePeriod()) {
1098             xrun_occurred = true;
1099         }
1100         // a xrun has occurred on the Iso side
1101         xrun_occurred |= (*it)->xrunOccurred();
1102         in_error |= (*it)->inError();
1103     }
1104     #endif
1105
1106     if(xrun_occurred) {
1107         debugOutput( DEBUG_LEVEL_VERBOSE, "exit due to xrun...\n");
1108     }
1109     if(in_error) {
1110         debugOutput( DEBUG_LEVEL_VERBOSE, "exit due to error...\n");
1111         m_shutdown_needed = true;
1112     }
1113
1114     // we save the 'ideal' time of the transfer at this point,
1115     // because we can have interleaved read - process - write
1116     // cycles making that we modify a receiving stream's buffer
1117     // before we get to writing.
1118     // NOTE: before waitForPeriod() is called again, both the transmit
1119     //       and the receive processors should have done their transfer.
1120     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
1121    
1122     #ifdef DEBUG
1123     static uint64_t m_time_of_transfer2 = m_time_of_transfer;
1124    
1125     int ticks_per_period = (int)(m_SyncSource->getTicksPerFrame() * m_period);
1126     int diff=diffTicks(m_time_of_transfer, m_time_of_transfer2);
1127     // display message if the difference between two successive tick
1128     // values is more than 50 ticks. 1 sample at 48k is 512 ticks
1129     // so 50 ticks = 10%, which is a rather large jitter value.
1130     if(diff-ticks_per_period > 50 || diff-ticks_per_period < -50) {
1131         debugOutput(DEBUG_LEVEL_VERBOSE, "rather large TSP difference TS=%011llu => TS=%011llu (%d, nom %d)\n",
1132                                             m_time_of_transfer2, m_time_of_transfer, diff, ticks_per_period);
1133     }
1134     m_time_of_transfer2 = m_time_of_transfer;
1135     #endif
1136
1137     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1138                         "transfer period %d at %llu ticks...\n",
1139                         m_nbperiods, m_time_of_transfer);
1140
1141     // this is to notify the client of the delay that we introduced by waiting
1142     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
1143     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1144                         "delayed for %d usecs...\n",
1145                         m_delayed_usecs);
1146
1147 #ifdef DEBUG
1148     int rcv_bf=0, xmt_bf=0;
1149     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1150         it != m_ReceiveProcessors.end();
1151         ++it ) {
1152         rcv_bf = (*it)->getBufferFill();
1153     }
1154     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1155         it != m_TransmitProcessors.end();
1156         ++it ) {
1157         xmt_bf = (*it)->getBufferFill();
1158     }
1159     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
1160                         "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
1161                         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
1162
1163     // check if xruns occurred on the Iso side.
1164     // also check if xruns will occur should we transfer() now
1165     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1166           it != m_ReceiveProcessors.end();
1167           ++it ) {
1168
1169         if ((*it)->xrunOccurred()) {
1170             debugOutput(DEBUG_LEVEL_NORMAL,
1171                         "Xrun on RECV SP %p due to ISO side xrun\n", *it);
1172             (*it)->dumpInfo();
1173         }
1174         if (!((*it)->canClientTransferFrames(m_period))) {
1175             debugOutput(DEBUG_LEVEL_NORMAL,
1176                         "Xrun on RECV SP %p due to buffer side xrun\n", *it);
1177             (*it)->dumpInfo();
1178         }
1179     }
1180     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1181           it != m_TransmitProcessors.end();
1182           ++it ) {
1183         if ((*it)->xrunOccurred()) {
1184             debugOutput(DEBUG_LEVEL_NORMAL,
1185                         "Xrun on XMIT SP %p due to ISO side xrun\n", *it);
1186         }
1187         if (!((*it)->canClientTransferFrames(m_period))) {
1188             debugOutput(DEBUG_LEVEL_NORMAL,
1189                         "Xrun on XMIT SP %p due to buffer side xrun\n", *it);
1190         }
1191     }
1192 #endif
1193     m_nbperiods++;
1194     // now we can signal the client that we are (should be) ready
1195     return !xrun_occurred;
1196 }
1197
1198 /**
1199  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
1200  *
1201  * Transfers one period of frames from the client side to the Iso side and vice versa.
1202  *
1203  * @return true if successful, false otherwise (indicates xrun).
1204  */
1205 bool StreamProcessorManager::transfer() {
1206     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
1207     bool retval=true;
1208     retval &= transfer(StreamProcessor::ePT_Receive);
1209     retval &= transfer(StreamProcessor::ePT_Transmit);
1210     return retval;
1211 }
1212
1213 /**
1214  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
1215  *
1216  * Transfers one period of frames from the client side to the Iso side or vice versa.
1217  *
1218  * @param t The processor type to tranfer for (receive or transmit)
1219  * @return true if successful, false otherwise (indicates xrun).
1220  */
1221 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
1222     if(m_SyncSource == NULL) return false;
1223     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
1224         "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
1225         t, m_time_of_transfer,
1226         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
1227         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
1228         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
1229
1230     bool retval = true;
1231     // a static cast could make sure that there is no performance
1232     // penalty for the virtual functions (to be checked)
1233     if (t==StreamProcessor::ePT_Receive) {
1234         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1235                 it != m_ReceiveProcessors.end();
1236                 ++it ) {
1237             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
1238                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
1239                             m_period, m_time_of_transfer,*it);
1240                 retval &= false; // buffer underrun
1241             }
1242         }
1243     } else {
1244         // FIXME: in the SPM it would be nice to have system time instead of
1245         //        1394 time
1246         float rate = m_SyncSource->getTicksPerFrame();
1247         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)((m_nb_buffers * m_period))) * rate);
1248
1249         // the data we are putting into the buffer is intended to be transmitted
1250         // one ringbuffer size after it has been received
1251
1252         // we also add one syncdelay as a safety margin, since that's the amount of time we can get
1253         // postponed.
1254         int syncdelay = m_SyncSource->getSyncDelay();
1255         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks + syncdelay);
1256
1257         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1258                 it != m_TransmitProcessors.end();
1259                 ++it ) {
1260             // FIXME: in the SPM it would be nice to have system time instead of
1261             //        1394 time
1262             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
1263                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
1264                         m_period, transmit_timestamp, *it);
1265                 retval &= false; // buffer underrun
1266             }
1267         }
1268     }
1269     return retval;
1270 }
1271
1272 /**
1273  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
1274  *
1275  * Transfers one period of silence to the Iso side for transmit SP's
1276  * or dump one period of frames for receive SP's
1277  *
1278  * @return true if successful, false otherwise (indicates xrun).
1279  */
1280 bool StreamProcessorManager::transferSilence() {
1281     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
1282     bool retval=true;
1283     // NOTE: the order here is opposite from the order in
1284     // normal operation (transmit is before receive), because
1285     // we can do that here (data=silence=available) and
1286     // it increases reliability (esp. on startup)
1287     retval &= transferSilence(StreamProcessor::ePT_Transmit);
1288     retval &= transferSilence(StreamProcessor::ePT_Receive);
1289     return retval;
1290 }
1291
1292 /**
1293  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
1294  *
1295  * Transfers one period of silence to the Iso side for transmit SP's
1296  * or dump one period of frames for receive SP's
1297  *
1298  * @param t The processor type to tranfer for (receive or transmit)
1299  * @return true if successful, false otherwise (indicates xrun).
1300  */
1301 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
1302     if(m_SyncSource == NULL) return false;
1303     debugOutput( DEBUG_LEVEL_VERY_VERBOSE,
1304         "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
1305         t, m_time_of_transfer,
1306         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
1307         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
1308         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
1309
1310     bool retval = true;
1311     // a static cast could make sure that there is no performance
1312     // penalty for the virtual functions (to be checked)
1313     if (t==StreamProcessor::ePT_Receive) {
1314         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1315                 it != m_ReceiveProcessors.end();
1316                 ++it ) {
1317             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
1318                     debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n",
1319                             m_period, m_time_of_transfer,*it);
1320                 retval &= false; // buffer underrun
1321             }
1322         }
1323     } else {
1324         // FIXME: in the SPM it would be nice to have system time instead of
1325         //        1394 time
1326         float rate = m_SyncSource->getTicksPerFrame();
1327         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
1328
1329         // the data we are putting into the buffer is intended to be transmitted
1330         // one ringbuffer size after it has been received
1331         // we also add one syncdelay as a safety margin, since that's the amount of time we can get
1332         // postponed.
1333         int syncdelay = m_SyncSource->getSyncDelay();
1334         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks + syncdelay);
1335
1336         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1337                 it != m_TransmitProcessors.end();
1338                 ++it ) {
1339             // FIXME: in the SPM it would be nice to have system time instead of
1340             //        1394 time
1341             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
1342                 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
1343                         m_period, transmit_timestamp, *it);
1344                 retval &= false; // buffer underrun
1345             }
1346         }
1347     }
1348     return retval;
1349 }
1350
1351 void StreamProcessorManager::dumpInfo() {
1352     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1353     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
1354     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
1355     debugOutputShort( DEBUG_LEVEL_NORMAL, "Data type: %s\n", (m_audio_datatype==eADT_Float?"float":"int24"));
1356
1357     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
1358     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1359         it != m_ReceiveProcessors.end();
1360         ++it ) {
1361         (*it)->dumpInfo();
1362     }
1363
1364     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
1365     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1366         it != m_TransmitProcessors.end();
1367         ++it ) {
1368         (*it)->dumpInfo();
1369     }
1370
1371     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1372
1373     // list port info in verbose mode
1374     debugOutputShort( DEBUG_LEVEL_VERBOSE, "Port Information\n");
1375     int nb_ports;
1376    
1377     debugOutputShort( DEBUG_LEVEL_VERBOSE, " Playback\n");
1378     nb_ports = getPortCount(Port::E_Playback);
1379     for(int i=0; i < nb_ports; i++) {
1380         Port *p = getPortByIndex(i, Port::E_Playback);
1381         debugOutputShort( DEBUG_LEVEL_VERBOSE, "  %3d (%p): ", i, p);
1382         if (p) {
1383             bool disabled = p->isDisabled();
1384             debugOutputShort( DEBUG_LEVEL_VERBOSE, "[%p] [%3s] ", &p->getManager(), (disabled?"off":"on"));
1385             debugOutputShort( DEBUG_LEVEL_VERBOSE, "[%7s] ", p->getPortTypeName().c_str());
1386             debugOutputShort( DEBUG_LEVEL_VERBOSE, "%3s ", p->getName().c_str());
1387         } else {
1388             debugOutputShort( DEBUG_LEVEL_VERBOSE, "invalid ");
1389         }
1390         debugOutputShort( DEBUG_LEVEL_VERBOSE, "\n");
1391     }
1392     debugOutputShort( DEBUG_LEVEL_VERBOSE, " Capture\n");
1393     nb_ports = getPortCount(Port::E_Capture);
1394     for(int i=0; i < nb_ports; i++) {
1395         Port *p = getPortByIndex(i, Port::E_Capture);
1396         debugOutputShort( DEBUG_LEVEL_VERBOSE, "  %3d (%p): ", i, p);
1397         if (p) {
1398             bool disabled = p->isDisabled();
1399             debugOutputShort( DEBUG_LEVEL_VERBOSE, "[%p] [%3s] ", &p->getManager(), (disabled?"off":"on"));
1400             debugOutputShort( DEBUG_LEVEL_VERBOSE, "[%7s] ", p->getPortTypeName().c_str());
1401             debugOutputShort( DEBUG_LEVEL_VERBOSE, " %3s ", p->getName().c_str());
1402         } else {
1403             debugOutputShort( DEBUG_LEVEL_VERBOSE, " invalid ");
1404         }
1405         debugOutputShort( DEBUG_LEVEL_VERBOSE, "\n");
1406     }
1407
1408     debugOutputShort( DEBUG_LEVEL_VERBOSE, "----------------------------------------------------\n");
1409
1410 }
1411
1412 void StreamProcessorManager::setVerboseLevel(int l) {
1413     if(m_WaitLock) m_WaitLock->setVerboseLevel(l);
1414
1415     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1416         it != m_ReceiveProcessors.end();
1417         ++it ) {
1418         (*it)->setVerboseLevel(l);
1419     }
1420     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1421         it != m_TransmitProcessors.end();
1422         ++it ) {
1423         (*it)->setVerboseLevel(l);
1424     }
1425     setDebugLevel(l);
1426     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting verbose level to %d...\n", l );
1427 }
1428
1429 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1430     int count=0;
1431
1432     if (direction == Port::E_Capture) {
1433         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1434             it != m_ReceiveProcessors.end();
1435             ++it ) {
1436             count += (*it)->getPortCount(type);
1437         }
1438     } else {
1439         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1440             it != m_TransmitProcessors.end();
1441             ++it ) {
1442             count += (*it)->getPortCount(type);
1443         }
1444     }
1445     return count;
1446 }
1447
1448 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1449     int count=0;
1450
1451     if (direction == Port::E_Capture) {
1452         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1453             it != m_ReceiveProcessors.end();
1454             ++it ) {
1455             count += (*it)->getPortCount();
1456         }
1457     } else {
1458         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1459             it != m_TransmitProcessors.end();
1460             ++it ) {
1461             count += (*it)->getPortCount();
1462         }
1463     }
1464     return count;
1465 }
1466
1467 void
1468 StreamProcessorManager::updateShadowLists()
1469 {
1470     debugOutput( DEBUG_LEVEL_VERBOSE, "Updating port shadow lists...\n");
1471     m_CapturePorts_shadow.clear();
1472     m_PlaybackPorts_shadow.clear();
1473
1474     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1475         it != m_ReceiveProcessors.end();
1476         ++it ) {
1477         PortManager *pm = *it;
1478         for (int i=0; i < pm->getPortCount(); i++) {
1479             Port *p = pm->getPortAtIdx(i);
1480             if (!p) {
1481                 debugError("getPortAtIdx(%d) returned NULL\n", i);
1482                 continue;
1483             }
1484             if(p->getDirection() != Port::E_Capture) {
1485                 debugError("port at idx %d for receive SP is not a capture port!\n", i);
1486                 continue;
1487             }
1488             m_CapturePorts_shadow.push_back(p);
1489         }
1490     }
1491     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1492         it != m_TransmitProcessors.end();
1493         ++it ) {
1494         PortManager *pm = *it;
1495         for (int i=0; i < pm->getPortCount(); i++) {
1496             Port *p = pm->getPortAtIdx(i);
1497             if (!p) {
1498                 debugError("getPortAtIdx(%d) returned NULL\n", i);
1499                 continue;
1500             }
1501             if(p->getDirection() != Port::E_Playback) {
1502                 debugError("port at idx %d for transmit SP is not a playback port!\n", i);
1503                 continue;
1504             }
1505             m_PlaybackPorts_shadow.push_back(p);
1506         }
1507     }
1508 }
1509
1510 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1511     debugOutputExtreme( DEBUG_LEVEL_ULTRA_VERBOSE, "getPortByIndex(%d, %d)...\n", idx, direction);
1512     if (direction == Port::E_Capture) {
1513         #ifdef DEBUG
1514         if(idx >= (int)m_CapturePorts_shadow.size()) {
1515             debugError("Capture port %d out of range (%d)\n", idx, m_CapturePorts_shadow.size());
1516             return NULL;
1517         }
1518         #endif
1519         return m_CapturePorts_shadow.at(idx);
1520     } else {
1521         #ifdef DEBUG
1522         if(idx >= (int)m_PlaybackPorts_shadow.size()) {
1523             debugError("Playback port %d out of range (%d)\n", idx, m_PlaybackPorts_shadow.size());
1524             return NULL;
1525         }
1526         #endif
1527         return m_PlaybackPorts_shadow.at(idx);
1528     }
1529     return NULL;
1530 }
1531
1532 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1533     m_thread_realtime=rt;
1534     m_thread_priority=priority;
1535     return true;
1536 }
1537
1538
1539 } // end of namespace
Note: See TracBrowser for help on using the browser.