root/branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp

Revision 399, 32.2 kB (checked in by pieterpalmers, 17 years ago)

- code cleanup
- introduce sync delay concept to fix latency issues due to intermediate ISO buffering
- made SytMonitor? use cycletimer.h functions

Line 
1 /* $Id$ */
2
3 /*
4  *   FreeBob Streaming API
5  *   FreeBob = Firewire (pro-)audio for linux
6  *
7  *   http://freebob.sf.net
8  *
9  *   Copyright (C) 2005,2006 Pieter Palmers <pieterpalmers@users.sourceforge.net>
10  *
11  *   This program is free software {} you can redistribute it and/or modify
12  *   it under the terms of the GNU General Public License as published by
13  *   the Free Software Foundation {} either version 2 of the License, or
14  *   (at your option) any later version.
15  *
16  *   This program is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY {} without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with this program {} if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  *
26  *
27  */
28
29 #include "StreamProcessorManager.h"
30 #include "StreamProcessor.h"
31 #include "Port.h"
32 #include <errno.h>
33 #include <assert.h>
34
35 #include "libstreaming/cycletimer.h"
36
37 #define CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL 50
38
39 #define RUNNING_TIMEOUT_MSEC 4000
40 #define PREPARE_TIMEOUT_MSEC 4000
41 #define ENABLE_TIMEOUT_MSEC 4000
42
43 namespace FreebobStreaming {
44
45 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_NORMAL );
46
47 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int nb_buffers)
48         : m_SyncSource(NULL), m_nb_buffers(nb_buffers), m_period(period), m_xruns(0),
49         m_isoManager(0), m_nbperiods(0) {
50
51 }
52
53 StreamProcessorManager::~StreamProcessorManager() {
54         if (m_isoManager) delete m_isoManager;
55        
56 }
57
58 /**
59  * Registers \ref processor with this manager.
60  *
61  * also registers it with the isohandlermanager
62  *
63  * be sure to call isohandlermanager->init() first!
64  * and be sure that the processors are also ->init()'ed
65  *
66  * @param processor
67  * @return true if successfull
68  */
69 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
70 {
71         debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
72         assert(processor);
73         assert(m_isoManager);
74
75         if (processor->getType()==StreamProcessor::E_Receive) {
76                 processor->setVerboseLevel(getDebugLevel()); // inherit debug level
77                
78                 m_ReceiveProcessors.push_back(processor);
79                
80                 processor->setManager(this);
81                                
82                 return true;
83         }
84        
85         if (processor->getType()==StreamProcessor::E_Transmit) {
86                 processor->setVerboseLevel(getDebugLevel()); // inherit debug level
87                
88                 m_TransmitProcessors.push_back(processor);
89                
90                 processor->setManager(this);
91                
92                 return true;
93         }
94
95         debugFatal("Unsupported processor type!\n");
96        
97         return false;
98 }
99
100 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
101 {
102         debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
103         assert(processor);
104
105         if (processor->getType()==StreamProcessor::E_Receive) {
106
107                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
108                         it != m_ReceiveProcessors.end();
109                         ++it ) {
110
111                         if ( *it == processor ) {
112                                         m_ReceiveProcessors.erase(it);
113                                        
114                                         processor->clearManager();
115                                        
116                                         if(!m_isoManager->unregisterStream(processor)) {
117                                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
118                                                
119                                                 return false;
120                                                
121                                         }
122                                        
123                                         return true;
124                                 }
125                 }
126         }
127
128         if (processor->getType()==StreamProcessor::E_Transmit) {
129                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
130                         it != m_TransmitProcessors.end();
131                         ++it ) {
132
133                         if ( *it == processor ) {
134                                         m_TransmitProcessors.erase(it);
135                                        
136                                         processor->clearManager();
137                                        
138                                         if(!m_isoManager->unregisterStream(processor)) {
139                                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
140                                                
141                                                 return false;
142                                                
143                                         }
144                                        
145                                         return true;
146                                 }
147                 }
148         }
149        
150         debugFatal("Processor (%p) not found!\n",processor);
151
152         return false; //not found
153
154 }
155
156 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
157     m_SyncSource=s;
158     return true;
159 }
160
161 StreamProcessor *StreamProcessorManager::getSyncSource() {
162     return m_SyncSource;
163 }
164
165 bool StreamProcessorManager::init()
166 {
167         debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
168
169         m_isoManager=new IsoHandlerManager(m_thread_realtime, m_thread_priority);
170        
171         if(!m_isoManager) {
172                 debugFatal("Could not create IsoHandlerManager\n");
173                 return false;
174         }
175        
176         // propagate the debug level
177         m_isoManager->setVerboseLevel(getDebugLevel());
178        
179         if(!m_isoManager->init()) {
180                 debugFatal("Could not initialize IsoHandlerManager\n");
181                 return false;
182         }
183        
184         m_xrun_happened=false;
185        
186         return true;
187 }
188
189 bool StreamProcessorManager::prepare() {
190
191         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
192        
193         // if no sync source is set, select one here
194         if(m_SyncSource == NULL) {
195            debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
196         }
197        
198         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
199                 it != m_ReceiveProcessors.end();
200                 ++it ) {
201                         if(m_SyncSource == NULL) {
202                                 debugWarning(" => Sync Source is %p.\n", *it);
203                                 m_SyncSource = *it;
204                         }
205         }
206
207         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
208                 it != m_TransmitProcessors.end();
209                 ++it ) {
210                         if(m_SyncSource == NULL) {
211                                 debugWarning(" => Sync Source is %p.\n", *it);
212                                 m_SyncSource = *it;
213                         }
214         }
215
216         // now do the actual preparation
217         debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
218         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
219                 it != m_ReceiveProcessors.end();
220                 ++it ) {
221                         if(!(*it)->setSyncSource(m_SyncSource)) {
222                                 debugFatal(  " could not set sync source (%p)...\n",(*it));
223                                 return false;
224                         }
225                        
226                         if(!(*it)->prepare()) {
227                                 debugFatal(  " could not prepare (%p)...\n",(*it));
228                                 return false;
229                         }
230         }
231
232         debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
233         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
234                 it != m_TransmitProcessors.end();
235                 ++it ) {
236                         if(!(*it)->setSyncSource(m_SyncSource)) {
237                                 debugFatal(  " could not set sync source (%p)...\n",(*it));
238                                 return false;
239                         }               
240                         if(!(*it)->prepare()) {
241                                 debugFatal( " could not prepare (%p)...\n",(*it));
242                                 return false;
243                         }
244         }
245
246     // if there are no stream processors registered,
247     // fail
248     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
249         debugFatal("No stream processors registered, can't do anything usefull\n");
250         return false;
251     }
252
253         return true;
254 }
255
256
257 bool StreamProcessorManager::syncStartAll() {
258
259     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessor streams to start running...\n");
260     // we have to wait until all streamprocessors indicate that they are running
261     // i.e. that there is actually some data stream flowing
262     int wait_cycles=RUNNING_TIMEOUT_MSEC; // two seconds
263     bool notRunning=true;
264     while (notRunning && wait_cycles) {
265         wait_cycles--;
266         notRunning=false;
267        
268         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
269                 it != m_ReceiveProcessors.end();
270                 ++it ) {
271             if(!(*it)->isRunning()) notRunning=true;
272         }
273
274         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
275                 it != m_TransmitProcessors.end();
276                 ++it ) {
277             if(!(*it)->isRunning()) notRunning=true;
278         }
279         usleep(1000);
280         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Running check: %d\n",notRunning);
281     }
282
283     if(!wait_cycles) { // timout has occurred
284         debugFatal("One or more streams are not starting up (timeout):\n");
285                    
286         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
287                 it != m_ReceiveProcessors.end();
288                 ++it ) {
289             if(!(*it)->isRunning()) {
290                 debugFatal(" receive stream %p not running\n",*it);
291             } else {   
292                 debugFatal(" receive stream %p running\n",*it);
293             }
294         }
295
296         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
297                 it != m_TransmitProcessors.end();
298                 ++it ) {
299             if(!(*it)->isRunning()) {
300                 debugFatal(" transmit stream %p not running\n",*it);
301             } else {   
302                 debugFatal(" transmit stream %p running\n",*it);
303             }
304         }
305        
306         return false;
307     }
308
309     // we want to make sure that everything is running well,
310     // so wait for a while
311     usleep(USECS_PER_CYCLE * CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL);
312
313     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
314    
315     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
316
317     int max_of_min_delay=0;
318     int min_delay=0;
319     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
320             it != m_ReceiveProcessors.end();
321             ++it ) {
322         min_delay=(*it)->getMinimalSyncDelay();
323         if(min_delay>max_of_min_delay) max_of_min_delay=min_delay;
324     }
325    
326     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
327             it != m_TransmitProcessors.end();
328             ++it ) {
329         min_delay=(*it)->getMinimalSyncDelay();
330         if(min_delay>max_of_min_delay) max_of_min_delay=min_delay;
331     }
332    
333     debugOutput( DEBUG_LEVEL_VERBOSE, "  %d ticks\n", max_of_min_delay);
334     m_SyncSource->setSyncDelay(max_of_min_delay);
335    
336    
337     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting StreamProcessors...\n");
338     // now we reset the frame counters
339     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
340             it != m_ReceiveProcessors.end();
341             ++it ) {
342         (*it)->reset();
343     }
344    
345     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
346             it != m_TransmitProcessors.end();
347             ++it ) {
348         (*it)->reset();
349     }
350        
351     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n");
352    
353     uint64_t now=m_SyncSource->getTimeNow(); // fixme: should be in usecs, not ticks
354    
355     // FIXME: this should not be in cycles, but in 'time'
356     unsigned int enable_at=TICKS_TO_CYCLES(now)+2000;
357     if (enable_at > 8000) enable_at -= 8000;
358
359     if (!enableStreamProcessors(enable_at)) {
360         debugFatal("Could not enable StreamProcessors...\n");
361         return false;
362     }
363
364     return true;
365 }
366
367 bool StreamProcessorManager::start() {
368         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
369         assert(m_isoManager);
370        
371         debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
372         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
373         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
374                 it != m_ReceiveProcessors.end();
375                 ++it ) {
376                         if (!(*it)->prepareForStart()) {
377                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Receive stream processor (%p) failed to prepare for start\n", *it);
378                                 return false;
379                         }
380                         if (!m_isoManager->registerStream(*it)) {
381                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
382                                 return false;
383                         }
384                 }
385
386         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
387         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
388                 it != m_TransmitProcessors.end();
389                 ++it ) {
390                         if (!(*it)->prepareForStart()) {
391                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Transmit stream processor (%p) failed to prepare for start\n", *it);
392                                 return false;
393                         }
394                         if (!m_isoManager->registerStream(*it)) {
395                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
396                                 return false;
397                         }
398                 }
399
400         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
401         if (!m_isoManager->prepare()) {
402                 debugFatal("Could not prepare isoManager\n");
403                 return false;
404         }
405
406         debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
407         if (!disableStreamProcessors()) {
408                 debugFatal("Could not disable StreamProcessors...\n");
409                 return false;
410         }
411                
412         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n");
413         if (!m_isoManager->startHandlers(0)) {
414                 debugFatal("Could not start handlers...\n");
415                 return false;
416         }
417
418         // start all SP's synchonized
419         if (!syncStartAll()) {
420                 debugFatal("Could not syncStartAll...\n");
421                 return false;
422         }
423        
424         // dump the iso stream information when in verbose mode
425         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
426                 m_isoManager->dumpInfo();
427         }
428        
429         return true;
430        
431 }
432
433 bool StreamProcessorManager::stop() {
434         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
435         assert(m_isoManager);
436
437         debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to prepare to stop...\n");
438         // Most stream processors can just stop without special treatment.  However, some
439         // (like the MOTU) need to do a few things before it's safe to turn off the iso
440         // handling.
441         int wait_cycles=PREPARE_TIMEOUT_MSEC; // two seconds ought to be sufficient
442         bool allReady = false;
443         while (!allReady && wait_cycles) {
444                 wait_cycles--;
445                 allReady = true;
446                
447                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
448                         it != m_ReceiveProcessors.end();
449                         ++it ) {
450                         if(!(*it)->prepareForStop()) allReady = false;
451                 }
452        
453                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
454                         it != m_TransmitProcessors.end();
455                         ++it ) {
456                         if(!(*it)->prepareForStop()) allReady = false;
457                 }
458                 usleep(1000);
459         }
460
461        
462         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n");
463         if(!m_isoManager->stopHandlers()) {
464            debugFatal("Could not stop ISO handlers\n");
465            return false;
466         }
467        
468         debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processors from handlers...\n");
469     // now unregister all streams from iso manager
470         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
471         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
472                 it != m_ReceiveProcessors.end();
473                 ++it ) {
474                         if (!m_isoManager->unregisterStream(*it)) {
475                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it);
476                                 return false;
477                         }
478                        
479                 }
480
481         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
482         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
483                 it != m_TransmitProcessors.end();
484                 ++it ) {
485                         if (!m_isoManager->unregisterStream(*it)) {
486                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it);
487                                 return false;
488                         }
489                        
490                 }
491        
492         return true;
493        
494 }
495
496 /**
497  * Enables the registered StreamProcessors
498  * @return true if successful, false otherwise
499  */
500 bool StreamProcessorManager::enableStreamProcessors(uint64_t time_to_enable_at) {
501     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors at %llu...\n", time_to_enable_at);
502
503     debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor (%p)...\n",m_SyncSource);
504     debugOutput( DEBUG_LEVEL_VERBOSE, "  Prepare...\n");
505     if (!m_SyncSource->prepareForEnable(time_to_enable_at)) {
506             debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n");
507         return false;
508     }
509
510     debugOutput( DEBUG_LEVEL_VERBOSE, "  Enable...\n");
511     m_SyncSource->enable(time_to_enable_at);
512    
513     debugOutput( DEBUG_LEVEL_VERBOSE, " Other StreamProcessors...\n");
514    
515     // we prepare the streamprocessors for enable
516     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
517             it != m_ReceiveProcessors.end();
518             ++it ) {
519         if(*it != m_SyncSource) {
520             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Receive SP (%p)...\n",*it);
521             (*it)->prepareForEnable(time_to_enable_at);
522         }
523     }
524
525     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
526             it != m_TransmitProcessors.end();
527             ++it ) {
528         if(*it != m_SyncSource) {
529             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Transmit SP (%p)...\n",*it);
530             (*it)->prepareForEnable(time_to_enable_at);
531         }
532     }
533
534     // then we enable the streamprocessors
535     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
536             it != m_ReceiveProcessors.end();
537             ++it ) {           
538         if(*it != m_SyncSource) {
539             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Receive SP (%p)...\n",*it);
540             (*it)->enable(time_to_enable_at);
541         }
542     }
543
544     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
545             it != m_TransmitProcessors.end();
546             ++it ) {
547         if(*it != m_SyncSource) {
548             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Transmit SP (%p)...\n",*it);
549             (*it)->enable(time_to_enable_at);
550         }
551     }
552
553     // now we wait for the SP's to get enabled
554     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be enabled...\n");
555     // we have to wait until all streamprocessors indicate that they are running
556     // i.e. that there is actually some data stream flowing
557     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds
558     bool notEnabled=true;
559     while (notEnabled && wait_cycles) {
560         wait_cycles--;
561         notEnabled=false;
562        
563         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
564                 it != m_ReceiveProcessors.end();
565                 ++it ) {
566             if(!(*it)->isEnabled()) notEnabled=true;
567         }
568
569         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
570                 it != m_TransmitProcessors.end();
571                 ++it ) {
572             if(!(*it)->isEnabled()) notEnabled=true;
573         }
574         usleep(1000); // one cycle
575     }
576    
577     if(!wait_cycles) { // timout has occurred
578         debugFatal("One or more streams couldn't be enabled (timeout):\n");
579                    
580         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
581                 it != m_ReceiveProcessors.end();
582                 ++it ) {
583             if(!(*it)->isEnabled()) {
584                     debugFatal(" receive stream %p not enabled\n",*it);
585             } else {   
586                     debugFatal(" receive stream %p enabled\n",*it);
587             }
588         }
589    
590         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
591                 it != m_TransmitProcessors.end();
592                 ++it ) {
593             if(!(*it)->isEnabled()) {
594                     debugFatal(" transmit stream %p not enabled\n",*it);
595             } else {   
596                     debugFatal(" transmit stream %p enabled\n",*it);
597             }
598         }
599         return false;
600     }
601    
602     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors enabled...\n");
603
604     return true;
605 }
606
607 /**
608  * Disables the registered StreamProcessors
609  * @return true if successful, false otherwise
610  */
611 bool StreamProcessorManager::disableStreamProcessors() {
612     // we prepare the streamprocessors for disable
613     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
614             it != m_ReceiveProcessors.end();
615             ++it ) {
616         (*it)->prepareForDisable();
617     }
618
619     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
620             it != m_TransmitProcessors.end();
621             ++it ) {
622         (*it)->prepareForDisable();
623     }
624
625     // then we disable the streamprocessors
626     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
627             it != m_ReceiveProcessors.end();
628             ++it ) {
629         (*it)->disable();
630     }
631
632     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
633             it != m_TransmitProcessors.end();
634             ++it ) {
635         (*it)->disable();
636     }
637
638     // now we wait for the SP's to get disabled
639     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be disabled...\n");
640     // we have to wait until all streamprocessors indicate that they are running
641     // i.e. that there is actually some data stream flowing
642     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds
643     bool enabled=true;
644     while (enabled && wait_cycles) {
645         wait_cycles--;
646         enabled=false;
647        
648         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
649                 it != m_ReceiveProcessors.end();
650                 ++it ) {
651             if((*it)->isEnabled()) enabled=true;
652         }
653
654         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
655                 it != m_TransmitProcessors.end();
656                 ++it ) {
657             if((*it)->isEnabled()) enabled=true;
658         }
659         usleep(1000); // one cycle
660     }
661    
662     if(!wait_cycles) { // timout has occurred
663         debugFatal("One or more streams couldn't be disabled (timeout):\n");
664                    
665         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
666                 it != m_ReceiveProcessors.end();
667                 ++it ) {
668             if(!(*it)->isEnabled()) {
669                     debugFatal(" receive stream %p not enabled\n",*it);
670             } else {   
671                     debugFatal(" receive stream %p enabled\n",*it);
672             }
673         }
674    
675         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
676                 it != m_TransmitProcessors.end();
677                 ++it ) {
678             if(!(*it)->isEnabled()) {
679                     debugFatal(" transmit stream %p not enabled\n",*it);
680             } else {   
681                     debugFatal(" transmit stream %p enabled\n",*it);
682             }
683         }
684         return false;
685     }
686    
687     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors disabled...\n");
688        
689     return true;
690 }
691
692 /**
693  * Called upon Xrun events. This brings all StreamProcessors back
694  * into their starting state, and then carries on streaming. This should
695  * have the same effect as restarting the whole thing.
696  *
697  * @return true if successful, false otherwise
698  */
699 bool StreamProcessorManager::handleXrun() {
700
701         debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
702
703         /*
704          * Reset means:
705          * 1) Disabling the SP's, so that they don't process any packets
706          *    note: the isomanager does keep on delivering/requesting them
707          * 2) Bringing all buffers & streamprocessors into a know state
708          *    - Clear all capture buffers
709          *    - Put nb_periods*period_size of null frames into the playback buffers
710          * 3) Re-enable the SP's
711          */
712         debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
713         if (!disableStreamProcessors()) {
714                 debugFatal("Could not disable StreamProcessors...\n");
715                 return false;
716         }
717
718         debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
719         // start all SP's synchonized
720         if (!syncStartAll()) {
721                 debugFatal("Could not syncStartAll...\n");
722                 return false;
723         }
724
725         debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
726        
727         return true;
728 }
729
730 /**
731  * @brief Waits until the next period of samples is ready
732  *
733  * This function does not return until a full period of samples is (or should be)
734  * ready to be transferred.
735  *
736  * @return true if the period is ready, false if an xrun occurred
737  */
738 bool StreamProcessorManager::waitForPeriod() {
739     int time_till_next_period;
740     bool xrun_occurred=false;
741        
742     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
743
744     assert(m_SyncSource);
745    
746     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodUsecs();
747    
748     while(time_till_next_period > 0) {
749         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
750    
751         // wait for the period
752         usleep(time_till_next_period);
753        
754         // check for underruns on the ISO side,
755         // those should make us bail out of the wait loop
756         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
757             it != m_ReceiveProcessors.end();
758             ++it ) {
759             // a xrun has occurred on the Iso side
760             xrun_occurred |= (*it)->xrunOccurred();
761         }
762         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
763             it != m_TransmitProcessors.end();
764             ++it ) {
765             // a xrun has occurred on the Iso side
766             xrun_occurred |= (*it)->xrunOccurred();
767         }
768
769         // check if we were waked up too soon
770         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodUsecs();
771     }
772    
773     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", time_till_next_period);
774    
775     // this is to notify the client of the delay
776     // that we introduced
777     m_delayed_usecs=time_till_next_period;
778    
779     // we save the 'ideal' time of the transfer at this point,
780     // because we can have interleaved read - process - write
781     // cycles making that we modify a receiving stream's buffer
782     // before we get to writing.
783     // NOTE: before waitForPeriod() is called again, both the transmit
784     //       and the receive processors should have done their transfer.
785     m_time_of_transfer=m_SyncSource->getTimeAtPeriod();
786     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
787         m_time_of_transfer);
788    
789 #ifdef DEBUG
790     int rcv_bf=0, xmt_bf=0;
791     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
792         it != m_ReceiveProcessors.end();
793         ++it ) {
794         rcv_bf = (*it)->getBufferFill();
795     }
796     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
797         it != m_TransmitProcessors.end();
798         ++it ) {
799         xmt_bf = (*it)->getBufferFill();
800     }
801     debugOutput( DEBUG_LEVEL_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
802         m_time_of_transfer,rcv_bf,xmt_bf,rcv_bf+xmt_bf);
803    
804 #endif
805
806     xrun_occurred=false;
807    
808     // check if xruns occurred on the Iso side.
809     // also check if xruns will occur should we transfer() now
810    
811     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
812           it != m_ReceiveProcessors.end();
813           ++it ) {
814         // a xrun has occurred on the Iso side
815         xrun_occurred |= (*it)->xrunOccurred();
816        
817         // if this is true, a xrun will occur
818         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
819        
820 #ifdef DEBUG
821         if ((*it)->xrunOccurred()) {
822             debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it);
823             (*it)->dumpInfo();
824         }
825         if (!((*it)->canClientTransferFrames(m_period))) {
826             debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it);
827             (*it)->dumpInfo();
828         }
829 #endif
830        
831     }
832     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
833           it != m_TransmitProcessors.end();
834           ++it ) {
835         // a xrun has occurred on the Iso side
836         xrun_occurred |= (*it)->xrunOccurred();
837        
838         // if this is true, a xrun will occur
839         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
840        
841 #ifdef DEBUG
842         if ((*it)->xrunOccurred()) {
843             debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it);
844         }
845         if (!((*it)->canClientTransferFrames(m_period))) {
846             debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it);
847         }
848 #endif       
849     }
850    
851     m_nbperiods++;
852    
853     // now we can signal the client that we are (should be) ready
854     return !xrun_occurred;
855 }
856
857 /**
858  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
859  *
860  * Transfers one period of frames from the client side to the Iso side and vice versa.
861  *
862  * @return true if successful, false otherwise (indicates xrun).
863  */
864 bool StreamProcessorManager::transfer() {
865    
866     debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n");
867
868     if (!transfer(StreamProcessor::E_Receive)) return false;
869     if (!transfer(StreamProcessor::E_Transmit)) return false;
870
871     return true;
872 }
873
874 /**
875  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
876  *
877  * Transfers one period of frames from the client side to the Iso side or vice versa.
878  *
879  * @param t The processor type to tranfer for (receive or transmit)
880  * @return true if successful, false otherwise (indicates xrun).
881  */
882
883 bool StreamProcessorManager::transfer(enum StreamProcessor::EProcessorType t) {
884     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
885    
886     // a static cast could make sure that there is no performance
887     // penalty for the virtual functions (to be checked)
888     if (t==StreamProcessor::E_Receive) {
889         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
890                 it != m_ReceiveProcessors.end();
891                 ++it ) {
892                
893             if(!(*it)->getFrames(m_period)) {
894                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)",
895                             m_period, m_time_of_transfer,*it);
896                     return false; // buffer underrun
897             }
898
899         }
900     } else {
901         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
902                 it != m_TransmitProcessors.end();
903                 ++it ) {
904                
905             if(!(*it)->putFrames(m_period, (int64_t)m_time_of_transfer)) {
906                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)",
907                         m_period, m_time_of_transfer, *it);
908                 return false; // buffer overrun
909             }
910
911         }
912     }
913
914     return true;
915 }
916
917 void StreamProcessorManager::dumpInfo() {
918         debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
919         debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
920         debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
921
922         debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
923         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
924                 it != m_ReceiveProcessors.end();
925                 ++it ) {
926                 (*it)->dumpInfo();
927         }
928
929         debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
930         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
931                 it != m_TransmitProcessors.end();
932                 ++it ) {
933                 (*it)->dumpInfo();
934         }
935
936         debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
937         m_isoManager->dumpInfo();
938         debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
939
940 }
941
942 void StreamProcessorManager::setVerboseLevel(int l) {
943         setDebugLevel(l);
944
945         if (m_isoManager) m_isoManager->setVerboseLevel(l);
946
947         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
948         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
949                 it != m_ReceiveProcessors.end();
950                 ++it ) {
951                 (*it)->setVerboseLevel(l);
952         }
953
954         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
955         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
956                 it != m_TransmitProcessors.end();
957                 ++it ) {
958                 (*it)->setVerboseLevel(l);
959         }
960 }
961
962
963 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
964         int count=0;
965
966         if (direction == Port::E_Capture) {
967                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
968                         it != m_ReceiveProcessors.end();
969                         ++it ) {
970                         count += (*it)->getPortCount(type);
971                 }
972         } else {
973                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
974                         it != m_TransmitProcessors.end();
975                         ++it ) {
976                         count += (*it)->getPortCount(type);
977                 }
978         }
979         return count;
980 }
981
982 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
983         int count=0;
984
985         if (direction == Port::E_Capture) {
986                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
987                         it != m_ReceiveProcessors.end();
988                         ++it ) {
989                         count += (*it)->getPortCount();
990                 }
991         } else {
992                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
993                         it != m_TransmitProcessors.end();
994                         ++it ) {
995                         count += (*it)->getPortCount();
996                 }
997         }
998         return count;
999 }
1000
1001 // TODO: implement a port map here, instead of the loop
1002
1003 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1004         int count=0;
1005         int prevcount=0;
1006
1007         if (direction == Port::E_Capture) {
1008                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1009                         it != m_ReceiveProcessors.end();
1010                         ++it ) {
1011                         count += (*it)->getPortCount();
1012                         if (count > idx) {
1013                                 return (*it)->getPortAtIdx(idx-prevcount);
1014                         }
1015                         prevcount=count;
1016                 }
1017         } else {
1018                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1019                         it != m_TransmitProcessors.end();
1020                         ++it ) {
1021                         count += (*it)->getPortCount();
1022                         if (count > idx) {
1023                                 return (*it)->getPortAtIdx(idx-prevcount);
1024                         }
1025                         prevcount=count;
1026                 }
1027         }
1028         return NULL;
1029 }
1030
1031 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1032     m_thread_realtime=rt;
1033     m_thread_priority=priority;
1034     return true;
1035 }
1036
1037
1038 } // end of namespace
Note: See TracBrowser for help on using the browser.