root/branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp

Revision 419, 33.0 kB (checked in by pieterpalmers, 17 years ago)

namespace simplification

Line 
1 /* $Id$ */
2
3 /*
4  *   FreeBob Streaming API
5  *   FreeBob = Firewire (pro-)audio for linux
6  *
7  *   http://freebob.sf.net
8  *
9  *   Copyright (C) 2005,2006 Pieter Palmers <pieterpalmers@users.sourceforge.net>
10  *
11  *   This program is free software {} you can redistribute it and/or modify
12  *   it under the terms of the GNU General Public License as published by
13  *   the Free Software Foundation {} either version 2 of the License, or
14  *   (at your option) any later version.
15  *
16  *   This program is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY {} without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with this program {} if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  *
26  *
27  */
28
29 #include "StreamProcessorManager.h"
30 #include "StreamProcessor.h"
31 #include "Port.h"
32 #include <errno.h>
33 #include <assert.h>
34
35 #include "libstreaming/cycletimer.h"
36
37 #define CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL 50
38
39 #define RUNNING_TIMEOUT_MSEC 4000
40 #define PREPARE_TIMEOUT_MSEC 4000
41 #define ENABLE_TIMEOUT_MSEC 4000
42
43 #define ENABLE_DELAY_CYCLES 2000
44
45 namespace Streaming {
46
47 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_NORMAL );
48
49 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int nb_buffers)
50         : m_SyncSource(NULL), m_nb_buffers(nb_buffers), m_period(period), m_xruns(0),
51         m_isoManager(0), m_nbperiods(0) {
52
53 }
54
55 StreamProcessorManager::~StreamProcessorManager() {
56         if (m_isoManager) delete m_isoManager;
57        
58 }
59
60 /**
61  * Registers \ref processor with this manager.
62  *
63  * also registers it with the isohandlermanager
64  *
65  * be sure to call isohandlermanager->init() first!
66  * and be sure that the processors are also ->init()'ed
67  *
68  * @param processor
69  * @return true if successfull
70  */
71 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
72 {
73         debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
74         assert(processor);
75         assert(m_isoManager);
76
77         if (processor->getType()==StreamProcessor::E_Receive) {
78                 processor->setVerboseLevel(getDebugLevel()); // inherit debug level
79                
80                 m_ReceiveProcessors.push_back(processor);
81                
82                 processor->setManager(this);
83                                
84                 return true;
85         }
86        
87         if (processor->getType()==StreamProcessor::E_Transmit) {
88                 processor->setVerboseLevel(getDebugLevel()); // inherit debug level
89                
90                 m_TransmitProcessors.push_back(processor);
91                
92                 processor->setManager(this);
93                
94                 return true;
95         }
96
97         debugFatal("Unsupported processor type!\n");
98        
99         return false;
100 }
101
102 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
103 {
104         debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
105         assert(processor);
106
107         if (processor->getType()==StreamProcessor::E_Receive) {
108
109                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
110                         it != m_ReceiveProcessors.end();
111                         ++it ) {
112
113                         if ( *it == processor ) {
114                                         m_ReceiveProcessors.erase(it);
115                                        
116                                         processor->clearManager();
117                                        
118                                         if(!m_isoManager->unregisterStream(processor)) {
119                                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
120                                                
121                                                 return false;
122                                                
123                                         }
124                                        
125                                         return true;
126                                 }
127                 }
128         }
129
130         if (processor->getType()==StreamProcessor::E_Transmit) {
131                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
132                         it != m_TransmitProcessors.end();
133                         ++it ) {
134
135                         if ( *it == processor ) {
136                                         m_TransmitProcessors.erase(it);
137                                        
138                                         processor->clearManager();
139                                        
140                                         if(!m_isoManager->unregisterStream(processor)) {
141                                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
142                                                
143                                                 return false;
144                                                
145                                         }
146                                        
147                                         return true;
148                                 }
149                 }
150         }
151        
152         debugFatal("Processor (%p) not found!\n",processor);
153
154         return false; //not found
155
156 }
157
158 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
159     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
160
161     m_SyncSource=s;
162     return true;
163 }
164
165 StreamProcessor *StreamProcessorManager::getSyncSource() {
166     return m_SyncSource;
167 }
168
169 bool StreamProcessorManager::init()
170 {
171         debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
172
173         m_isoManager=new IsoHandlerManager(m_thread_realtime, m_thread_priority);
174        
175         if(!m_isoManager) {
176                 debugFatal("Could not create IsoHandlerManager\n");
177                 return false;
178         }
179        
180         // propagate the debug level
181         m_isoManager->setVerboseLevel(getDebugLevel());
182        
183         if(!m_isoManager->init()) {
184                 debugFatal("Could not initialize IsoHandlerManager\n");
185                 return false;
186         }
187        
188         m_xrun_happened=false;
189        
190         return true;
191 }
192
193 bool StreamProcessorManager::prepare() {
194
195         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
196        
197         // if no sync source is set, select one here
198         if(m_SyncSource == NULL) {
199            debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
200         }
201        
202         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
203                 it != m_ReceiveProcessors.end();
204                 ++it ) {
205                         if(m_SyncSource == NULL) {
206                                 debugWarning(" => Sync Source is %p.\n", *it);
207                                 m_SyncSource = *it;
208                         }
209         }
210
211         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
212                 it != m_TransmitProcessors.end();
213                 ++it ) {
214                         if(m_SyncSource == NULL) {
215                                 debugWarning(" => Sync Source is %p.\n", *it);
216                                 m_SyncSource = *it;
217                         }
218         }
219
220         // now do the actual preparation
221         debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
222         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
223                 it != m_ReceiveProcessors.end();
224                 ++it ) {
225                         if(!(*it)->setSyncSource(m_SyncSource)) {
226                                 debugFatal(  " could not set sync source (%p)...\n",(*it));
227                                 return false;
228                         }
229                        
230                         if(!(*it)->prepare()) {
231                                 debugFatal(  " could not prepare (%p)...\n",(*it));
232                                 return false;
233                         }
234         }
235
236         debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
237         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
238                 it != m_TransmitProcessors.end();
239                 ++it ) {
240                         if(!(*it)->setSyncSource(m_SyncSource)) {
241                                 debugFatal(  " could not set sync source (%p)...\n",(*it));
242                                 return false;
243                         }               
244                         if(!(*it)->prepare()) {
245                                 debugFatal( " could not prepare (%p)...\n",(*it));
246                                 return false;
247                         }
248         }
249
250     // if there are no stream processors registered,
251     // fail
252     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
253         debugFatal("No stream processors registered, can't do anything usefull\n");
254         return false;
255     }
256
257         return true;
258 }
259
260
261 bool StreamProcessorManager::syncStartAll() {
262
263     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for StreamProcessor streams to start running...\n");
264     // we have to wait until all streamprocessors indicate that they are running
265     // i.e. that there is actually some data stream flowing
266     int wait_cycles=RUNNING_TIMEOUT_MSEC; // two seconds
267     bool notRunning=true;
268     while (notRunning && wait_cycles) {
269         wait_cycles--;
270         notRunning=false;
271        
272 //         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
273 //                 it != m_ReceiveProcessors.end();
274 //                 ++it ) {
275 //             if(!(*it)->isRunning()) notRunning=true;
276 //         }
277 //
278 //         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
279 //                 it != m_TransmitProcessors.end();
280 //                 ++it ) {
281 //             if(!(*it)->isRunning()) notRunning=true;
282 //         }
283        
284         // EXPERIMENT:
285         // the only stream that should be running is the sync
286         // source stream, as this is the one that defines
287         // when to signal buffers. Maybe we get an xrun at startup,
288         // but that should be handled.
289        
290         // the problem is that otherwise a setup with a device
291         // that waits for decent input before sending output
292         // will not start up (e.g. the bounce device), because
293         // all streams are required to be running.
294        
295         // other streams still have at least ENABLE_DELAY_CYCLES cycles
296         // to start up
297         if(!m_SyncSource->isRunning()) notRunning=true;
298        
299         usleep(1000);
300         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Running check: %d\n",notRunning);
301     }
302
303     if(!wait_cycles) { // timout has occurred
304         debugFatal("One or more streams are not starting up (timeout):\n");
305
306         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
307                 it != m_ReceiveProcessors.end();
308                 ++it ) {
309             if(!(*it)->isRunning()) {
310                 debugFatal(" receive stream %p not running\n",*it);
311             } else {   
312                 debugFatal(" receive stream %p running\n",*it);
313             }
314         }
315
316         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
317                 it != m_TransmitProcessors.end();
318                 ++it ) {
319             if(!(*it)->isRunning()) {
320                 debugFatal(" transmit stream %p not running\n",*it);
321             } else {   
322                 debugFatal(" transmit stream %p running\n",*it);
323             }
324         }
325         return false;
326     }
327
328     // we want to make sure that everything is running well,
329     // so wait for a while
330     usleep(USECS_PER_CYCLE * CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL);
331
332     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
333    
334     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
335
336     int max_of_min_delay=0;
337     int min_delay=0;
338     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
339             it != m_ReceiveProcessors.end();
340             ++it ) {
341         min_delay=(*it)->getMinimalSyncDelay();
342         if(min_delay>max_of_min_delay) max_of_min_delay=min_delay;
343     }
344    
345     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
346             it != m_TransmitProcessors.end();
347             ++it ) {
348         min_delay=(*it)->getMinimalSyncDelay();
349         if(min_delay>max_of_min_delay) max_of_min_delay=min_delay;
350     }
351    
352     debugOutput( DEBUG_LEVEL_VERBOSE, "  %d ticks\n", max_of_min_delay);
353     m_SyncSource->setSyncDelay(max_of_min_delay);
354    
355    
356     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting StreamProcessors...\n");
357     // now we reset the frame counters
358     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
359             it != m_ReceiveProcessors.end();
360             ++it ) {
361         (*it)->reset();
362     }
363    
364     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
365             it != m_TransmitProcessors.end();
366             ++it ) {
367         (*it)->reset();
368     }
369        
370     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n");
371    
372     uint64_t now=m_SyncSource->getTimeNow(); // fixme: should be in usecs, not ticks
373    
374     // FIXME: this should not be in cycles, but in 'time'
375     unsigned int enable_at=TICKS_TO_CYCLES(now)+ENABLE_DELAY_CYCLES;
376     if (enable_at > 8000) enable_at -= 8000;
377
378     if (!enableStreamProcessors(enable_at)) {
379         debugFatal("Could not enable StreamProcessors...\n");
380         return false;
381     }
382
383     return true;
384 }
385
386 bool StreamProcessorManager::start() {
387         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
388         assert(m_isoManager);
389        
390         debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
391         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
392         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
393                 it != m_ReceiveProcessors.end();
394                 ++it ) {
395                         if (!(*it)->prepareForStart()) {
396                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Receive stream processor (%p) failed to prepare for start\n", *it);
397                                 return false;
398                         }
399                         if (!m_isoManager->registerStream(*it)) {
400                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
401                                 return false;
402                         }
403                 }
404
405         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
406         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
407                 it != m_TransmitProcessors.end();
408                 ++it ) {
409                         if (!(*it)->prepareForStart()) {
410                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Transmit stream processor (%p) failed to prepare for start\n", *it);
411                                 return false;
412                         }
413                         if (!m_isoManager->registerStream(*it)) {
414                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
415                                 return false;
416                         }
417                 }
418
419         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
420         if (!m_isoManager->prepare()) {
421                 debugFatal("Could not prepare isoManager\n");
422                 return false;
423         }
424
425         debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
426         if (!disableStreamProcessors()) {
427                 debugFatal("Could not disable StreamProcessors...\n");
428                 return false;
429         }
430                
431         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n");
432         if (!m_isoManager->startHandlers(-1)) {
433                 debugFatal("Could not start handlers...\n");
434                 return false;
435         }
436
437         // start all SP's synchonized
438         if (!syncStartAll()) {
439                 debugFatal("Could not syncStartAll...\n");
440                 return false;
441         }
442        
443         // dump the iso stream information when in verbose mode
444         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
445                 m_isoManager->dumpInfo();
446         }
447        
448         return true;
449        
450 }
451
452 bool StreamProcessorManager::stop() {
453         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
454         assert(m_isoManager);
455
456         debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to prepare to stop...\n");
457         // Most stream processors can just stop without special treatment.  However, some
458         // (like the MOTU) need to do a few things before it's safe to turn off the iso
459         // handling.
460         int wait_cycles=PREPARE_TIMEOUT_MSEC; // two seconds ought to be sufficient
461         bool allReady = false;
462         while (!allReady && wait_cycles) {
463                 wait_cycles--;
464                 allReady = true;
465                
466                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
467                         it != m_ReceiveProcessors.end();
468                         ++it ) {
469                         if(!(*it)->prepareForStop()) allReady = false;
470                 }
471        
472                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
473                         it != m_TransmitProcessors.end();
474                         ++it ) {
475                         if(!(*it)->prepareForStop()) allReady = false;
476                 }
477                 usleep(1000);
478         }
479
480        
481         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n");
482         if(!m_isoManager->stopHandlers()) {
483            debugFatal("Could not stop ISO handlers\n");
484            return false;
485         }
486        
487         debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processors from handlers...\n");
488     // now unregister all streams from iso manager
489         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
490         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
491                 it != m_ReceiveProcessors.end();
492                 ++it ) {
493                         if (!m_isoManager->unregisterStream(*it)) {
494                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it);
495                                 return false;
496                         }
497                        
498                 }
499
500         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
501         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
502                 it != m_TransmitProcessors.end();
503                 ++it ) {
504                         if (!m_isoManager->unregisterStream(*it)) {
505                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it);
506                                 return false;
507                         }
508                        
509                 }
510        
511         return true;
512        
513 }
514
515 /**
516  * Enables the registered StreamProcessors
517  * @return true if successful, false otherwise
518  */
519 bool StreamProcessorManager::enableStreamProcessors(uint64_t time_to_enable_at) {
520     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors at %llu...\n", time_to_enable_at);
521
522     debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor (%p)...\n",m_SyncSource);
523     debugOutput( DEBUG_LEVEL_VERBOSE, "  Prepare...\n");
524     if (!m_SyncSource->prepareForEnable(time_to_enable_at)) {
525             debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n");
526         return false;
527     }
528
529     debugOutput( DEBUG_LEVEL_VERBOSE, "  Enable...\n");
530     m_SyncSource->enable(time_to_enable_at);
531    
532     debugOutput( DEBUG_LEVEL_VERBOSE, " Other StreamProcessors...\n");
533    
534     // we prepare the streamprocessors for enable
535     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
536             it != m_ReceiveProcessors.end();
537             ++it ) {
538         if(*it != m_SyncSource) {
539             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Receive SP (%p)...\n",*it);
540             (*it)->prepareForEnable(time_to_enable_at);
541         }
542     }
543
544     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
545             it != m_TransmitProcessors.end();
546             ++it ) {
547         if(*it != m_SyncSource) {
548             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Transmit SP (%p)...\n",*it);
549             (*it)->prepareForEnable(time_to_enable_at);
550         }
551     }
552
553     // then we enable the streamprocessors
554     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
555             it != m_ReceiveProcessors.end();
556             ++it ) {           
557         if(*it != m_SyncSource) {
558             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Receive SP (%p)...\n",*it);
559             (*it)->enable(time_to_enable_at);
560         }
561     }
562
563     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
564             it != m_TransmitProcessors.end();
565             ++it ) {
566         if(*it != m_SyncSource) {
567             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Transmit SP (%p)...\n",*it);
568             (*it)->enable(time_to_enable_at);
569         }
570     }
571
572     // now we wait for the SP's to get enabled
573     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be enabled...\n");
574     // we have to wait until all streamprocessors indicate that they are running
575     // i.e. that there is actually some data stream flowing
576     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds
577     bool notEnabled=true;
578     while (notEnabled && wait_cycles) {
579         wait_cycles--;
580         notEnabled=false;
581        
582         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
583                 it != m_ReceiveProcessors.end();
584                 ++it ) {
585             if(!(*it)->isEnabled()) notEnabled=true;
586         }
587
588         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
589                 it != m_TransmitProcessors.end();
590                 ++it ) {
591             if(!(*it)->isEnabled()) notEnabled=true;
592         }
593         usleep(1000); // one cycle
594     }
595    
596     if(!wait_cycles) { // timout has occurred
597         debugFatal("One or more streams couldn't be enabled (timeout):\n");
598                    
599         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
600                 it != m_ReceiveProcessors.end();
601                 ++it ) {
602             if(!(*it)->isEnabled()) {
603                     debugFatal(" receive stream %p not enabled\n",*it);
604             } else {   
605                     debugFatal(" receive stream %p enabled\n",*it);
606             }
607         }
608    
609         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
610                 it != m_TransmitProcessors.end();
611                 ++it ) {
612             if(!(*it)->isEnabled()) {
613                     debugFatal(" transmit stream %p not enabled\n",*it);
614             } else {   
615                     debugFatal(" transmit stream %p enabled\n",*it);
616             }
617         }
618         return false;
619     }
620    
621     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors enabled...\n");
622
623     return true;
624 }
625
626 /**
627  * Disables the registered StreamProcessors
628  * @return true if successful, false otherwise
629  */
630 bool StreamProcessorManager::disableStreamProcessors() {
631     // we prepare the streamprocessors for disable
632     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
633             it != m_ReceiveProcessors.end();
634             ++it ) {
635         (*it)->prepareForDisable();
636     }
637
638     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
639             it != m_TransmitProcessors.end();
640             ++it ) {
641         (*it)->prepareForDisable();
642     }
643
644     // then we disable the streamprocessors
645     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
646             it != m_ReceiveProcessors.end();
647             ++it ) {
648         (*it)->disable();
649     }
650
651     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
652             it != m_TransmitProcessors.end();
653             ++it ) {
654         (*it)->disable();
655     }
656
657     // now we wait for the SP's to get disabled
658     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be disabled...\n");
659     // we have to wait until all streamprocessors indicate that they are running
660     // i.e. that there is actually some data stream flowing
661     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds
662     bool enabled=true;
663     while (enabled && wait_cycles) {
664         wait_cycles--;
665         enabled=false;
666        
667         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
668                 it != m_ReceiveProcessors.end();
669                 ++it ) {
670             if((*it)->isEnabled()) enabled=true;
671         }
672
673         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
674                 it != m_TransmitProcessors.end();
675                 ++it ) {
676             if((*it)->isEnabled()) enabled=true;
677         }
678         usleep(1000); // one cycle
679     }
680    
681     if(!wait_cycles) { // timout has occurred
682         debugFatal("One or more streams couldn't be disabled (timeout):\n");
683                    
684         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
685                 it != m_ReceiveProcessors.end();
686                 ++it ) {
687             if(!(*it)->isEnabled()) {
688                     debugFatal(" receive stream %p not enabled\n",*it);
689             } else {   
690                     debugFatal(" receive stream %p enabled\n",*it);
691             }
692         }
693    
694         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
695                 it != m_TransmitProcessors.end();
696                 ++it ) {
697             if(!(*it)->isEnabled()) {
698                     debugFatal(" transmit stream %p not enabled\n",*it);
699             } else {   
700                     debugFatal(" transmit stream %p enabled\n",*it);
701             }
702         }
703         return false;
704     }
705    
706     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors disabled...\n");
707        
708     return true;
709 }
710
711 /**
712  * Called upon Xrun events. This brings all StreamProcessors back
713  * into their starting state, and then carries on streaming. This should
714  * have the same effect as restarting the whole thing.
715  *
716  * @return true if successful, false otherwise
717  */
718 bool StreamProcessorManager::handleXrun() {
719
720         debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
721
722         /*
723          * Reset means:
724          * 1) Disabling the SP's, so that they don't process any packets
725          *    note: the isomanager does keep on delivering/requesting them
726          * 2) Bringing all buffers & streamprocessors into a know state
727          *    - Clear all capture buffers
728          *    - Put nb_periods*period_size of null frames into the playback buffers
729          * 3) Re-enable the SP's
730          */
731         debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
732         if (!disableStreamProcessors()) {
733                 debugFatal("Could not disable StreamProcessors...\n");
734                 return false;
735         }
736
737         debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
738         // start all SP's synchonized
739         if (!syncStartAll()) {
740                 debugFatal("Could not syncStartAll...\n");
741                 return false;
742         }
743
744         debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
745        
746         return true;
747 }
748
749 /**
750  * @brief Waits until the next period of samples is ready
751  *
752  * This function does not return until a full period of samples is (or should be)
753  * ready to be transferred.
754  *
755  * @return true if the period is ready, false if an xrun occurred
756  */
757 bool StreamProcessorManager::waitForPeriod() {
758     int time_till_next_period;
759     bool xrun_occurred=false;
760        
761     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
762
763     assert(m_SyncSource);
764    
765     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
766    
767     while(time_till_next_period > 0) {
768         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
769    
770         // wait for the period
771         usleep(time_till_next_period);
772        
773         // check for underruns on the ISO side,
774         // those should make us bail out of the wait loop
775         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
776             it != m_ReceiveProcessors.end();
777             ++it ) {
778             // a xrun has occurred on the Iso side
779             xrun_occurred |= (*it)->xrunOccurred();
780         }
781         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
782             it != m_TransmitProcessors.end();
783             ++it ) {
784             // a xrun has occurred on the Iso side
785             xrun_occurred |= (*it)->xrunOccurred();
786         }
787
788         // check if we were waked up too soon
789         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
790     }
791    
792     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", time_till_next_period);
793    
794     // this is to notify the client of the delay
795     // that we introduced
796     m_delayed_usecs=time_till_next_period;
797    
798     // we save the 'ideal' time of the transfer at this point,
799     // because we can have interleaved read - process - write
800     // cycles making that we modify a receiving stream's buffer
801     // before we get to writing.
802     // NOTE: before waitForPeriod() is called again, both the transmit
803     //       and the receive processors should have done their transfer.
804     m_time_of_transfer=m_SyncSource->getTimeAtPeriod();
805     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
806         m_time_of_transfer);
807    
808 #ifdef DEBUG
809     int rcv_bf=0, xmt_bf=0;
810     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
811         it != m_ReceiveProcessors.end();
812         ++it ) {
813         rcv_bf = (*it)->getBufferFill();
814     }
815     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
816         it != m_TransmitProcessors.end();
817         ++it ) {
818         xmt_bf = (*it)->getBufferFill();
819     }
820     debugOutput( DEBUG_LEVEL_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
821         m_time_of_transfer,rcv_bf,xmt_bf,rcv_bf+xmt_bf);
822    
823 #endif
824
825     xrun_occurred=false;
826    
827     // check if xruns occurred on the Iso side.
828     // also check if xruns will occur should we transfer() now
829    
830     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
831           it != m_ReceiveProcessors.end();
832           ++it ) {
833         // a xrun has occurred on the Iso side
834         xrun_occurred |= (*it)->xrunOccurred();
835        
836         // if this is true, a xrun will occur
837         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
838        
839 #ifdef DEBUG
840         if ((*it)->xrunOccurred()) {
841             debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it);
842             (*it)->dumpInfo();
843         }
844         if (!((*it)->canClientTransferFrames(m_period))) {
845             debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it);
846             (*it)->dumpInfo();
847         }
848 #endif
849        
850     }
851     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
852           it != m_TransmitProcessors.end();
853           ++it ) {
854         // a xrun has occurred on the Iso side
855         xrun_occurred |= (*it)->xrunOccurred();
856        
857         // if this is true, a xrun will occur
858         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
859        
860 #ifdef DEBUG
861         if ((*it)->xrunOccurred()) {
862             debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it);
863         }
864         if (!((*it)->canClientTransferFrames(m_period))) {
865             debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it);
866         }
867 #endif       
868     }
869    
870     m_nbperiods++;
871    
872     // now we can signal the client that we are (should be) ready
873     return !xrun_occurred;
874 }
875
876 /**
877  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
878  *
879  * Transfers one period of frames from the client side to the Iso side and vice versa.
880  *
881  * @return true if successful, false otherwise (indicates xrun).
882  */
883 bool StreamProcessorManager::transfer() {
884    
885     debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n");
886
887     if (!transfer(StreamProcessor::E_Receive)) return false;
888     if (!transfer(StreamProcessor::E_Transmit)) return false;
889
890     return true;
891 }
892
893 /**
894  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
895  *
896  * Transfers one period of frames from the client side to the Iso side or vice versa.
897  *
898  * @param t The processor type to tranfer for (receive or transmit)
899  * @return true if successful, false otherwise (indicates xrun).
900  */
901
902 bool StreamProcessorManager::transfer(enum StreamProcessor::EProcessorType t) {
903     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
904    
905     // a static cast could make sure that there is no performance
906     // penalty for the virtual functions (to be checked)
907     if (t==StreamProcessor::E_Receive) {
908         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
909                 it != m_ReceiveProcessors.end();
910                 ++it ) {
911                
912             if(!(*it)->getFrames(m_period)) {
913                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)",
914                             m_period, m_time_of_transfer,*it);
915                     return false; // buffer underrun
916             }
917
918         }
919     } else {
920         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
921                 it != m_TransmitProcessors.end();
922                 ++it ) {
923                
924             if(!(*it)->putFrames(m_period, (int64_t)m_time_of_transfer)) {
925                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)",
926                         m_period, m_time_of_transfer, *it);
927                 return false; // buffer overrun
928             }
929
930         }
931     }
932
933     return true;
934 }
935
936 void StreamProcessorManager::dumpInfo() {
937         debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
938         debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
939         debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
940
941         debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
942         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
943                 it != m_ReceiveProcessors.end();
944                 ++it ) {
945                 (*it)->dumpInfo();
946         }
947
948         debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
949         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
950                 it != m_TransmitProcessors.end();
951                 ++it ) {
952                 (*it)->dumpInfo();
953         }
954
955         debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
956         m_isoManager->dumpInfo();
957         debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
958
959 }
960
961 void StreamProcessorManager::setVerboseLevel(int l) {
962         setDebugLevel(l);
963
964         if (m_isoManager) m_isoManager->setVerboseLevel(l);
965
966         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
967         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
968                 it != m_ReceiveProcessors.end();
969                 ++it ) {
970                 (*it)->setVerboseLevel(l);
971         }
972
973         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
974         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
975                 it != m_TransmitProcessors.end();
976                 ++it ) {
977                 (*it)->setVerboseLevel(l);
978         }
979 }
980
981
982 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
983         int count=0;
984
985         if (direction == Port::E_Capture) {
986                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
987                         it != m_ReceiveProcessors.end();
988                         ++it ) {
989                         count += (*it)->getPortCount(type);
990                 }
991         } else {
992                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
993                         it != m_TransmitProcessors.end();
994                         ++it ) {
995                         count += (*it)->getPortCount(type);
996                 }
997         }
998         return count;
999 }
1000
1001 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1002         int count=0;
1003
1004         if (direction == Port::E_Capture) {
1005                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1006                         it != m_ReceiveProcessors.end();
1007                         ++it ) {
1008                         count += (*it)->getPortCount();
1009                 }
1010         } else {
1011                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1012                         it != m_TransmitProcessors.end();
1013                         ++it ) {
1014                         count += (*it)->getPortCount();
1015                 }
1016         }
1017         return count;
1018 }
1019
1020 // TODO: implement a port map here, instead of the loop
1021
1022 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1023         int count=0;
1024         int prevcount=0;
1025
1026         if (direction == Port::E_Capture) {
1027                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1028                         it != m_ReceiveProcessors.end();
1029                         ++it ) {
1030                         count += (*it)->getPortCount();
1031                         if (count > idx) {
1032                                 return (*it)->getPortAtIdx(idx-prevcount);
1033                         }
1034                         prevcount=count;
1035                 }
1036         } else {
1037                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1038                         it != m_TransmitProcessors.end();
1039                         ++it ) {
1040                         count += (*it)->getPortCount();
1041                         if (count > idx) {
1042                                 return (*it)->getPortAtIdx(idx-prevcount);
1043                         }
1044                         prevcount=count;
1045                 }
1046         }
1047         return NULL;
1048 }
1049
1050 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1051     m_thread_realtime=rt;
1052     m_thread_priority=priority;
1053     return true;
1054 }
1055
1056
1057 } // end of namespace
Note: See TracBrowser for help on using the browser.