root/branches/libffado-2.0/src/libstreaming/StreamProcessorManager.cpp

Revision 1528, 59.1 kB (checked in by ppalmers, 15 years ago)

cosmetic changes

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23 #include "config.h"
24
25 #include "StreamProcessorManager.h"
26 #include "generic/StreamProcessor.h"
27 #include "generic/Port.h"
28 #include "libieee1394/cycletimer.h"
29
30 #include "devicemanager.h"
31
32 #include "libutil/Time.h"
33
34 #include <errno.h>
35 #include <assert.h>
36 #include <math.h>
37
38 namespace Streaming {
39
40 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
41
42 StreamProcessorManager::StreamProcessorManager(DeviceManager &p)
43     : m_time_of_transfer ( 0 )
44     #ifdef DEBUG
45     , m_time_of_transfer2 ( 0 )
46     #endif
47     , m_is_slave( false )
48     , m_SyncSource(NULL)
49     , m_parent( p )
50     , m_xrun_happened( false )
51     , m_activity_wait_timeout_nsec( 0 ) // dynamically set
52     , m_nb_buffers( 0 )
53     , m_period( 0 )
54     , m_audio_datatype( eADT_Float )
55     , m_nominal_framerate ( 0 )
56     , m_xruns(0)
57     , m_shutdown_needed(false)
58     , m_nbperiods(0)
59     , m_WaitLock( new Util::PosixMutex("SPMWAIT") )
60 {
61     addOption(Util::OptionContainer::Option("slaveMode",false));
62     sem_init(&m_activity_semaphore, 0, 0);
63 }
64
65 StreamProcessorManager::StreamProcessorManager(DeviceManager &p, unsigned int period,
66                                                unsigned int framerate, unsigned int nb_buffers)
67     : m_time_of_transfer ( 0 )
68     #ifdef DEBUG
69     , m_time_of_transfer2 ( 0 )
70     #endif
71     , m_is_slave( false )
72     , m_SyncSource(NULL)
73     , m_parent( p )
74     , m_xrun_happened( false )
75     , m_activity_wait_timeout_nsec( 0 ) // dynamically set
76     , m_nb_buffers(nb_buffers)
77     , m_period(period)
78     , m_audio_datatype( eADT_Float )
79     , m_nominal_framerate ( framerate )
80     , m_xruns(0)
81     , m_shutdown_needed(false)
82     , m_nbperiods(0)
83     , m_WaitLock( new Util::PosixMutex("SPMWAIT") )
84 {
85     addOption(Util::OptionContainer::Option("slaveMode",false));
86     sem_init(&m_activity_semaphore, 0, 0);
87 }
88
89 StreamProcessorManager::~StreamProcessorManager() {
90     sem_post(&m_activity_semaphore);
91     sem_destroy(&m_activity_semaphore);
92     delete m_WaitLock;
93 }
94
95 // void
96 // StreamProcessorManager::handleBusReset(Ieee1394Service &s)
97 // {
98 // //     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) Handle bus reset on service %p...\n", this, &s);
99 // //
100 // //     bool handled_at_least_one = false;
101 // //     // note that all receive streams are gone once a device is unplugged
102 // //
103 // //     // synchronize with the wait lock
104 // //     Util::MutexLockHelper lock(*m_WaitLock);
105 // //
106 // //     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) got wait lock...\n", this);
107 // //     // cause all SP's to bail out
108 // //     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
109 // //           it != m_ReceiveProcessors.end();
110 // //           ++it )
111 // //     {
112 // //         if(&s == &((*it)->getParent().get1394Service())) {
113 // //             debugOutput(DEBUG_LEVEL_NORMAL,
114 // //                         "issue busreset on receive SPM on channel %d\n",
115 // //                         (*it)->getChannel());
116 // //             (*it)->handleBusReset();
117 // //             handled_at_least_one = true;
118 // //         } else {
119 // //             debugOutput(DEBUG_LEVEL_NORMAL,
120 // //                         "skipping receive SPM on channel %d since not on service %p\n",
121 // //                         (*it)->getChannel(), &s);
122 // //         }
123 // //     }
124 // //     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
125 // //           it != m_TransmitProcessors.end();
126 // //           ++it )
127 // //     {
128 // //         if(&s == &((*it)->getParent().get1394Service())) {
129 // //             debugOutput(DEBUG_LEVEL_NORMAL,
130 // //                         "issue busreset on transmit SPM on channel %d\n",
131 // //                         (*it)->getChannel());
132 // //             (*it)->handleBusReset();
133 // //             handled_at_least_one = true;
134 // //         } else {
135 // //             debugOutput(DEBUG_LEVEL_NORMAL,
136 // //                         "skipping transmit SPM on channel %d since not on service %p\n",
137 // //                         (*it)->getChannel(), &s);
138 // //         }
139 // //     }
140 // //
141 // //     // FIXME: we request shutdown for now.
142 // //     m_shutdown_needed = handled_at_least_one;
143 // }
144
145 void
146 StreamProcessorManager::signalActivity()
147 {
148     sem_post(&m_activity_semaphore);
149     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p activity\n", this);
150 }
151
152 enum StreamProcessorManager::eActivityResult
153 StreamProcessorManager::waitForActivity()
154 {
155     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p waiting for activity\n", this);
156     struct timespec ts;
157     int result;
158
159     if (m_activity_wait_timeout_nsec >= 0) {
160
161         if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
162             debugError("clock_gettime failed\n");
163             return eAR_Error;
164         }
165         ts.tv_nsec += m_activity_wait_timeout_nsec;
166         while(ts.tv_nsec >= 1000000000LL) {
167             ts.tv_sec += 1;
168             ts.tv_nsec -= 1000000000LL;
169         }
170     }
171
172     if (m_activity_wait_timeout_nsec >= 0) {
173         result = sem_timedwait(&m_activity_semaphore, &ts);
174     } else {
175         result = sem_wait(&m_activity_semaphore);
176     }
177
178     if(result != 0) {
179         if (errno == ETIMEDOUT) {
180             debugOutput(DEBUG_LEVEL_VERBOSE,
181                         "(%p) sem_timedwait() timed out (result=%d)\n",
182                         this, result);
183             return eAR_Timeout;
184         } else if (errno == EINTR) {
185             debugOutput(DEBUG_LEVEL_VERBOSE,
186                         "(%p) sem_[timed]wait() interrupted by signal (result=%d)\n",
187                         this, result);
188             return eAR_Interrupted;
189         } else if (errno == EINVAL) {
190             debugError("(%p) sem_[timed]wait error (result=%d errno=EINVAL)\n",
191                         this, result);
192             debugError("(%p) timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n",
193                        this, m_activity_wait_timeout_nsec, ts.tv_sec, ts.tv_nsec);
194             return eAR_Error;
195         } else {
196             debugError("(%p) sem_[timed]wait error (result=%d errno=%d)\n",
197                         this, result, errno);
198             debugError("(%p) timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n",
199                        this, m_activity_wait_timeout_nsec, ts.tv_sec, ts.tv_nsec);
200             return eAR_Error;
201         }
202     }
203
204     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p got activity\n", this);
205     return eAR_Activity;
206 }
207
208 /**
209  * Registers \ref processor with this manager.
210  *
211  * also registers it with the isohandlermanager
212  *
213  * be sure to call isohandlermanager->init() first!
214  * and be sure that the processors are also ->init()'ed
215  *
216  * @param processor
217  * @return true if successfull
218  */
219 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
220 {
221     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
222     assert(processor);
223     if (processor->getType() == StreamProcessor::ePT_Receive) {
224         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
225         m_ReceiveProcessors.push_back(processor);
226         Util::Functor* f = new Util::MemberFunctor0< StreamProcessorManager*, void (StreamProcessorManager::*)() >
227                     ( this, &StreamProcessorManager::updateShadowLists, false );
228         processor->addPortManagerUpdateHandler(f);
229         updateShadowLists();
230         return true;
231     }
232     if (processor->getType() == StreamProcessor::ePT_Transmit) {
233         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
234         m_TransmitProcessors.push_back(processor);
235         Util::Functor* f = new Util::MemberFunctor0< StreamProcessorManager*, void (StreamProcessorManager::*)() >
236                     ( this, &StreamProcessorManager::updateShadowLists, false );
237         processor->addPortManagerUpdateHandler(f);
238         updateShadowLists();
239         return true;
240     }
241
242     debugFatal("Unsupported processor type!\n");
243     return false;
244 }
245
246 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
247 {
248     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
249     assert(processor);
250
251     if (processor->getType()==StreamProcessor::ePT_Receive) {
252
253         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
254               it != m_ReceiveProcessors.end();
255               ++it )
256         {
257             if ( *it == processor ) {
258                 if (*it == m_SyncSource) {
259                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
260                     m_SyncSource = NULL;
261                 }
262                 m_ReceiveProcessors.erase(it);
263                 // remove the functor
264                 Util::Functor * f = processor->getUpdateHandlerForPtr(this);
265                 if(f) {
266                     processor->remPortManagerUpdateHandler(f);
267                     delete f;
268                 }
269                 updateShadowLists();
270                 return true;
271             }
272         }
273     }
274
275     if (processor->getType()==StreamProcessor::ePT_Transmit) {
276         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
277               it != m_TransmitProcessors.end();
278               ++it )
279         {
280             if ( *it == processor ) {
281                 if (*it == m_SyncSource) {
282                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
283                     m_SyncSource = NULL;
284                 }
285                 m_TransmitProcessors.erase(it);
286                 // remove the functor
287                 Util::Functor * f = processor->getUpdateHandlerForPtr(this);
288                 if(f) {
289                     processor->remPortManagerUpdateHandler(f);
290                     delete f;
291                 }
292                 updateShadowLists();
293                 return true;
294             }
295         }
296     }
297
298     debugFatal("Processor (%p) not found!\n",processor);
299     return false; //not found
300 }
301
302 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
303     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
304     m_SyncSource = s;
305     return true;
306 }
307
308 bool StreamProcessorManager::prepare() {
309
310     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
311     m_is_slave=false;
312     if(!getOption("slaveMode", m_is_slave)) {
313         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
314     }
315
316     m_shutdown_needed=false;
317
318     // if no sync source is set, select one here
319     if(m_SyncSource == NULL) {
320        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
321     }
322
323     // FIXME: put into separate method
324     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
325           it != m_ReceiveProcessors.end();
326           ++it )
327     {
328         if(m_SyncSource == NULL) {
329             debugWarning(" => Sync Source is %p.\n", *it);
330             m_SyncSource = *it;
331         }
332     }
333     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
334           it != m_TransmitProcessors.end();
335           ++it )
336     {
337         if(m_SyncSource == NULL) {
338             debugWarning(" => Sync Source is %p.\n", *it);
339             m_SyncSource = *it;
340         }
341     }
342
343     // now do the actual preparation of the SP's
344     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
345     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
346         it != m_ReceiveProcessors.end();
347         ++it ) {
348
349         if(!(*it)->setOption("slaveMode", m_is_slave)) {
350             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
351         }
352
353         if(!(*it)->prepare()) {
354             debugFatal(  " could not prepare (%p)...\n",(*it));
355             return false;
356         }
357     }
358     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
359     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
360         it != m_TransmitProcessors.end();
361         ++it ) {
362         if(!(*it)->setOption("slaveMode", m_is_slave)) {
363             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
364         }
365         if(!(*it)->prepare()) {
366             debugFatal( " could not prepare (%p)...\n",(*it));
367             return false;
368         }
369     }
370
371     // if there are no stream processors registered,
372     // fail
373     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
374         debugFatal("No stream processors registered, can't do anything usefull\n");
375         return false;
376     }
377
378     // set the activity timeout value to two periods worth of usecs.
379     // since we can expect activity once every period, but we might have some
380     // offset, the safe value is two periods.
381     int timeout_usec = 2*1000LL * 1000LL * m_period / m_nominal_framerate;
382     debugOutput(DEBUG_LEVEL_VERBOSE, "setting activity timeout to %d\n", timeout_usec);
383     setActivityWaitTimeoutUsec(timeout_usec);
384
385     updateShadowLists();
386
387     return true;
388 }
389
390 bool
391 StreamProcessorManager::startDryRunning()
392 {
393     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
394     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
395     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
396             it != m_TransmitProcessors.end();
397             ++it ) {
398         if ((*it)->inError()) {
399             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
400             return false;
401         }
402         if (!(*it)->isDryRunning()) {
403             if(!(*it)->scheduleStartDryRunning(-1)) {
404                 debugError("Could not put SP %p into the dry-running state\n", *it);
405                 return false;
406             }
407         } else {
408             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
409         }
410     }
411     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
412             it != m_ReceiveProcessors.end();
413             ++it ) {
414         if ((*it)->inError()) {
415             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
416             return false;
417         }
418         if (!(*it)->isDryRunning()) {
419             if(!(*it)->scheduleStartDryRunning(-1)) {
420                 debugError("Could not put SP %p into the dry-running state\n", *it);
421                 return false;
422             }
423         } else {
424             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
425         }
426     }
427     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
428     // wait for the syncsource to start running.
429     // that will block the waitForPeriod call until everyone has started (theoretically)
430     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_DRYRUN; // by then it should have started
431     bool all_dry_running = false;
432     while (!all_dry_running && cnt) {
433         all_dry_running = true;
434         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
435                 it != m_ReceiveProcessors.end();
436                 ++it ) {
437             all_dry_running &= (*it)->isDryRunning();
438         }
439         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
440                 it != m_TransmitProcessors.end();
441                 ++it ) {
442             all_dry_running &= (*it)->isDryRunning();
443         }
444
445         SleepRelativeUsec(125);
446         cnt--;
447     }
448     if(cnt==0) {
449         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
450         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
451                 it != m_ReceiveProcessors.end();
452                 ++it ) {
453             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
454                 (*it)->getTypeString(), *it, (*it)->getStateString());
455         }
456         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
457                 it != m_TransmitProcessors.end();
458                 ++it ) {
459             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
460                 (*it)->getTypeString(), *it, (*it)->getStateString());
461         }
462         return false;
463     }
464     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
465     return true;
466 }
467
468 bool StreamProcessorManager::syncStartAll() {
469     if(m_SyncSource == NULL) return false;
470
471     // get the options
472     int signal_delay_ticks = STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS;
473     int sync_wait_time_msec = STREAMPROCESSORMANAGER_SYNC_WAIT_TIME_MSEC;
474     int cycles_for_startup = STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP;
475     int prestart_cycles_for_xmit = STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_XMIT;
476     int prestart_cycles_for_recv = STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_RECV;
477     Util::Configuration &config = m_parent.getConfiguration();
478     config.getValueForSetting("streaming.spm.signal_delay_ticks", signal_delay_ticks);
479     config.getValueForSetting("streaming.spm.sync_wait_time_msec", sync_wait_time_msec);
480     config.getValueForSetting("streaming.spm.cycles_for_startup", cycles_for_startup);
481     config.getValueForSetting("streaming.spm.prestart_cycles_for_xmit", prestart_cycles_for_xmit);
482     config.getValueForSetting("streaming.spm.prestart_cycles_for_recv", prestart_cycles_for_recv);
483
484     // figure out when to get the SP's running.
485     // the xmit SP's should also know the base timestamp
486     // streams should be aligned here
487
488     // now find out how long we have to delay the wait operation such that
489     // the received frames will all be presented to the SP
490     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
491     int max_of_min_delay = 0;
492     int min_delay = 0;
493     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
494             it != m_ReceiveProcessors.end();
495             ++it ) {
496         min_delay = (*it)->getMaxFrameLatency();
497         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
498     }
499
500     // add some processing margin. This only shifts the time
501     // at which the buffer is transfer()'ed. This makes things somewhat
502     // more robust. It should be noted though that shifting the transfer
503     // time to a later time instant also causes the xmit buffer fill to be
504     // lower on average.
505     max_of_min_delay += signal_delay_ticks;
506
507     m_SyncSource->setSyncDelay(max_of_min_delay);
508     unsigned int syncdelay = m_SyncSource->getSyncDelay();
509     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d => %d ticks (%03us %04uc %04ut)...\n",
510         max_of_min_delay, syncdelay,
511         (unsigned int)TICKS_TO_SECS(syncdelay),
512         (unsigned int)TICKS_TO_CYCLES(syncdelay),
513         (unsigned int)TICKS_TO_OFFSET(syncdelay));
514
515     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
516     //        have aquired lock
517     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
518     //sleep(2); // FIXME: be smarter here
519
520     // make sure that we are dry-running long enough for the
521     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
522     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
523
524     unsigned int nb_sync_runs = (sync_wait_time_msec * getNominalRate());
525     nb_sync_runs /= 1000;
526     nb_sync_runs /= getPeriodSize();
527
528     int64_t time_till_next_period;
529     while(nb_sync_runs--) { // or while not sync-ed?
530         // check if we were woken up too soon
531         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
532         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
533         if(time_till_next_period > 0) {
534             // wait for the period
535             SleepRelativeUsec(time_till_next_period);
536         }
537     }
538
539     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
540     // FIXME: in the SPM it would be nice to have system time instead of
541     //        1394 time
542
543     float syncrate = 0.0;
544     float tpf = m_SyncSource->getTicksPerFrame();
545     if (tpf > 0.0) {
546         syncrate = 24576000.0/tpf;
547     } else {
548         debugWarning("tpf <= 0? %f\n", tpf);
549     }
550     debugOutput( DEBUG_LEVEL_VERBOSE, " sync source frame rate: %f fps (%f tpf)\n", syncrate, tpf);
551
552     // we now should have decent sync info on the sync source
553     // determine a point in time where the system should start
554     // figure out where we are now
555     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
556     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
557         time_of_first_sample,
558         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
559         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
560         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
561
562     // start wet-running in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles
563     // this is the time window we have to setup all SP's such that they
564     // can start wet-running correctly.
565     time_of_first_sample = addTicks(time_of_first_sample,
566                                     cycles_for_startup * TICKS_PER_CYCLE);
567
568     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
569         time_of_first_sample,
570         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
571         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
572         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
573
574     #ifdef DEBUG
575     // the time at which the previous period would have passed is the
576     // time of the first sample received, minus one frame.
577     //m_time_of_transfer2 = substractTicks(time_of_first_sample, (uint64_t)m_SyncSource->getTicksPerFrame());
578     m_time_of_transfer2 = time_of_first_sample;
579     #endif
580
581     // we should start wet-running the transmit SP's some cycles in advance
582     // such that we know it is wet-running when it should output its first sample
583     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
584                                                  prestart_cycles_for_xmit * TICKS_PER_CYCLE);
585
586     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
587                                                  prestart_cycles_for_recv * TICKS_PER_CYCLE);
588     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
589         time_to_start_xmit,
590         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
591         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
592         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
593     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
594         time_to_start_recv,
595         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
596         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
597         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
598
599     // at this point the buffer head timestamp of the transmit buffers can be set
600     // this is the presentation time of the first sample in the buffer
601     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
602           it != m_TransmitProcessors.end();
603           ++it ) {
604         (*it)->setBufferHeadTimestamp(time_of_first_sample);
605     }
606
607     // switch syncsource to running state
608     uint64_t time_to_start_sync;
609     // FIXME: this is most likely not going to work for transmit sync sources
610     // but those are unsupported in this version
611     if(m_SyncSource->getType() == StreamProcessor::ePT_Receive ) {
612         time_to_start_sync = time_to_start_recv;
613     } else {
614         time_to_start_sync = time_to_start_xmit;
615     }
616     if(!m_SyncSource->scheduleStartRunning(time_to_start_sync)) {
617         debugError("m_SyncSource->scheduleStartRunning(%11llu) failed\n", time_to_start_sync);
618         return false;
619     }
620
621     // STEP X: switch all non-syncsource SP's over to the running state
622     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
623           it != m_ReceiveProcessors.end();
624           ++it ) {
625         if(*it != m_SyncSource) {
626             if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
627                 debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
628                 return false;
629             }
630         }
631     }
632     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
633           it != m_TransmitProcessors.end();
634           ++it ) {
635         if(*it != m_SyncSource) {
636             if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
637                 debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
638                 return false;
639             }
640         }
641     }
642     // wait for the syncsource to start running.
643     // that will block the waitForPeriod call until everyone has started (theoretically)
644     // note: the SP's are scheduled to start in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles,
645     // so a 20 times this value should be a good timeout
646     //int cnt = cycles_for_startup * 20; // by then it should have started
647     // or maybe we just have to use 1 second, as this wraps the cycle counter
648     int cnt = 8000;
649     while (!m_SyncSource->isRunning() && cnt) {
650         SleepRelativeUsec(125);
651         cnt--;
652     }
653     if(cnt==0) {
654         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
655         return false;
656     }
657
658     // the sync source is running, we can now read a decent received timestamp from it
659     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
660
661     // and a (still very rough) approximation of the rate
662     float rate = m_SyncSource->getTicksPerFrame();
663     int64_t delay_in_ticks = (int64_t)(((float)((m_nb_buffers-1) * m_period)) * rate);
664     // also add the sync delay
665     delay_in_ticks = addTicks(delay_in_ticks, m_SyncSource->getSyncDelay());
666     debugOutput( DEBUG_LEVEL_VERBOSE, "  initial time of transfer %010lld, rate %f...\n",
667                 m_time_of_transfer, rate);
668
669
670     // then use this information to initialize the xmit handlers
671
672     //  we now set the buffer tail timestamp of the transmit buffer
673     //  to the period transfer time instant plus what's nb_buffers - 1
674     //  in ticks. This due to the fact that we (should) have received one period
675     //  worth of ticks at t=m_time_of_transfer
676     //  hence one period of frames should also have been transmitted, which means
677     //  that there should be (nb_buffers - 1) * periodsize of frames in the xmit buffer
678     //  that allows us to calculate the tail timestamp for the buffer.
679
680     int64_t transmit_tail_timestamp = addTicks(m_time_of_transfer, delay_in_ticks);
681
682     debugOutput( DEBUG_LEVEL_VERBOSE, "  preset transmit tail TS %010lld, rate %f...\n",
683                 transmit_tail_timestamp, rate);
684
685     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
686         it != m_TransmitProcessors.end();
687         ++it ) {
688         (*it)->setBufferTailTimestamp(transmit_tail_timestamp);
689         (*it)->setTicksPerFrame(rate);
690     }
691
692     // align the received streams to be phase aligned
693     if(!alignReceivedStreams()) {
694         debugError("Could not align streams...\n");
695         return false;
696     }
697
698     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
699     return true;
700 }
701
702 bool
703 StreamProcessorManager::alignReceivedStreams()
704 {
705     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
706     unsigned int nb_sync_runs;
707     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
708     int64_t diff_between_streams[nb_rcv_sp];
709     int64_t diff;
710
711     unsigned int i;
712
713     int cnt = STREAMPROCESSORMANAGER_NB_ALIGN_TRIES;
714     int align_average_time_msec = STREAMPROCESSORMANAGER_ALIGN_AVERAGE_TIME_MSEC;
715     Util::Configuration &config = m_parent.getConfiguration();
716     config.getValueForSetting("streaming.spm.align_tries", cnt);
717     config.getValueForSetting("streaming.spm.align_average_time_msec", align_average_time_msec);
718
719     unsigned int periods_per_align_try = (align_average_time_msec * getNominalRate());
720     periods_per_align_try /= 1000;
721     periods_per_align_try /= getPeriodSize();
722     debugOutput( DEBUG_LEVEL_VERBOSE, " averaging over %u periods...\n", periods_per_align_try);
723
724     bool aligned = false;
725     while (!aligned && cnt--) {
726         nb_sync_runs = periods_per_align_try;
727         while(nb_sync_runs) {
728             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
729             if(!waitForPeriod()) {
730                 debugWarning("xrun while aligning streams...\n");
731                 return false;
732             }
733
734             // before we do anything else, transfer
735             if(!transferSilence()) {
736                 debugError("Could not transfer silence\n");
737                 return false;
738             }
739
740             // now calculate the stream offset
741             i = 0;
742             for ( i = 0; i < nb_rcv_sp; i++) {
743                 StreamProcessor *s = m_ReceiveProcessors.at(i);
744                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
745                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",
746                     m_SyncSource, s, diff);
747                 if ( nb_sync_runs == periods_per_align_try ) {
748                     diff_between_streams[i] = diff;
749                 } else {
750                     diff_between_streams[i] += diff;
751                 }
752             }
753
754             nb_sync_runs--;
755         }
756         // calculate the average offsets
757         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
758         int diff_between_streams_frames[nb_rcv_sp];
759         aligned = true;
760         for ( i = 0; i < nb_rcv_sp; i++) {
761             StreamProcessor *s = m_ReceiveProcessors.at(i);
762
763             diff_between_streams[i] /= periods_per_align_try;
764             diff_between_streams_frames[i] = (int)roundf(diff_between_streams[i] / s->getTicksPerFrame());
765             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",
766                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
767
768             aligned &= (diff_between_streams_frames[i] == 0);
769
770             // reposition the stream
771             if(!s->shiftStream(diff_between_streams_frames[i])) {
772                 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]);
773                 return false;
774             }
775         }
776         if (!aligned) {
777             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
778         }
779     }
780     if (cnt == 0) {
781         debugError("Align failed\n");
782         return false;
783     }
784     return true;
785 }
786
787 bool StreamProcessorManager::start() {
788     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
789
790     // start all SP's synchonized
791     bool start_result = false;
792     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
793         // put all SP's into dry-running state
794         if (!startDryRunning()) {
795             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
796             start_result = false;
797             continue;
798         }
799
800         start_result = syncStartAll();
801         if(start_result) {
802             break;
803         } else {
804             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
805             if(m_shutdown_needed) {
806                 debugOutput(DEBUG_LEVEL_VERBOSE, "Some fatal error occurred, stop trying.\n");
807                 return false;
808             }
809         }
810     }
811     if (!start_result) {
812         debugFatal("Could not syncStartAll...\n");
813         return false;
814     }
815     debugOutput( DEBUG_LEVEL_VERBOSE, " Started...\n");
816     return true;
817 }
818
819 bool StreamProcessorManager::stop() {
820     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
821
822     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
823     // switch SP's over to the dry-running state
824     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
825           it != m_ReceiveProcessors.end();
826           ++it ) {
827         if((*it)->isRunning()) {
828             if(!(*it)->scheduleStopRunning(-1)) {
829                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
830                 return false;
831             }
832         }
833     }
834     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
835           it != m_TransmitProcessors.end();
836           ++it ) {
837         if((*it)->isRunning()) {
838             if(!(*it)->scheduleStopRunning(-1)) {
839                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
840                 return false;
841             }
842         }
843     }
844     // wait for the SP's to get into the dry-running/stopped state
845     int cnt = 8000;
846     bool ready = false;
847     while (!ready && cnt) {
848         ready = true;
849         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
850             it != m_ReceiveProcessors.end();
851             ++it ) {
852             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream() || (*it)->inError());
853         }
854         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
855             it != m_TransmitProcessors.end();
856             ++it ) {
857             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream() || (*it)->inError());
858         }
859         SleepRelativeUsec(125);
860         cnt--;
861     }
862     if(cnt==0) {
863         debugWarning(" Timeout waiting for the SP's to start dry-running\n");
864         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
865             it != m_ReceiveProcessors.end();
866             ++it ) {
867             (*it)->dumpInfo();
868         }
869         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
870             it != m_TransmitProcessors.end();
871             ++it ) {
872             (*it)->dumpInfo();
873         }
874         return false;
875     }
876
877     // switch SP's over to the stopped state
878     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
879           it != m_ReceiveProcessors.end();
880           ++it ) {
881         if ((*it)->inError()) {
882             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
883         } else if(!(*it)->scheduleStopDryRunning(-1)) {
884             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
885             return false;
886         }
887     }
888     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
889           it != m_TransmitProcessors.end();
890           ++it ) {
891         if ((*it)->inError()) {
892             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
893         } else if(!(*it)->scheduleStopDryRunning(-1)) {
894             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
895             return false;
896         }
897     }
898     // wait for the SP's to get into the stopped state
899     cnt = 8000;
900     ready = false;
901     while (!ready && cnt) {
902         ready = true;
903         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
904             it != m_ReceiveProcessors.end();
905             ++it ) {
906             ready &= ((*it)->isStopped() || (*it)->inError());
907         }
908         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
909             it != m_TransmitProcessors.end();
910             ++it ) {
911             ready &= ((*it)->isStopped() || (*it)->inError());
912         }
913         SleepRelativeUsec(125);
914         cnt--;
915     }
916     if(cnt==0) {
917         debugWarning(" Timeout waiting for the SP's to stop\n");
918         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
919             it != m_ReceiveProcessors.end();
920             ++it ) {
921             (*it)->dumpInfo();
922         }
923         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
924             it != m_TransmitProcessors.end();
925             ++it ) {
926             (*it)->dumpInfo();
927         }
928         return false;
929     }
930     debugOutput( DEBUG_LEVEL_VERBOSE, " Stopped...\n");
931     return true;
932 }
933
934 /**
935  * Called upon Xrun events. This brings all StreamProcessors back
936  * into their starting state, and then carries on streaming. This should
937  * have the same effect as restarting the whole thing.
938  *
939  * @return true if successful, false otherwise
940  */
941 bool StreamProcessorManager::handleXrun() {
942
943     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
944
945     dumpInfo();
946
947     /*
948      * Reset means:
949      * 1) Disabling the SP's, so that they don't process any packets
950      *    note: the isomanager does keep on delivering/requesting them
951      * 2) Bringing all buffers & streamprocessors into a know state
952      *    - Clear all capture buffers
953      *    - Put nb_periods*period_size of null frames into the playback buffers
954      * 3) Re-enable the SP's
955      */
956
957     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
958     // start all SP's synchonized
959     bool start_result = false;
960     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
961         if(m_shutdown_needed) {
962             debugOutput(DEBUG_LEVEL_VERBOSE, "Shutdown requested...\n");
963             return true;
964         }
965         // put all SP's into dry-running state
966         if (!startDryRunning()) {
967             debugShowBackLog();
968             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
969             start_result = false;
970             continue;
971         }
972
973         start_result = syncStartAll();
974         if(start_result) {
975             break;
976         } else {
977             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
978         }
979     }
980     if (!start_result) {
981         debugFatal("Could not syncStartAll...\n");
982         return false;
983     }
984     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
985
986     return true;
987 }
988
989 /**
990  * @brief Waits until the next period of samples is ready
991  *
992  * This function does not return until a full period of samples is (or should be)
993  * ready to be transferred.
994  *
995  * @return true if the period is ready, false if not
996  */
997 bool StreamProcessorManager::waitForPeriod() {
998     if(m_SyncSource == NULL) return false;
999     if(m_shutdown_needed) return false;
1000     bool xrun_occurred = false;
1001     bool in_error = false;
1002
1003     // grab the wait lock
1004     // this ensures that bus reset handling doesn't interfere
1005     Util::MutexLockHelper lock(*m_WaitLock);
1006     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1007                         "waiting for period (%d frames in buffer)...\n",
1008                         m_SyncSource->getBufferFill());
1009     uint64_t ticks_at_period = m_SyncSource->getTimeAtPeriod();
1010     uint64_t ticks_at_period_margin = ticks_at_period + m_SyncSource->getSyncDelay();
1011     uint64_t pred_system_time_at_xfer = m_SyncSource->getParent().get1394Service().getSystemTimeForCycleTimerTicks(ticks_at_period_margin);
1012
1013     #ifdef DEBUG
1014     int64_t now = Util::SystemTimeSource::getCurrentTime();
1015     debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "pred: %lld, now: %lld, wait: %lld\n", pred_system_time_at_xfer, now, pred_system_time_at_xfer-now );
1016     #endif
1017
1018     // wait until it's time to transfer
1019     Util::SystemTimeSource::SleepUsecAbsolute(pred_system_time_at_xfer);
1020
1021     #ifdef DEBUG
1022     now = Util::SystemTimeSource::getCurrentTime();
1023     debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "pred: %lld now: %lld, excess: %lld\n", pred_system_time_at_xfer, now, now-pred_system_time_at_xfer );
1024     #endif
1025
1026     // the period should be ready now
1027
1028     #if STREAMPROCESSORMANAGER_ALLOW_DELAYED_PERIOD_SIGNAL
1029     // HACK: we force wait until every SP is ready. this is needed
1030     // since the raw1394 interface provides no control over interrupts
1031     // resulting in very bad predictability on when the data is present.
1032     bool period_not_ready = true;
1033     while(period_not_ready) {
1034         period_not_ready = false;
1035         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1036             it != m_ReceiveProcessors.end();
1037             ++it ) {
1038             bool this_sp_period_ready = (*it)->canConsumePeriod();
1039             if (!this_sp_period_ready) {
1040                 period_not_ready = true;
1041             }
1042         }
1043         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1044             it != m_TransmitProcessors.end();
1045             ++it ) {
1046             bool this_sp_period_ready = (*it)->canProducePeriod();
1047             if (!this_sp_period_ready) {
1048                 period_not_ready = true;
1049             }
1050         }
1051
1052         if (period_not_ready) {
1053             debugOutput(DEBUG_LEVEL_VERBOSE, " wait extended since period not ready...\n");
1054             Util::SystemTimeSource::SleepUsecRelative(125); // one cycle
1055         }
1056
1057         // check for underruns/errors on the ISO side,
1058         // those should make us bail out of the wait loop
1059         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1060             it != m_ReceiveProcessors.end();
1061             ++it ) {
1062             // a xrun has occurred on the Iso side
1063             xrun_occurred |= (*it)->xrunOccurred();
1064             in_error |= (*it)->inError();
1065         }
1066         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1067             it != m_TransmitProcessors.end();
1068             ++it ) {
1069             // a xrun has occurred on the Iso side
1070             xrun_occurred |= (*it)->xrunOccurred();
1071             in_error |= (*it)->inError();
1072         }
1073         if(xrun_occurred | in_error | m_shutdown_needed) break;
1074     }
1075     #else
1076     // check for underruns/errors on the ISO side,
1077     // those should make us bail out of the wait loop
1078     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1079         it != m_ReceiveProcessors.end();
1080         ++it ) {
1081         // xrun on data buffer side
1082         if (!(*it)->canConsumePeriod()) {
1083             xrun_occurred = true;
1084         }
1085         // a xrun has occurred on the Iso side
1086         xrun_occurred |= (*it)->xrunOccurred();
1087         in_error |= (*it)->inError();
1088     }
1089     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1090         it != m_TransmitProcessors.end();
1091         ++it ) {
1092         // xrun on data buffer side
1093         if (!(*it)->canProducePeriod()) {
1094             xrun_occurred = true;
1095         }
1096         // a xrun has occurred on the Iso side
1097         xrun_occurred |= (*it)->xrunOccurred();
1098         in_error |= (*it)->inError();
1099     }
1100     #endif
1101
1102     if(xrun_occurred) {
1103         debugOutput( DEBUG_LEVEL_VERBOSE, "exit due to xrun...\n");
1104     }
1105     if(in_error) {
1106         debugOutput( DEBUG_LEVEL_VERBOSE, "exit due to error...\n");
1107         m_shutdown_needed = true;
1108     }
1109
1110     // we save the 'ideal' time of the transfer at this point,
1111     // because we can have interleaved read - process - write
1112     // cycles making that we modify a receiving stream's buffer
1113     // before we get to writing.
1114     // NOTE: before waitForPeriod() is called again, both the transmit
1115     //       and the receive processors should have done their transfer.
1116     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
1117    
1118     #ifdef DEBUG
1119     int ticks_per_period = (int)(m_SyncSource->getTicksPerFrame() * m_period);
1120    
1121     int diff = diffTicks(m_time_of_transfer, m_time_of_transfer2);
1122     // display message if the difference between two successive tick
1123     // values is more than 50 ticks. 1 sample at 48k is 512 ticks
1124     // so 50 ticks = 10%, which is a rather large jitter value.
1125     if(diff-ticks_per_period > 50 || diff-ticks_per_period < -50) {
1126         debugOutput(DEBUG_LEVEL_VERBOSE, "rather large TSP difference TS=%011llu => TS=%011llu (%d, nom %d)\n",
1127                                             m_time_of_transfer2, m_time_of_transfer, diff, ticks_per_period);
1128     }
1129     m_time_of_transfer2 = m_time_of_transfer;
1130     #endif
1131
1132     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1133                         "transfer period %d at %llu ticks...\n",
1134                         m_nbperiods, m_time_of_transfer);
1135
1136     // this is to notify the client of the delay that we introduced by waiting
1137     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
1138     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1139                         "delayed for %d usecs...\n",
1140                         m_delayed_usecs);
1141
1142 #ifdef DEBUG
1143     int rcv_bf=0, xmt_bf=0;
1144     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1145         it != m_ReceiveProcessors.end();
1146         ++it ) {
1147         rcv_bf = (*it)->getBufferFill();
1148     }
1149     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1150         it != m_TransmitProcessors.end();
1151         ++it ) {
1152         xmt_bf = (*it)->getBufferFill();
1153     }
1154     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
1155                         "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
1156                         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
1157
1158     // check if xruns occurred on the Iso side.
1159     // also check if xruns will occur should we transfer() now
1160     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1161           it != m_ReceiveProcessors.end();
1162           ++it ) {
1163
1164         if ((*it)->xrunOccurred()) {
1165             debugOutput(DEBUG_LEVEL_NORMAL,
1166                         "Xrun on RECV SP %p due to ISO side xrun\n", *it);
1167             (*it)->dumpInfo();
1168         }
1169         if (!((*it)->canClientTransferFrames(m_period))) {
1170             debugOutput(DEBUG_LEVEL_NORMAL,
1171                         "Xrun on RECV SP %p due to buffer side xrun\n", *it);
1172             (*it)->dumpInfo();
1173         }
1174     }
1175     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1176           it != m_TransmitProcessors.end();
1177           ++it ) {
1178         if ((*it)->xrunOccurred()) {
1179             debugOutput(DEBUG_LEVEL_NORMAL,
1180                         "Xrun on XMIT SP %p due to ISO side xrun\n", *it);
1181         }
1182         if (!((*it)->canClientTransferFrames(m_period))) {
1183             debugOutput(DEBUG_LEVEL_NORMAL,
1184                         "Xrun on XMIT SP %p due to buffer side xrun\n", *it);
1185         }
1186     }
1187 #endif
1188     m_nbperiods++;
1189     // now we can signal the client that we are (should be) ready
1190     return !xrun_occurred;
1191 }
1192
1193 /**
1194  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
1195  *
1196  * Transfers one period of frames from the client side to the Iso side and vice versa.
1197  *
1198  * @return true if successful, false otherwise (indicates xrun).
1199  */
1200 bool StreamProcessorManager::transfer() {
1201     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
1202     bool retval=true;
1203     retval &= transfer(StreamProcessor::ePT_Receive);
1204     retval &= transfer(StreamProcessor::ePT_Transmit);
1205     return retval;
1206 }
1207
1208 /**
1209  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
1210  *
1211  * Transfers one period of frames from the client side to the Iso side or vice versa.
1212  *
1213  * @param t The processor type to tranfer for (receive or transmit)
1214  * @return true if successful, false otherwise (indicates xrun).
1215  */
1216 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
1217     if(m_SyncSource == NULL) return false;
1218     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
1219         "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
1220         t, m_time_of_transfer,
1221         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
1222         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
1223         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
1224
1225     bool retval = true;
1226     // a static cast could make sure that there is no performance
1227     // penalty for the virtual functions (to be checked)
1228     if (t==StreamProcessor::ePT_Receive) {
1229         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1230                 it != m_ReceiveProcessors.end();
1231                 ++it ) {
1232             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
1233                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
1234                             m_period, m_time_of_transfer,*it);
1235                 retval &= false; // buffer underrun
1236             }
1237         }
1238     } else {
1239         // FIXME: in the SPM it would be nice to have system time instead of
1240         //        1394 time
1241         float rate = m_SyncSource->getTicksPerFrame();
1242         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)((m_nb_buffers * m_period))) * rate);
1243
1244         // the data we are putting into the buffer is intended to be transmitted
1245         // one ringbuffer size after it has been received
1246
1247         // we also add one syncdelay as a safety margin, since that's the amount of time we can get
1248         // postponed.
1249         int syncdelay = m_SyncSource->getSyncDelay();
1250         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks + syncdelay);
1251
1252         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1253                 it != m_TransmitProcessors.end();
1254                 ++it ) {
1255             // FIXME: in the SPM it would be nice to have system time instead of
1256             //        1394 time
1257             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
1258                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
1259                         m_period, transmit_timestamp, *it);
1260                 retval &= false; // buffer underrun
1261             }
1262         }
1263     }
1264     return retval;
1265 }
1266
1267 /**
1268  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
1269  *
1270  * Transfers one period of silence to the Iso side for transmit SP's
1271  * or dump one period of frames for receive SP's
1272  *
1273  * @return true if successful, false otherwise (indicates xrun).
1274  */
1275 bool StreamProcessorManager::transferSilence() {
1276     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
1277     bool retval=true;
1278     // NOTE: the order here is opposite from the order in
1279     // normal operation (transmit is before receive), because
1280     // we can do that here (data=silence=available) and
1281     // it increases reliability (esp. on startup)
1282     retval &= transferSilence(StreamProcessor::ePT_Transmit);
1283     retval &= transferSilence(StreamProcessor::ePT_Receive);
1284     return retval;
1285 }
1286
1287 /**
1288  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
1289  *
1290  * Transfers one period of silence to the Iso side for transmit SP's
1291  * or dump one period of frames for receive SP's
1292  *
1293  * @param t The processor type to tranfer for (receive or transmit)
1294  * @return true if successful, false otherwise (indicates xrun).
1295  */
1296 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
1297     if(m_SyncSource == NULL) return false;
1298     debugOutput( DEBUG_LEVEL_VERY_VERBOSE,
1299         "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
1300         t, m_time_of_transfer,
1301         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
1302         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
1303         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
1304
1305     bool retval = true;
1306     // a static cast could make sure that there is no performance
1307     // penalty for the virtual functions (to be checked)
1308     if (t==StreamProcessor::ePT_Receive) {
1309         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1310                 it != m_ReceiveProcessors.end();
1311                 ++it ) {
1312             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
1313                     debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n",
1314                             m_period, m_time_of_transfer,*it);
1315                 retval &= false; // buffer underrun
1316             }
1317         }
1318     } else {
1319         // FIXME: in the SPM it would be nice to have system time instead of
1320         //        1394 time
1321         float rate = m_SyncSource->getTicksPerFrame();
1322         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
1323
1324         // the data we are putting into the buffer is intended to be transmitted
1325         // one ringbuffer size after it has been received
1326         // we also add one syncdelay as a safety margin, since that's the amount of time we can get
1327         // postponed.
1328         int syncdelay = m_SyncSource->getSyncDelay();
1329         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks + syncdelay);
1330
1331         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1332                 it != m_TransmitProcessors.end();
1333                 ++it ) {
1334             // FIXME: in the SPM it would be nice to have system time instead of
1335             //        1394 time
1336             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
1337                 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
1338                         m_period, transmit_timestamp, *it);
1339                 retval &= false; // buffer underrun
1340             }
1341         }
1342     }
1343     return retval;
1344 }
1345
1346 void StreamProcessorManager::dumpInfo() {
1347     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1348     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
1349     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
1350     debugOutputShort( DEBUG_LEVEL_NORMAL, "Data type: %s\n", (m_audio_datatype==eADT_Float?"float":"int24"));
1351
1352     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
1353     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1354         it != m_ReceiveProcessors.end();
1355         ++it ) {
1356         (*it)->dumpInfo();
1357     }
1358
1359     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
1360     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1361         it != m_TransmitProcessors.end();
1362         ++it ) {
1363         (*it)->dumpInfo();
1364     }
1365
1366     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1367
1368     // list port info in verbose mode
1369     debugOutputShort( DEBUG_LEVEL_VERBOSE, "Port Information\n");
1370     int nb_ports;
1371    
1372     debugOutputShort( DEBUG_LEVEL_VERBOSE, " Playback\n");
1373     nb_ports = getPortCount(Port::E_Playback);
1374     for(int i=0; i < nb_ports; i++) {
1375         Port *p = getPortByIndex(i, Port::E_Playback);
1376         debugOutputShort( DEBUG_LEVEL_VERBOSE, "  %3d (%p): ", i, p);
1377         if (p) {
1378             bool disabled = p->isDisabled();
1379             debugOutputShort( DEBUG_LEVEL_VERBOSE, "[%p] [%3s] ", &p->getManager(), (disabled?"off":"on"));
1380             debugOutputShort( DEBUG_LEVEL_VERBOSE, "[%7s] ", p->getPortTypeName().c_str());
1381             debugOutputShort( DEBUG_LEVEL_VERBOSE, "%3s ", p->getName().c_str());
1382         } else {
1383             debugOutputShort( DEBUG_LEVEL_VERBOSE, "invalid ");
1384         }
1385         debugOutputShort( DEBUG_LEVEL_VERBOSE, "\n");
1386     }
1387     debugOutputShort( DEBUG_LEVEL_VERBOSE, " Capture\n");
1388     nb_ports = getPortCount(Port::E_Capture);
1389     for(int i=0; i < nb_ports; i++) {
1390         Port *p = getPortByIndex(i, Port::E_Capture);
1391         debugOutputShort( DEBUG_LEVEL_VERBOSE, "  %3d (%p): ", i, p);
1392         if (p) {
1393             bool disabled = p->isDisabled();
1394             debugOutputShort( DEBUG_LEVEL_VERBOSE, "[%p] [%3s] ", &p->getManager(), (disabled?"off":"on"));
1395             debugOutputShort( DEBUG_LEVEL_VERBOSE, "[%7s] ", p->getPortTypeName().c_str());
1396             debugOutputShort( DEBUG_LEVEL_VERBOSE, " %3s ", p->getName().c_str());
1397         } else {
1398             debugOutputShort( DEBUG_LEVEL_VERBOSE, " invalid ");
1399         }
1400         debugOutputShort( DEBUG_LEVEL_VERBOSE, "\n");
1401     }
1402
1403     debugOutputShort( DEBUG_LEVEL_VERBOSE, "----------------------------------------------------\n");
1404
1405 }
1406
1407 void StreamProcessorManager::setVerboseLevel(int l) {
1408     if(m_WaitLock) m_WaitLock->setVerboseLevel(l);
1409
1410     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1411         it != m_ReceiveProcessors.end();
1412         ++it ) {
1413         (*it)->setVerboseLevel(l);
1414     }
1415     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1416         it != m_TransmitProcessors.end();
1417         ++it ) {
1418         (*it)->setVerboseLevel(l);
1419     }
1420     setDebugLevel(l);
1421     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting verbose level to %d...\n", l );
1422 }
1423
1424 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1425     int count=0;
1426
1427     if (direction == Port::E_Capture) {
1428         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1429             it != m_ReceiveProcessors.end();
1430             ++it ) {
1431             count += (*it)->getPortCount(type);
1432         }
1433     } else {
1434         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1435             it != m_TransmitProcessors.end();
1436             ++it ) {
1437             count += (*it)->getPortCount(type);
1438         }
1439     }
1440     return count;
1441 }
1442
1443 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1444     int count=0;
1445
1446     if (direction == Port::E_Capture) {
1447         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1448             it != m_ReceiveProcessors.end();
1449             ++it ) {
1450             count += (*it)->getPortCount();
1451         }
1452     } else {
1453         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1454             it != m_TransmitProcessors.end();
1455             ++it ) {
1456             count += (*it)->getPortCount();
1457         }
1458     }
1459     return count;
1460 }
1461
1462 void
1463 StreamProcessorManager::updateShadowLists()
1464 {
1465     debugOutput( DEBUG_LEVEL_VERBOSE, "Updating port shadow lists...\n");
1466     m_CapturePorts_shadow.clear();
1467     m_PlaybackPorts_shadow.clear();
1468
1469     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1470         it != m_ReceiveProcessors.end();
1471         ++it ) {
1472         PortManager *pm = *it;
1473         for (int i=0; i < pm->getPortCount(); i++) {
1474             Port *p = pm->getPortAtIdx(i);
1475             if (!p) {
1476                 debugError("getPortAtIdx(%d) returned NULL\n", i);
1477                 continue;
1478             }
1479             if(p->getDirection() != Port::E_Capture) {
1480                 debugError("port at idx %d for receive SP is not a capture port!\n", i);
1481                 continue;
1482             }
1483             m_CapturePorts_shadow.push_back(p);
1484         }
1485     }
1486     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1487         it != m_TransmitProcessors.end();
1488         ++it ) {
1489         PortManager *pm = *it;
1490         for (int i=0; i < pm->getPortCount(); i++) {
1491             Port *p = pm->getPortAtIdx(i);
1492             if (!p) {
1493                 debugError("getPortAtIdx(%d) returned NULL\n", i);
1494                 continue;
1495             }
1496             if(p->getDirection() != Port::E_Playback) {
1497                 debugError("port at idx %d for transmit SP is not a playback port!\n", i);
1498                 continue;
1499             }
1500             m_PlaybackPorts_shadow.push_back(p);
1501         }
1502     }
1503 }
1504
1505 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1506     debugOutputExtreme( DEBUG_LEVEL_ULTRA_VERBOSE, "getPortByIndex(%d, %d)...\n", idx, direction);
1507     if (direction == Port::E_Capture) {
1508         #ifdef DEBUG
1509         if(idx >= (int)m_CapturePorts_shadow.size()) {
1510             debugError("Capture port %d out of range (%d)\n", idx, m_CapturePorts_shadow.size());
1511             return NULL;
1512         }
1513         #endif
1514         return m_CapturePorts_shadow.at(idx);
1515     } else {
1516         #ifdef DEBUG
1517         if(idx >= (int)m_PlaybackPorts_shadow.size()) {
1518             debugError("Playback port %d out of range (%d)\n", idx, m_PlaybackPorts_shadow.size());
1519             return NULL;
1520         }
1521         #endif
1522         return m_PlaybackPorts_shadow.at(idx);
1523     }
1524     return NULL;
1525 }
1526
1527 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1528     m_thread_realtime=rt;
1529     m_thread_priority=priority;
1530     return true;
1531 }
1532
1533
1534 } // end of namespace
Note: See TracBrowser for help on using the browser.