root/trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

Revision 445, 35.0 kB (checked in by pieterpalmers, 15 years ago)

* name change from FreeBoB to FFADO
* replaced tabs by 4 spaces
* got rid of end-of-line spaces
* made all license and copyrights conform

library becomes LGPL, apps become GPL
explicitly state LGPL v2.1 and GPL v2 (don't like v3 draft)

copyrights are 2005-2007 Daniel & Pieter
except for the MotU stuff (C) Jonathan, Pieter

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public
11  * License version 2.1, as published by the Free Software Foundation;
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public
19  * License along with this library; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
21  * MA 02110-1301 USA
22  */
23
24 #include "StreamProcessorManager.h"
25 #include "StreamProcessor.h"
26 #include "Port.h"
27 #include <errno.h>
28 #include <assert.h>
29
30 #include "libstreaming/cycletimer.h"
31
32 #define CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL 5
33
34 #define RUNNING_TIMEOUT_MSEC 4000
35 #define PREPARE_TIMEOUT_MSEC 4000
36 #define ENABLE_TIMEOUT_MSEC 4000
37
38 #define ENABLE_DELAY_CYCLES 100
39
40 namespace Streaming {
41
42 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_NORMAL );
43
44 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int nb_buffers)
45     : m_is_slave( false )
46     , m_SyncSource(NULL)
47     , m_nb_buffers(nb_buffers)
48     , m_period(period)
49     , m_xruns(0)
50     , m_isoManager(0)
51     , m_nbperiods(0)
52 {
53     addOption(Util::OptionContainer::Option("slaveMode",false));
54 }
55
56 StreamProcessorManager::~StreamProcessorManager() {
57     if (m_isoManager) delete m_isoManager;
58
59 }
60
61 /**
62  * Registers \ref processor with this manager.
63  *
64  * also registers it with the isohandlermanager
65  *
66  * be sure to call isohandlermanager->init() first!
67  * and be sure that the processors are also ->init()'ed
68  *
69  * @param processor
70  * @return true if successfull
71  */
72 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
73 {
74     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
75     assert(processor);
76     assert(m_isoManager);
77
78     if (processor->getType()==StreamProcessor::E_Receive) {
79         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
80
81         m_ReceiveProcessors.push_back(processor);
82
83         processor->setManager(this);
84
85         return true;
86     }
87
88     if (processor->getType()==StreamProcessor::E_Transmit) {
89         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
90
91         m_TransmitProcessors.push_back(processor);
92
93         processor->setManager(this);
94
95         return true;
96     }
97
98     debugFatal("Unsupported processor type!\n");
99
100     return false;
101 }
102
103 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
104 {
105     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
106     assert(processor);
107
108     if (processor->getType()==StreamProcessor::E_Receive) {
109
110         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
111             it != m_ReceiveProcessors.end();
112             ++it ) {
113
114             if ( *it == processor ) {
115                     m_ReceiveProcessors.erase(it);
116
117                     processor->clearManager();
118
119                     if(!m_isoManager->unregisterStream(processor)) {
120                         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
121
122                         return false;
123
124                     }
125
126                     return true;
127                 }
128         }
129     }
130
131     if (processor->getType()==StreamProcessor::E_Transmit) {
132         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
133             it != m_TransmitProcessors.end();
134             ++it ) {
135
136             if ( *it == processor ) {
137                     m_TransmitProcessors.erase(it);
138
139                     processor->clearManager();
140
141                     if(!m_isoManager->unregisterStream(processor)) {
142                         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
143
144                         return false;
145
146                     }
147
148                     return true;
149                 }
150         }
151     }
152
153     debugFatal("Processor (%p) not found!\n",processor);
154
155     return false; //not found
156
157 }
158
159 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
160     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
161
162     m_SyncSource=s;
163     return true;
164 }
165
166 StreamProcessor *StreamProcessorManager::getSyncSource() {
167     return m_SyncSource;
168 }
169
170 bool StreamProcessorManager::init()
171 {
172     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
173
174     m_isoManager=new IsoHandlerManager(m_thread_realtime, m_thread_priority);
175
176     if(!m_isoManager) {
177         debugFatal("Could not create IsoHandlerManager\n");
178         return false;
179     }
180
181     // propagate the debug level
182     m_isoManager->setVerboseLevel(getDebugLevel());
183
184     if(!m_isoManager->init()) {
185         debugFatal("Could not initialize IsoHandlerManager\n");
186         return false;
187     }
188
189     m_xrun_happened=false;
190
191     return true;
192 }
193
194 bool StreamProcessorManager::prepare() {
195
196     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
197
198     m_is_slave=false;
199     if(!getOption("snoopMode", m_is_slave)) {
200         debugWarning("Could not retrieve slaveMode parameter, defauling to false\n");
201     }
202
203     // if no sync source is set, select one here
204     if(m_SyncSource == NULL) {
205        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
206     }
207
208     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
209         it != m_ReceiveProcessors.end();
210         ++it ) {
211             if(m_SyncSource == NULL) {
212                 debugWarning(" => Sync Source is %p.\n", *it);
213                 m_SyncSource = *it;
214             }
215     }
216
217     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
218         it != m_TransmitProcessors.end();
219         ++it ) {
220             if(m_SyncSource == NULL) {
221                 debugWarning(" => Sync Source is %p.\n", *it);
222                 m_SyncSource = *it;
223             }
224     }
225
226     // now do the actual preparation
227     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
228     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
229         it != m_ReceiveProcessors.end();
230         ++it ) {
231
232         if(!(*it)->setSyncSource(m_SyncSource)) {
233             debugFatal(  " could not set sync source (%p)...\n",(*it));
234             return false;
235         }
236
237         if(!(*it)->setOption("slaveMode", m_is_slave)) {
238             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
239         }
240
241         if(!(*it)->prepare()) {
242             debugFatal(  " could not prepare (%p)...\n",(*it));
243             return false;
244         }
245     }
246
247     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
248     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
249         it != m_TransmitProcessors.end();
250         ++it ) {
251         if(!(*it)->setSyncSource(m_SyncSource)) {
252             debugFatal(  " could not set sync source (%p)...\n",(*it));
253             return false;
254         }
255         if(!(*it)->setOption("slaveMode", m_is_slave)) {
256             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
257         }
258         if(!(*it)->prepare()) {
259             debugFatal( " could not prepare (%p)...\n",(*it));
260             return false;
261         }
262     }
263
264     // if there are no stream processors registered,
265     // fail
266     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
267         debugFatal("No stream processors registered, can't do anything usefull\n");
268         return false;
269     }
270
271     return true;
272 }
273
274 bool StreamProcessorManager::syncStartAll() {
275
276     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for StreamProcessor streams to start running...\n");
277     // we have to wait until all streamprocessors indicate that they are running
278     // i.e. that there is actually some data stream flowing
279     int wait_cycles=RUNNING_TIMEOUT_MSEC; // two seconds
280     bool notRunning=true;
281     while (notRunning && wait_cycles) {
282         wait_cycles--;
283         notRunning=false;
284
285         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
286                 it != m_ReceiveProcessors.end();
287                 ++it ) {
288             if(!(*it)->isRunning()) notRunning=true;
289         }
290
291         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
292                 it != m_TransmitProcessors.end();
293                 ++it ) {
294             if(!(*it)->isRunning()) notRunning=true;
295         }
296
297         // EXPERIMENT:
298         // the only stream that should be running is the sync
299         // source stream, as this is the one that defines
300         // when to signal buffers. Maybe we get an xrun at startup,
301         // but that should be handled.
302
303         // the problem is that otherwise a setup with a device
304         // that waits for decent input before sending output
305         // will not start up (e.g. the bounce device), because
306         // all streams are required to be running.
307
308         // other streams still have at least ENABLE_DELAY_CYCLES cycles
309         // to start up
310 //         if(!m_SyncSource->isRunning()) notRunning=true;
311
312         usleep(1000);
313         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Running check: %d\n",notRunning);
314     }
315
316     if(!wait_cycles) { // timout has occurred
317         debugFatal("One or more streams are not starting up (timeout):\n");
318
319         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
320                 it != m_ReceiveProcessors.end();
321                 ++it ) {
322             if(!(*it)->isRunning()) {
323                 debugFatal(" receive stream %p not running\n",*it);
324             } else {
325                 debugFatal(" receive stream %p running\n",*it);
326             }
327         }
328
329         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
330                 it != m_TransmitProcessors.end();
331                 ++it ) {
332             if(!(*it)->isRunning()) {
333                 debugFatal(" transmit stream %p not running\n",*it);
334             } else {
335                 debugFatal(" transmit stream %p running\n",*it);
336             }
337         }
338         return false;
339     }
340
341     // we want to make sure that everything is running well,
342     // so wait for a while
343     usleep(USECS_PER_CYCLE * CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL);
344
345     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
346
347     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
348
349     int max_of_min_delay=0;
350     int min_delay=0;
351     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
352             it != m_ReceiveProcessors.end();
353             ++it ) {
354         min_delay=(*it)->getMinimalSyncDelay();
355         if(min_delay>max_of_min_delay) max_of_min_delay=min_delay;
356     }
357
358     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
359             it != m_TransmitProcessors.end();
360             ++it ) {
361         min_delay=(*it)->getMinimalSyncDelay();
362         if(min_delay>max_of_min_delay) max_of_min_delay=min_delay;
363     }
364
365     debugOutput( DEBUG_LEVEL_VERBOSE, "  %d ticks\n", max_of_min_delay);
366     m_SyncSource->setSyncDelay(max_of_min_delay);
367
368
369     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting StreamProcessors...\n");
370     // now we reset the frame counters
371     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
372             it != m_ReceiveProcessors.end();
373             ++it ) {
374         (*it)->reset();
375     }
376
377     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
378             it != m_TransmitProcessors.end();
379             ++it ) {
380         (*it)->reset();
381     }
382
383     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n");
384
385     uint64_t now=m_SyncSource->getTimeNow(); // fixme: should be in usecs, not ticks
386
387     // FIXME: this should not be in cycles, but in 'time'
388     unsigned int enable_at=TICKS_TO_CYCLES(now)+ENABLE_DELAY_CYCLES;
389     if (enable_at > 8000) enable_at -= 8000;
390
391     if (!enableStreamProcessors(enable_at)) {
392         debugFatal("Could not enable StreamProcessors...\n");
393         return false;
394     }
395
396     return true;
397 }
398
399 bool StreamProcessorManager::start() {
400     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
401     assert(m_isoManager);
402
403     debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
404     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
405     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
406         it != m_ReceiveProcessors.end();
407         ++it ) {
408             if (!(*it)->prepareForStart()) {
409                 debugOutput(DEBUG_LEVEL_VERBOSE,"Receive stream processor (%p) failed to prepare for start\n", *it);
410                 return false;
411             }
412             if (!m_isoManager->registerStream(*it)) {
413                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
414                 return false;
415             }
416         }
417
418     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
419     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
420         it != m_TransmitProcessors.end();
421         ++it ) {
422             if (!(*it)->prepareForStart()) {
423                 debugOutput(DEBUG_LEVEL_VERBOSE,"Transmit stream processor (%p) failed to prepare for start\n", *it);
424                 return false;
425             }
426             if (!m_isoManager->registerStream(*it)) {
427                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
428                 return false;
429             }
430         }
431
432     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
433     if (!m_isoManager->prepare()) {
434         debugFatal("Could not prepare isoManager\n");
435         return false;
436     }
437
438     debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
439         if (!disableStreamProcessors()) {
440         debugFatal("Could not disable StreamProcessors...\n");
441         return false;
442     }
443
444     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n");
445     if (!m_isoManager->startHandlers(-1)) {
446         debugFatal("Could not start handlers...\n");
447         return false;
448     }
449
450     // start all SP's synchonized
451     if (!syncStartAll()) {
452         debugFatal("Could not syncStartAll...\n");
453         return false;
454     }
455
456     // dump the iso stream information when in verbose mode
457     if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
458         m_isoManager->dumpInfo();
459     }
460
461     return true;
462
463 }
464
465 bool StreamProcessorManager::stop() {
466     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
467     assert(m_isoManager);
468
469     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to prepare to stop...\n");
470     // Most stream processors can just stop without special treatment.  However, some
471     // (like the MOTU) need to do a few things before it's safe to turn off the iso
472     // handling.
473     int wait_cycles=PREPARE_TIMEOUT_MSEC; // two seconds ought to be sufficient
474     bool allReady = false;
475     while (!allReady && wait_cycles) {
476         wait_cycles--;
477         allReady = true;
478
479         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
480             it != m_ReceiveProcessors.end();
481             ++it ) {
482             if(!(*it)->prepareForStop()) allReady = false;
483         }
484
485         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
486             it != m_TransmitProcessors.end();
487             ++it ) {
488             if(!(*it)->prepareForStop()) allReady = false;
489         }
490         usleep(1000);
491     }
492
493
494     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n");
495     if(!m_isoManager->stopHandlers()) {
496        debugFatal("Could not stop ISO handlers\n");
497        return false;
498     }
499
500     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processors from handlers...\n");
501     // now unregister all streams from iso manager
502     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
503     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
504         it != m_ReceiveProcessors.end();
505         ++it ) {
506             if (!m_isoManager->unregisterStream(*it)) {
507                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it);
508                 return false;
509             }
510
511         }
512
513     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
514     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
515         it != m_TransmitProcessors.end();
516         ++it ) {
517             if (!m_isoManager->unregisterStream(*it)) {
518                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it);
519                 return false;
520             }
521
522         }
523
524     return true;
525
526 }
527
528 /**
529  * Enables the registered StreamProcessors
530  * @return true if successful, false otherwise
531  */
532 bool StreamProcessorManager::enableStreamProcessors(uint64_t time_to_enable_at) {
533     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors at %llu...\n", time_to_enable_at);
534
535     debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor (%p)...\n",m_SyncSource);
536     debugOutput( DEBUG_LEVEL_VERBOSE, "  Prepare...\n");
537     if (!m_SyncSource->prepareForEnable(time_to_enable_at)) {
538             debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n");
539         return false;
540     }
541
542     debugOutput( DEBUG_LEVEL_VERBOSE, "  Enable...\n");
543     m_SyncSource->enable(time_to_enable_at);
544
545     debugOutput( DEBUG_LEVEL_VERBOSE, " Other StreamProcessors...\n");
546
547     // we prepare the streamprocessors for enable
548     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
549             it != m_ReceiveProcessors.end();
550             ++it ) {
551         if(*it != m_SyncSource) {
552             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Receive SP (%p)...\n",*it);
553             (*it)->prepareForEnable(time_to_enable_at);
554         }
555     }
556
557     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
558             it != m_TransmitProcessors.end();
559             ++it ) {
560         if(*it != m_SyncSource) {
561             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Transmit SP (%p)...\n",*it);
562             (*it)->prepareForEnable(time_to_enable_at);
563         }
564     }
565
566     // then we enable the streamprocessors
567     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
568             it != m_ReceiveProcessors.end();
569             ++it ) {
570         if(*it != m_SyncSource) {
571             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Receive SP (%p)...\n",*it);
572             (*it)->enable(time_to_enable_at);
573         }
574     }
575
576     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
577             it != m_TransmitProcessors.end();
578             ++it ) {
579         if(*it != m_SyncSource) {
580             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Transmit SP (%p)...\n",*it);
581             (*it)->enable(time_to_enable_at);
582         }
583     }
584
585     // now we wait for the SP's to get enabled
586     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be enabled...\n");
587     // we have to wait until all streamprocessors indicate that they are running
588     // i.e. that there is actually some data stream flowing
589     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds
590     bool notEnabled=true;
591     while (notEnabled && wait_cycles) {
592         wait_cycles--;
593         notEnabled=false;
594
595         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
596                 it != m_ReceiveProcessors.end();
597                 ++it ) {
598             if(!(*it)->isEnabled()) notEnabled=true;
599         }
600
601         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
602                 it != m_TransmitProcessors.end();
603                 ++it ) {
604             if(!(*it)->isEnabled()) notEnabled=true;
605         }
606         usleep(1000); // one cycle
607     }
608
609     if(!wait_cycles) { // timout has occurred
610         debugFatal("One or more streams couldn't be enabled (timeout):\n");
611
612         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
613                 it != m_ReceiveProcessors.end();
614                 ++it ) {
615             if(!(*it)->isEnabled()) {
616                     debugFatal(" receive stream %p not enabled\n",*it);
617             } else {
618                     debugFatal(" receive stream %p enabled\n",*it);
619             }
620         }
621
622         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
623                 it != m_TransmitProcessors.end();
624                 ++it ) {
625             if(!(*it)->isEnabled()) {
626                     debugFatal(" transmit stream %p not enabled\n",*it);
627             } else {
628                     debugFatal(" transmit stream %p enabled\n",*it);
629             }
630         }
631         return false;
632     }
633
634     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors enabled...\n");
635
636     return true;
637 }
638
639 /**
640  * Disables the registered StreamProcessors
641  * @return true if successful, false otherwise
642  */
643 bool StreamProcessorManager::disableStreamProcessors() {
644     // we prepare the streamprocessors for disable
645     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
646             it != m_ReceiveProcessors.end();
647             ++it ) {
648         (*it)->prepareForDisable();
649     }
650
651     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
652             it != m_TransmitProcessors.end();
653             ++it ) {
654         (*it)->prepareForDisable();
655     }
656
657     // then we disable the streamprocessors
658     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
659             it != m_ReceiveProcessors.end();
660             ++it ) {
661         (*it)->disable();
662     }
663
664     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
665             it != m_TransmitProcessors.end();
666             ++it ) {
667         (*it)->disable();
668     }
669
670     // now we wait for the SP's to get disabled
671     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be disabled...\n");
672     // we have to wait until all streamprocessors indicate that they are running
673     // i.e. that there is actually some data stream flowing
674     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds
675     bool enabled=true;
676     while (enabled && wait_cycles) {
677         wait_cycles--;
678         enabled=false;
679
680         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
681                 it != m_ReceiveProcessors.end();
682                 ++it ) {
683             if((*it)->isEnabled()) enabled=true;
684         }
685
686         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
687                 it != m_TransmitProcessors.end();
688                 ++it ) {
689             if((*it)->isEnabled()) enabled=true;
690         }
691         usleep(1000); // one cycle
692     }
693
694     if(!wait_cycles) { // timout has occurred
695         debugFatal("One or more streams couldn't be disabled (timeout):\n");
696
697         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
698                 it != m_ReceiveProcessors.end();
699                 ++it ) {
700             if(!(*it)->isEnabled()) {
701                     debugFatal(" receive stream %p not enabled\n",*it);
702             } else {
703                     debugFatal(" receive stream %p enabled\n",*it);
704             }
705         }
706
707         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
708                 it != m_TransmitProcessors.end();
709                 ++it ) {
710             if(!(*it)->isEnabled()) {
711                     debugFatal(" transmit stream %p not enabled\n",*it);
712             } else {
713                     debugFatal(" transmit stream %p enabled\n",*it);
714             }
715         }
716         return false;
717     }
718
719     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors disabled...\n");
720
721     return true;
722 }
723
724 /**
725  * Called upon Xrun events. This brings all StreamProcessors back
726  * into their starting state, and then carries on streaming. This should
727  * have the same effect as restarting the whole thing.
728  *
729  * @return true if successful, false otherwise
730  */
731 bool StreamProcessorManager::handleXrun() {
732
733     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
734
735     /*
736      * Reset means:
737      * 1) Disabling the SP's, so that they don't process any packets
738      *    note: the isomanager does keep on delivering/requesting them
739      * 2) Bringing all buffers & streamprocessors into a know state
740      *    - Clear all capture buffers
741      *    - Put nb_periods*period_size of null frames into the playback buffers
742      * 3) Re-enable the SP's
743      */
744     debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
745         if (!disableStreamProcessors()) {
746         debugFatal("Could not disable StreamProcessors...\n");
747         return false;
748     }
749
750     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
751     // start all SP's synchonized
752     if (!syncStartAll()) {
753         debugFatal("Could not syncStartAll...\n");
754         return false;
755     }
756
757     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
758
759     return true;
760 }
761
762 /**
763  * @brief Waits until the next period of samples is ready
764  *
765  * This function does not return until a full period of samples is (or should be)
766  * ready to be transferred.
767  *
768  * @return true if the period is ready, false if an xrun occurred
769  */
770 bool StreamProcessorManager::waitForPeriod() {
771     int time_till_next_period;
772     bool xrun_occurred=false;
773
774     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
775
776     assert(m_SyncSource);
777
778     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
779
780     while(time_till_next_period > 0) {
781         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
782
783         // wait for the period
784         usleep(time_till_next_period);
785
786         // check for underruns on the ISO side,
787         // those should make us bail out of the wait loop
788         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
789             it != m_ReceiveProcessors.end();
790             ++it ) {
791             // a xrun has occurred on the Iso side
792             xrun_occurred |= (*it)->xrunOccurred();
793         }
794         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
795             it != m_TransmitProcessors.end();
796             ++it ) {
797             // a xrun has occurred on the Iso side
798             xrun_occurred |= (*it)->xrunOccurred();
799         }
800
801         // check if we were waked up too soon
802         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
803     }
804
805     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", time_till_next_period);
806
807     // this is to notify the client of the delay
808     // that we introduced
809     m_delayed_usecs=time_till_next_period;
810
811     // we save the 'ideal' time of the transfer at this point,
812     // because we can have interleaved read - process - write
813     // cycles making that we modify a receiving stream's buffer
814     // before we get to writing.
815     // NOTE: before waitForPeriod() is called again, both the transmit
816     //       and the receive processors should have done their transfer.
817     m_time_of_transfer=m_SyncSource->getTimeAtPeriod();
818     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
819         m_time_of_transfer);
820
821 #ifdef DEBUG
822     int rcv_bf=0, xmt_bf=0;
823     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
824         it != m_ReceiveProcessors.end();
825         ++it ) {
826         rcv_bf = (*it)->getBufferFill();
827     }
828     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
829         it != m_TransmitProcessors.end();
830         ++it ) {
831         xmt_bf = (*it)->getBufferFill();
832     }
833     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
834         m_time_of_transfer,rcv_bf,xmt_bf,rcv_bf+xmt_bf);
835
836 #endif
837
838     xrun_occurred=false;
839
840     // check if xruns occurred on the Iso side.
841     // also check if xruns will occur should we transfer() now
842
843     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
844           it != m_ReceiveProcessors.end();
845           ++it ) {
846         // a xrun has occurred on the Iso side
847         xrun_occurred |= (*it)->xrunOccurred();
848
849         // if this is true, a xrun will occur
850         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
851
852 #ifdef DEBUG
853         if ((*it)->xrunOccurred()) {
854             debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it);
855             (*it)->dumpInfo();
856         }
857         if (!((*it)->canClientTransferFrames(m_period))) {
858             debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it);
859             (*it)->dumpInfo();
860         }
861 #endif
862
863     }
864     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
865           it != m_TransmitProcessors.end();
866           ++it ) {
867         // a xrun has occurred on the Iso side
868         xrun_occurred |= (*it)->xrunOccurred();
869
870         // if this is true, a xrun will occur
871         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
872
873 #ifdef DEBUG
874         if ((*it)->xrunOccurred()) {
875             debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it);
876         }
877         if (!((*it)->canClientTransferFrames(m_period))) {
878             debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it);
879         }
880 #endif
881     }
882
883     m_nbperiods++;
884
885     // now we can signal the client that we are (should be) ready
886     return !xrun_occurred;
887 }
888
889 /**
890  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
891  *
892  * Transfers one period of frames from the client side to the Iso side and vice versa.
893  *
894  * @return true if successful, false otherwise (indicates xrun).
895  */
896 bool StreamProcessorManager::transfer() {
897
898     debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n");
899
900     if (!transfer(StreamProcessor::E_Receive)) return false;
901     if (!transfer(StreamProcessor::E_Transmit)) return false;
902
903     return true;
904 }
905
906 /**
907  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
908  *
909  * Transfers one period of frames from the client side to the Iso side or vice versa.
910  *
911  * @param t The processor type to tranfer for (receive or transmit)
912  * @return true if successful, false otherwise (indicates xrun).
913  */
914
915 bool StreamProcessorManager::transfer(enum StreamProcessor::EProcessorType t) {
916     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
917
918     // a static cast could make sure that there is no performance
919     // penalty for the virtual functions (to be checked)
920     if (t==StreamProcessor::E_Receive) {
921         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
922                 it != m_ReceiveProcessors.end();
923                 ++it ) {
924
925             if(!(*it)->getFrames(m_period)) {
926                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)",
927                             m_period, m_time_of_transfer,*it);
928                     return false; // buffer underrun
929             }
930
931         }
932     } else {
933         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
934                 it != m_TransmitProcessors.end();
935                 ++it ) {
936
937             if(!(*it)->putFrames(m_period, (int64_t)m_time_of_transfer)) {
938                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)",
939                         m_period, m_time_of_transfer, *it);
940                 return false; // buffer overrun
941             }
942
943         }
944     }
945
946     return true;
947 }
948
949 void StreamProcessorManager::dumpInfo() {
950     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
951     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
952     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
953
954     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
955     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
956         it != m_ReceiveProcessors.end();
957         ++it ) {
958         (*it)->dumpInfo();
959     }
960
961     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
962     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
963         it != m_TransmitProcessors.end();
964         ++it ) {
965         (*it)->dumpInfo();
966     }
967
968     debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
969     m_isoManager->dumpInfo();
970     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
971
972 }
973
974 void StreamProcessorManager::setVerboseLevel(int l) {
975     setDebugLevel(l);
976
977     if (m_isoManager) m_isoManager->setVerboseLevel(l);
978
979     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
980     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
981         it != m_ReceiveProcessors.end();
982         ++it ) {
983         (*it)->setVerboseLevel(l);
984     }
985
986     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
987     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
988         it != m_TransmitProcessors.end();
989         ++it ) {
990         (*it)->setVerboseLevel(l);
991     }
992 }
993
994
995 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
996     int count=0;
997
998     if (direction == Port::E_Capture) {
999         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1000             it != m_ReceiveProcessors.end();
1001             ++it ) {
1002             count += (*it)->getPortCount(type);
1003         }
1004     } else {
1005         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1006             it != m_TransmitProcessors.end();
1007             ++it ) {
1008             count += (*it)->getPortCount(type);
1009         }
1010     }
1011     return count;
1012 }
1013
1014 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1015     int count=0;
1016
1017     if (direction == Port::E_Capture) {
1018         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1019             it != m_ReceiveProcessors.end();
1020             ++it ) {
1021             count += (*it)->getPortCount();
1022         }
1023     } else {
1024         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1025             it != m_TransmitProcessors.end();
1026             ++it ) {
1027             count += (*it)->getPortCount();
1028         }
1029     }
1030     return count;
1031 }
1032
1033 // TODO: implement a port map here, instead of the loop
1034
1035 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1036     int count=0;
1037     int prevcount=0;
1038
1039     if (direction == Port::E_Capture) {
1040         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1041             it != m_ReceiveProcessors.end();
1042             ++it ) {
1043             count += (*it)->getPortCount();
1044             if (count > idx) {
1045                 return (*it)->getPortAtIdx(idx-prevcount);
1046             }
1047             prevcount=count;
1048         }
1049     } else {
1050         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1051             it != m_TransmitProcessors.end();
1052             ++it ) {
1053             count += (*it)->getPortCount();
1054             if (count > idx) {
1055                 return (*it)->getPortAtIdx(idx-prevcount);
1056             }
1057             prevcount=count;
1058         }
1059     }
1060     return NULL;
1061 }
1062
1063 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1064     m_thread_realtime=rt;
1065     m_thread_priority=priority;
1066     return true;
1067 }
1068
1069
1070 } // end of namespace
Note: See TracBrowser for help on using the browser.