root/branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp

Revision 723, 36.4 kB (checked in by ppalmers, 14 years ago)

fix problem where receiving multiple streams doesn't work

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public
11  * License version 2.1, as published by the Free Software Foundation;
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public
19  * License along with this library; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
21  * MA 02110-1301 USA
22  */
23
24 #include "StreamProcessorManager.h"
25 #include "generic/StreamProcessor.h"
26 #include "generic/Port.h"
27 #include "util/cycletimer.h"
28
29 #include <errno.h>
30 #include <assert.h>
31
32 #define RUNNING_TIMEOUT_MSEC 4000
33 #define PREPARE_TIMEOUT_MSEC 4000
34 #define ENABLE_TIMEOUT_MSEC 4000
35
36 // allows to add some processing margin. This shifts the time
37 // at which the buffer is transfer()'ed, making things somewhat
38 // more robust. It should be noted though that shifting the transfer
39 // time to a later time instant also causes the xmit buffer fill to be
40 // lower on average.
41 #define FFADO_SIGNAL_DELAY_TICKS 3072
42
43 namespace Streaming {
44
45 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
46
47 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int framerate, unsigned int nb_buffers)
48     : m_is_slave( false )
49     , m_SyncSource(NULL)
50     , m_nb_buffers(nb_buffers)
51     , m_period(period)
52     , m_nominal_framerate ( framerate )
53     , m_xruns(0)
54     , m_isoManager(0)
55     , m_nbperiods(0)
56 {
57     addOption(Util::OptionContainer::Option("slaveMode",false));
58 }
59
60 StreamProcessorManager::~StreamProcessorManager() {
61     if (m_isoManager) delete m_isoManager;
62 }
63
64 /**
65  * Registers \ref processor with this manager.
66  *
67  * also registers it with the isohandlermanager
68  *
69  * be sure to call isohandlermanager->init() first!
70  * and be sure that the processors are also ->init()'ed
71  *
72  * @param processor
73  * @return true if successfull
74  */
75 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
76 {
77     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
78     assert(processor);
79     assert(m_isoManager);
80
81     if (processor->getType() == StreamProcessor::ePT_Receive) {
82         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
83
84         m_ReceiveProcessors.push_back(processor);
85         processor->setManager(this);
86         return true;
87     }
88
89     if (processor->getType() == StreamProcessor::ePT_Transmit) {
90         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
91
92         m_TransmitProcessors.push_back(processor);
93         processor->setManager(this);
94         return true;
95     }
96
97     debugFatal("Unsupported processor type!\n");
98     return false;
99 }
100
101 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
102 {
103     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
104     assert(processor);
105
106     if (processor->getType()==StreamProcessor::ePT_Receive) {
107
108         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
109               it != m_ReceiveProcessors.end();
110               ++it )
111         {
112             if ( *it == processor ) {
113                 m_ReceiveProcessors.erase(it);
114                 processor->clearManager();
115                 if(!m_isoManager->unregisterStream(processor)) {
116                     debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
117                     return false;
118                 }
119                 return true;
120             }
121         }
122     }
123
124     if (processor->getType()==StreamProcessor::ePT_Transmit) {
125         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
126               it != m_TransmitProcessors.end();
127               ++it )
128         {
129             if ( *it == processor ) {
130                 m_TransmitProcessors.erase(it);
131                 processor->clearManager();
132                 if(!m_isoManager->unregisterStream(processor)) {
133                     debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
134                     return false;
135                 }
136                 return true;
137             }
138         }
139     }
140
141     debugFatal("Processor (%p) not found!\n",processor);
142     return false; //not found
143 }
144
145 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
146     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
147     m_SyncSource=s;
148     return true;
149 }
150
151 bool StreamProcessorManager::init()
152 {
153     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
154     m_isoManager = new IsoHandlerManager(m_thread_realtime, m_thread_priority + 1);
155     if(!m_isoManager) {
156         debugFatal("Could not create IsoHandlerManager\n");
157         return false;
158     }
159     m_isoManager->setVerboseLevel(getDebugLevel());
160     m_isoManager->setTransmitBufferNbPeriods(getNbBuffers() - 1);
161
162     if(!m_isoManager->init()) {
163         debugFatal("Could not initialize IsoHandlerManager\n");
164         return false;
165     }
166
167     m_xrun_happened=false;
168     return true;
169 }
170
171 bool StreamProcessorManager::prepare() {
172
173     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
174
175     m_is_slave=false;
176     if(!getOption("slaveMode", m_is_slave)) {
177         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
178     }
179
180     // if no sync source is set, select one here
181     if(m_SyncSource == NULL) {
182        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
183     }
184
185     // FIXME: put into separate method
186     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
187           it != m_ReceiveProcessors.end();
188           ++it )
189     {
190         if(m_SyncSource == NULL) {
191             debugWarning(" => Sync Source is %p.\n", *it);
192             m_SyncSource = *it;
193         }
194     }
195     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
196           it != m_TransmitProcessors.end();
197           ++it )
198     {
199         if(m_SyncSource == NULL) {
200             debugWarning(" => Sync Source is %p.\n", *it);
201             m_SyncSource = *it;
202         }
203     }
204
205     // now do the actual preparation of the SP's
206     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
207     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
208         it != m_ReceiveProcessors.end();
209         ++it ) {
210
211         if(!(*it)->setOption("slaveMode", m_is_slave)) {
212             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
213         }
214
215         if(!(*it)->prepare()) {
216             debugFatal(  " could not prepare (%p)...\n",(*it));
217             return false;
218         }
219     }
220     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
221     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
222         it != m_TransmitProcessors.end();
223         ++it ) {
224         if(!(*it)->setOption("slaveMode", m_is_slave)) {
225             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
226         }
227         if(!(*it)->prepare()) {
228             debugFatal( " could not prepare (%p)...\n",(*it));
229             return false;
230         }
231     }
232
233     // if there are no stream processors registered,
234     // fail
235     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
236         debugFatal("No stream processors registered, can't do anything usefull\n");
237         return false;
238     }
239     return true;
240 }
241
242 bool StreamProcessorManager::startDryRunning() {
243     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
244     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
245     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
246             it != m_ReceiveProcessors.end();
247             ++it ) {
248         if (!(*it)->isDryRunning()) {
249             if(!(*it)->scheduleStartDryRunning(-1)) {
250                 debugError("Could not put SP %p into the dry-running state\n", *it);
251                 return false;
252             }
253         } else {
254             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
255         }
256     }
257     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
258             it != m_TransmitProcessors.end();
259             ++it ) {
260         if (!(*it)->isDryRunning()) {
261             if(!(*it)->scheduleStartDryRunning(-1)) {
262                 debugError("Could not put SP %p into the dry-running state\n", *it);
263                 return false;
264             }
265         } else {
266             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
267         }
268     }
269     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
270     // wait for the syncsource to start running.
271     // that will block the waitForPeriod call until everyone has started (theoretically)
272     #define CYCLES_FOR_DRYRUN 40000
273     int cnt = CYCLES_FOR_DRYRUN; // by then it should have started
274     bool all_dry_running = false;
275     while (!all_dry_running && cnt) {
276         all_dry_running = true;
277         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
278                 it != m_ReceiveProcessors.end();
279                 ++it ) {
280             all_dry_running &= (*it)->isDryRunning();
281         }
282         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
283                 it != m_TransmitProcessors.end();
284                 ++it ) {
285             all_dry_running &= (*it)->isDryRunning();
286         }
287
288         usleep(125);
289         cnt--;
290     }
291     if(cnt==0) {
292         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
293         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
294                 it != m_ReceiveProcessors.end();
295                 ++it ) {
296             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
297                 (*it)->getTypeString(), *it, (*it)->getStateString());
298         }
299         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
300                 it != m_TransmitProcessors.end();
301                 ++it ) {
302             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
303                 (*it)->getTypeString(), *it, (*it)->getStateString());
304         }
305         return false;
306     }
307     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
308     return true;
309 }
310
311 bool StreamProcessorManager::syncStartAll() {
312     // figure out when to get the SP's running.
313     // the xmit SP's should also know the base timestamp
314     // streams should be aligned here
315
316     // now find out how long we have to delay the wait operation such that
317     // the received frames will all be presented to the SP
318     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
319     int max_of_min_delay = 0;
320     int min_delay = 0;
321     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
322             it != m_ReceiveProcessors.end();
323             ++it ) {
324         min_delay = (*it)->getMaxFrameLatency();
325         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
326     }
327
328     // add some processing margin. This only shifts the time
329     // at which the buffer is transfer()'ed. This makes things somewhat
330     // more robust. It should be noted though that shifting the transfer
331     // time to a later time instant also causes the xmit buffer fill to be
332     // lower on average.
333     max_of_min_delay += FFADO_SIGNAL_DELAY_TICKS;
334     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n",
335         max_of_min_delay,
336         (unsigned int)TICKS_TO_SECS(max_of_min_delay),
337         (unsigned int)TICKS_TO_CYCLES(max_of_min_delay),
338         (unsigned int)TICKS_TO_OFFSET(max_of_min_delay));
339     m_SyncSource->setSyncDelay(max_of_min_delay);
340
341     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
342     //        have aquired lock
343     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
344     //sleep(2); // FIXME: be smarter here
345
346     // make sure that we are dry-running long enough for the
347     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
348     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
349     int nb_sync_runs=20;
350     int64_t time_till_next_period;
351     while(nb_sync_runs--) { // or while not sync-ed?
352         // check if we were waked up too soon
353         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
354         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
355         if(time_till_next_period > 0) {
356             // wait for the period
357             usleep(time_till_next_period);
358         }
359     }
360
361     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
362     // FIXME: in the SPM it would be nice to have system time instead of
363     //        1394 time
364
365     // we now should have decent sync info on the sync source
366     // determine a point in time where the system should start
367     // figure out where we are now
368     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
369     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
370         time_of_first_sample,
371         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
372         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
373         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
374
375     #define CYCLES_FOR_STARTUP 2000
376     // start wet-running in CYCLES_FOR_STARTUP cycles
377     // this is the time window we have to setup all SP's such that they
378     // can start wet-running correctly.
379     time_of_first_sample = addTicks(time_of_first_sample,
380                                     CYCLES_FOR_STARTUP * TICKS_PER_CYCLE);
381
382     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
383         time_of_first_sample,
384         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
385         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
386         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
387
388     // we should start wet-running the transmit SP's some cycles in advance
389     // such that we know it is wet-running when it should output its first sample
390     #define PRESTART_CYCLES_FOR_XMIT 20
391     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
392                                                  PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE);
393
394     #define PRESTART_CYCLES_FOR_RECV 0
395     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
396                                                  PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE);
397     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
398         time_to_start_xmit,
399         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
400         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
401         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
402     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
403         time_to_start_recv,
404         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
405         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
406         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
407
408     // at this point the buffer head timestamp of the transmit buffers can be set
409     // this is the presentation time of the first sample in the buffer
410     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
411           it != m_TransmitProcessors.end();
412           ++it ) {
413         (*it)->setBufferHeadTimestamp(time_of_first_sample);
414     }
415
416     // STEP X: switch SP's over to the running state
417     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
418           it != m_ReceiveProcessors.end();
419           ++it ) {
420         if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
421             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
422             return false;
423         }
424     }
425     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
426           it != m_TransmitProcessors.end();
427           ++it ) {
428         if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
429             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
430             return false;
431         }
432     }
433     // wait for the syncsource to start running.
434     // that will block the waitForPeriod call until everyone has started (theoretically)
435     int cnt = CYCLES_FOR_STARTUP * 2; // by then it should have started
436     while (!m_SyncSource->isRunning() && cnt) {
437         usleep(125);
438         cnt--;
439     }
440     if(cnt==0) {
441         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
442         return false;
443     }
444
445     // now align the received streams
446     debugOutput( DEBUG_LEVEL_VERBOSE, " Aligning incoming streams...\n");
447    
448    
449     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
450     return true;
451 }
452
453 bool StreamProcessorManager::start() {
454     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
455     assert(m_isoManager);
456
457     debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
458     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
459     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
460           it != m_ReceiveProcessors.end();
461           ++it )
462     {
463         if (!m_isoManager->registerStream(*it)) {
464             debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
465             return false;
466         }
467     }
468     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
469     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
470           it != m_TransmitProcessors.end();
471           ++it )
472     {
473         if (!m_isoManager->registerStream(*it)) {
474             debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
475             return false;
476         }
477     }
478
479     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
480     if (!m_isoManager->prepare()) {
481         debugFatal("Could not prepare isoManager\n");
482         return false;
483     }
484
485     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n");
486     if (!m_isoManager->startHandlers(-1)) {
487         debugFatal("Could not start handlers...\n");
488         return false;
489     }
490
491     // put all SP's into dry-running state
492     if (!startDryRunning()) {
493         debugFatal("Could not put SP's in dry-running state\n");
494         return false;
495     }
496
497     // start all SP's synchonized
498     if (!syncStartAll()) {
499         debugFatal("Could not syncStartAll...\n");
500         return false;
501     }
502
503     // dump the iso stream information when in verbose mode
504     if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
505         m_isoManager->dumpInfo();
506     }
507
508     return true;
509 }
510
511 bool StreamProcessorManager::stop() {
512     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
513     assert(m_isoManager);
514
515     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
516
517     // switch SP's over to the dry-running state
518     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
519           it != m_ReceiveProcessors.end();
520           ++it ) {
521         if(!(*it)->scheduleStopRunning(-1)) {
522             debugError("%p->scheduleStopRunning(-1) failed\n", *it);
523             return false;
524         }
525     }
526     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
527           it != m_TransmitProcessors.end();
528           ++it ) {
529         if(!(*it)->scheduleStopRunning(-1)) {
530             debugError("%p->scheduleStopRunning(-1) failed\n", *it);
531             return false;
532         }
533     }
534     // wait for the SP's to get into the dry-running state
535     int cnt = 200;
536     bool ready = false;
537     while (!ready && cnt) {
538         ready = true;
539         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
540             it != m_ReceiveProcessors.end();
541             ++it ) {
542             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
543         }
544         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
545             it != m_TransmitProcessors.end();
546             ++it ) {
547             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
548         }
549         usleep(125);
550         cnt--;
551     }
552     if(cnt==0) {
553         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
554         return false;
555     }
556
557     // switch SP's over to the stopped state
558     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
559           it != m_ReceiveProcessors.end();
560           ++it ) {
561         if(!(*it)->scheduleStopDryRunning(-1)) {
562             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
563             return false;
564         }
565     }
566     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
567           it != m_TransmitProcessors.end();
568           ++it ) {
569         if(!(*it)->scheduleStopDryRunning(-1)) {
570             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
571             return false;
572         }
573     }
574     // wait for the SP's to get into the running state
575     cnt = 200;
576     ready = false;
577     while (!ready && cnt) {
578         ready = true;
579         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
580             it != m_ReceiveProcessors.end();
581             ++it ) {
582             ready &= (*it)->isStopped();
583         }
584         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
585             it != m_TransmitProcessors.end();
586             ++it ) {
587             ready &= (*it)->isStopped();
588         }
589         usleep(125);
590         cnt--;
591     }
592     if(cnt==0) {
593         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to stop\n");
594         return false;
595     }
596
597     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n");
598     if(!m_isoManager->stopHandlers()) {
599        debugFatal("Could not stop ISO handlers\n");
600        return false;
601     }
602
603     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processors from handlers...\n");
604     // now unregister all streams from iso manager
605     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
606     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
607           it != m_ReceiveProcessors.end();
608           ++it ) {
609         if (!m_isoManager->unregisterStream(*it)) {
610             debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it);
611             return false;
612         }
613     }
614     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
615     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
616           it != m_TransmitProcessors.end();
617           ++it ) {
618         if (!m_isoManager->unregisterStream(*it)) {
619             debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it);
620             return false;
621         }
622     }
623     return true;
624 }
625
626 /**
627  * Called upon Xrun events. This brings all StreamProcessors back
628  * into their starting state, and then carries on streaming. This should
629  * have the same effect as restarting the whole thing.
630  *
631  * @return true if successful, false otherwise
632  */
633 bool StreamProcessorManager::handleXrun() {
634
635     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
636
637     dumpInfo();
638
639     /*
640      * Reset means:
641      * 1) Disabling the SP's, so that they don't process any packets
642      *    note: the isomanager does keep on delivering/requesting them
643      * 2) Bringing all buffers & streamprocessors into a know state
644      *    - Clear all capture buffers
645      *    - Put nb_periods*period_size of null frames into the playback buffers
646      * 3) Re-enable the SP's
647      */
648
649     // put all SP's back into dry-running state
650     if (!startDryRunning()) {
651         debugFatal("Could not put SP's in dry-running state\n");
652         return false;
653     }
654
655     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
656     // start all SP's synchonized
657     if (!syncStartAll()) {
658         debugFatal("Could not syncStartAll...\n");
659         return false;
660     }
661
662     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
663
664     return true;
665 }
666
667 /**
668  * @brief Waits until the next period of samples is ready
669  *
670  * This function does not return until a full period of samples is (or should be)
671  * ready to be transferred.
672  *
673  * @return true if the period is ready, false if an xrun occurred
674  */
675 bool StreamProcessorManager::waitForPeriod() {
676     int time_till_next_period;
677     bool xrun_occurred = false;
678
679     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
680
681     assert(m_SyncSource);
682
683     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
684
685     while(time_till_next_period > 0) {
686         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
687
688         // wait for the period
689         usleep(time_till_next_period);
690
691         // check for underruns on the ISO side,
692         // those should make us bail out of the wait loop
693         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
694             it != m_ReceiveProcessors.end();
695             ++it ) {
696             // a xrun has occurred on the Iso side
697             xrun_occurred |= (*it)->xrunOccurred();
698         }
699         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
700             it != m_TransmitProcessors.end();
701             ++it ) {
702             // a xrun has occurred on the Iso side
703             xrun_occurred |= (*it)->xrunOccurred();
704         }
705         if(xrun_occurred) break;
706
707         // check if we were waked up too soon
708         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
709     }
710
711     // we save the 'ideal' time of the transfer at this point,
712     // because we can have interleaved read - process - write
713     // cycles making that we modify a receiving stream's buffer
714     // before we get to writing.
715     // NOTE: before waitForPeriod() is called again, both the transmit
716     //       and the receive processors should have done their transfer.
717     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
718     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
719         m_time_of_transfer);
720
721     // normally we can transfer frames at this time, but in some cases this is not true
722     // e.g. when there are not enough frames in the receive buffer.
723     // however this doesn't have to be a problem, since we can wait some more until we
724     // have enough frames. There is only a problem once the ISO xmit doesn't have packets
725     // to transmit, or if the receive buffer overflows. These conditions are signaled by
726     // the iso threads
727     // check if xruns occurred on the Iso side.
728     // also check if xruns will occur should we transfer() now
729     #ifdef DEBUG
730     int waited = -1;
731     #endif
732     bool ready_for_transfer = false;
733     xrun_occurred = false;
734     while (!ready_for_transfer && !xrun_occurred) {
735         ready_for_transfer = true;
736         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
737             it != m_ReceiveProcessors.end();
738             ++it ) {
739             ready_for_transfer &= ((*it)->canClientTransferFrames(m_period));
740             xrun_occurred |= (*it)->xrunOccurred();
741         }
742         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
743             it != m_TransmitProcessors.end();
744             ++it ) {
745             ready_for_transfer &= ((*it)->canClientTransferFrames(m_period));
746             xrun_occurred |= (*it)->xrunOccurred();
747         }
748         usleep(125); // MAGIC: one cycle sleep...
749         #ifdef DEBUG
750         waited++;
751         #endif
752     } // we are either ready or an xrun occurred
753
754     #ifdef DEBUG
755     if(waited > 0) {
756         debugOutput(DEBUG_LEVEL_VERBOSE, "Waited %d x 125us due to SP not ready for transfer\n", waited);
757     }
758     #endif
759
760     // this is to notify the client of the delay that we introduced by waiting
761     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
762     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", m_delayed_usecs);
763
764 #ifdef DEBUG
765     int rcv_bf=0, xmt_bf=0;
766     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
767         it != m_ReceiveProcessors.end();
768         ++it ) {
769         rcv_bf = (*it)->getBufferFill();
770     }
771     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
772         it != m_TransmitProcessors.end();
773         ++it ) {
774         xmt_bf = (*it)->getBufferFill();
775     }
776     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
777         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
778
779     // check if xruns occurred on the Iso side.
780     // also check if xruns will occur should we transfer() now
781     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
782           it != m_ReceiveProcessors.end();
783           ++it ) {
784
785         if ((*it)->xrunOccurred()) {
786             debugWarning("Xrun on RECV SP %p due to ISO side xrun\n",*it);
787             (*it)->dumpInfo();
788         }
789         if (!((*it)->canClientTransferFrames(m_period))) {
790             debugWarning("Xrun on RECV SP %p due to buffer side xrun\n",*it);
791             (*it)->dumpInfo();
792         }
793     }
794     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
795           it != m_TransmitProcessors.end();
796           ++it ) {
797         if ((*it)->xrunOccurred()) {
798             debugWarning("Xrun on XMIT SP %p due to ISO side xrun\n",*it);
799         }
800         if (!((*it)->canClientTransferFrames(m_period))) {
801             debugWarning("Xrun on XMIT SP %p due to buffer side xrun\n",*it);
802         }
803     }
804 #endif
805
806     m_nbperiods++;
807     // now we can signal the client that we are (should be) ready
808     return !xrun_occurred;
809 }
810
811 /**
812  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
813  *
814  * Transfers one period of frames from the client side to the Iso side and vice versa.
815  *
816  * @return true if successful, false otherwise (indicates xrun).
817  */
818 bool StreamProcessorManager::transfer() {
819
820     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
821     bool retval=true;
822     retval &= transfer(StreamProcessor::ePT_Receive);
823     retval &= transfer(StreamProcessor::ePT_Transmit);
824     return retval;
825 }
826
827 /**
828  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
829  *
830  * Transfers one period of frames from the client side to the Iso side or vice versa.
831  *
832  * @param t The processor type to tranfer for (receive or transmit)
833  * @return true if successful, false otherwise (indicates xrun).
834  */
835
836 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
837     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
838         t, m_time_of_transfer,
839         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
840         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
841         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
842
843     bool retval = true;
844     // a static cast could make sure that there is no performance
845     // penalty for the virtual functions (to be checked)
846     if (t==StreamProcessor::ePT_Receive) {
847         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
848                 it != m_ReceiveProcessors.end();
849                 ++it ) {
850             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
851                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
852                             m_period, m_time_of_transfer,*it);
853                 retval &= false; // buffer underrun
854             }
855         }
856     } else {
857         // FIXME: in the SPM it would be nice to have system time instead of
858         //        1394 time
859         float rate = m_SyncSource->getTicksPerFrame();
860         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
861
862         // the data we are putting into the buffer is intended to be transmitted
863         // one ringbuffer size after it has been received
864         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
865
866         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
867                 it != m_TransmitProcessors.end();
868                 ++it ) {
869             // FIXME: in the SPM it would be nice to have system time instead of
870             //        1394 time
871             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
872                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
873                         m_period, transmit_timestamp, *it);
874                 retval &= false; // buffer underrun
875             }
876         }
877     }
878     return retval;
879 }
880
881 void StreamProcessorManager::dumpInfo() {
882     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
883     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
884     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
885
886     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
887     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
888         it != m_ReceiveProcessors.end();
889         ++it ) {
890         (*it)->dumpInfo();
891     }
892
893     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
894     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
895         it != m_TransmitProcessors.end();
896         ++it ) {
897         (*it)->dumpInfo();
898     }
899
900     debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
901     m_isoManager->dumpInfo();
902     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
903
904 }
905
906 void StreamProcessorManager::setVerboseLevel(int l) {
907     setDebugLevel(l);
908
909     if (m_isoManager) m_isoManager->setVerboseLevel(l);
910
911     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
912     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
913         it != m_ReceiveProcessors.end();
914         ++it ) {
915         (*it)->setVerboseLevel(l);
916     }
917
918     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
919     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
920         it != m_TransmitProcessors.end();
921         ++it ) {
922         (*it)->setVerboseLevel(l);
923     }
924 }
925
926
927 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
928     int count=0;
929
930     if (direction == Port::E_Capture) {
931         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
932             it != m_ReceiveProcessors.end();
933             ++it ) {
934             count += (*it)->getPortCount(type);
935         }
936     } else {
937         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
938             it != m_TransmitProcessors.end();
939             ++it ) {
940             count += (*it)->getPortCount(type);
941         }
942     }
943     return count;
944 }
945
946 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
947     int count=0;
948
949     if (direction == Port::E_Capture) {
950         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
951             it != m_ReceiveProcessors.end();
952             ++it ) {
953             count += (*it)->getPortCount();
954         }
955     } else {
956         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
957             it != m_TransmitProcessors.end();
958             ++it ) {
959             count += (*it)->getPortCount();
960         }
961     }
962     return count;
963 }
964
965 // TODO: implement a port map here, instead of the loop
966
967 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
968     int count=0;
969     int prevcount=0;
970
971     if (direction == Port::E_Capture) {
972         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
973             it != m_ReceiveProcessors.end();
974             ++it ) {
975             count += (*it)->getPortCount();
976             if (count > idx) {
977                 return (*it)->getPortAtIdx(idx-prevcount);
978             }
979             prevcount=count;
980         }
981     } else {
982         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
983             it != m_TransmitProcessors.end();
984             ++it ) {
985             count += (*it)->getPortCount();
986             if (count > idx) {
987                 return (*it)->getPortAtIdx(idx-prevcount);
988             }
989             prevcount=count;
990         }
991     }
992     return NULL;
993 }
994
995 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
996     m_thread_realtime=rt;
997     m_thread_priority=priority;
998     return true;
999 }
1000
1001
1002 } // end of namespace
Note: See TracBrowser for help on using the browser.