root/branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp

Revision 411, 33.0 kB (checked in by pieterpalmers, 16 years ago)

cycletimer.h:
- some extra operations on Ticks (diffTicks & substractTicks)

StreamProcessor?.cpp
AmdtpStreamProcessor?.cpp
MotuStreamProcessor?.cpp:
- Moved the syncDelay to StreamProcessor::getTimeUntilNextPeriodSignalUsecs(). This delay should be the delay between the actual period boundary and the time it is reported to the SPManager. Therefore it's place is not as a buffer offset, but in the calculation of the signalling time.
This makes that the buffer timestamps correspond to 'real' timestamps. These might have to be manipulated by the transmit or receive handles to account for e.g. iso buffering etc..., but at least the timestamps themselves have a well-defined meaning now.

StreamProcessorManager?.cpp:
- The only stream that needs to be running is the sync source stream. It is assumed that the other streams start running in time. 'In time' is currently about 2000 cycles afterwards.

Line 
1 /* $Id$ */
2
3 /*
4  *   FreeBob Streaming API
5  *   FreeBob = Firewire (pro-)audio for linux
6  *
7  *   http://freebob.sf.net
8  *
9  *   Copyright (C) 2005,2006 Pieter Palmers <pieterpalmers@users.sourceforge.net>
10  *
11  *   This program is free software {} you can redistribute it and/or modify
12  *   it under the terms of the GNU General Public License as published by
13  *   the Free Software Foundation {} either version 2 of the License, or
14  *   (at your option) any later version.
15  *
16  *   This program is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY {} without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with this program {} if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  *
26  *
27  */
28
29 #include "StreamProcessorManager.h"
30 #include "StreamProcessor.h"
31 #include "Port.h"
32 #include <errno.h>
33 #include <assert.h>
34
35 #include "libstreaming/cycletimer.h"
36
37 #define CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL 50
38
39 #define RUNNING_TIMEOUT_MSEC 4000
40 #define PREPARE_TIMEOUT_MSEC 4000
41 #define ENABLE_TIMEOUT_MSEC 4000
42
43 #define ENABLE_DELAY_CYCLES 2000
44
45 namespace FreebobStreaming {
46
47 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_NORMAL );
48
49 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int nb_buffers)
50         : m_SyncSource(NULL), m_nb_buffers(nb_buffers), m_period(period), m_xruns(0),
51         m_isoManager(0), m_nbperiods(0) {
52
53 }
54
55 StreamProcessorManager::~StreamProcessorManager() {
56         if (m_isoManager) delete m_isoManager;
57        
58 }
59
60 /**
61  * Registers \ref processor with this manager.
62  *
63  * also registers it with the isohandlermanager
64  *
65  * be sure to call isohandlermanager->init() first!
66  * and be sure that the processors are also ->init()'ed
67  *
68  * @param processor
69  * @return true if successfull
70  */
71 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
72 {
73         debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
74         assert(processor);
75         assert(m_isoManager);
76
77         if (processor->getType()==StreamProcessor::E_Receive) {
78                 processor->setVerboseLevel(getDebugLevel()); // inherit debug level
79                
80                 m_ReceiveProcessors.push_back(processor);
81                
82                 processor->setManager(this);
83                                
84                 return true;
85         }
86        
87         if (processor->getType()==StreamProcessor::E_Transmit) {
88                 processor->setVerboseLevel(getDebugLevel()); // inherit debug level
89                
90                 m_TransmitProcessors.push_back(processor);
91                
92                 processor->setManager(this);
93                
94                 return true;
95         }
96
97         debugFatal("Unsupported processor type!\n");
98        
99         return false;
100 }
101
102 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
103 {
104         debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
105         assert(processor);
106
107         if (processor->getType()==StreamProcessor::E_Receive) {
108
109                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
110                         it != m_ReceiveProcessors.end();
111                         ++it ) {
112
113                         if ( *it == processor ) {
114                                         m_ReceiveProcessors.erase(it);
115                                        
116                                         processor->clearManager();
117                                        
118                                         if(!m_isoManager->unregisterStream(processor)) {
119                                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
120                                                
121                                                 return false;
122                                                
123                                         }
124                                        
125                                         return true;
126                                 }
127                 }
128         }
129
130         if (processor->getType()==StreamProcessor::E_Transmit) {
131                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
132                         it != m_TransmitProcessors.end();
133                         ++it ) {
134
135                         if ( *it == processor ) {
136                                         m_TransmitProcessors.erase(it);
137                                        
138                                         processor->clearManager();
139                                        
140                                         if(!m_isoManager->unregisterStream(processor)) {
141                                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
142                                                
143                                                 return false;
144                                                
145                                         }
146                                        
147                                         return true;
148                                 }
149                 }
150         }
151        
152         debugFatal("Processor (%p) not found!\n",processor);
153
154         return false; //not found
155
156 }
157
158 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
159     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
160
161     m_SyncSource=s;
162     return true;
163 }
164
165 StreamProcessor *StreamProcessorManager::getSyncSource() {
166     return m_SyncSource;
167 }
168
169 bool StreamProcessorManager::init()
170 {
171         debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
172
173         m_isoManager=new IsoHandlerManager(m_thread_realtime, m_thread_priority);
174        
175         if(!m_isoManager) {
176                 debugFatal("Could not create IsoHandlerManager\n");
177                 return false;
178         }
179        
180         // propagate the debug level
181         m_isoManager->setVerboseLevel(getDebugLevel());
182        
183         if(!m_isoManager->init()) {
184                 debugFatal("Could not initialize IsoHandlerManager\n");
185                 return false;
186         }
187        
188         m_xrun_happened=false;
189        
190         return true;
191 }
192
193 bool StreamProcessorManager::prepare() {
194
195         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
196        
197         // if no sync source is set, select one here
198         if(m_SyncSource == NULL) {
199            debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
200         }
201        
202         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
203                 it != m_ReceiveProcessors.end();
204                 ++it ) {
205                         if(m_SyncSource == NULL) {
206                                 debugWarning(" => Sync Source is %p.\n", *it);
207                                 m_SyncSource = *it;
208                         }
209         }
210
211         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
212                 it != m_TransmitProcessors.end();
213                 ++it ) {
214                         if(m_SyncSource == NULL) {
215                                 debugWarning(" => Sync Source is %p.\n", *it);
216                                 m_SyncSource = *it;
217                         }
218         }
219
220         // now do the actual preparation
221         debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
222         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
223                 it != m_ReceiveProcessors.end();
224                 ++it ) {
225                         if(!(*it)->setSyncSource(m_SyncSource)) {
226                                 debugFatal(  " could not set sync source (%p)...\n",(*it));
227                                 return false;
228                         }
229                        
230                         if(!(*it)->prepare()) {
231                                 debugFatal(  " could not prepare (%p)...\n",(*it));
232                                 return false;
233                         }
234         }
235
236         debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
237         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
238                 it != m_TransmitProcessors.end();
239                 ++it ) {
240                         if(!(*it)->setSyncSource(m_SyncSource)) {
241                                 debugFatal(  " could not set sync source (%p)...\n",(*it));
242                                 return false;
243                         }               
244                         if(!(*it)->prepare()) {
245                                 debugFatal( " could not prepare (%p)...\n",(*it));
246                                 return false;
247                         }
248         }
249
250     // if there are no stream processors registered,
251     // fail
252     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
253         debugFatal("No stream processors registered, can't do anything usefull\n");
254         return false;
255     }
256
257         return true;
258 }
259
260
261 bool StreamProcessorManager::syncStartAll() {
262
263     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for StreamProcessor streams to start running...\n");
264     // we have to wait until all streamprocessors indicate that they are running
265     // i.e. that there is actually some data stream flowing
266     int wait_cycles=RUNNING_TIMEOUT_MSEC; // two seconds
267     bool notRunning=true;
268     while (notRunning && wait_cycles) {
269         wait_cycles--;
270         notRunning=false;
271        
272 //         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
273 //                 it != m_ReceiveProcessors.end();
274 //                 ++it ) {
275 //             if(!(*it)->isRunning()) notRunning=true;
276 //         }
277 //
278 //         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
279 //                 it != m_TransmitProcessors.end();
280 //                 ++it ) {
281 //             if(!(*it)->isRunning()) notRunning=true;
282 //         }
283        
284         // EXPERIMENT:
285         // the only stream that should be running is the sync
286         // source stream, as this is the one that defines
287         // when to signal buffers. Maybe we get an xrun at startup,
288         // but that should be handled.
289        
290         // the problem is that otherwise a setup with a device
291         // that waits for decent input before sending output
292         // will not start up (e.g. the bounce device), because
293         // all streams are required to be running.
294        
295         // other streams still have at least ENABLE_DELAY_CYCLES cycles
296         // to start up
297         if(!m_SyncSource->isRunning()) notRunning=true;
298        
299         usleep(1000);
300         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Running check: %d\n",notRunning);
301     }
302
303     if(!wait_cycles) { // timout has occurred
304         debugFatal("One or more streams are not starting up (timeout):\n");
305
306         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
307                 it != m_ReceiveProcessors.end();
308                 ++it ) {
309             if(!(*it)->isRunning()) {
310                 debugFatal(" receive stream %p not running\n",*it);
311             } else {   
312                 debugFatal(" receive stream %p running\n",*it);
313             }
314         }
315
316         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
317                 it != m_TransmitProcessors.end();
318                 ++it ) {
319             if(!(*it)->isRunning()) {
320                 debugFatal(" transmit stream %p not running\n",*it);
321             } else {   
322                 debugFatal(" transmit stream %p running\n",*it);
323             }
324         }
325         return false;
326     }
327
328     // we want to make sure that everything is running well,
329     // so wait for a while
330     usleep(USECS_PER_CYCLE * CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL);
331
332     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
333    
334     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
335
336     int max_of_min_delay=0;
337     int min_delay=0;
338     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
339             it != m_ReceiveProcessors.end();
340             ++it ) {
341         min_delay=(*it)->getMinimalSyncDelay();
342         if(min_delay>max_of_min_delay) max_of_min_delay=min_delay;
343     }
344    
345     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
346             it != m_TransmitProcessors.end();
347             ++it ) {
348         min_delay=(*it)->getMinimalSyncDelay();
349         if(min_delay>max_of_min_delay) max_of_min_delay=min_delay;
350     }
351    
352     debugOutput( DEBUG_LEVEL_VERBOSE, "  %d ticks\n", max_of_min_delay);
353     m_SyncSource->setSyncDelay(max_of_min_delay);
354    
355    
356     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting StreamProcessors...\n");
357     // now we reset the frame counters
358     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
359             it != m_ReceiveProcessors.end();
360             ++it ) {
361         (*it)->reset();
362     }
363    
364     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
365             it != m_TransmitProcessors.end();
366             ++it ) {
367         (*it)->reset();
368     }
369        
370     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n");
371    
372     uint64_t now=m_SyncSource->getTimeNow(); // fixme: should be in usecs, not ticks
373    
374     // FIXME: this should not be in cycles, but in 'time'
375     unsigned int enable_at=TICKS_TO_CYCLES(now)+ENABLE_DELAY_CYCLES;
376     if (enable_at > 8000) enable_at -= 8000;
377
378     if (!enableStreamProcessors(enable_at)) {
379         debugFatal("Could not enable StreamProcessors...\n");
380         return false;
381     }
382
383     return true;
384 }
385
386 bool StreamProcessorManager::start() {
387         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
388         assert(m_isoManager);
389        
390         debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
391         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
392         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
393                 it != m_ReceiveProcessors.end();
394                 ++it ) {
395                         if (!(*it)->prepareForStart()) {
396                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Receive stream processor (%p) failed to prepare for start\n", *it);
397                                 return false;
398                         }
399                         if (!m_isoManager->registerStream(*it)) {
400                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
401                                 return false;
402                         }
403                 }
404
405         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
406         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
407                 it != m_TransmitProcessors.end();
408                 ++it ) {
409                         if (!(*it)->prepareForStart()) {
410                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Transmit stream processor (%p) failed to prepare for start\n", *it);
411                                 return false;
412                         }
413                         if (!m_isoManager->registerStream(*it)) {
414                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
415                                 return false;
416                         }
417                 }
418
419         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
420         if (!m_isoManager->prepare()) {
421                 debugFatal("Could not prepare isoManager\n");
422                 return false;
423         }
424
425         debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
426         if (!disableStreamProcessors()) {
427                 debugFatal("Could not disable StreamProcessors...\n");
428                 return false;
429         }
430                
431         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n");
432         if (!m_isoManager->startHandlers(-1)) {
433                 debugFatal("Could not start handlers...\n");
434                 return false;
435         }
436
437         // start all SP's synchonized
438         if (!syncStartAll()) {
439                 debugFatal("Could not syncStartAll...\n");
440                 return false;
441         }
442        
443         // dump the iso stream information when in verbose mode
444         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
445                 m_isoManager->dumpInfo();
446         }
447        
448         return true;
449        
450 }
451
452 bool StreamProcessorManager::stop() {
453         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
454         assert(m_isoManager);
455
456         debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to prepare to stop...\n");
457         // Most stream processors can just stop without special treatment.  However, some
458         // (like the MOTU) need to do a few things before it's safe to turn off the iso
459         // handling.
460         int wait_cycles=PREPARE_TIMEOUT_MSEC; // two seconds ought to be sufficient
461         bool allReady = false;
462         while (!allReady && wait_cycles) {
463                 wait_cycles--;
464                 allReady = true;
465                
466                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
467                         it != m_ReceiveProcessors.end();
468                         ++it ) {
469                         if(!(*it)->prepareForStop()) allReady = false;
470                 }
471        
472                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
473                         it != m_TransmitProcessors.end();
474                         ++it ) {
475                         if(!(*it)->prepareForStop()) allReady = false;
476                 }
477                 usleep(1000);
478         }
479
480        
481         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n");
482         if(!m_isoManager->stopHandlers()) {
483            debugFatal("Could not stop ISO handlers\n");
484            return false;
485         }
486        
487         debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processors from handlers...\n");
488     // now unregister all streams from iso manager
489         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
490         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
491                 it != m_ReceiveProcessors.end();
492                 ++it ) {
493                         if (!m_isoManager->unregisterStream(*it)) {
494                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it);
495                                 return false;
496                         }
497                        
498                 }
499
500         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
501         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
502                 it != m_TransmitProcessors.end();
503                 ++it ) {
504                         if (!m_isoManager->unregisterStream(*it)) {
505                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it);
506                                 return false;
507                         }
508                        
509                 }
510        
511         return true;
512        
513 }
514
515 /**
516  * Enables the registered StreamProcessors
517  * @return true if successful, false otherwise
518  */
519 bool StreamProcessorManager::enableStreamProcessors(uint64_t time_to_enable_at) {
520     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors at %llu...\n", time_to_enable_at);
521
522     debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor (%p)...\n",m_SyncSource);
523     debugOutput( DEBUG_LEVEL_VERBOSE, "  Prepare...\n");
524     if (!m_SyncSource->prepareForEnable(time_to_enable_at)) {
525             debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n");
526         return false;
527     }
528
529     debugOutput( DEBUG_LEVEL_VERBOSE, "  Enable...\n");
530     m_SyncSource->enable(time_to_enable_at);
531    
532     debugOutput( DEBUG_LEVEL_VERBOSE, " Other StreamProcessors...\n");
533    
534     // we prepare the streamprocessors for enable
535     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
536             it != m_ReceiveProcessors.end();
537             ++it ) {
538         if(*it != m_SyncSource) {
539             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Receive SP (%p)...\n",*it);
540             (*it)->prepareForEnable(time_to_enable_at);
541         }
542     }
543
544     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
545             it != m_TransmitProcessors.end();
546             ++it ) {
547         if(*it != m_SyncSource) {
548             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Transmit SP (%p)...\n",*it);
549             (*it)->prepareForEnable(time_to_enable_at);
550         }
551     }
552
553     // then we enable the streamprocessors
554     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
555             it != m_ReceiveProcessors.end();
556             ++it ) {           
557         if(*it != m_SyncSource) {
558             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Receive SP (%p)...\n",*it);
559             (*it)->enable(time_to_enable_at);
560         }
561     }
562
563     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
564             it != m_TransmitProcessors.end();
565             ++it ) {
566         if(*it != m_SyncSource) {
567             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Transmit SP (%p)...\n",*it);
568             (*it)->enable(time_to_enable_at);
569         }
570     }
571
572     // now we wait for the SP's to get enabled
573     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be enabled...\n");
574     // we have to wait until all streamprocessors indicate that they are running
575     // i.e. that there is actually some data stream flowing
576     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds
577     bool notEnabled=true;
578     while (notEnabled && wait_cycles) {
579         wait_cycles--;
580         notEnabled=false;
581        
582         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
583                 it != m_ReceiveProcessors.end();
584                 ++it ) {
585             if(!(*it)->isEnabled()) notEnabled=true;
586         }
587
588         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
589                 it != m_TransmitProcessors.end();
590                 ++it ) {
591             if(!(*it)->isEnabled()) notEnabled=true;
592         }
593         usleep(1000); // one cycle
594     }
595    
596     if(!wait_cycles) { // timout has occurred
597         debugFatal("One or more streams couldn't be enabled (timeout):\n");
598                    
599         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
600                 it != m_ReceiveProcessors.end();
601                 ++it ) {
602             if(!(*it)->isEnabled()) {
603                     debugFatal(" receive stream %p not enabled\n",*it);
604             } else {   
605                     debugFatal(" receive stream %p enabled\n",*it);
606             }
607         }
608    
609         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
610                 it != m_TransmitProcessors.end();
611                 ++it ) {
612             if(!(*it)->isEnabled()) {
613                     debugFatal(" transmit stream %p not enabled\n",*it);
614             } else {   
615                     debugFatal(" transmit stream %p enabled\n",*it);
616             }
617         }
618         return false;
619     }
620    
621     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors enabled...\n");
622
623     return true;
624 }
625
626 /**
627  * Disables the registered StreamProcessors
628  * @return true if successful, false otherwise
629  */
630 bool StreamProcessorManager::disableStreamProcessors() {
631     // we prepare the streamprocessors for disable
632     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
633             it != m_ReceiveProcessors.end();
634             ++it ) {
635         (*it)->prepareForDisable();
636     }
637
638     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
639             it != m_TransmitProcessors.end();
640             ++it ) {
641         (*it)->prepareForDisable();
642     }
643
644     // then we disable the streamprocessors
645     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
646             it != m_ReceiveProcessors.end();
647             ++it ) {
648         (*it)->disable();
649     }
650
651     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
652             it != m_TransmitProcessors.end();
653             ++it ) {
654         (*it)->disable();
655     }
656
657     // now we wait for the SP's to get disabled
658     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be disabled...\n");
659     // we have to wait until all streamprocessors indicate that they are running
660     // i.e. that there is actually some data stream flowing
661     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds
662     bool enabled=true;
663     while (enabled && wait_cycles) {
664         wait_cycles--;
665         enabled=false;
666        
667         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
668                 it != m_ReceiveProcessors.end();
669                 ++it ) {
670             if((*it)->isEnabled()) enabled=true;
671         }
672
673         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
674                 it != m_TransmitProcessors.end();
675                 ++it ) {
676             if((*it)->isEnabled()) enabled=true;
677         }
678         usleep(1000); // one cycle
679     }
680    
681     if(!wait_cycles) { // timout has occurred
682         debugFatal("One or more streams couldn't be disabled (timeout):\n");
683                    
684         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
685                 it != m_ReceiveProcessors.end();
686                 ++it ) {
687             if(!(*it)->isEnabled()) {
688                     debugFatal(" receive stream %p not enabled\n",*it);
689             } else {   
690                     debugFatal(" receive stream %p enabled\n",*it);
691             }
692         }
693    
694         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
695                 it != m_TransmitProcessors.end();
696                 ++it ) {
697             if(!(*it)->isEnabled()) {
698                     debugFatal(" transmit stream %p not enabled\n",*it);
699             } else {   
700                     debugFatal(" transmit stream %p enabled\n",*it);
701             }
702         }
703         return false;
704     }
705    
706     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors disabled...\n");
707        
708     return true;
709 }
710
711 /**
712  * Called upon Xrun events. This brings all StreamProcessors back
713  * into their starting state, and then carries on streaming. This should
714  * have the same effect as restarting the whole thing.
715  *
716  * @return true if successful, false otherwise
717  */
718 bool StreamProcessorManager::handleXrun() {
719
720         debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
721
722         /*
723          * Reset means:
724          * 1) Disabling the SP's, so that they don't process any packets
725          *    note: the isomanager does keep on delivering/requesting them
726          * 2) Bringing all buffers & streamprocessors into a know state
727          *    - Clear all capture buffers
728          *    - Put nb_periods*period_size of null frames into the playback buffers
729          * 3) Re-enable the SP's
730          */
731         debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
732         if (!disableStreamProcessors()) {
733                 debugFatal("Could not disable StreamProcessors...\n");
734                 return false;
735         }
736
737         debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
738         // start all SP's synchonized
739         if (!syncStartAll()) {
740                 debugFatal("Could not syncStartAll...\n");
741                 return false;
742         }
743
744         debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
745        
746         return true;
747 }
748
749 /**
750  * @brief Waits until the next period of samples is ready
751  *
752  * This function does not return until a full period of samples is (or should be)
753  * ready to be transferred.
754  *
755  * @return true if the period is ready, false if an xrun occurred
756  */
757 bool StreamProcessorManager::waitForPeriod() {
758     int time_till_next_period;
759     bool xrun_occurred=false;
760        
761     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
762
763     assert(m_SyncSource);
764    
765     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
766    
767     while(time_till_next_period > 0) {
768         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
769    
770         // wait for the period
771         usleep(time_till_next_period);
772        
773         // check for underruns on the ISO side,
774         // those should make us bail out of the wait loop
775         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
776             it != m_ReceiveProcessors.end();
777             ++it ) {
778             // a xrun has occurred on the Iso side
779             xrun_occurred |= (*it)->xrunOccurred();
780         }
781         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
782             it != m_TransmitProcessors.end();
783             ++it ) {
784             // a xrun has occurred on the Iso side
785             xrun_occurred |= (*it)->xrunOccurred();
786         }
787
788         // check if we were waked up too soon
789         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
790     }
791    
792     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", time_till_next_period);
793    
794     // this is to notify the client of the delay
795     // that we introduced
796     m_delayed_usecs=time_till_next_period;
797    
798     // we save the 'ideal' time of the transfer at this point,
799     // because we can have interleaved read - process - write
800     // cycles making that we modify a receiving stream's buffer
801     // before we get to writing.
802     // NOTE: before waitForPeriod() is called again, both the transmit
803     //       and the receive processors should have done their transfer.
804     m_time_of_transfer=m_SyncSource->getTimeAtPeriod();
805     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
806         m_time_of_transfer);
807    
808 #ifdef DEBUG
809     int rcv_bf=0, xmt_bf=0;
810     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
811         it != m_ReceiveProcessors.end();
812         ++it ) {
813         rcv_bf = (*it)->getBufferFill();
814     }
815     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
816         it != m_TransmitProcessors.end();
817         ++it ) {
818         xmt_bf = (*it)->getBufferFill();
819     }
820     debugOutput( DEBUG_LEVEL_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
821         m_time_of_transfer,rcv_bf,xmt_bf,rcv_bf+xmt_bf);
822    
823 #endif
824
825     xrun_occurred=false;
826    
827     // check if xruns occurred on the Iso side.
828     // also check if xruns will occur should we transfer() now
829    
830     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
831           it != m_ReceiveProcessors.end();
832           ++it ) {
833         // a xrun has occurred on the Iso side
834         xrun_occurred |= (*it)->xrunOccurred();
835        
836         // if this is true, a xrun will occur
837         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
838        
839 #ifdef DEBUG
840         if ((*it)->xrunOccurred()) {
841             debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it);
842             (*it)->dumpInfo();
843         }
844         if (!((*it)->canClientTransferFrames(m_period))) {
845             debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it);
846             (*it)->dumpInfo();
847         }
848 #endif
849        
850     }
851     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
852           it != m_TransmitProcessors.end();
853           ++it ) {
854         // a xrun has occurred on the Iso side
855         xrun_occurred |= (*it)->xrunOccurred();
856        
857         // if this is true, a xrun will occur
858         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
859        
860 #ifdef DEBUG
861         if ((*it)->xrunOccurred()) {
862             debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it);
863         }
864         if (!((*it)->canClientTransferFrames(m_period))) {
865             debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it);
866         }
867 #endif       
868     }
869    
870     m_nbperiods++;
871    
872     // now we can signal the client that we are (should be) ready
873     return !xrun_occurred;
874 }
875
876 /**
877  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
878  *
879  * Transfers one period of frames from the client side to the Iso side and vice versa.
880  *
881  * @return true if successful, false otherwise (indicates xrun).
882  */
883 bool StreamProcessorManager::transfer() {
884    
885     debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n");
886
887     if (!transfer(StreamProcessor::E_Receive)) return false;
888     if (!transfer(StreamProcessor::E_Transmit)) return false;
889
890     return true;
891 }
892
893 /**
894  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
895  *
896  * Transfers one period of frames from the client side to the Iso side or vice versa.
897  *
898  * @param t The processor type to tranfer for (receive or transmit)
899  * @return true if successful, false otherwise (indicates xrun).
900  */
901
902 bool StreamProcessorManager::transfer(enum StreamProcessor::EProcessorType t) {
903     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
904    
905     // a static cast could make sure that there is no performance
906     // penalty for the virtual functions (to be checked)
907     if (t==StreamProcessor::E_Receive) {
908         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
909                 it != m_ReceiveProcessors.end();
910                 ++it ) {
911                
912             if(!(*it)->getFrames(m_period)) {
913                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)",
914                             m_period, m_time_of_transfer,*it);
915                     return false; // buffer underrun
916             }
917
918         }
919     } else {
920         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
921                 it != m_TransmitProcessors.end();
922                 ++it ) {
923                
924             if(!(*it)->putFrames(m_period, (int64_t)m_time_of_transfer)) {
925                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)",
926                         m_period, m_time_of_transfer, *it);
927                 return false; // buffer overrun
928             }
929
930         }
931     }
932
933     return true;
934 }
935
936 void StreamProcessorManager::dumpInfo() {
937         debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
938         debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
939         debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
940
941         debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
942         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
943                 it != m_ReceiveProcessors.end();
944                 ++it ) {
945                 (*it)->dumpInfo();
946         }
947
948         debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
949         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
950                 it != m_TransmitProcessors.end();
951                 ++it ) {
952                 (*it)->dumpInfo();
953         }
954
955         debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
956         m_isoManager->dumpInfo();
957         debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
958
959 }
960
961 void StreamProcessorManager::setVerboseLevel(int l) {
962         setDebugLevel(l);
963
964         if (m_isoManager) m_isoManager->setVerboseLevel(l);
965
966         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
967         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
968                 it != m_ReceiveProcessors.end();
969                 ++it ) {
970                 (*it)->setVerboseLevel(l);
971         }
972
973         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
974         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
975                 it != m_TransmitProcessors.end();
976                 ++it ) {
977                 (*it)->setVerboseLevel(l);
978         }
979 }
980
981
982 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
983         int count=0;
984
985         if (direction == Port::E_Capture) {
986                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
987                         it != m_ReceiveProcessors.end();
988                         ++it ) {
989                         count += (*it)->getPortCount(type);
990                 }
991         } else {
992                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
993                         it != m_TransmitProcessors.end();
994                         ++it ) {
995                         count += (*it)->getPortCount(type);
996                 }
997         }
998         return count;
999 }
1000
1001 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1002         int count=0;
1003
1004         if (direction == Port::E_Capture) {
1005                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1006                         it != m_ReceiveProcessors.end();
1007                         ++it ) {
1008                         count += (*it)->getPortCount();
1009                 }
1010         } else {
1011                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1012                         it != m_TransmitProcessors.end();
1013                         ++it ) {
1014                         count += (*it)->getPortCount();
1015                 }
1016         }
1017         return count;
1018 }
1019
1020 // TODO: implement a port map here, instead of the loop
1021
1022 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1023         int count=0;
1024         int prevcount=0;
1025
1026         if (direction == Port::E_Capture) {
1027                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1028                         it != m_ReceiveProcessors.end();
1029                         ++it ) {
1030                         count += (*it)->getPortCount();
1031                         if (count > idx) {
1032                                 return (*it)->getPortAtIdx(idx-prevcount);
1033                         }
1034                         prevcount=count;
1035                 }
1036         } else {
1037                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1038                         it != m_TransmitProcessors.end();
1039                         ++it ) {
1040                         count += (*it)->getPortCount();
1041                         if (count > idx) {
1042                                 return (*it)->getPortAtIdx(idx-prevcount);
1043                         }
1044                         prevcount=count;
1045                 }
1046         }
1047         return NULL;
1048 }
1049
1050 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1051     m_thread_realtime=rt;
1052     m_thread_priority=priority;
1053     return true;
1054 }
1055
1056
1057 } // end of namespace
Note: See TracBrowser for help on using the browser.