root/branches/libffado-2.0/src/libstreaming/StreamProcessorManager.cpp

Revision 1345, 52.5 kB (checked in by ppalmers, 15 years ago)

improve latency performance. always use packet_per_buffer mode since that's better suited for our problem

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23 #include "config.h"
24
25 #include "StreamProcessorManager.h"
26 #include "generic/StreamProcessor.h"
27 #include "generic/Port.h"
28 #include "libieee1394/cycletimer.h"
29
30 #include "devicemanager.h"
31
32 #include "libutil/Time.h"
33
34 #include <errno.h>
35 #include <assert.h>
36 #include <math.h>
37
38 namespace Streaming {
39
40 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
41
42 StreamProcessorManager::StreamProcessorManager(DeviceManager &p)
43     : m_is_slave( false )
44     , m_SyncSource(NULL)
45     , m_parent( p )
46     , m_xrun_happened( false )
47     , m_activity_wait_timeout_usec( 1000*1000 )
48     , m_nb_buffers( 0 )
49     , m_period( 0 )
50     , m_audio_datatype( eADT_Float )
51     , m_nominal_framerate ( 0 )
52     , m_xruns(0)
53     , m_shutdown_needed(false)
54     , m_nbperiods(0)
55     , m_WaitLock( new Util::PosixMutex("SPMWAIT") )
56 {
57     addOption(Util::OptionContainer::Option("slaveMode",false));
58     sem_init(&m_activity_semaphore, 0, 0);
59 }
60
61 StreamProcessorManager::StreamProcessorManager(DeviceManager &p, unsigned int period,
62                                                unsigned int framerate, unsigned int nb_buffers)
63     : m_is_slave( false )
64     , m_SyncSource(NULL)
65     , m_parent( p )
66     , m_xrun_happened( false )
67     , m_activity_wait_timeout_usec( 1000*1000 )
68     , m_nb_buffers(nb_buffers)
69     , m_period(period)
70     , m_audio_datatype( eADT_Float )
71     , m_nominal_framerate ( framerate )
72     , m_xruns(0)
73     , m_shutdown_needed(false)
74     , m_nbperiods(0)
75     , m_WaitLock( new Util::PosixMutex("SPMWAIT") )
76 {
77     addOption(Util::OptionContainer::Option("slaveMode",false));
78     sem_init(&m_activity_semaphore, 0, 0);
79 }
80
81 StreamProcessorManager::~StreamProcessorManager() {
82     sem_post(&m_activity_semaphore);
83     sem_destroy(&m_activity_semaphore);
84     delete m_WaitLock;
85 }
86
87 // void
88 // StreamProcessorManager::handleBusReset(Ieee1394Service &s)
89 // {
90 // //     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) Handle bus reset on service %p...\n", this, &s);
91 // //
92 // //     bool handled_at_least_one = false;
93 // //     // note that all receive streams are gone once a device is unplugged
94 // //
95 // //     // synchronize with the wait lock
96 // //     Util::MutexLockHelper lock(*m_WaitLock);
97 // //
98 // //     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) got wait lock...\n", this);
99 // //     // cause all SP's to bail out
100 // //     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
101 // //           it != m_ReceiveProcessors.end();
102 // //           ++it )
103 // //     {
104 // //         if(&s == &((*it)->getParent().get1394Service())) {
105 // //             debugOutput(DEBUG_LEVEL_NORMAL,
106 // //                         "issue busreset on receive SPM on channel %d\n",
107 // //                         (*it)->getChannel());
108 // //             (*it)->handleBusReset();
109 // //             handled_at_least_one = true;
110 // //         } else {
111 // //             debugOutput(DEBUG_LEVEL_NORMAL,
112 // //                         "skipping receive SPM on channel %d since not on service %p\n",
113 // //                         (*it)->getChannel(), &s);
114 // //         }
115 // //     }
116 // //     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
117 // //           it != m_TransmitProcessors.end();
118 // //           ++it )
119 // //     {
120 // //         if(&s == &((*it)->getParent().get1394Service())) {
121 // //             debugOutput(DEBUG_LEVEL_NORMAL,
122 // //                         "issue busreset on transmit SPM on channel %d\n",
123 // //                         (*it)->getChannel());
124 // //             (*it)->handleBusReset();
125 // //             handled_at_least_one = true;
126 // //         } else {
127 // //             debugOutput(DEBUG_LEVEL_NORMAL,
128 // //                         "skipping transmit SPM on channel %d since not on service %p\n",
129 // //                         (*it)->getChannel(), &s);
130 // //         }
131 // //     }
132 // //
133 // //     // FIXME: we request shutdown for now.
134 // //     m_shutdown_needed = handled_at_least_one;
135 // }
136
137 void
138 StreamProcessorManager::signalActivity()
139 {
140     sem_post(&m_activity_semaphore);
141     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p activity\n", this);
142 }
143
144 enum StreamProcessorManager::eActivityResult
145 StreamProcessorManager::waitForActivity()
146 {
147     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p waiting for activity\n", this);
148     struct timespec ts;
149     int result;
150
151     long long int timeout_nsec=0;
152     if (m_activity_wait_timeout_usec >= 0) {
153         timeout_nsec = m_activity_wait_timeout_usec * 1000LL;
154
155         if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
156             debugError("clock_gettime failed\n");
157             return eAR_Error;
158         }
159         ts.tv_nsec += timeout_nsec;
160         while(ts.tv_nsec >= 1000000000LL) {
161             ts.tv_sec += 1;
162             ts.tv_nsec -= 1000000000LL;
163         }
164     }
165
166     if (m_activity_wait_timeout_usec >= 0) {
167         result = sem_timedwait(&m_activity_semaphore, &ts);
168     } else {
169         result = sem_wait(&m_activity_semaphore);
170     }
171
172     if(result != 0) {
173         if (errno == ETIMEDOUT) {
174             debugOutput(DEBUG_LEVEL_VERBOSE,
175                         "(%p) sem_timedwait() timed out (result=%d)\n",
176                         this, result);
177             return eAR_Timeout;
178         } else if (errno == EINTR) {
179             debugOutput(DEBUG_LEVEL_VERBOSE,
180                         "(%p) sem_[timed]wait() interrupted by signal (result=%d)\n",
181                         this, result);
182             return eAR_Interrupted;
183         } else if (errno == EINVAL) {
184             debugError("(%p) sem_[timed]wait error (result=%d errno=EINVAL)\n",
185                         this, result);
186             debugError("(%p) timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n",
187                        this, timeout_nsec, ts.tv_sec, ts.tv_nsec);
188             return eAR_Error;
189         } else {
190             debugError("(%p) sem_[timed]wait error (result=%d errno=%d)\n",
191                         this, result, errno);
192             debugError("(%p) timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n",
193                        this, timeout_nsec, ts.tv_sec, ts.tv_nsec);
194             return eAR_Error;
195         }
196     }
197
198     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p got activity\n", this);
199     return eAR_Activity;
200 }
201
202 /**
203  * Registers \ref processor with this manager.
204  *
205  * also registers it with the isohandlermanager
206  *
207  * be sure to call isohandlermanager->init() first!
208  * and be sure that the processors are also ->init()'ed
209  *
210  * @param processor
211  * @return true if successfull
212  */
213 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
214 {
215     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
216     assert(processor);
217     if (processor->getType() == StreamProcessor::ePT_Receive) {
218         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
219         m_ReceiveProcessors.push_back(processor);
220         return true;
221     }
222
223     if (processor->getType() == StreamProcessor::ePT_Transmit) {
224         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
225         m_TransmitProcessors.push_back(processor);
226         return true;
227     }
228
229     debugFatal("Unsupported processor type!\n");
230     return false;
231 }
232
233 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
234 {
235     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
236     assert(processor);
237
238     if (processor->getType()==StreamProcessor::ePT_Receive) {
239
240         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
241               it != m_ReceiveProcessors.end();
242               ++it )
243         {
244             if ( *it == processor ) {
245                 if (*it == m_SyncSource) {
246                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
247                     m_SyncSource = NULL;
248                 }
249                 m_ReceiveProcessors.erase(it);
250                 return true;
251             }
252         }
253     }
254
255     if (processor->getType()==StreamProcessor::ePT_Transmit) {
256         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
257               it != m_TransmitProcessors.end();
258               ++it )
259         {
260             if ( *it == processor ) {
261                 if (*it == m_SyncSource) {
262                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
263                     m_SyncSource = NULL;
264                 }
265                 m_TransmitProcessors.erase(it);
266                 return true;
267             }
268         }
269     }
270
271     debugFatal("Processor (%p) not found!\n",processor);
272     return false; //not found
273 }
274
275 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
276     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
277     m_SyncSource=s;
278     return true;
279 }
280
281 bool StreamProcessorManager::prepare() {
282
283     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
284     m_is_slave=false;
285     if(!getOption("slaveMode", m_is_slave)) {
286         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
287     }
288
289     m_shutdown_needed=false;
290
291     // if no sync source is set, select one here
292     if(m_SyncSource == NULL) {
293        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
294     }
295
296     // FIXME: put into separate method
297     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
298           it != m_ReceiveProcessors.end();
299           ++it )
300     {
301         if(m_SyncSource == NULL) {
302             debugWarning(" => Sync Source is %p.\n", *it);
303             m_SyncSource = *it;
304         }
305     }
306     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
307           it != m_TransmitProcessors.end();
308           ++it )
309     {
310         if(m_SyncSource == NULL) {
311             debugWarning(" => Sync Source is %p.\n", *it);
312             m_SyncSource = *it;
313         }
314     }
315
316     // now do the actual preparation of the SP's
317     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
318     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
319         it != m_ReceiveProcessors.end();
320         ++it ) {
321
322         if(!(*it)->setOption("slaveMode", m_is_slave)) {
323             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
324         }
325
326         if(!(*it)->prepare()) {
327             debugFatal(  " could not prepare (%p)...\n",(*it));
328             return false;
329         }
330     }
331     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
332     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
333         it != m_TransmitProcessors.end();
334         ++it ) {
335         if(!(*it)->setOption("slaveMode", m_is_slave)) {
336             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
337         }
338         if(!(*it)->prepare()) {
339             debugFatal( " could not prepare (%p)...\n",(*it));
340             return false;
341         }
342     }
343
344     // if there are no stream processors registered,
345     // fail
346     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
347         debugFatal("No stream processors registered, can't do anything usefull\n");
348         return false;
349     }
350
351     // set the activity timeout value to two periods worth of usecs.
352     // since we can expect activity once every period, but we might have some
353     // offset, the safe value is two periods.
354     int timeout_usec = 2*1000LL * 1000LL * m_period / m_nominal_framerate;
355     debugOutput(DEBUG_LEVEL_VERBOSE, "setting activity timeout to %d\n", timeout_usec);
356     setActivityWaitTimeoutUsec(timeout_usec);
357
358     return true;
359 }
360
361 bool
362 StreamProcessorManager::startDryRunning()
363 {
364     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
365     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
366     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
367             it != m_TransmitProcessors.end();
368             ++it ) {
369         if ((*it)->inError()) {
370             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
371             return false;
372         }
373         if (!(*it)->isDryRunning()) {
374             if(!(*it)->scheduleStartDryRunning(-1)) {
375                 debugError("Could not put SP %p into the dry-running state\n", *it);
376                 return false;
377             }
378         } else {
379             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
380         }
381     }
382     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
383             it != m_ReceiveProcessors.end();
384             ++it ) {
385         if ((*it)->inError()) {
386             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
387             return false;
388         }
389         if (!(*it)->isDryRunning()) {
390             if(!(*it)->scheduleStartDryRunning(-1)) {
391                 debugError("Could not put SP %p into the dry-running state\n", *it);
392                 return false;
393             }
394         } else {
395             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
396         }
397     }
398     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
399     // wait for the syncsource to start running.
400     // that will block the waitForPeriod call until everyone has started (theoretically)
401     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_DRYRUN; // by then it should have started
402     bool all_dry_running = false;
403     while (!all_dry_running && cnt) {
404         all_dry_running = true;
405         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
406                 it != m_ReceiveProcessors.end();
407                 ++it ) {
408             all_dry_running &= (*it)->isDryRunning();
409         }
410         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
411                 it != m_TransmitProcessors.end();
412                 ++it ) {
413             all_dry_running &= (*it)->isDryRunning();
414         }
415
416         SleepRelativeUsec(125);
417         cnt--;
418     }
419     if(cnt==0) {
420         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
421         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
422                 it != m_ReceiveProcessors.end();
423                 ++it ) {
424             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
425                 (*it)->getTypeString(), *it, (*it)->getStateString());
426         }
427         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
428                 it != m_TransmitProcessors.end();
429                 ++it ) {
430             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
431                 (*it)->getTypeString(), *it, (*it)->getStateString());
432         }
433         return false;
434     }
435     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
436     return true;
437 }
438
439 bool StreamProcessorManager::syncStartAll() {
440     if(m_SyncSource == NULL) return false;
441     // figure out when to get the SP's running.
442     // the xmit SP's should also know the base timestamp
443     // streams should be aligned here
444
445     // now find out how long we have to delay the wait operation such that
446     // the received frames will all be presented to the SP
447     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
448     int max_of_min_delay = 0;
449     int min_delay = 0;
450     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
451             it != m_ReceiveProcessors.end();
452             ++it ) {
453         min_delay = (*it)->getMaxFrameLatency();
454         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
455     }
456
457     // add some processing margin. This only shifts the time
458     // at which the buffer is transfer()'ed. This makes things somewhat
459     // more robust. It should be noted though that shifting the transfer
460     // time to a later time instant also causes the xmit buffer fill to be
461     // lower on average.
462     max_of_min_delay += STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS;
463     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n",
464         max_of_min_delay,
465         (unsigned int)TICKS_TO_SECS(max_of_min_delay),
466         (unsigned int)TICKS_TO_CYCLES(max_of_min_delay),
467         (unsigned int)TICKS_TO_OFFSET(max_of_min_delay));
468     m_SyncSource->setSyncDelay(max_of_min_delay);
469
470     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
471     //        have aquired lock
472     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
473     //sleep(2); // FIXME: be smarter here
474
475     // make sure that we are dry-running long enough for the
476     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
477     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
478
479     unsigned int nb_sync_runs = (STREAMPROCESSORMANAGER_SYNC_WAIT_TIME_MSEC * getNominalRate());
480     nb_sync_runs /= 1000;
481     nb_sync_runs /= getPeriodSize();
482
483     int64_t time_till_next_period;
484     while(nb_sync_runs--) { // or while not sync-ed?
485         // check if we were woken up too soon
486         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
487         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
488         if(time_till_next_period > 0) {
489             // wait for the period
490             SleepRelativeUsec(time_till_next_period);
491         }
492     }
493
494     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
495     // FIXME: in the SPM it would be nice to have system time instead of
496     //        1394 time
497
498     // we now should have decent sync info on the sync source
499     // determine a point in time where the system should start
500     // figure out where we are now
501     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
502     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
503         time_of_first_sample,
504         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
505         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
506         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
507
508     // start wet-running in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles
509     // this is the time window we have to setup all SP's such that they
510     // can start wet-running correctly.
511     time_of_first_sample = addTicks(time_of_first_sample,
512                                     STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP * TICKS_PER_CYCLE);
513
514     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
515         time_of_first_sample,
516         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
517         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
518         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
519
520     // we should start wet-running the transmit SP's some cycles in advance
521     // such that we know it is wet-running when it should output its first sample
522     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
523                                                  STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE);
524
525     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
526                                                  STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE);
527     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
528         time_to_start_xmit,
529         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
530         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
531         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
532     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
533         time_to_start_recv,
534         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
535         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
536         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
537
538     // at this point the buffer head timestamp of the transmit buffers can be set
539     // this is the presentation time of the first sample in the buffer
540     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
541           it != m_TransmitProcessors.end();
542           ++it ) {
543         (*it)->setBufferHeadTimestamp(time_of_first_sample);
544     }
545
546     // switch syncsource to running state
547     uint64_t time_to_start_sync;
548     // FIXME: this is most likely not going to work for transmit sync sources
549     // but those are unsupported in this version
550     if(m_SyncSource->getType() == StreamProcessor::ePT_Receive ) {
551         time_to_start_sync = time_to_start_recv;
552     } else {
553         time_to_start_sync = time_to_start_xmit;
554     }
555     if(!m_SyncSource->scheduleStartRunning(time_to_start_sync)) {
556         debugError("m_SyncSource->scheduleStartRunning(%11llu) failed\n", time_to_start_sync);
557         return false;
558     }
559
560     // STEP X: switch all non-syncsource SP's over to the running state
561     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
562           it != m_ReceiveProcessors.end();
563           ++it ) {
564         if(*it != m_SyncSource) {
565             if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
566                 debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
567                 return false;
568             }
569         }
570     }
571     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
572           it != m_TransmitProcessors.end();
573           ++it ) {
574         if(*it != m_SyncSource) {
575             if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
576                 debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
577                 return false;
578             }
579         }
580     }
581     // wait for the syncsource to start running.
582     // that will block the waitForPeriod call until everyone has started (theoretically)
583     // note: the SP's are scheduled to start in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles,
584     // so a 20 times this value should be a good timeout
585     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP * 20; // by then it should have started
586     while (!m_SyncSource->isRunning() && cnt) {
587         SleepRelativeUsec(125);
588         cnt--;
589     }
590     if(cnt==0) {
591         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
592         return false;
593     }
594
595     // the sync source is running, we can now read a decent received timestamp from it
596     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
597
598     // and a (still very rough) approximation of the rate
599     float rate = m_SyncSource->getTicksPerFrame();
600     int64_t delay_in_ticks=(int64_t)(((float)((m_nb_buffers-1) * m_period)) * rate);
601     debugOutput( DEBUG_LEVEL_VERBOSE, "  initial time of transfer %010lld, rate %f...\n",
602                 m_time_of_transfer, rate);
603
604     // then use this information to initialize the xmit handlers
605
606     //  we now set the buffer tail timestamp of the transmit buffer
607     //  to the period transfer time instant plus what's nb_buffers - 1
608     //  in ticks. This due to the fact that we (should) have received one period
609     //  worth of ticks at t=m_time_of_transfer
610     //  hence one period of frames should also have been transmitted, which means
611     //  that there should be (nb_buffers - 1) * periodsize of frames in the xmit buffer
612     //  that allows us to calculate the tail timestamp for the buffer.
613
614     int64_t transmit_tail_timestamp = addTicks(m_time_of_transfer, delay_in_ticks);
615
616     debugOutput( DEBUG_LEVEL_VERBOSE, "  preset transmit tail TS %010lld, rate %f...\n",
617                 transmit_tail_timestamp, rate);
618
619     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
620         it != m_TransmitProcessors.end();
621         ++it ) {
622         (*it)->setBufferTailTimestamp(transmit_tail_timestamp);
623         (*it)->setTicksPerFrame(rate);
624     }
625
626     // align the received streams to be phase aligned
627     if(!alignReceivedStreams()) {
628         debugError("Could not align streams...\n");
629         return false;
630     }
631
632     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
633     return true;
634 }
635
636 bool
637 StreamProcessorManager::alignReceivedStreams()
638 {
639     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
640     unsigned int nb_sync_runs;
641     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
642     int64_t diff_between_streams[nb_rcv_sp];
643     int64_t diff;
644
645     unsigned int i;
646
647     unsigned int periods_per_align_try = (STREAMPROCESSORMANAGER_ALIGN_AVERAGE_TIME_MSEC * getNominalRate());
648     periods_per_align_try /= 1000;
649     periods_per_align_try /= getPeriodSize();
650     debugOutput( DEBUG_LEVEL_VERBOSE, " averaging over %u periods...\n", periods_per_align_try);
651
652     bool aligned = false;
653     int cnt = STREAMPROCESSORMANAGER_NB_ALIGN_TRIES;
654     while (!aligned && cnt--) {
655         nb_sync_runs = periods_per_align_try;
656         while(nb_sync_runs) {
657             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
658             if(!waitForPeriod()) {
659                 debugWarning("xrun while aligning streams...\n");
660                 return false;
661             }
662
663             // before we do anything else, transfer
664             if(!transferSilence()) {
665                 debugError("Could not transfer silence\n");
666                 return false;
667             }
668
669             // now calculate the stream offset
670             i = 0;
671             for ( i = 0; i < nb_rcv_sp; i++) {
672                 StreamProcessor *s = m_ReceiveProcessors.at(i);
673                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
674                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",
675                     m_SyncSource, s, diff);
676                 if ( nb_sync_runs == periods_per_align_try ) {
677                     diff_between_streams[i] = diff;
678                 } else {
679                     diff_between_streams[i] += diff;
680                 }
681             }
682
683             nb_sync_runs--;
684         }
685         // calculate the average offsets
686         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
687         int diff_between_streams_frames[nb_rcv_sp];
688         aligned = true;
689         for ( i = 0; i < nb_rcv_sp; i++) {
690             StreamProcessor *s = m_ReceiveProcessors.at(i);
691
692             diff_between_streams[i] /= periods_per_align_try;
693             diff_between_streams_frames[i] = (int)roundf(diff_between_streams[i] / s->getTicksPerFrame());
694             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",
695                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
696
697             aligned &= (diff_between_streams_frames[i] == 0);
698
699             // reposition the stream
700             if(!s->shiftStream(diff_between_streams_frames[i])) {
701                 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]);
702                 return false;
703             }
704         }
705         if (!aligned) {
706             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
707         }
708     }
709     if (cnt == 0) {
710         debugError("Align failed\n");
711         return false;
712     }
713     return true;
714 }
715
716 bool StreamProcessorManager::start() {
717     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
718
719     // start all SP's synchonized
720     bool start_result = false;
721     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
722         // put all SP's into dry-running state
723         if (!startDryRunning()) {
724             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
725             start_result = false;
726             continue;
727         }
728
729         start_result = syncStartAll();
730         if(start_result) {
731             break;
732         } else {
733             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
734             if(m_shutdown_needed) {
735                 debugOutput(DEBUG_LEVEL_VERBOSE, "Some fatal error occurred, stop trying.\n");
736                 return false;
737             }
738         }
739     }
740     if (!start_result) {
741         debugFatal("Could not syncStartAll...\n");
742         return false;
743     }
744     debugOutput( DEBUG_LEVEL_VERBOSE, " Started...\n");
745     return true;
746 }
747
748 bool StreamProcessorManager::stop() {
749     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
750
751     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
752     // switch SP's over to the dry-running state
753     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
754           it != m_ReceiveProcessors.end();
755           ++it ) {
756         if((*it)->isRunning()) {
757             if(!(*it)->scheduleStopRunning(-1)) {
758                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
759                 return false;
760             }
761         }
762     }
763     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
764           it != m_TransmitProcessors.end();
765           ++it ) {
766         if((*it)->isRunning()) {
767             if(!(*it)->scheduleStopRunning(-1)) {
768                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
769                 return false;
770             }
771         }
772     }
773     // wait for the SP's to get into the dry-running/stopped state
774     int cnt = 8000;
775     bool ready = false;
776     while (!ready && cnt) {
777         ready = true;
778         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
779             it != m_ReceiveProcessors.end();
780             ++it ) {
781             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream() || (*it)->inError());
782         }
783         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
784             it != m_TransmitProcessors.end();
785             ++it ) {
786             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream() || (*it)->inError());
787         }
788         SleepRelativeUsec(125);
789         cnt--;
790     }
791     if(cnt==0) {
792         debugWarning(" Timeout waiting for the SP's to start dry-running\n");
793         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
794             it != m_ReceiveProcessors.end();
795             ++it ) {
796             (*it)->dumpInfo();
797         }
798         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
799             it != m_TransmitProcessors.end();
800             ++it ) {
801             (*it)->dumpInfo();
802         }
803         return false;
804     }
805
806     // switch SP's over to the stopped state
807     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
808           it != m_ReceiveProcessors.end();
809           ++it ) {
810         if ((*it)->inError()) {
811             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
812         } else if(!(*it)->scheduleStopDryRunning(-1)) {
813             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
814             return false;
815         }
816     }
817     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
818           it != m_TransmitProcessors.end();
819           ++it ) {
820         if ((*it)->inError()) {
821             debugOutput(DEBUG_LEVEL_VERBOSE, "SP %p in error state\n", *it);
822         } else if(!(*it)->scheduleStopDryRunning(-1)) {
823             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
824             return false;
825         }
826     }
827     // wait for the SP's to get into the stopped state
828     cnt = 8000;
829     ready = false;
830     while (!ready && cnt) {
831         ready = true;
832         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
833             it != m_ReceiveProcessors.end();
834             ++it ) {
835             ready &= ((*it)->isStopped() || (*it)->inError());
836         }
837         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
838             it != m_TransmitProcessors.end();
839             ++it ) {
840             ready &= ((*it)->isStopped() || (*it)->inError());
841         }
842         SleepRelativeUsec(125);
843         cnt--;
844     }
845     if(cnt==0) {
846         debugWarning(" Timeout waiting for the SP's to stop\n");
847         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
848             it != m_ReceiveProcessors.end();
849             ++it ) {
850             (*it)->dumpInfo();
851         }
852         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
853             it != m_TransmitProcessors.end();
854             ++it ) {
855             (*it)->dumpInfo();
856         }
857         return false;
858     }
859     debugOutput( DEBUG_LEVEL_VERBOSE, " Stopped...\n");
860     return true;
861 }
862
863 /**
864  * Called upon Xrun events. This brings all StreamProcessors back
865  * into their starting state, and then carries on streaming. This should
866  * have the same effect as restarting the whole thing.
867  *
868  * @return true if successful, false otherwise
869  */
870 bool StreamProcessorManager::handleXrun() {
871
872     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
873
874     dumpInfo();
875
876     /*
877      * Reset means:
878      * 1) Disabling the SP's, so that they don't process any packets
879      *    note: the isomanager does keep on delivering/requesting them
880      * 2) Bringing all buffers & streamprocessors into a know state
881      *    - Clear all capture buffers
882      *    - Put nb_periods*period_size of null frames into the playback buffers
883      * 3) Re-enable the SP's
884      */
885
886     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
887     // start all SP's synchonized
888     bool start_result = false;
889     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
890         if(m_shutdown_needed) {
891             debugOutput(DEBUG_LEVEL_VERBOSE, "Shutdown requested...\n");
892             return true;
893         }
894         // put all SP's into dry-running state
895         if (!startDryRunning()) {
896             debugShowBackLog();
897             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
898             start_result = false;
899             continue;
900         }
901
902         start_result = syncStartAll();
903         if(start_result) {
904             break;
905         } else {
906             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
907         }
908     }
909     if (!start_result) {
910         debugFatal("Could not syncStartAll...\n");
911         return false;
912     }
913     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
914
915     return true;
916 }
917
918 /**
919  * @brief Waits until the next period of samples is ready
920  *
921  * This function does not return until a full period of samples is (or should be)
922  * ready to be transferred.
923  *
924  * @return true if the period is ready, false if not
925  */
926 bool StreamProcessorManager::waitForPeriod() {
927     if(m_SyncSource == NULL) return false;
928     if(m_shutdown_needed) return false;
929     bool xrun_occurred = false;
930     bool in_error = false;
931
932     // grab the wait lock
933     // this ensures that bus reset handling doesn't interfere
934     Util::MutexLockHelper lock(*m_WaitLock);
935     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
936                         "waiting for period (%d frames in buffer)...\n",
937                         m_SyncSource->getBufferFill());
938     uint64_t ticks_at_period = m_SyncSource->getTimeAtPeriod();
939     uint64_t ticks_at_period_margin = ticks_at_period + m_SyncSource->getSyncDelay();
940     uint64_t pred_system_time_at_xfer = m_SyncSource->getParent().get1394Service().getSystemTimeForCycleTimerTicks(ticks_at_period_margin);
941
942     #ifdef DEBUG
943     int64_t now = Util::SystemTimeSource::getCurrentTime();
944     debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "pred: %lld, now: %lld, wait: %lld\n", pred_system_time_at_xfer, now, pred_system_time_at_xfer-now );
945     #endif
946
947     // wait until it's time to transfer
948     Util::SystemTimeSource::SleepUsecAbsolute(pred_system_time_at_xfer);
949
950     #ifdef DEBUG
951     now = Util::SystemTimeSource::getCurrentTime();
952     debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "pred: %lld now: %lld, excess: %lld\n", pred_system_time_at_xfer, now, now-pred_system_time_at_xfer );
953     #endif
954
955     // the period should be ready now
956
957     #if STREAMPROCESSORMANAGER_ALLOW_DELAYED_PERIOD_SIGNAL
958     // HACK: we force wait until every SP is ready. this is needed
959     // since the raw1394 interface provides no control over interrupts
960     // resulting in very bad predictability on when the data is present.
961     bool period_not_ready = true;
962     while(period_not_ready) {
963         period_not_ready = false;
964         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
965             it != m_ReceiveProcessors.end();
966             ++it ) {
967             bool this_sp_period_ready = (*it)->canConsumePeriod();
968             if (!this_sp_period_ready) {
969                 period_not_ready = true;
970             }
971         }
972         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
973             it != m_TransmitProcessors.end();
974             ++it ) {
975             bool this_sp_period_ready = (*it)->canProducePeriod();
976             if (!this_sp_period_ready) {
977                 period_not_ready = true;
978             }
979         }
980
981         if (period_not_ready) {
982             debugOutput(DEBUG_LEVEL_VERBOSE, " wait extended since period not ready...\n");
983             Util::SystemTimeSource::SleepUsecRelative(125); // one cycle
984         }
985
986         // check for underruns/errors on the ISO side,
987         // those should make us bail out of the wait loop
988         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
989             it != m_ReceiveProcessors.end();
990             ++it ) {
991             // a xrun has occurred on the Iso side
992             xrun_occurred |= (*it)->xrunOccurred();
993             in_error |= (*it)->inError();
994         }
995         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
996             it != m_TransmitProcessors.end();
997             ++it ) {
998             // a xrun has occurred on the Iso side
999             xrun_occurred |= (*it)->xrunOccurred();
1000             in_error |= (*it)->inError();
1001         }
1002         if(xrun_occurred | in_error | m_shutdown_needed) break;
1003     }
1004     #else
1005     // check for underruns/errors on the ISO side,
1006     // those should make us bail out of the wait loop
1007     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1008         it != m_ReceiveProcessors.end();
1009         ++it ) {
1010         // xrun on data buffer side
1011         if (!(*it)->canConsumePeriod()) {
1012             xrun_occurred = true;
1013         }
1014         // a xrun has occurred on the Iso side
1015         xrun_occurred |= (*it)->xrunOccurred();
1016         in_error |= (*it)->inError();
1017     }
1018     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1019         it != m_TransmitProcessors.end();
1020         ++it ) {
1021         // xrun on data buffer side
1022         if (!(*it)->canProducePeriod()) {
1023             xrun_occurred = true;
1024         }
1025         // a xrun has occurred on the Iso side
1026         xrun_occurred |= (*it)->xrunOccurred();
1027         in_error |= (*it)->inError();
1028     }
1029     #endif
1030
1031     if(xrun_occurred) {
1032         debugOutput( DEBUG_LEVEL_VERBOSE, "exit due to xrun...\n");
1033     }
1034     if(in_error) {
1035         debugOutput( DEBUG_LEVEL_VERBOSE, "exit due to error...\n");
1036         m_shutdown_needed = true;
1037     }
1038
1039     // we save the 'ideal' time of the transfer at this point,
1040     // because we can have interleaved read - process - write
1041     // cycles making that we modify a receiving stream's buffer
1042     // before we get to writing.
1043     // NOTE: before waitForPeriod() is called again, both the transmit
1044     //       and the receive processors should have done their transfer.
1045     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
1046    
1047     #ifdef DEBUG
1048     static uint64_t m_time_of_transfer2 = m_time_of_transfer;
1049    
1050     int ticks_per_period = (int)(m_SyncSource->getTicksPerFrame() * m_period);
1051     int diff=diffTicks(m_time_of_transfer, m_time_of_transfer2);
1052     // display message if the difference between two successive tick
1053     // values is more than 50 ticks. 1 sample at 48k is 512 ticks
1054     // so 50 ticks = 10%, which is a rather large jitter value.
1055     if(diff-ticks_per_period > 50 || diff-ticks_per_period < -50) {
1056         debugOutput(DEBUG_LEVEL_VERBOSE, "rather large TSP difference TS=%011llu => TS=%011llu (%d, nom %d)\n",
1057                                             m_time_of_transfer2, m_time_of_transfer, diff, ticks_per_period);
1058     }
1059     m_time_of_transfer2 = m_time_of_transfer;
1060     #endif
1061
1062     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1063                         "transfer period %d at %llu ticks...\n",
1064                         m_nbperiods, m_time_of_transfer);
1065
1066     // this is to notify the client of the delay that we introduced by waiting
1067     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
1068     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1069                         "delayed for %d usecs...\n",
1070                         m_delayed_usecs);
1071
1072 #ifdef DEBUG
1073     int rcv_bf=0, xmt_bf=0;
1074     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1075         it != m_ReceiveProcessors.end();
1076         ++it ) {
1077         rcv_bf = (*it)->getBufferFill();
1078     }
1079     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1080         it != m_TransmitProcessors.end();
1081         ++it ) {
1082         xmt_bf = (*it)->getBufferFill();
1083     }
1084     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
1085                         "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
1086                         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
1087
1088     // check if xruns occurred on the Iso side.
1089     // also check if xruns will occur should we transfer() now
1090     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1091           it != m_ReceiveProcessors.end();
1092           ++it ) {
1093
1094         if ((*it)->xrunOccurred()) {
1095             debugOutput(DEBUG_LEVEL_NORMAL,
1096                         "Xrun on RECV SP %p due to ISO side xrun\n", *it);
1097             (*it)->dumpInfo();
1098         }
1099         if (!((*it)->canClientTransferFrames(m_period))) {
1100             debugOutput(DEBUG_LEVEL_NORMAL,
1101                         "Xrun on RECV SP %p due to buffer side xrun\n", *it);
1102             (*it)->dumpInfo();
1103         }
1104     }
1105     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1106           it != m_TransmitProcessors.end();
1107           ++it ) {
1108         if ((*it)->xrunOccurred()) {
1109             debugOutput(DEBUG_LEVEL_NORMAL,
1110                         "Xrun on XMIT SP %p due to ISO side xrun\n", *it);
1111         }
1112         if (!((*it)->canClientTransferFrames(m_period))) {
1113             debugOutput(DEBUG_LEVEL_NORMAL,
1114                         "Xrun on XMIT SP %p due to buffer side xrun\n", *it);
1115         }
1116     }
1117 #endif
1118     m_nbperiods++;
1119     // now we can signal the client that we are (should be) ready
1120     return !xrun_occurred;
1121 }
1122
1123 /**
1124  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
1125  *
1126  * Transfers one period of frames from the client side to the Iso side and vice versa.
1127  *
1128  * @return true if successful, false otherwise (indicates xrun).
1129  */
1130 bool StreamProcessorManager::transfer() {
1131     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
1132     bool retval=true;
1133     retval &= transfer(StreamProcessor::ePT_Receive);
1134     retval &= transfer(StreamProcessor::ePT_Transmit);
1135     return retval;
1136 }
1137
1138 /**
1139  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
1140  *
1141  * Transfers one period of frames from the client side to the Iso side or vice versa.
1142  *
1143  * @param t The processor type to tranfer for (receive or transmit)
1144  * @return true if successful, false otherwise (indicates xrun).
1145  */
1146 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
1147     if(m_SyncSource == NULL) return false;
1148     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
1149         "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
1150         t, m_time_of_transfer,
1151         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
1152         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
1153         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
1154
1155     bool retval = true;
1156     // a static cast could make sure that there is no performance
1157     // penalty for the virtual functions (to be checked)
1158     if (t==StreamProcessor::ePT_Receive) {
1159         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1160                 it != m_ReceiveProcessors.end();
1161                 ++it ) {
1162             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
1163                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
1164                             m_period, m_time_of_transfer,*it);
1165                 retval &= false; // buffer underrun
1166             }
1167         }
1168     } else {
1169         // FIXME: in the SPM it would be nice to have system time instead of
1170         //        1394 time
1171         float rate = m_SyncSource->getTicksPerFrame();
1172         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)((m_nb_buffers * m_period))) * rate);
1173
1174         // the data we are putting into the buffer is intended to be transmitted
1175         // one ringbuffer size after it has been received
1176         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1177
1178         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1179                 it != m_TransmitProcessors.end();
1180                 ++it ) {
1181             // FIXME: in the SPM it would be nice to have system time instead of
1182             //        1394 time
1183             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
1184                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
1185                         m_period, transmit_timestamp, *it);
1186                 retval &= false; // buffer underrun
1187             }
1188         }
1189     }
1190     return retval;
1191 }
1192
1193 /**
1194  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
1195  *
1196  * Transfers one period of silence to the Iso side for transmit SP's
1197  * or dump one period of frames for receive SP's
1198  *
1199  * @return true if successful, false otherwise (indicates xrun).
1200  */
1201 bool StreamProcessorManager::transferSilence() {
1202     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
1203     bool retval=true;
1204     // NOTE: the order here is opposite from the order in
1205     // normal operation (transmit is before receive), because
1206     // we can do that here (data=silence=available) and
1207     // it increases reliability (esp. on startup)
1208     retval &= transferSilence(StreamProcessor::ePT_Transmit);
1209     retval &= transferSilence(StreamProcessor::ePT_Receive);
1210     return retval;
1211 }
1212
1213 /**
1214  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
1215  *
1216  * Transfers one period of silence to the Iso side for transmit SP's
1217  * or dump one period of frames for receive SP's
1218  *
1219  * @param t The processor type to tranfer for (receive or transmit)
1220  * @return true if successful, false otherwise (indicates xrun).
1221  */
1222 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
1223     if(m_SyncSource == NULL) return false;
1224     debugOutput( DEBUG_LEVEL_VERY_VERBOSE,
1225         "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
1226         t, m_time_of_transfer,
1227         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
1228         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
1229         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
1230
1231     bool retval = true;
1232     // a static cast could make sure that there is no performance
1233     // penalty for the virtual functions (to be checked)
1234     if (t==StreamProcessor::ePT_Receive) {
1235         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1236                 it != m_ReceiveProcessors.end();
1237                 ++it ) {
1238             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
1239                     debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n",
1240                             m_period, m_time_of_transfer,*it);
1241                 retval &= false; // buffer underrun
1242             }
1243         }
1244     } else {
1245         // FIXME: in the SPM it would be nice to have system time instead of
1246         //        1394 time
1247         float rate = m_SyncSource->getTicksPerFrame();
1248         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
1249
1250         // the data we are putting into the buffer is intended to be transmitted
1251         // one ringbuffer size after it has been received
1252         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1253
1254         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1255                 it != m_TransmitProcessors.end();
1256                 ++it ) {
1257             // FIXME: in the SPM it would be nice to have system time instead of
1258             //        1394 time
1259             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
1260                 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
1261                         m_period, transmit_timestamp, *it);
1262                 retval &= false; // buffer underrun
1263             }
1264         }
1265     }
1266     return retval;
1267 }
1268
1269 void StreamProcessorManager::dumpInfo() {
1270     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1271     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
1272     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
1273     debugOutputShort( DEBUG_LEVEL_NORMAL, "Data type: %s\n", (m_audio_datatype==eADT_Float?"float":"int24"));
1274
1275     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
1276     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1277         it != m_ReceiveProcessors.end();
1278         ++it ) {
1279         (*it)->dumpInfo();
1280     }
1281
1282     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
1283     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1284         it != m_TransmitProcessors.end();
1285         ++it ) {
1286         (*it)->dumpInfo();
1287     }
1288
1289     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1290
1291 }
1292
1293 void StreamProcessorManager::setVerboseLevel(int l) {
1294     if(m_WaitLock) m_WaitLock->setVerboseLevel(l);
1295
1296     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1297         it != m_ReceiveProcessors.end();
1298         ++it ) {
1299         (*it)->setVerboseLevel(l);
1300     }
1301     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1302         it != m_TransmitProcessors.end();
1303         ++it ) {
1304         (*it)->setVerboseLevel(l);
1305     }
1306     setDebugLevel(l);
1307     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting verbose level to %d...\n", l );
1308 }
1309
1310 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1311     int count=0;
1312
1313     if (direction == Port::E_Capture) {
1314         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1315             it != m_ReceiveProcessors.end();
1316             ++it ) {
1317             count += (*it)->getPortCount(type);
1318         }
1319     } else {
1320         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1321             it != m_TransmitProcessors.end();
1322             ++it ) {
1323             count += (*it)->getPortCount(type);
1324         }
1325     }
1326     return count;
1327 }
1328
1329 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1330     int count=0;
1331
1332     if (direction == Port::E_Capture) {
1333         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1334             it != m_ReceiveProcessors.end();
1335             ++it ) {
1336             count += (*it)->getPortCount();
1337         }
1338     } else {
1339         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1340             it != m_TransmitProcessors.end();
1341             ++it ) {
1342             count += (*it)->getPortCount();
1343         }
1344     }
1345     return count;
1346 }
1347
1348 // TODO: implement a port map here, instead of the loop
1349 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1350     int count=0;
1351     int prevcount=0;
1352
1353     if (direction == Port::E_Capture) {
1354         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1355             it != m_ReceiveProcessors.end();
1356             ++it ) {
1357             count += (*it)->getPortCount();
1358             if (count > idx) {
1359                 return (*it)->getPortAtIdx(idx-prevcount);
1360             }
1361             prevcount=count;
1362         }
1363     } else {
1364         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1365             it != m_TransmitProcessors.end();
1366             ++it ) {
1367             count += (*it)->getPortCount();
1368             if (count > idx) {
1369                 return (*it)->getPortAtIdx(idx-prevcount);
1370             }
1371             prevcount=count;
1372         }
1373     }
1374     return NULL;
1375 }
1376
1377 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1378     m_thread_realtime=rt;
1379     m_thread_priority=priority;
1380     return true;
1381 }
1382
1383
1384 } // end of namespace
Note: See TracBrowser for help on using the browser.