root/branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp

Revision 721, 33.0 kB (checked in by ppalmers, 16 years ago)

another update

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public
11  * License version 2.1, as published by the Free Software Foundation;
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public
19  * License along with this library; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
21  * MA 02110-1301 USA
22  */
23
24 #include "StreamProcessorManager.h"
25 #include "generic/StreamProcessor.h"
26 #include "generic/Port.h"
27 #include "util/cycletimer.h"
28
29 #include <errno.h>
30 #include <assert.h>
31
32 #define RUNNING_TIMEOUT_MSEC 4000
33 #define PREPARE_TIMEOUT_MSEC 4000
34 #define ENABLE_TIMEOUT_MSEC 4000
35
36 // allows to add some processing margin. This shifts the time
37 // at which the buffer is transfer()'ed, making things somewhat
38 // more robust. It should be noted though that shifting the transfer
39 // time to a later time instant also causes the xmit buffer fill to be
40 // lower on average.
41 #define FFADO_SIGNAL_DELAY_TICKS 3072
42
43 namespace Streaming {
44
45 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
46
47 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int framerate, unsigned int nb_buffers)
48     : m_is_slave( false )
49     , m_SyncSource(NULL)
50     , m_nb_buffers(nb_buffers)
51     , m_period(period)
52     , m_nominal_framerate ( framerate )
53     , m_xruns(0)
54     , m_isoManager(0)
55     , m_nbperiods(0)
56 {
57     addOption(Util::OptionContainer::Option("slaveMode",false));
58 }
59
60 StreamProcessorManager::~StreamProcessorManager() {
61     if (m_isoManager) delete m_isoManager;
62 }
63
64 /**
65  * Registers \ref processor with this manager.
66  *
67  * also registers it with the isohandlermanager
68  *
69  * be sure to call isohandlermanager->init() first!
70  * and be sure that the processors are also ->init()'ed
71  *
72  * @param processor
73  * @return true if successfull
74  */
75 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
76 {
77     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
78     assert(processor);
79     assert(m_isoManager);
80
81     if (processor->getType() == StreamProcessor::ePT_Receive) {
82         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
83
84         m_ReceiveProcessors.push_back(processor);
85         processor->setManager(this);
86         return true;
87     }
88
89     if (processor->getType() == StreamProcessor::ePT_Transmit) {
90         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
91
92         m_TransmitProcessors.push_back(processor);
93         processor->setManager(this);
94         return true;
95     }
96
97     debugFatal("Unsupported processor type!\n");
98     return false;
99 }
100
101 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
102 {
103     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
104     assert(processor);
105
106     if (processor->getType()==StreamProcessor::ePT_Receive) {
107
108         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
109               it != m_ReceiveProcessors.end();
110               ++it )
111         {
112             if ( *it == processor ) {
113                 m_ReceiveProcessors.erase(it);
114                 processor->clearManager();
115                 if(!m_isoManager->unregisterStream(processor)) {
116                     debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
117                     return false;
118                 }
119                 return true;
120             }
121         }
122     }
123
124     if (processor->getType()==StreamProcessor::ePT_Transmit) {
125         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
126               it != m_TransmitProcessors.end();
127               ++it )
128         {
129             if ( *it == processor ) {
130                 m_TransmitProcessors.erase(it);
131                 processor->clearManager();
132                 if(!m_isoManager->unregisterStream(processor)) {
133                     debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
134                     return false;
135                 }
136                 return true;
137             }
138         }
139     }
140
141     debugFatal("Processor (%p) not found!\n",processor);
142     return false; //not found
143 }
144
145 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
146     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
147     m_SyncSource=s;
148     return true;
149 }
150
151 bool StreamProcessorManager::init()
152 {
153     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
154     m_isoManager = new IsoHandlerManager(m_thread_realtime, m_thread_priority + 1);
155     if(!m_isoManager) {
156         debugFatal("Could not create IsoHandlerManager\n");
157         return false;
158     }
159     m_isoManager->setVerboseLevel(getDebugLevel());
160     if(!m_isoManager->init()) {
161         debugFatal("Could not initialize IsoHandlerManager\n");
162         return false;
163     }
164
165     m_xrun_happened=false;
166     return true;
167 }
168
169 bool StreamProcessorManager::prepare() {
170
171     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
172
173     m_is_slave=false;
174     if(!getOption("slaveMode", m_is_slave)) {
175         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
176     }
177
178     // if no sync source is set, select one here
179     if(m_SyncSource == NULL) {
180        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
181     }
182
183     // FIXME: put into separate method
184     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
185           it != m_ReceiveProcessors.end();
186           ++it )
187     {
188         if(m_SyncSource == NULL) {
189             debugWarning(" => Sync Source is %p.\n", *it);
190             m_SyncSource = *it;
191         }
192     }
193     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
194           it != m_TransmitProcessors.end();
195           ++it )
196     {
197         if(m_SyncSource == NULL) {
198             debugWarning(" => Sync Source is %p.\n", *it);
199             m_SyncSource = *it;
200         }
201     }
202
203     // now do the actual preparation of the SP's
204     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
205     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
206         it != m_ReceiveProcessors.end();
207         ++it ) {
208
209         if(!(*it)->setOption("slaveMode", m_is_slave)) {
210             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
211         }
212
213         if(!(*it)->prepare()) {
214             debugFatal(  " could not prepare (%p)...\n",(*it));
215             return false;
216         }
217     }
218     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
219     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
220         it != m_TransmitProcessors.end();
221         ++it ) {
222         if(!(*it)->setOption("slaveMode", m_is_slave)) {
223             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
224         }
225         if(!(*it)->prepare()) {
226             debugFatal( " could not prepare (%p)...\n",(*it));
227             return false;
228         }
229     }
230
231     // if there are no stream processors registered,
232     // fail
233     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
234         debugFatal("No stream processors registered, can't do anything usefull\n");
235         return false;
236     }
237     return true;
238 }
239
240 bool StreamProcessorManager::startDryRunning() {
241     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for StreamProcessor streams to start dry-running...\n");
242     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
243             it != m_ReceiveProcessors.end();
244             ++it ) {
245         if(!(*it)->startDryRunning(-1)) {
246             debugError("Could not put SP %p into the dry-running state\n", *it);
247             return false;
248         }
249     }
250     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
251             it != m_TransmitProcessors.end();
252             ++it ) {
253         if(!(*it)->startDryRunning(-1)) {
254             debugError("Could not put SP %p into the dry-running state\n", *it);
255             return false;
256         }
257     }
258     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
259     return true;
260 }
261
262 bool StreamProcessorManager::syncStartAll() {
263     // figure out when to get the SP's running.
264     // the xmit SP's should also know the base timestamp
265     // streams should be aligned here
266
267     // now find out how long we have to delay the wait operation such that
268     // the received frames will all be presented to the SP
269     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
270     int max_of_min_delay = 0;
271     int min_delay = 0;
272     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
273             it != m_ReceiveProcessors.end();
274             ++it ) {
275         min_delay = (*it)->getMaxFrameLatency();
276         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
277     }
278
279     // add some processing margin. This only shifts the time
280     // at which the buffer is transfer()'ed. This makes things somewhat
281     // more robust. It should be noted though that shifting the transfer
282     // time to a later time instant also causes the xmit buffer fill to be
283     // lower on average.
284     max_of_min_delay += FFADO_SIGNAL_DELAY_TICKS;
285     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n",
286         max_of_min_delay,
287         (unsigned int)TICKS_TO_SECS(max_of_min_delay),
288         (unsigned int)TICKS_TO_CYCLES(max_of_min_delay),
289         (unsigned int)TICKS_TO_OFFSET(max_of_min_delay));
290     m_SyncSource->setSyncDelay(max_of_min_delay);
291
292     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
293     //        have aquired lock
294     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
295     //sleep(2); // FIXME: be smarter here
296
297     // make sure that we are dry-running long enough for the
298     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
299     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
300     int nb_sync_runs=20;
301     int64_t time_till_next_period;
302     while(nb_sync_runs--) { // or while not sync-ed?
303         // check if we were waked up too soon
304         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
305         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
306         if(time_till_next_period > 0) {
307             // wait for the period
308             usleep(time_till_next_period);
309         }
310     }
311
312     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
313     // FIXME: in the SPM it would be nice to have system time instead of
314     //        1394 time
315
316     // we now should have decent sync info on the sync source
317     // determine a point in time where the system should start
318     // figure out where we are now
319     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
320     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
321         time_of_first_sample,
322         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
323         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
324         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
325
326     #define CYCLES_FOR_STARTUP 200
327     // start wet-running in CYCLES_FOR_STARTUP cycles
328     // this is the time window we have to setup all SP's such that they
329     // can start wet-running correctly.
330     time_of_first_sample = addTicks(time_of_first_sample,
331                                     CYCLES_FOR_STARTUP * TICKS_PER_CYCLE);
332
333     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
334         time_of_first_sample,
335         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
336         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
337         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
338
339     // we should start wet-running the transmit SP's some cycles in advance
340     // such that we know it is wet-running when it should output its first sample
341     #define PRESTART_CYCLES_FOR_XMIT 20
342     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
343                                                  PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE);
344
345     #define PRESTART_CYCLES_FOR_RECV 0
346     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
347                                                  PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE);
348     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
349         time_to_start_xmit,
350         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
351         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
352         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
353     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
354         time_to_start_recv,
355         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
356         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
357         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
358
359     // at this point the buffer head timestamp of the transmit buffers can be set
360     // this is the presentation time of the first sample in the buffer
361     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
362           it != m_TransmitProcessors.end();
363           ++it ) {
364         (*it)->setBufferHeadTimestamp(time_of_first_sample);
365     }
366
367     // STEP X: switch SP's over to the running state
368     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
369           it != m_ReceiveProcessors.end();
370           ++it ) {
371         if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
372             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
373             return false;
374         }
375     }
376     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
377           it != m_TransmitProcessors.end();
378           ++it ) {
379         if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
380             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
381             return false;
382         }
383     }
384     // wait for the syncsource to start running.
385     // that will block the waitForPeriod call until everyone has started (theoretically)
386     int cnt = CYCLES_FOR_STARTUP * 2; // by then it should have started
387     while (!m_SyncSource->isRunning() && cnt) {
388         usleep(125);
389         cnt--;
390     }
391     if(cnt==0) {
392         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
393         return false;
394     }
395     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
396     return true;
397 }
398
399 bool StreamProcessorManager::start() {
400     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
401     assert(m_isoManager);
402
403     debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
404     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
405     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
406           it != m_ReceiveProcessors.end();
407           ++it )
408     {
409         if (!m_isoManager->registerStream(*it)) {
410             debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
411             return false;
412         }
413     }
414     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
415     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
416           it != m_TransmitProcessors.end();
417           ++it )
418     {
419         if (!m_isoManager->registerStream(*it)) {
420             debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
421             return false;
422         }
423     }
424
425     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
426     if (!m_isoManager->prepare()) {
427         debugFatal("Could not prepare isoManager\n");
428         return false;
429     }
430
431     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n");
432     if (!m_isoManager->startHandlers(-1)) {
433         debugFatal("Could not start handlers...\n");
434         return false;
435     }
436
437     // put all SP's into dry-running state
438     if (!startDryRunning()) {
439         debugFatal("Could not put SP's in dry-running state\n");
440         return false;
441     }
442
443     // start all SP's synchonized
444     if (!syncStartAll()) {
445         debugFatal("Could not syncStartAll...\n");
446         return false;
447     }
448
449     // dump the iso stream information when in verbose mode
450     if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
451         m_isoManager->dumpInfo();
452     }
453
454     return true;
455 }
456
457 bool StreamProcessorManager::stop() {
458     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
459     assert(m_isoManager);
460
461     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
462
463     // switch SP's over to the dry-running state
464     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
465           it != m_ReceiveProcessors.end();
466           ++it ) {
467         if(!(*it)->scheduleStopRunning(-1)) {
468             debugError("%p->scheduleStopRunning(-1) failed\n", *it);
469             return false;
470         }
471     }
472     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
473           it != m_TransmitProcessors.end();
474           ++it ) {
475         if(!(*it)->scheduleStopRunning(-1)) {
476             debugError("%p->scheduleStopRunning(-1) failed\n", *it);
477             return false;
478         }
479     }
480     // wait for the SP's to get into the dry-running state
481     int cnt = 200;
482     bool ready = false;
483     while (!ready && cnt) {
484         ready = true;
485         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
486             it != m_ReceiveProcessors.end();
487             ++it ) {
488             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
489         }
490         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
491             it != m_TransmitProcessors.end();
492             ++it ) {
493             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
494         }
495         usleep(125);
496         cnt--;
497     }
498     if(cnt==0) {
499         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
500         return false;
501     }
502
503     // switch SP's over to the stopped state
504     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
505           it != m_ReceiveProcessors.end();
506           ++it ) {
507         if(!(*it)->scheduleStopDryRunning(-1)) {
508             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
509             return false;
510         }
511     }
512     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
513           it != m_TransmitProcessors.end();
514           ++it ) {
515         if(!(*it)->scheduleStopDryRunning(-1)) {
516             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
517             return false;
518         }
519     }
520     // wait for the SP's to get into the running state
521     cnt = 200;
522     ready = false;
523     while (!ready && cnt) {
524         ready = true;
525         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
526             it != m_ReceiveProcessors.end();
527             ++it ) {
528             ready &= (*it)->isStopped();
529         }
530         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
531             it != m_TransmitProcessors.end();
532             ++it ) {
533             ready &= (*it)->isStopped();
534         }
535         usleep(125);
536         cnt--;
537     }
538     if(cnt==0) {
539         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to stop\n");
540         return false;
541     }
542
543     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n");
544     if(!m_isoManager->stopHandlers()) {
545        debugFatal("Could not stop ISO handlers\n");
546        return false;
547     }
548
549     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processors from handlers...\n");
550     // now unregister all streams from iso manager
551     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
552     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
553           it != m_ReceiveProcessors.end();
554           ++it ) {
555         if (!m_isoManager->unregisterStream(*it)) {
556             debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it);
557             return false;
558         }
559     }
560     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
561     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
562           it != m_TransmitProcessors.end();
563           ++it ) {
564         if (!m_isoManager->unregisterStream(*it)) {
565             debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it);
566             return false;
567         }
568     }
569     return true;
570 }
571
572 /**
573  * Called upon Xrun events. This brings all StreamProcessors back
574  * into their starting state, and then carries on streaming. This should
575  * have the same effect as restarting the whole thing.
576  *
577  * @return true if successful, false otherwise
578  */
579 bool StreamProcessorManager::handleXrun() {
580
581     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
582
583     dumpInfo();
584
585     /*
586      * Reset means:
587      * 1) Disabling the SP's, so that they don't process any packets
588      *    note: the isomanager does keep on delivering/requesting them
589      * 2) Bringing all buffers & streamprocessors into a know state
590      *    - Clear all capture buffers
591      *    - Put nb_periods*period_size of null frames into the playback buffers
592      * 3) Re-enable the SP's
593      */
594
595     // put all SP's back into dry-running state
596     if (!startDryRunning()) {
597         debugFatal("Could not put SP's in dry-running state\n");
598         return false;
599     }
600
601     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
602     // start all SP's synchonized
603     if (!syncStartAll()) {
604         debugFatal("Could not syncStartAll...\n");
605         return false;
606     }
607
608     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
609
610     return true;
611 }
612
613 /**
614  * @brief Waits until the next period of samples is ready
615  *
616  * This function does not return until a full period of samples is (or should be)
617  * ready to be transferred.
618  *
619  * @return true if the period is ready, false if an xrun occurred
620  */
621 bool StreamProcessorManager::waitForPeriod() {
622     int time_till_next_period;
623     bool xrun_occurred=false;
624
625     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
626
627     assert(m_SyncSource);
628
629     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
630
631     while(time_till_next_period > 0) {
632         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
633
634         // wait for the period
635         usleep(time_till_next_period);
636
637         // check for underruns on the ISO side,
638         // those should make us bail out of the wait loop
639         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
640             it != m_ReceiveProcessors.end();
641             ++it ) {
642             // a xrun has occurred on the Iso side
643             xrun_occurred |= (*it)->xrunOccurred();
644         }
645         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
646             it != m_TransmitProcessors.end();
647             ++it ) {
648             // a xrun has occurred on the Iso side
649             xrun_occurred |= (*it)->xrunOccurred();
650         }
651
652         if(xrun_occurred) break;
653
654         // check if we were waked up too soon
655         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
656     }
657
658     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", time_till_next_period);
659
660     // this is to notify the client of the delay
661     // that we introduced
662     m_delayed_usecs = -time_till_next_period;
663
664     // we save the 'ideal' time of the transfer at this point,
665     // because we can have interleaved read - process - write
666     // cycles making that we modify a receiving stream's buffer
667     // before we get to writing.
668     // NOTE: before waitForPeriod() is called again, both the transmit
669     //       and the receive processors should have done their transfer.
670     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
671     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
672         m_time_of_transfer);
673
674 #ifdef DEBUG
675     int rcv_bf=0, xmt_bf=0;
676     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
677         it != m_ReceiveProcessors.end();
678         ++it ) {
679         rcv_bf = (*it)->getBufferFill();
680     }
681     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
682         it != m_TransmitProcessors.end();
683         ++it ) {
684         xmt_bf = (*it)->getBufferFill();
685     }
686     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
687         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
688
689 #endif
690
691     xrun_occurred=false;
692
693     // check if xruns occurred on the Iso side.
694     // also check if xruns will occur should we transfer() now
695
696     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
697           it != m_ReceiveProcessors.end();
698           ++it ) {
699         // a xrun has occurred on the Iso side
700         xrun_occurred |= (*it)->xrunOccurred();
701
702         // if this is true, a xrun will occur
703         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
704
705 #ifdef DEBUG
706         if ((*it)->xrunOccurred()) {
707             debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it);
708             (*it)->dumpInfo();
709         }
710         if (!((*it)->canClientTransferFrames(m_period))) {
711             debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it);
712             (*it)->dumpInfo();
713         }
714 #endif
715
716     }
717     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
718           it != m_TransmitProcessors.end();
719           ++it ) {
720         // a xrun has occurred on the Iso side
721         xrun_occurred |= (*it)->xrunOccurred();
722
723         // if this is true, a xrun will occur
724         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
725
726 #ifdef DEBUG
727         if ((*it)->xrunOccurred()) {
728             debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it);
729         }
730         if (!((*it)->canClientTransferFrames(m_period))) {
731             debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it);
732         }
733 #endif
734     }
735
736     m_nbperiods++;
737
738     // now we can signal the client that we are (should be) ready
739     return !xrun_occurred;
740 }
741
742 /**
743  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
744  *
745  * Transfers one period of frames from the client side to the Iso side and vice versa.
746  *
747  * @return true if successful, false otherwise (indicates xrun).
748  */
749 bool StreamProcessorManager::transfer() {
750
751     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
752     bool retval=true;
753     retval &= transfer(StreamProcessor::ePT_Receive);
754     retval &= transfer(StreamProcessor::ePT_Transmit);
755     return retval;
756 }
757
758 /**
759  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
760  *
761  * Transfers one period of frames from the client side to the Iso side or vice versa.
762  *
763  * @param t The processor type to tranfer for (receive or transmit)
764  * @return true if successful, false otherwise (indicates xrun).
765  */
766
767 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
768     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
769         t, m_time_of_transfer,
770         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
771         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
772         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
773
774     bool retval = true;
775     // a static cast could make sure that there is no performance
776     // penalty for the virtual functions (to be checked)
777     if (t==StreamProcessor::ePT_Receive) {
778         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
779                 it != m_ReceiveProcessors.end();
780                 ++it ) {
781             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
782                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
783                             m_period, m_time_of_transfer,*it);
784                 retval &= false; // buffer underrun
785             }
786         }
787     } else {
788         // FIXME: in the SPM it would be nice to have system time instead of
789         //        1394 time
790         float rate = m_SyncSource->getTicksPerFrame();
791         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
792
793         // the data we are putting into the buffer is intended to be transmitted
794         // one ringbuffer size after it has been received
795         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
796
797         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
798                 it != m_TransmitProcessors.end();
799                 ++it ) {
800             // FIXME: in the SPM it would be nice to have system time instead of
801             //        1394 time
802             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
803                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
804                         m_period, transmit_timestamp, *it);
805                 retval &= false; // buffer underrun
806             }
807         }
808     }
809     return retval;
810 }
811
812 void StreamProcessorManager::dumpInfo() {
813     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
814     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
815     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
816
817     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
818     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
819         it != m_ReceiveProcessors.end();
820         ++it ) {
821         (*it)->dumpInfo();
822     }
823
824     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
825     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
826         it != m_TransmitProcessors.end();
827         ++it ) {
828         (*it)->dumpInfo();
829     }
830
831     debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
832     m_isoManager->dumpInfo();
833     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
834
835 }
836
837 void StreamProcessorManager::setVerboseLevel(int l) {
838     setDebugLevel(l);
839
840     if (m_isoManager) m_isoManager->setVerboseLevel(l);
841
842     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
843     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
844         it != m_ReceiveProcessors.end();
845         ++it ) {
846         (*it)->setVerboseLevel(l);
847     }
848
849     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
850     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
851         it != m_TransmitProcessors.end();
852         ++it ) {
853         (*it)->setVerboseLevel(l);
854     }
855 }
856
857
858 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
859     int count=0;
860
861     if (direction == Port::E_Capture) {
862         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
863             it != m_ReceiveProcessors.end();
864             ++it ) {
865             count += (*it)->getPortCount(type);
866         }
867     } else {
868         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
869             it != m_TransmitProcessors.end();
870             ++it ) {
871             count += (*it)->getPortCount(type);
872         }
873     }
874     return count;
875 }
876
877 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
878     int count=0;
879
880     if (direction == Port::E_Capture) {
881         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
882             it != m_ReceiveProcessors.end();
883             ++it ) {
884             count += (*it)->getPortCount();
885         }
886     } else {
887         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
888             it != m_TransmitProcessors.end();
889             ++it ) {
890             count += (*it)->getPortCount();
891         }
892     }
893     return count;
894 }
895
896 // TODO: implement a port map here, instead of the loop
897
898 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
899     int count=0;
900     int prevcount=0;
901
902     if (direction == Port::E_Capture) {
903         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
904             it != m_ReceiveProcessors.end();
905             ++it ) {
906             count += (*it)->getPortCount();
907             if (count > idx) {
908                 return (*it)->getPortAtIdx(idx-prevcount);
909             }
910             prevcount=count;
911         }
912     } else {
913         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
914             it != m_TransmitProcessors.end();
915             ++it ) {
916             count += (*it)->getPortCount();
917             if (count > idx) {
918                 return (*it)->getPortAtIdx(idx-prevcount);
919             }
920             prevcount=count;
921         }
922     }
923     return NULL;
924 }
925
926 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
927     m_thread_realtime=rt;
928     m_thread_priority=priority;
929     return true;
930 }
931
932
933 } // end of namespace
Note: See TracBrowser for help on using the browser.