root/branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp

Revision 407, 32.3 kB (checked in by pieterpalmers, 16 years ago)

- Changed the way the device class configure options are handled. Now they are handled in the makefiles instead of the source files. The only source file that still contains the #ifdef's is devicemanager.cpp, to conditionally include the device class include files and to conditionally probe the classes that might be supported.
- added a configure option to disable the compilation of the test programs in tests/
- cleaned up the ADMTP transmit streamprocessor. Now it sends silenced packets when in the disabled state, instead of no-data packets
- added a getNodeID() to ieee1394service
- made comments in ieee1394service.h doxygen compliant

Line 
1 /* $Id$ */
2
3 /*
4  *   FreeBob Streaming API
5  *   FreeBob = Firewire (pro-)audio for linux
6  *
7  *   http://freebob.sf.net
8  *
9  *   Copyright (C) 2005,2006 Pieter Palmers <pieterpalmers@users.sourceforge.net>
10  *
11  *   This program is free software {} you can redistribute it and/or modify
12  *   it under the terms of the GNU General Public License as published by
13  *   the Free Software Foundation {} either version 2 of the License, or
14  *   (at your option) any later version.
15  *
16  *   This program is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY {} without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with this program {} if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  *
26  *
27  */
28
29 #include "StreamProcessorManager.h"
30 #include "StreamProcessor.h"
31 #include "Port.h"
32 #include <errno.h>
33 #include <assert.h>
34
35 #include "libstreaming/cycletimer.h"
36
37 #define CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL 50
38
39 #define RUNNING_TIMEOUT_MSEC 4000
40 #define PREPARE_TIMEOUT_MSEC 4000
41 #define ENABLE_TIMEOUT_MSEC 4000
42
43 namespace FreebobStreaming {
44
45 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_NORMAL );
46
47 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int nb_buffers)
48         : m_SyncSource(NULL), m_nb_buffers(nb_buffers), m_period(period), m_xruns(0),
49         m_isoManager(0), m_nbperiods(0) {
50
51 }
52
53 StreamProcessorManager::~StreamProcessorManager() {
54         if (m_isoManager) delete m_isoManager;
55        
56 }
57
58 /**
59  * Registers \ref processor with this manager.
60  *
61  * also registers it with the isohandlermanager
62  *
63  * be sure to call isohandlermanager->init() first!
64  * and be sure that the processors are also ->init()'ed
65  *
66  * @param processor
67  * @return true if successfull
68  */
69 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
70 {
71         debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
72         assert(processor);
73         assert(m_isoManager);
74
75         if (processor->getType()==StreamProcessor::E_Receive) {
76                 processor->setVerboseLevel(getDebugLevel()); // inherit debug level
77                
78                 m_ReceiveProcessors.push_back(processor);
79                
80                 processor->setManager(this);
81                                
82                 return true;
83         }
84        
85         if (processor->getType()==StreamProcessor::E_Transmit) {
86                 processor->setVerboseLevel(getDebugLevel()); // inherit debug level
87                
88                 m_TransmitProcessors.push_back(processor);
89                
90                 processor->setManager(this);
91                
92                 return true;
93         }
94
95         debugFatal("Unsupported processor type!\n");
96        
97         return false;
98 }
99
100 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
101 {
102         debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
103         assert(processor);
104
105         if (processor->getType()==StreamProcessor::E_Receive) {
106
107                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
108                         it != m_ReceiveProcessors.end();
109                         ++it ) {
110
111                         if ( *it == processor ) {
112                                         m_ReceiveProcessors.erase(it);
113                                        
114                                         processor->clearManager();
115                                        
116                                         if(!m_isoManager->unregisterStream(processor)) {
117                                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
118                                                
119                                                 return false;
120                                                
121                                         }
122                                        
123                                         return true;
124                                 }
125                 }
126         }
127
128         if (processor->getType()==StreamProcessor::E_Transmit) {
129                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
130                         it != m_TransmitProcessors.end();
131                         ++it ) {
132
133                         if ( *it == processor ) {
134                                         m_TransmitProcessors.erase(it);
135                                        
136                                         processor->clearManager();
137                                        
138                                         if(!m_isoManager->unregisterStream(processor)) {
139                                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
140                                                
141                                                 return false;
142                                                
143                                         }
144                                        
145                                         return true;
146                                 }
147                 }
148         }
149        
150         debugFatal("Processor (%p) not found!\n",processor);
151
152         return false; //not found
153
154 }
155
156 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
157     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
158
159     m_SyncSource=s;
160     return true;
161 }
162
163 StreamProcessor *StreamProcessorManager::getSyncSource() {
164     return m_SyncSource;
165 }
166
167 bool StreamProcessorManager::init()
168 {
169         debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
170
171         m_isoManager=new IsoHandlerManager(m_thread_realtime, m_thread_priority);
172        
173         if(!m_isoManager) {
174                 debugFatal("Could not create IsoHandlerManager\n");
175                 return false;
176         }
177        
178         // propagate the debug level
179         m_isoManager->setVerboseLevel(getDebugLevel());
180        
181         if(!m_isoManager->init()) {
182                 debugFatal("Could not initialize IsoHandlerManager\n");
183                 return false;
184         }
185        
186         m_xrun_happened=false;
187        
188         return true;
189 }
190
191 bool StreamProcessorManager::prepare() {
192
193         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
194        
195         // if no sync source is set, select one here
196         if(m_SyncSource == NULL) {
197            debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
198         }
199        
200         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
201                 it != m_ReceiveProcessors.end();
202                 ++it ) {
203                         if(m_SyncSource == NULL) {
204                                 debugWarning(" => Sync Source is %p.\n", *it);
205                                 m_SyncSource = *it;
206                         }
207         }
208
209         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
210                 it != m_TransmitProcessors.end();
211                 ++it ) {
212                         if(m_SyncSource == NULL) {
213                                 debugWarning(" => Sync Source is %p.\n", *it);
214                                 m_SyncSource = *it;
215                         }
216         }
217
218         // now do the actual preparation
219         debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
220         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
221                 it != m_ReceiveProcessors.end();
222                 ++it ) {
223                         if(!(*it)->setSyncSource(m_SyncSource)) {
224                                 debugFatal(  " could not set sync source (%p)...\n",(*it));
225                                 return false;
226                         }
227                        
228                         if(!(*it)->prepare()) {
229                                 debugFatal(  " could not prepare (%p)...\n",(*it));
230                                 return false;
231                         }
232         }
233
234         debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
235         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
236                 it != m_TransmitProcessors.end();
237                 ++it ) {
238                         if(!(*it)->setSyncSource(m_SyncSource)) {
239                                 debugFatal(  " could not set sync source (%p)...\n",(*it));
240                                 return false;
241                         }               
242                         if(!(*it)->prepare()) {
243                                 debugFatal( " could not prepare (%p)...\n",(*it));
244                                 return false;
245                         }
246         }
247
248     // if there are no stream processors registered,
249     // fail
250     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
251         debugFatal("No stream processors registered, can't do anything usefull\n");
252         return false;
253     }
254
255         return true;
256 }
257
258
259 bool StreamProcessorManager::syncStartAll() {
260
261     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessor streams to start running...\n");
262     // we have to wait until all streamprocessors indicate that they are running
263     // i.e. that there is actually some data stream flowing
264     int wait_cycles=RUNNING_TIMEOUT_MSEC; // two seconds
265     bool notRunning=true;
266     while (notRunning && wait_cycles) {
267         wait_cycles--;
268         notRunning=false;
269        
270         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
271                 it != m_ReceiveProcessors.end();
272                 ++it ) {
273             if(!(*it)->isRunning()) notRunning=true;
274         }
275
276         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
277                 it != m_TransmitProcessors.end();
278                 ++it ) {
279             if(!(*it)->isRunning()) notRunning=true;
280         }
281         usleep(1000);
282         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Running check: %d\n",notRunning);
283     }
284
285     if(!wait_cycles) { // timout has occurred
286         debugFatal("One or more streams are not starting up (timeout):\n");
287                    
288         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
289                 it != m_ReceiveProcessors.end();
290                 ++it ) {
291             if(!(*it)->isRunning()) {
292                 debugFatal(" receive stream %p not running\n",*it);
293             } else {   
294                 debugFatal(" receive stream %p running\n",*it);
295             }
296         }
297
298         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
299                 it != m_TransmitProcessors.end();
300                 ++it ) {
301             if(!(*it)->isRunning()) {
302                 debugFatal(" transmit stream %p not running\n",*it);
303             } else {   
304                 debugFatal(" transmit stream %p running\n",*it);
305             }
306         }
307        
308         return false;
309     }
310
311     // we want to make sure that everything is running well,
312     // so wait for a while
313     usleep(USECS_PER_CYCLE * CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL);
314
315     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
316    
317     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
318
319     int max_of_min_delay=0;
320     int min_delay=0;
321     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
322             it != m_ReceiveProcessors.end();
323             ++it ) {
324         min_delay=(*it)->getMinimalSyncDelay();
325         if(min_delay>max_of_min_delay) max_of_min_delay=min_delay;
326     }
327    
328     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
329             it != m_TransmitProcessors.end();
330             ++it ) {
331         min_delay=(*it)->getMinimalSyncDelay();
332         if(min_delay>max_of_min_delay) max_of_min_delay=min_delay;
333     }
334    
335     debugOutput( DEBUG_LEVEL_VERBOSE, "  %d ticks\n", max_of_min_delay);
336     m_SyncSource->setSyncDelay(max_of_min_delay);
337    
338    
339     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting StreamProcessors...\n");
340     // now we reset the frame counters
341     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
342             it != m_ReceiveProcessors.end();
343             ++it ) {
344         (*it)->reset();
345     }
346    
347     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
348             it != m_TransmitProcessors.end();
349             ++it ) {
350         (*it)->reset();
351     }
352        
353     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n");
354    
355     uint64_t now=m_SyncSource->getTimeNow(); // fixme: should be in usecs, not ticks
356    
357     // FIXME: this should not be in cycles, but in 'time'
358     unsigned int enable_at=TICKS_TO_CYCLES(now)+2000;
359     if (enable_at > 8000) enable_at -= 8000;
360
361     if (!enableStreamProcessors(enable_at)) {
362         debugFatal("Could not enable StreamProcessors...\n");
363         return false;
364     }
365
366     return true;
367 }
368
369 bool StreamProcessorManager::start() {
370         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
371         assert(m_isoManager);
372        
373         debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
374         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
375         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
376                 it != m_ReceiveProcessors.end();
377                 ++it ) {
378                         if (!(*it)->prepareForStart()) {
379                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Receive stream processor (%p) failed to prepare for start\n", *it);
380                                 return false;
381                         }
382                         if (!m_isoManager->registerStream(*it)) {
383                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
384                                 return false;
385                         }
386                 }
387
388         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
389         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
390                 it != m_TransmitProcessors.end();
391                 ++it ) {
392                         if (!(*it)->prepareForStart()) {
393                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Transmit stream processor (%p) failed to prepare for start\n", *it);
394                                 return false;
395                         }
396                         if (!m_isoManager->registerStream(*it)) {
397                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
398                                 return false;
399                         }
400                 }
401
402         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
403         if (!m_isoManager->prepare()) {
404                 debugFatal("Could not prepare isoManager\n");
405                 return false;
406         }
407
408         debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
409         if (!disableStreamProcessors()) {
410                 debugFatal("Could not disable StreamProcessors...\n");
411                 return false;
412         }
413                
414         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n");
415         if (!m_isoManager->startHandlers(-1)) {
416                 debugFatal("Could not start handlers...\n");
417                 return false;
418         }
419
420         // start all SP's synchonized
421         if (!syncStartAll()) {
422                 debugFatal("Could not syncStartAll...\n");
423                 return false;
424         }
425        
426         // dump the iso stream information when in verbose mode
427         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
428                 m_isoManager->dumpInfo();
429         }
430        
431         return true;
432        
433 }
434
435 bool StreamProcessorManager::stop() {
436         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
437         assert(m_isoManager);
438
439         debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to prepare to stop...\n");
440         // Most stream processors can just stop without special treatment.  However, some
441         // (like the MOTU) need to do a few things before it's safe to turn off the iso
442         // handling.
443         int wait_cycles=PREPARE_TIMEOUT_MSEC; // two seconds ought to be sufficient
444         bool allReady = false;
445         while (!allReady && wait_cycles) {
446                 wait_cycles--;
447                 allReady = true;
448                
449                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
450                         it != m_ReceiveProcessors.end();
451                         ++it ) {
452                         if(!(*it)->prepareForStop()) allReady = false;
453                 }
454        
455                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
456                         it != m_TransmitProcessors.end();
457                         ++it ) {
458                         if(!(*it)->prepareForStop()) allReady = false;
459                 }
460                 usleep(1000);
461         }
462
463        
464         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n");
465         if(!m_isoManager->stopHandlers()) {
466            debugFatal("Could not stop ISO handlers\n");
467            return false;
468         }
469        
470         debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processors from handlers...\n");
471     // now unregister all streams from iso manager
472         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
473         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
474                 it != m_ReceiveProcessors.end();
475                 ++it ) {
476                         if (!m_isoManager->unregisterStream(*it)) {
477                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it);
478                                 return false;
479                         }
480                        
481                 }
482
483         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
484         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
485                 it != m_TransmitProcessors.end();
486                 ++it ) {
487                         if (!m_isoManager->unregisterStream(*it)) {
488                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it);
489                                 return false;
490                         }
491                        
492                 }
493        
494         return true;
495        
496 }
497
498 /**
499  * Enables the registered StreamProcessors
500  * @return true if successful, false otherwise
501  */
502 bool StreamProcessorManager::enableStreamProcessors(uint64_t time_to_enable_at) {
503     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors at %llu...\n", time_to_enable_at);
504
505     debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor (%p)...\n",m_SyncSource);
506     debugOutput( DEBUG_LEVEL_VERBOSE, "  Prepare...\n");
507     if (!m_SyncSource->prepareForEnable(time_to_enable_at)) {
508             debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n");
509         return false;
510     }
511
512     debugOutput( DEBUG_LEVEL_VERBOSE, "  Enable...\n");
513     m_SyncSource->enable(time_to_enable_at);
514    
515     debugOutput( DEBUG_LEVEL_VERBOSE, " Other StreamProcessors...\n");
516    
517     // we prepare the streamprocessors for enable
518     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
519             it != m_ReceiveProcessors.end();
520             ++it ) {
521         if(*it != m_SyncSource) {
522             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Receive SP (%p)...\n",*it);
523             (*it)->prepareForEnable(time_to_enable_at);
524         }
525     }
526
527     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
528             it != m_TransmitProcessors.end();
529             ++it ) {
530         if(*it != m_SyncSource) {
531             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Transmit SP (%p)...\n",*it);
532             (*it)->prepareForEnable(time_to_enable_at);
533         }
534     }
535
536     // then we enable the streamprocessors
537     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
538             it != m_ReceiveProcessors.end();
539             ++it ) {           
540         if(*it != m_SyncSource) {
541             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Receive SP (%p)...\n",*it);
542             (*it)->enable(time_to_enable_at);
543         }
544     }
545
546     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
547             it != m_TransmitProcessors.end();
548             ++it ) {
549         if(*it != m_SyncSource) {
550             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Transmit SP (%p)...\n",*it);
551             (*it)->enable(time_to_enable_at);
552         }
553     }
554
555     // now we wait for the SP's to get enabled
556     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be enabled...\n");
557     // we have to wait until all streamprocessors indicate that they are running
558     // i.e. that there is actually some data stream flowing
559     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds
560     bool notEnabled=true;
561     while (notEnabled && wait_cycles) {
562         wait_cycles--;
563         notEnabled=false;
564        
565         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
566                 it != m_ReceiveProcessors.end();
567                 ++it ) {
568             if(!(*it)->isEnabled()) notEnabled=true;
569         }
570
571         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
572                 it != m_TransmitProcessors.end();
573                 ++it ) {
574             if(!(*it)->isEnabled()) notEnabled=true;
575         }
576         usleep(1000); // one cycle
577     }
578    
579     if(!wait_cycles) { // timout has occurred
580         debugFatal("One or more streams couldn't be enabled (timeout):\n");
581                    
582         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
583                 it != m_ReceiveProcessors.end();
584                 ++it ) {
585             if(!(*it)->isEnabled()) {
586                     debugFatal(" receive stream %p not enabled\n",*it);
587             } else {   
588                     debugFatal(" receive stream %p enabled\n",*it);
589             }
590         }
591    
592         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
593                 it != m_TransmitProcessors.end();
594                 ++it ) {
595             if(!(*it)->isEnabled()) {
596                     debugFatal(" transmit stream %p not enabled\n",*it);
597             } else {   
598                     debugFatal(" transmit stream %p enabled\n",*it);
599             }
600         }
601         return false;
602     }
603    
604     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors enabled...\n");
605
606     return true;
607 }
608
609 /**
610  * Disables the registered StreamProcessors
611  * @return true if successful, false otherwise
612  */
613 bool StreamProcessorManager::disableStreamProcessors() {
614     // we prepare the streamprocessors for disable
615     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
616             it != m_ReceiveProcessors.end();
617             ++it ) {
618         (*it)->prepareForDisable();
619     }
620
621     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
622             it != m_TransmitProcessors.end();
623             ++it ) {
624         (*it)->prepareForDisable();
625     }
626
627     // then we disable the streamprocessors
628     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
629             it != m_ReceiveProcessors.end();
630             ++it ) {
631         (*it)->disable();
632     }
633
634     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
635             it != m_TransmitProcessors.end();
636             ++it ) {
637         (*it)->disable();
638     }
639
640     // now we wait for the SP's to get disabled
641     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be disabled...\n");
642     // we have to wait until all streamprocessors indicate that they are running
643     // i.e. that there is actually some data stream flowing
644     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds
645     bool enabled=true;
646     while (enabled && wait_cycles) {
647         wait_cycles--;
648         enabled=false;
649        
650         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
651                 it != m_ReceiveProcessors.end();
652                 ++it ) {
653             if((*it)->isEnabled()) enabled=true;
654         }
655
656         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
657                 it != m_TransmitProcessors.end();
658                 ++it ) {
659             if((*it)->isEnabled()) enabled=true;
660         }
661         usleep(1000); // one cycle
662     }
663    
664     if(!wait_cycles) { // timout has occurred
665         debugFatal("One or more streams couldn't be disabled (timeout):\n");
666                    
667         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
668                 it != m_ReceiveProcessors.end();
669                 ++it ) {
670             if(!(*it)->isEnabled()) {
671                     debugFatal(" receive stream %p not enabled\n",*it);
672             } else {   
673                     debugFatal(" receive stream %p enabled\n",*it);
674             }
675         }
676    
677         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
678                 it != m_TransmitProcessors.end();
679                 ++it ) {
680             if(!(*it)->isEnabled()) {
681                     debugFatal(" transmit stream %p not enabled\n",*it);
682             } else {   
683                     debugFatal(" transmit stream %p enabled\n",*it);
684             }
685         }
686         return false;
687     }
688    
689     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors disabled...\n");
690        
691     return true;
692 }
693
694 /**
695  * Called upon Xrun events. This brings all StreamProcessors back
696  * into their starting state, and then carries on streaming. This should
697  * have the same effect as restarting the whole thing.
698  *
699  * @return true if successful, false otherwise
700  */
701 bool StreamProcessorManager::handleXrun() {
702
703         debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
704
705         /*
706          * Reset means:
707          * 1) Disabling the SP's, so that they don't process any packets
708          *    note: the isomanager does keep on delivering/requesting them
709          * 2) Bringing all buffers & streamprocessors into a know state
710          *    - Clear all capture buffers
711          *    - Put nb_periods*period_size of null frames into the playback buffers
712          * 3) Re-enable the SP's
713          */
714         debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
715         if (!disableStreamProcessors()) {
716                 debugFatal("Could not disable StreamProcessors...\n");
717                 return false;
718         }
719
720         debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
721         // start all SP's synchonized
722         if (!syncStartAll()) {
723                 debugFatal("Could not syncStartAll...\n");
724                 return false;
725         }
726
727         debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
728        
729         return true;
730 }
731
732 /**
733  * @brief Waits until the next period of samples is ready
734  *
735  * This function does not return until a full period of samples is (or should be)
736  * ready to be transferred.
737  *
738  * @return true if the period is ready, false if an xrun occurred
739  */
740 bool StreamProcessorManager::waitForPeriod() {
741     int time_till_next_period;
742     bool xrun_occurred=false;
743        
744     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
745
746     assert(m_SyncSource);
747    
748     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodUsecs();
749    
750     while(time_till_next_period > 0) {
751         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
752    
753         // wait for the period
754         usleep(time_till_next_period);
755        
756         // check for underruns on the ISO side,
757         // those should make us bail out of the wait loop
758         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
759             it != m_ReceiveProcessors.end();
760             ++it ) {
761             // a xrun has occurred on the Iso side
762             xrun_occurred |= (*it)->xrunOccurred();
763         }
764         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
765             it != m_TransmitProcessors.end();
766             ++it ) {
767             // a xrun has occurred on the Iso side
768             xrun_occurred |= (*it)->xrunOccurred();
769         }
770
771         // check if we were waked up too soon
772         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodUsecs();
773     }
774    
775     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", time_till_next_period);
776    
777     // this is to notify the client of the delay
778     // that we introduced
779     m_delayed_usecs=time_till_next_period;
780    
781     // we save the 'ideal' time of the transfer at this point,
782     // because we can have interleaved read - process - write
783     // cycles making that we modify a receiving stream's buffer
784     // before we get to writing.
785     // NOTE: before waitForPeriod() is called again, both the transmit
786     //       and the receive processors should have done their transfer.
787     m_time_of_transfer=m_SyncSource->getTimeAtPeriod();
788     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
789         m_time_of_transfer);
790    
791 #ifdef DEBUG
792     int rcv_bf=0, xmt_bf=0;
793     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
794         it != m_ReceiveProcessors.end();
795         ++it ) {
796         rcv_bf = (*it)->getBufferFill();
797     }
798     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
799         it != m_TransmitProcessors.end();
800         ++it ) {
801         xmt_bf = (*it)->getBufferFill();
802     }
803     debugOutput( DEBUG_LEVEL_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
804         m_time_of_transfer,rcv_bf,xmt_bf,rcv_bf+xmt_bf);
805    
806 #endif
807
808     xrun_occurred=false;
809    
810     // check if xruns occurred on the Iso side.
811     // also check if xruns will occur should we transfer() now
812    
813     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
814           it != m_ReceiveProcessors.end();
815           ++it ) {
816         // a xrun has occurred on the Iso side
817         xrun_occurred |= (*it)->xrunOccurred();
818        
819         // if this is true, a xrun will occur
820         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
821        
822 #ifdef DEBUG
823         if ((*it)->xrunOccurred()) {
824             debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it);
825             (*it)->dumpInfo();
826         }
827         if (!((*it)->canClientTransferFrames(m_period))) {
828             debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it);
829             (*it)->dumpInfo();
830         }
831 #endif
832        
833     }
834     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
835           it != m_TransmitProcessors.end();
836           ++it ) {
837         // a xrun has occurred on the Iso side
838         xrun_occurred |= (*it)->xrunOccurred();
839        
840         // if this is true, a xrun will occur
841         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
842        
843 #ifdef DEBUG
844         if ((*it)->xrunOccurred()) {
845             debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it);
846         }
847         if (!((*it)->canClientTransferFrames(m_period))) {
848             debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it);
849         }
850 #endif       
851     }
852    
853     m_nbperiods++;
854    
855     // now we can signal the client that we are (should be) ready
856     return !xrun_occurred;
857 }
858
859 /**
860  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
861  *
862  * Transfers one period of frames from the client side to the Iso side and vice versa.
863  *
864  * @return true if successful, false otherwise (indicates xrun).
865  */
866 bool StreamProcessorManager::transfer() {
867    
868     debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n");
869
870     if (!transfer(StreamProcessor::E_Receive)) return false;
871     if (!transfer(StreamProcessor::E_Transmit)) return false;
872
873     return true;
874 }
875
876 /**
877  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
878  *
879  * Transfers one period of frames from the client side to the Iso side or vice versa.
880  *
881  * @param t The processor type to tranfer for (receive or transmit)
882  * @return true if successful, false otherwise (indicates xrun).
883  */
884
885 bool StreamProcessorManager::transfer(enum StreamProcessor::EProcessorType t) {
886     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
887    
888     // a static cast could make sure that there is no performance
889     // penalty for the virtual functions (to be checked)
890     if (t==StreamProcessor::E_Receive) {
891         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
892                 it != m_ReceiveProcessors.end();
893                 ++it ) {
894                
895             if(!(*it)->getFrames(m_period)) {
896                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)",
897                             m_period, m_time_of_transfer,*it);
898                     return false; // buffer underrun
899             }
900
901         }
902     } else {
903         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
904                 it != m_TransmitProcessors.end();
905                 ++it ) {
906                
907             if(!(*it)->putFrames(m_period, (int64_t)m_time_of_transfer)) {
908                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)",
909                         m_period, m_time_of_transfer, *it);
910                 return false; // buffer overrun
911             }
912
913         }
914     }
915
916     return true;
917 }
918
919 void StreamProcessorManager::dumpInfo() {
920         debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
921         debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
922         debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
923
924         debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
925         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
926                 it != m_ReceiveProcessors.end();
927                 ++it ) {
928                 (*it)->dumpInfo();
929         }
930
931         debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
932         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
933                 it != m_TransmitProcessors.end();
934                 ++it ) {
935                 (*it)->dumpInfo();
936         }
937
938         debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
939         m_isoManager->dumpInfo();
940         debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
941
942 }
943
944 void StreamProcessorManager::setVerboseLevel(int l) {
945         setDebugLevel(l);
946
947         if (m_isoManager) m_isoManager->setVerboseLevel(l);
948
949         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
950         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
951                 it != m_ReceiveProcessors.end();
952                 ++it ) {
953                 (*it)->setVerboseLevel(l);
954         }
955
956         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
957         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
958                 it != m_TransmitProcessors.end();
959                 ++it ) {
960                 (*it)->setVerboseLevel(l);
961         }
962 }
963
964
965 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
966         int count=0;
967
968         if (direction == Port::E_Capture) {
969                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
970                         it != m_ReceiveProcessors.end();
971                         ++it ) {
972                         count += (*it)->getPortCount(type);
973                 }
974         } else {
975                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
976                         it != m_TransmitProcessors.end();
977                         ++it ) {
978                         count += (*it)->getPortCount(type);
979                 }
980         }
981         return count;
982 }
983
984 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
985         int count=0;
986
987         if (direction == Port::E_Capture) {
988                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
989                         it != m_ReceiveProcessors.end();
990                         ++it ) {
991                         count += (*it)->getPortCount();
992                 }
993         } else {
994                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
995                         it != m_TransmitProcessors.end();
996                         ++it ) {
997                         count += (*it)->getPortCount();
998                 }
999         }
1000         return count;
1001 }
1002
1003 // TODO: implement a port map here, instead of the loop
1004
1005 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1006         int count=0;
1007         int prevcount=0;
1008
1009         if (direction == Port::E_Capture) {
1010                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1011                         it != m_ReceiveProcessors.end();
1012                         ++it ) {
1013                         count += (*it)->getPortCount();
1014                         if (count > idx) {
1015                                 return (*it)->getPortAtIdx(idx-prevcount);
1016                         }
1017                         prevcount=count;
1018                 }
1019         } else {
1020                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1021                         it != m_TransmitProcessors.end();
1022                         ++it ) {
1023                         count += (*it)->getPortCount();
1024                         if (count > idx) {
1025                                 return (*it)->getPortAtIdx(idx-prevcount);
1026                         }
1027                         prevcount=count;
1028                 }
1029         }
1030         return NULL;
1031 }
1032
1033 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1034     m_thread_realtime=rt;
1035     m_thread_priority=priority;
1036     return true;
1037 }
1038
1039
1040 } // end of namespace
Note: See TracBrowser for help on using the browser.