root/trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

Revision 512, 35.8 kB (checked in by jwoithe, 17 years ago)

MOTU: more tweaks to improve reliability. Things are looking pretty good now.
MOTU: Commenced cleanup of MOTU code, removing temporary debug output etc.

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public
11  * License version 2.1, as published by the Free Software Foundation;
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public
19  * License along with this library; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
21  * MA 02110-1301 USA
22  */
23
24 #include "StreamProcessorManager.h"
25 #include "StreamProcessor.h"
26 #include "Port.h"
27 #include <errno.h>
28 #include <assert.h>
29
30 #include "libstreaming/cycletimer.h"
31
32 #define CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL 5
33
34 #define RUNNING_TIMEOUT_MSEC 4000
35 #define PREPARE_TIMEOUT_MSEC 4000
36 #define ENABLE_TIMEOUT_MSEC 4000
37
38 //#define ENABLE_DELAY_CYCLES 100
39 #define ENABLE_DELAY_CYCLES 1000
40
41 namespace Streaming {
42
43 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_NORMAL );
44
45 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int nb_buffers)
46     : m_is_slave( false )
47     , m_SyncSource(NULL)
48     , m_nb_buffers(nb_buffers)
49     , m_period(period)
50     , m_xruns(0)
51     , m_isoManager(0)
52     , m_nbperiods(0)
53 {
54     addOption(Util::OptionContainer::Option("slaveMode",false));
55 }
56
57 StreamProcessorManager::~StreamProcessorManager() {
58     if (m_isoManager) delete m_isoManager;
59
60 }
61
62 /**
63  * Registers \ref processor with this manager.
64  *
65  * also registers it with the isohandlermanager
66  *
67  * be sure to call isohandlermanager->init() first!
68  * and be sure that the processors are also ->init()'ed
69  *
70  * @param processor
71  * @return true if successfull
72  */
73 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
74 {
75     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
76     assert(processor);
77     assert(m_isoManager);
78
79     if (processor->getType()==StreamProcessor::E_Receive) {
80         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
81
82         m_ReceiveProcessors.push_back(processor);
83
84         processor->setManager(this);
85
86         return true;
87     }
88
89     if (processor->getType()==StreamProcessor::E_Transmit) {
90         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
91
92         m_TransmitProcessors.push_back(processor);
93
94         processor->setManager(this);
95
96         return true;
97     }
98
99     debugFatal("Unsupported processor type!\n");
100
101     return false;
102 }
103
104 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
105 {
106     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
107     assert(processor);
108
109     if (processor->getType()==StreamProcessor::E_Receive) {
110
111         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
112             it != m_ReceiveProcessors.end();
113             ++it ) {
114
115             if ( *it == processor ) {
116                     m_ReceiveProcessors.erase(it);
117
118                     processor->clearManager();
119
120                     if(!m_isoManager->unregisterStream(processor)) {
121                         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
122
123                         return false;
124
125                     }
126
127                     return true;
128                 }
129         }
130     }
131
132     if (processor->getType()==StreamProcessor::E_Transmit) {
133         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
134             it != m_TransmitProcessors.end();
135             ++it ) {
136
137             if ( *it == processor ) {
138                     m_TransmitProcessors.erase(it);
139
140                     processor->clearManager();
141
142                     if(!m_isoManager->unregisterStream(processor)) {
143                         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
144
145                         return false;
146
147                     }
148
149                     return true;
150                 }
151         }
152     }
153
154     debugFatal("Processor (%p) not found!\n",processor);
155
156     return false; //not found
157
158 }
159
160 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
161     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
162
163     m_SyncSource=s;
164     return true;
165 }
166
167 StreamProcessor *StreamProcessorManager::getSyncSource() {
168     return m_SyncSource;
169 }
170
171 bool StreamProcessorManager::init()
172 {
173     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
174
175     m_isoManager=new IsoHandlerManager(m_thread_realtime, m_thread_priority);
176
177     if(!m_isoManager) {
178         debugFatal("Could not create IsoHandlerManager\n");
179         return false;
180     }
181
182     // propagate the debug level
183     m_isoManager->setVerboseLevel(getDebugLevel());
184
185     if(!m_isoManager->init()) {
186         debugFatal("Could not initialize IsoHandlerManager\n");
187         return false;
188     }
189
190     m_xrun_happened=false;
191
192     return true;
193 }
194
195 bool StreamProcessorManager::prepare() {
196
197     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
198
199     m_is_slave=false;
200     if(!getOption("slaveMode", m_is_slave)) {
201         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
202     }
203
204     // if no sync source is set, select one here
205     if(m_SyncSource == NULL) {
206        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
207     }
208
209     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
210         it != m_ReceiveProcessors.end();
211         ++it ) {
212             if(m_SyncSource == NULL) {
213                 debugWarning(" => Sync Source is %p.\n", *it);
214                 m_SyncSource = *it;
215             }
216     }
217
218     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
219         it != m_TransmitProcessors.end();
220         ++it ) {
221             if(m_SyncSource == NULL) {
222                 debugWarning(" => Sync Source is %p.\n", *it);
223                 m_SyncSource = *it;
224             }
225     }
226
227     // now do the actual preparation
228     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
229     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
230         it != m_ReceiveProcessors.end();
231         ++it ) {
232
233         if(!(*it)->setSyncSource(m_SyncSource)) {
234             debugFatal(  " could not set sync source (%p)...\n",(*it));
235             return false;
236         }
237
238         if(!(*it)->setOption("slaveMode", m_is_slave)) {
239             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
240         }
241
242         if(!(*it)->prepare()) {
243             debugFatal(  " could not prepare (%p)...\n",(*it));
244             return false;
245         }
246     }
247
248     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
249     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
250         it != m_TransmitProcessors.end();
251         ++it ) {
252         if(!(*it)->setSyncSource(m_SyncSource)) {
253             debugFatal(  " could not set sync source (%p)...\n",(*it));
254             return false;
255         }
256         if(!(*it)->setOption("slaveMode", m_is_slave)) {
257             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
258         }
259         if(!(*it)->prepare()) {
260             debugFatal( " could not prepare (%p)...\n",(*it));
261             return false;
262         }
263     }
264
265     // if there are no stream processors registered,
266     // fail
267     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
268         debugFatal("No stream processors registered, can't do anything usefull\n");
269         return false;
270     }
271
272     return true;
273 }
274
275 bool StreamProcessorManager::syncStartAll() {
276
277     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for StreamProcessor streams to start running...\n");
278     // we have to wait until all streamprocessors indicate that they are running
279     // i.e. that there is actually some data stream flowing
280     int wait_cycles=RUNNING_TIMEOUT_MSEC; // two seconds
281     bool notRunning=true;
282     while (notRunning && wait_cycles) {
283         wait_cycles--;
284         notRunning=false;
285
286         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
287                 it != m_ReceiveProcessors.end();
288                 ++it ) {
289             if(!(*it)->isRunning()) notRunning=true;
290         }
291
292         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
293                 it != m_TransmitProcessors.end();
294                 ++it ) {
295             if(!(*it)->isRunning()) notRunning=true;
296         }
297
298         // EXPERIMENT:
299         // the only stream that should be running is the sync
300         // source stream, as this is the one that defines
301         // when to signal buffers. Maybe we get an xrun at startup,
302         // but that should be handled.
303
304         // the problem is that otherwise a setup with a device
305         // that waits for decent input before sending output
306         // will not start up (e.g. the bounce device), because
307         // all streams are required to be running.
308
309         // other streams still have at least ENABLE_DELAY_CYCLES cycles
310         // to start up
311 //         if(!m_SyncSource->isRunning()) notRunning=true;
312
313         usleep(1000);
314         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Running check: %d\n",notRunning);
315     }
316
317     if(!wait_cycles) { // timout has occurred
318         debugFatal("One or more streams are not starting up (timeout):\n");
319
320         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
321                 it != m_ReceiveProcessors.end();
322                 ++it ) {
323             if(!(*it)->isRunning()) {
324                 debugFatal(" receive stream %p not running\n",*it);
325             } else {
326                 debugFatal(" receive stream %p running\n",*it);
327             }
328         }
329
330         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
331                 it != m_TransmitProcessors.end();
332                 ++it ) {
333             if(!(*it)->isRunning()) {
334                 debugFatal(" transmit stream %p not running\n",*it);
335             } else {
336                 debugFatal(" transmit stream %p running\n",*it);
337             }
338         }
339         return false;
340     }
341
342     // we want to make sure that everything is running well,
343     // so wait for a while
344     usleep(USECS_PER_CYCLE * CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL);
345
346     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
347
348     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
349
350     int max_of_min_delay=0;
351     int min_delay=0;
352     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
353             it != m_ReceiveProcessors.end();
354             ++it ) {
355         min_delay=(*it)->getMinimalSyncDelay();
356         if(min_delay>max_of_min_delay) max_of_min_delay=min_delay;
357     }
358
359     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
360             it != m_TransmitProcessors.end();
361             ++it ) {
362         min_delay=(*it)->getMinimalSyncDelay();
363         if(min_delay>max_of_min_delay) max_of_min_delay=min_delay;
364     }
365
366     debugOutput( DEBUG_LEVEL_VERBOSE, "  %d ticks\n", max_of_min_delay);
367     m_SyncSource->setSyncDelay(max_of_min_delay);
368
369
370     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting StreamProcessors...\n");
371     // now we reset the frame counters
372     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
373             it != m_ReceiveProcessors.end();
374             ++it ) {
375         (*it)->reset();
376     }
377
378     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
379             it != m_TransmitProcessors.end();
380             ++it ) {
381         (*it)->reset();
382     }
383
384     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n");
385
386     uint64_t now=m_SyncSource->getTimeNow(); // fixme: should be in usecs, not ticks
387
388     // FIXME: this should not be in cycles, but in 'time'
389     unsigned int enable_at=TICKS_TO_CYCLES(now)+ENABLE_DELAY_CYCLES;
390     if (enable_at > 8000) enable_at -= 8000;
391
392     if (!enableStreamProcessors(enable_at)) {
393         debugFatal("Could not enable StreamProcessors...\n");
394         return false;
395     }
396
397     return true;
398 }
399
400 bool StreamProcessorManager::start() {
401     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
402     assert(m_isoManager);
403
404     debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
405     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
406     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
407         it != m_ReceiveProcessors.end();
408         ++it ) {
409             if (!(*it)->prepareForStart()) {
410                 debugOutput(DEBUG_LEVEL_VERBOSE,"Receive stream processor (%p) failed to prepare for start\n", *it);
411                 return false;
412             }
413             if (!m_isoManager->registerStream(*it)) {
414                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
415                 return false;
416             }
417         }
418
419     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
420     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
421         it != m_TransmitProcessors.end();
422         ++it ) {
423             if (!(*it)->prepareForStart()) {
424                 debugOutput(DEBUG_LEVEL_VERBOSE,"Transmit stream processor (%p) failed to prepare for start\n", *it);
425                 return false;
426             }
427             if (!m_isoManager->registerStream(*it)) {
428                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
429                 return false;
430             }
431         }
432
433     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
434     if (!m_isoManager->prepare()) {
435         debugFatal("Could not prepare isoManager\n");
436         return false;
437     }
438
439     debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
440         if (!disableStreamProcessors()) {
441         debugFatal("Could not disable StreamProcessors...\n");
442         return false;
443     }
444
445     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n");
446     if (!m_isoManager->startHandlers(-1)) {
447         debugFatal("Could not start handlers...\n");
448         return false;
449     }
450
451     // start all SP's synchonized
452     if (!syncStartAll()) {
453         debugFatal("Could not syncStartAll...\n");
454         return false;
455     }
456
457     // dump the iso stream information when in verbose mode
458     if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
459         m_isoManager->dumpInfo();
460     }
461
462     return true;
463
464 }
465
466 bool StreamProcessorManager::stop() {
467     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
468     assert(m_isoManager);
469
470     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to prepare to stop...\n");
471     // Most stream processors can just stop without special treatment.  However, some
472     // (like the MOTU) need to do a few things before it's safe to turn off the iso
473     // handling.
474     int wait_cycles=PREPARE_TIMEOUT_MSEC; // two seconds ought to be sufficient
475     bool allReady = false;
476     while (!allReady && wait_cycles) {
477         wait_cycles--;
478         allReady = true;
479
480         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
481             it != m_ReceiveProcessors.end();
482             ++it ) {
483             if(!(*it)->prepareForStop()) allReady = false;
484         }
485
486         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
487             it != m_TransmitProcessors.end();
488             ++it ) {
489             if(!(*it)->prepareForStop()) allReady = false;
490         }
491         usleep(1000);
492     }
493
494
495     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n");
496     if(!m_isoManager->stopHandlers()) {
497        debugFatal("Could not stop ISO handlers\n");
498        return false;
499     }
500
501     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processors from handlers...\n");
502     // now unregister all streams from iso manager
503     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
504     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
505         it != m_ReceiveProcessors.end();
506         ++it ) {
507             if (!m_isoManager->unregisterStream(*it)) {
508                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it);
509                 return false;
510             }
511
512         }
513
514     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
515     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
516         it != m_TransmitProcessors.end();
517         ++it ) {
518             if (!m_isoManager->unregisterStream(*it)) {
519                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it);
520                 return false;
521             }
522
523         }
524
525     return true;
526
527 }
528
529 /**
530  * Enables the registered StreamProcessors
531  * @return true if successful, false otherwise
532  */
533 bool StreamProcessorManager::enableStreamProcessors(uint64_t time_to_enable_at) {
534     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors at %llu...\n", time_to_enable_at);
535
536     debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor (%p)...\n",m_SyncSource);
537     debugOutput( DEBUG_LEVEL_VERBOSE, "  Prepare...\n");
538     if (!m_SyncSource->prepareForEnable(time_to_enable_at)) {
539             debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n");
540         return false;
541     }
542
543     debugOutput( DEBUG_LEVEL_VERBOSE, "  Enable...\n");
544     m_SyncSource->enable(time_to_enable_at);
545
546     debugOutput( DEBUG_LEVEL_VERBOSE, " Other StreamProcessors...\n");
547
548     // we prepare the streamprocessors for enable
549     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
550             it != m_ReceiveProcessors.end();
551             ++it ) {
552         if(*it != m_SyncSource) {
553             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Receive SP (%p)...\n",*it);
554             (*it)->prepareForEnable(time_to_enable_at);
555         }
556     }
557
558     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
559             it != m_TransmitProcessors.end();
560             ++it ) {
561         if(*it != m_SyncSource) {
562             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Transmit SP (%p)...\n",*it);
563             (*it)->prepareForEnable(time_to_enable_at);
564         }
565     }
566
567     // then we enable the streamprocessors
568     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
569             it != m_ReceiveProcessors.end();
570             ++it ) {
571         if(*it != m_SyncSource) {
572             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Receive SP (%p)...\n",*it);
573             (*it)->enable(time_to_enable_at);
574         }
575     }
576
577     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
578             it != m_TransmitProcessors.end();
579             ++it ) {
580         if(*it != m_SyncSource) {
581             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Transmit SP (%p)...\n",*it);
582             (*it)->enable(time_to_enable_at);
583         }
584     }
585
586     // now we wait for the SP's to get enabled
587     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be enabled...\n");
588     // we have to wait until all streamprocessors indicate that they are running
589     // i.e. that there is actually some data stream flowing
590     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds
591     bool notEnabled=true;
592     while (notEnabled && wait_cycles) {
593         wait_cycles--;
594         notEnabled=false;
595
596         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
597                 it != m_ReceiveProcessors.end();
598                 ++it ) {
599             if(!(*it)->isEnabled()) notEnabled=true;
600         }
601
602         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
603                 it != m_TransmitProcessors.end();
604                 ++it ) {
605             if(!(*it)->isEnabled()) notEnabled=true;
606         }
607         usleep(1000); // one cycle
608     }
609
610     if(!wait_cycles) { // timout has occurred
611         debugFatal("One or more streams couldn't be enabled (timeout):\n");
612
613         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
614                 it != m_ReceiveProcessors.end();
615                 ++it ) {
616             if(!(*it)->isEnabled()) {
617                     debugFatal(" receive stream %p not enabled\n",*it);
618             } else {
619                     debugFatal(" receive stream %p enabled\n",*it);
620             }
621         }
622
623         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
624                 it != m_TransmitProcessors.end();
625                 ++it ) {
626             if(!(*it)->isEnabled()) {
627                     debugFatal(" transmit stream %p not enabled\n",*it);
628             } else {
629                     debugFatal(" transmit stream %p enabled\n",*it);
630             }
631         }
632         return false;
633     }
634
635     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors enabled...\n");
636
637     return true;
638 }
639
640 /**
641  * Disables the registered StreamProcessors
642  * @return true if successful, false otherwise
643  */
644 bool StreamProcessorManager::disableStreamProcessors() {
645     // we prepare the streamprocessors for disable
646     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
647             it != m_ReceiveProcessors.end();
648             ++it ) {
649         (*it)->prepareForDisable();
650     }
651
652     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
653             it != m_TransmitProcessors.end();
654             ++it ) {
655         (*it)->prepareForDisable();
656     }
657
658     // then we disable the streamprocessors
659     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
660             it != m_ReceiveProcessors.end();
661             ++it ) {
662         (*it)->disable();
663     }
664
665     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
666             it != m_TransmitProcessors.end();
667             ++it ) {
668         (*it)->disable();
669     }
670
671     // now we wait for the SP's to get disabled
672     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be disabled...\n");
673     // we have to wait until all streamprocessors indicate that they are running
674     // i.e. that there is actually some data stream flowing
675     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds
676     bool enabled=true;
677     while (enabled && wait_cycles) {
678         wait_cycles--;
679         enabled=false;
680
681         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
682                 it != m_ReceiveProcessors.end();
683                 ++it ) {
684             if((*it)->isEnabled()) enabled=true;
685         }
686
687         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
688                 it != m_TransmitProcessors.end();
689                 ++it ) {
690             if((*it)->isEnabled()) enabled=true;
691         }
692         usleep(1000); // one cycle
693     }
694
695     if(!wait_cycles) { // timout has occurred
696         debugFatal("One or more streams couldn't be disabled (timeout):\n");
697
698         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
699                 it != m_ReceiveProcessors.end();
700                 ++it ) {
701             if(!(*it)->isEnabled()) {
702                     debugFatal(" receive stream %p not enabled\n",*it);
703             } else {
704                     debugFatal(" receive stream %p enabled\n",*it);
705             }
706         }
707
708         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
709                 it != m_TransmitProcessors.end();
710                 ++it ) {
711             if(!(*it)->isEnabled()) {
712                     debugFatal(" transmit stream %p not enabled\n",*it);
713             } else {
714                     debugFatal(" transmit stream %p enabled\n",*it);
715             }
716         }
717         return false;
718     }
719
720     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors disabled...\n");
721
722     return true;
723 }
724
725 /**
726  * Called upon Xrun events. This brings all StreamProcessors back
727  * into their starting state, and then carries on streaming. This should
728  * have the same effect as restarting the whole thing.
729  *
730  * @return true if successful, false otherwise
731  */
732 bool StreamProcessorManager::handleXrun() {
733
734     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
735
736     /*
737      * Reset means:
738      * 1) Disabling the SP's, so that they don't process any packets
739      *    note: the isomanager does keep on delivering/requesting them
740      * 2) Bringing all buffers & streamprocessors into a know state
741      *    - Clear all capture buffers
742      *    - Put nb_periods*period_size of null frames into the playback buffers
743      * 3) Re-enable the SP's
744      */
745     debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
746         if (!disableStreamProcessors()) {
747         debugFatal("Could not disable StreamProcessors...\n");
748         return false;
749     }
750
751     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
752     // start all SP's synchonized
753     if (!syncStartAll()) {
754         debugFatal("Could not syncStartAll...\n");
755         return false;
756     }
757
758     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
759
760     return true;
761 }
762
763 /**
764  * @brief Waits until the next period of samples is ready
765  *
766  * This function does not return until a full period of samples is (or should be)
767  * ready to be transferred.
768  *
769  * @return true if the period is ready, false if an xrun occurred
770  */
771 bool StreamProcessorManager::waitForPeriod() {
772     int time_till_next_period;
773     bool xrun_occurred=false;
774
775     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
776
777     assert(m_SyncSource);
778
779     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
780
781     while(time_till_next_period > 0) {
782         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
783
784         // wait for the period
785         usleep(time_till_next_period);
786
787         // check for underruns on the ISO side,
788         // those should make us bail out of the wait loop
789         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
790             it != m_ReceiveProcessors.end();
791             ++it ) {
792             // a xrun has occurred on the Iso side
793             xrun_occurred |= (*it)->xrunOccurred();
794         }
795         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
796             it != m_TransmitProcessors.end();
797             ++it ) {
798             // a xrun has occurred on the Iso side
799             xrun_occurred |= (*it)->xrunOccurred();
800         }
801
802         // check if we were waked up too soon
803         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
804     }
805
806     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", time_till_next_period);
807
808     // this is to notify the client of the delay
809     // that we introduced
810     m_delayed_usecs=time_till_next_period;
811
812     // we save the 'ideal' time of the transfer at this point,
813     // because we can have interleaved read - process - write
814     // cycles making that we modify a receiving stream's buffer
815     // before we get to writing.
816     // NOTE: before waitForPeriod() is called again, both the transmit
817     //       and the receive processors should have done their transfer.
818     m_time_of_transfer=m_SyncSource->getTimeAtPeriod();
819     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
820         m_time_of_transfer);
821
822 #ifdef DEBUG
823     int rcv_bf=0, xmt_bf=0;
824     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
825         it != m_ReceiveProcessors.end();
826         ++it ) {
827         rcv_bf = (*it)->getBufferFill();
828     }
829     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
830         it != m_TransmitProcessors.end();
831         ++it ) {
832         xmt_bf = (*it)->getBufferFill();
833     }
834     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
835         m_time_of_transfer,rcv_bf,xmt_bf,rcv_bf+xmt_bf);
836
837 #endif
838
839     xrun_occurred=false;
840
841     // check if xruns occurred on the Iso side.
842     // also check if xruns will occur should we transfer() now
843
844     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
845           it != m_ReceiveProcessors.end();
846           ++it ) {
847         // a xrun has occurred on the Iso side
848         xrun_occurred |= (*it)->xrunOccurred();
849
850         // if this is true, a xrun will occur
851         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
852
853 #ifdef DEBUG
854         if ((*it)->xrunOccurred()) {
855             debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it);
856             (*it)->dumpInfo();
857         }
858         if (!((*it)->canClientTransferFrames(m_period))) {
859             debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it);
860             (*it)->dumpInfo();
861         }
862 #endif
863
864     }
865     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
866           it != m_TransmitProcessors.end();
867           ++it ) {
868         // a xrun has occurred on the Iso side
869         xrun_occurred |= (*it)->xrunOccurred();
870
871         // if this is true, a xrun will occur
872         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
873
874 #ifdef DEBUG
875         if ((*it)->xrunOccurred()) {
876             debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it);
877         }
878         if (!((*it)->canClientTransferFrames(m_period))) {
879             debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it);
880         }
881 #endif
882     }
883
884     m_nbperiods++;
885
886     // now we can signal the client that we are (should be) ready
887     return !xrun_occurred;
888 }
889
890 /**
891  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
892  *
893  * Transfers one period of frames from the client side to the Iso side and vice versa.
894  *
895  * @return true if successful, false otherwise (indicates xrun).
896  */
897 bool StreamProcessorManager::transfer() {
898
899     debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n");
900
901     if (!transfer(StreamProcessor::E_Receive)) return false;
902     if (!transfer(StreamProcessor::E_Transmit)) return false;
903
904     return true;
905 }
906
907 /**
908  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
909  *
910  * Transfers one period of frames from the client side to the Iso side or vice versa.
911  *
912  * @param t The processor type to tranfer for (receive or transmit)
913  * @return true if successful, false otherwise (indicates xrun).
914  */
915
916 bool StreamProcessorManager::transfer(enum StreamProcessor::EProcessorType t) {
917     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
918
919     // a static cast could make sure that there is no performance
920     // penalty for the virtual functions (to be checked)
921     if (t==StreamProcessor::E_Receive) {
922        
923         // determine the time at which we want reception to start
924         float rate=m_SyncSource->getTicksPerFrame();
925         int64_t one_frame_in_ticks=(int64_t)(((float)m_period)*rate);
926        
927         int64_t receive_timestamp = substractTicks(m_time_of_transfer,one_frame_in_ticks);
928        
929         if(receive_timestamp<0) {
930             debugWarning("receive ts < 0.0 : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
931              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
932         }
933         if(receive_timestamp>(128L*TICKS_PER_SECOND)) {
934             debugWarning("receive ts > 128L*TICKS_PER_SECOND : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
935              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
936         }
937        
938         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
939                 it != m_ReceiveProcessors.end();
940                 ++it ) {
941
942             if(!(*it)->getFrames(m_period, receive_timestamp)) {
943                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)\n",
944                             m_period, m_time_of_transfer,*it);
945                     return false; // buffer underrun
946             }
947
948         }
949     } else {
950         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
951                 it != m_TransmitProcessors.end();
952                 ++it ) {
953
954             if(!(*it)->putFrames(m_period, (int64_t)m_time_of_transfer)) {
955                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)\n",
956                         m_period, m_time_of_transfer, *it);
957                 return false; // buffer overrun
958             }
959
960         }
961     }
962
963     return true;
964 }
965
966 void StreamProcessorManager::dumpInfo() {
967     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
968     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
969     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
970
971     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
972     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
973         it != m_ReceiveProcessors.end();
974         ++it ) {
975         (*it)->dumpInfo();
976     }
977
978     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
979     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
980         it != m_TransmitProcessors.end();
981         ++it ) {
982         (*it)->dumpInfo();
983     }
984
985     debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
986     m_isoManager->dumpInfo();
987     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
988
989 }
990
991 void StreamProcessorManager::setVerboseLevel(int l) {
992     setDebugLevel(l);
993
994     if (m_isoManager) m_isoManager->setVerboseLevel(l);
995
996     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
997     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
998         it != m_ReceiveProcessors.end();
999         ++it ) {
1000         (*it)->setVerboseLevel(l);
1001     }
1002
1003     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
1004     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1005         it != m_TransmitProcessors.end();
1006         ++it ) {
1007         (*it)->setVerboseLevel(l);
1008     }
1009 }
1010
1011
1012 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1013     int count=0;
1014
1015     if (direction == Port::E_Capture) {
1016         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1017             it != m_ReceiveProcessors.end();
1018             ++it ) {
1019             count += (*it)->getPortCount(type);
1020         }
1021     } else {
1022         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1023             it != m_TransmitProcessors.end();
1024             ++it ) {
1025             count += (*it)->getPortCount(type);
1026         }
1027     }
1028     return count;
1029 }
1030
1031 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1032     int count=0;
1033
1034     if (direction == Port::E_Capture) {
1035         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1036             it != m_ReceiveProcessors.end();
1037             ++it ) {
1038             count += (*it)->getPortCount();
1039         }
1040     } else {
1041         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1042             it != m_TransmitProcessors.end();
1043             ++it ) {
1044             count += (*it)->getPortCount();
1045         }
1046     }
1047     return count;
1048 }
1049
1050 // TODO: implement a port map here, instead of the loop
1051
1052 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1053     int count=0;
1054     int prevcount=0;
1055
1056     if (direction == Port::E_Capture) {
1057         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1058             it != m_ReceiveProcessors.end();
1059             ++it ) {
1060             count += (*it)->getPortCount();
1061             if (count > idx) {
1062                 return (*it)->getPortAtIdx(idx-prevcount);
1063             }
1064             prevcount=count;
1065         }
1066     } else {
1067         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1068             it != m_TransmitProcessors.end();
1069             ++it ) {
1070             count += (*it)->getPortCount();
1071             if (count > idx) {
1072                 return (*it)->getPortAtIdx(idx-prevcount);
1073             }
1074             prevcount=count;
1075         }
1076     }
1077     return NULL;
1078 }
1079
1080 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1081     m_thread_realtime=rt;
1082     m_thread_priority=priority;
1083     return true;
1084 }
1085
1086
1087 } // end of namespace
Note: See TracBrowser for help on using the browser.