root/branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp

Revision 398, 31.5 kB (checked in by pieterpalmers, 17 years ago)

remove cycle timer prediction & DLL code from the IsoHandler?, as it is replaced by a raw1394 API call

Line 
1 /* $Id$ */
2
3 /*
4  *   FreeBob Streaming API
5  *   FreeBob = Firewire (pro-)audio for linux
6  *
7  *   http://freebob.sf.net
8  *
9  *   Copyright (C) 2005,2006 Pieter Palmers <pieterpalmers@users.sourceforge.net>
10  *
11  *   This program is free software {} you can redistribute it and/or modify
12  *   it under the terms of the GNU General Public License as published by
13  *   the Free Software Foundation {} either version 2 of the License, or
14  *   (at your option) any later version.
15  *
16  *   This program is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY {} without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with this program {} if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  *
26  *
27  */
28
29 #include "StreamProcessorManager.h"
30 #include "StreamProcessor.h"
31 #include "Port.h"
32 #include <errno.h>
33 #include <assert.h>
34
35 #include "libstreaming/cycletimer.h"
36
37 #define CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL 50
38
39 #define RUNNING_TIMEOUT_MSEC 4000
40 #define PREPARE_TIMEOUT_MSEC 4000
41 #define ENABLE_TIMEOUT_MSEC 4000
42
43 namespace FreebobStreaming {
44
45 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_NORMAL );
46
47 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int nb_buffers)
48         : m_SyncSource(NULL), m_nb_buffers(nb_buffers), m_period(period), m_xruns(0),
49         m_isoManager(0), m_nbperiods(0) {
50
51 }
52
53 StreamProcessorManager::~StreamProcessorManager() {
54         if (m_isoManager) delete m_isoManager;
55        
56 }
57
58 /**
59  * Registers \ref processor with this manager.
60  *
61  * also registers it with the isohandlermanager
62  *
63  * be sure to call isohandlermanager->init() first!
64  * and be sure that the processors are also ->init()'ed
65  *
66  * @param processor
67  * @return true if successfull
68  */
69 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
70 {
71         debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
72         assert(processor);
73         assert(m_isoManager);
74
75         if (processor->getType()==StreamProcessor::E_Receive) {
76                 processor->setVerboseLevel(getDebugLevel()); // inherit debug level
77                
78                 m_ReceiveProcessors.push_back(processor);
79                
80                 processor->setManager(this);
81                                
82                 return true;
83         }
84        
85         if (processor->getType()==StreamProcessor::E_Transmit) {
86                 processor->setVerboseLevel(getDebugLevel()); // inherit debug level
87                
88                 m_TransmitProcessors.push_back(processor);
89                
90                 processor->setManager(this);
91                
92                 return true;
93         }
94
95         debugFatal("Unsupported processor type!\n");
96        
97         return false;
98 }
99
100 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
101 {
102         debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
103         assert(processor);
104
105         if (processor->getType()==StreamProcessor::E_Receive) {
106
107                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
108                         it != m_ReceiveProcessors.end();
109                         ++it ) {
110
111                         if ( *it == processor ) {
112                                         m_ReceiveProcessors.erase(it);
113                                        
114                                         processor->clearManager();
115                                        
116                                         if(!m_isoManager->unregisterStream(processor)) {
117                                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
118                                                
119                                                 return false;
120                                                
121                                         }
122                                        
123                                         return true;
124                                 }
125                 }
126         }
127
128         if (processor->getType()==StreamProcessor::E_Transmit) {
129                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
130                         it != m_TransmitProcessors.end();
131                         ++it ) {
132
133                         if ( *it == processor ) {
134                                         m_TransmitProcessors.erase(it);
135                                        
136                                         processor->clearManager();
137                                        
138                                         if(!m_isoManager->unregisterStream(processor)) {
139                                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
140                                                
141                                                 return false;
142                                                
143                                         }
144                                        
145                                         return true;
146                                 }
147                 }
148         }
149        
150         debugFatal("Processor (%p) not found!\n",processor);
151
152         return false; //not found
153
154 }
155
156 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
157     m_SyncSource=s;
158     return true;
159 }
160
161 StreamProcessor *StreamProcessorManager::getSyncSource() {
162     return m_SyncSource;
163 }
164
165 bool StreamProcessorManager::init()
166 {
167         debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
168
169         m_isoManager=new IsoHandlerManager(m_thread_realtime, m_thread_priority);
170        
171         if(!m_isoManager) {
172                 debugFatal("Could not create IsoHandlerManager\n");
173                 return false;
174         }
175        
176         // propagate the debug level
177         m_isoManager->setVerboseLevel(getDebugLevel());
178        
179         if(!m_isoManager->init()) {
180                 debugFatal("Could not initialize IsoHandlerManager\n");
181                 return false;
182         }
183        
184         m_xrun_happened=false;
185        
186         return true;
187 }
188
189 bool StreamProcessorManager::prepare() {
190
191         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
192        
193         // if no sync source is set, select one here
194         if(m_SyncSource == NULL) {
195            debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
196         }
197        
198         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
199                 it != m_ReceiveProcessors.end();
200                 ++it ) {
201                         if(m_SyncSource == NULL) {
202                                 debugWarning(" => Sync Source is %p.\n", *it);
203                                 m_SyncSource = *it;
204                         }
205         }
206
207         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
208                 it != m_TransmitProcessors.end();
209                 ++it ) {
210                         if(m_SyncSource == NULL) {
211                                 debugWarning(" => Sync Source is %p.\n", *it);
212                                 m_SyncSource = *it;
213                         }
214         }
215
216         // now do the actual preparation
217         debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
218         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
219                 it != m_ReceiveProcessors.end();
220                 ++it ) {
221                         if(!(*it)->setSyncSource(m_SyncSource)) {
222                                 debugFatal(  " could not set sync source (%p)...\n",(*it));
223                                 return false;
224                         }
225                        
226                         if(!(*it)->prepare()) {
227                                 debugFatal(  " could not prepare (%p)...\n",(*it));
228                                 return false;
229                         }
230         }
231
232         debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
233         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
234                 it != m_TransmitProcessors.end();
235                 ++it ) {
236                         if(!(*it)->setSyncSource(m_SyncSource)) {
237                                 debugFatal(  " could not set sync source (%p)...\n",(*it));
238                                 return false;
239                         }               
240                         if(!(*it)->prepare()) {
241                                 debugFatal( " could not prepare (%p)...\n",(*it));
242                                 return false;
243                         }
244         }
245
246     // if there are no stream processors registered,
247     // fail
248     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
249         debugFatal("No stream processors registered, can't do anything usefull\n");
250         return false;
251     }
252
253         return true;
254 }
255
256
257 bool StreamProcessorManager::syncStartAll() {
258
259     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessor streams to start running...\n");
260     // we have to wait until all streamprocessors indicate that they are running
261     // i.e. that there is actually some data stream flowing
262     int wait_cycles=RUNNING_TIMEOUT_MSEC; // two seconds
263     bool notRunning=true;
264     while (notRunning && wait_cycles) {
265         wait_cycles--;
266         notRunning=false;
267        
268         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
269                 it != m_ReceiveProcessors.end();
270                 ++it ) {
271             if(!(*it)->isRunning()) notRunning=true;
272         }
273
274         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
275                 it != m_TransmitProcessors.end();
276                 ++it ) {
277             if(!(*it)->isRunning()) notRunning=true;
278         }
279         usleep(1000);
280         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Running check: %d\n",notRunning);
281     }
282
283     if(!wait_cycles) { // timout has occurred
284         debugFatal("One or more streams are not starting up (timeout):\n");
285                    
286         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
287                 it != m_ReceiveProcessors.end();
288                 ++it ) {
289             if(!(*it)->isRunning()) {
290                 debugFatal(" receive stream %p not running\n",*it);
291             } else {   
292                 debugFatal(" receive stream %p running\n",*it);
293             }
294         }
295
296         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
297                 it != m_TransmitProcessors.end();
298                 ++it ) {
299             if(!(*it)->isRunning()) {
300                 debugFatal(" transmit stream %p not running\n",*it);
301             } else {   
302                 debugFatal(" transmit stream %p running\n",*it);
303             }
304         }
305        
306         return false;
307     }
308
309     // we want to make sure that everything is running well,
310     // so wait for a while
311     usleep(USECS_PER_CYCLE * CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL);
312
313     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
314     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting StreamProcessors...\n");
315
316     // now we reset the frame counters
317     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
318             it != m_ReceiveProcessors.end();
319             ++it ) {
320
321         debugOutput( DEBUG_LEVEL_VERBOSE, "Before:\n");
322
323         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
324             (*it)->dumpInfo();
325         }
326
327         (*it)->reset();
328
329         debugOutput( DEBUG_LEVEL_VERBOSE, "After:\n");
330
331         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
332             (*it)->dumpInfo();
333         }
334
335     }
336    
337     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
338             it != m_TransmitProcessors.end();
339             ++it ) {
340            
341         debugOutput( DEBUG_LEVEL_VERBOSE, "Before:\n");
342        
343         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
344             (*it)->dumpInfo();
345         }
346        
347         (*it)->reset();
348        
349         debugOutput( DEBUG_LEVEL_VERBOSE, "After:\n");
350        
351         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
352             (*it)->dumpInfo();
353         }
354     }
355        
356     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n");
357    
358     uint64_t now=m_SyncSource->getTimeNow(); // fixme: should be in usecs, not ticks
359    
360     // FIXME: this should not be in cycles, but in 'time'
361     unsigned int enable_at=TICKS_TO_CYCLES(now)+2000;
362     if (enable_at > 8000) enable_at -= 8000;
363
364     if (!enableStreamProcessors(enable_at)) {
365         debugFatal("Could not enable StreamProcessors...\n");
366         return false;
367     }
368
369     return true;
370 }
371
372 bool StreamProcessorManager::start() {
373         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
374         assert(m_isoManager);
375        
376         debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
377         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
378         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
379                 it != m_ReceiveProcessors.end();
380                 ++it ) {
381                         if (!(*it)->prepareForStart()) {
382                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Receive stream processor (%p) failed to prepare for start\n", *it);
383                                 return false;
384                         }
385                         if (!m_isoManager->registerStream(*it)) {
386                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
387                                 return false;
388                         }
389                 }
390
391         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
392         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
393                 it != m_TransmitProcessors.end();
394                 ++it ) {
395                         if (!(*it)->prepareForStart()) {
396                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Transmit stream processor (%p) failed to prepare for start\n", *it);
397                                 return false;
398                         }
399                         if (!m_isoManager->registerStream(*it)) {
400                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
401                                 return false;
402                         }
403                 }
404
405         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
406         if (!m_isoManager->prepare()) {
407                 debugFatal("Could not prepare isoManager\n");
408                 return false;
409         }
410
411         debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
412         if (!disableStreamProcessors()) {
413                 debugFatal("Could not disable StreamProcessors...\n");
414                 return false;
415         }
416                
417         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n");
418         if (!m_isoManager->startHandlers(0)) {
419                 debugFatal("Could not start handlers...\n");
420                 return false;
421         }
422
423         // start all SP's synchonized
424         if (!syncStartAll()) {
425                 debugFatal("Could not syncStartAll...\n");
426                 return false;
427         }
428        
429         // dump the iso stream information when in verbose mode
430         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
431                 m_isoManager->dumpInfo();
432         }
433        
434         return true;
435        
436 }
437
438 bool StreamProcessorManager::stop() {
439         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
440         assert(m_isoManager);
441
442         debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to prepare to stop...\n");
443         // Most stream processors can just stop without special treatment.  However, some
444         // (like the MOTU) need to do a few things before it's safe to turn off the iso
445         // handling.
446         int wait_cycles=PREPARE_TIMEOUT_MSEC; // two seconds ought to be sufficient
447         bool allReady = false;
448         while (!allReady && wait_cycles) {
449                 wait_cycles--;
450                 allReady = true;
451                
452                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
453                         it != m_ReceiveProcessors.end();
454                         ++it ) {
455                         if(!(*it)->prepareForStop()) allReady = false;
456                 }
457        
458                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
459                         it != m_TransmitProcessors.end();
460                         ++it ) {
461                         if(!(*it)->prepareForStop()) allReady = false;
462                 }
463                 usleep(1000);
464         }
465
466        
467         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n");
468         if(!m_isoManager->stopHandlers()) {
469            debugFatal("Could not stop ISO handlers\n");
470            return false;
471         }
472        
473         debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processors from handlers...\n");
474     // now unregister all streams from iso manager
475         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
476         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
477                 it != m_ReceiveProcessors.end();
478                 ++it ) {
479                         if (!m_isoManager->unregisterStream(*it)) {
480                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it);
481                                 return false;
482                         }
483                        
484                 }
485
486         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
487         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
488                 it != m_TransmitProcessors.end();
489                 ++it ) {
490                         if (!m_isoManager->unregisterStream(*it)) {
491                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it);
492                                 return false;
493                         }
494                        
495                 }
496        
497         return true;
498        
499 }
500
501 /**
502  * Enables the registered StreamProcessors
503  * @return true if successful, false otherwise
504  */
505 bool StreamProcessorManager::enableStreamProcessors(uint64_t time_to_enable_at) {
506     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors at %llu...\n", time_to_enable_at);
507
508     debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor (%p)...\n",m_SyncSource);
509     debugOutput( DEBUG_LEVEL_VERBOSE, "  Prepare...\n");
510     if (!m_SyncSource->prepareForEnable(time_to_enable_at)) {
511             debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n");
512         return false;
513     }
514
515     debugOutput( DEBUG_LEVEL_VERBOSE, "  Enable...\n");
516     m_SyncSource->enable(time_to_enable_at);
517    
518     debugOutput( DEBUG_LEVEL_VERBOSE, " Other StreamProcessors...\n");
519    
520     // we prepare the streamprocessors for enable
521     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
522             it != m_ReceiveProcessors.end();
523             ++it ) {
524         if(*it != m_SyncSource) {
525             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Receive SP (%p)...\n",*it);
526             (*it)->prepareForEnable(time_to_enable_at);
527         }
528     }
529
530     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
531             it != m_TransmitProcessors.end();
532             ++it ) {
533         if(*it != m_SyncSource) {
534             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Transmit SP (%p)...\n",*it);
535             (*it)->prepareForEnable(time_to_enable_at);
536         }
537     }
538
539     // then we enable the streamprocessors
540     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
541             it != m_ReceiveProcessors.end();
542             ++it ) {           
543         if(*it != m_SyncSource) {
544             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Receive SP (%p)...\n",*it);
545             (*it)->enable(time_to_enable_at);
546         }
547     }
548
549     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
550             it != m_TransmitProcessors.end();
551             ++it ) {
552         if(*it != m_SyncSource) {
553             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Transmit SP (%p)...\n",*it);
554             (*it)->enable(time_to_enable_at);
555         }
556     }
557
558     // now we wait for the SP's to get enabled
559     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be enabled...\n");
560     // we have to wait until all streamprocessors indicate that they are running
561     // i.e. that there is actually some data stream flowing
562     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds
563     bool notEnabled=true;
564     while (notEnabled && wait_cycles) {
565         wait_cycles--;
566         notEnabled=false;
567        
568         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
569                 it != m_ReceiveProcessors.end();
570                 ++it ) {
571             if(!(*it)->isEnabled()) notEnabled=true;
572         }
573
574         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
575                 it != m_TransmitProcessors.end();
576                 ++it ) {
577             if(!(*it)->isEnabled()) notEnabled=true;
578         }
579         usleep(1000); // one cycle
580     }
581    
582     if(!wait_cycles) { // timout has occurred
583         debugFatal("One or more streams couldn't be enabled (timeout):\n");
584                    
585         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
586                 it != m_ReceiveProcessors.end();
587                 ++it ) {
588             if(!(*it)->isEnabled()) {
589                     debugFatal(" receive stream %p not enabled\n",*it);
590             } else {   
591                     debugFatal(" receive stream %p enabled\n",*it);
592             }
593         }
594    
595         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
596                 it != m_TransmitProcessors.end();
597                 ++it ) {
598             if(!(*it)->isEnabled()) {
599                     debugFatal(" transmit stream %p not enabled\n",*it);
600             } else {   
601                     debugFatal(" transmit stream %p enabled\n",*it);
602             }
603         }
604         return false;
605     }
606    
607     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors enabled...\n");
608
609     return true;
610 }
611
612 /**
613  * Disables the registered StreamProcessors
614  * @return true if successful, false otherwise
615  */
616 bool StreamProcessorManager::disableStreamProcessors() {
617     // we prepare the streamprocessors for disable
618     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
619             it != m_ReceiveProcessors.end();
620             ++it ) {
621         (*it)->prepareForDisable();
622     }
623
624     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
625             it != m_TransmitProcessors.end();
626             ++it ) {
627         (*it)->prepareForDisable();
628     }
629
630     // then we disable the streamprocessors
631     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
632             it != m_ReceiveProcessors.end();
633             ++it ) {
634         (*it)->disable();
635     }
636
637     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
638             it != m_TransmitProcessors.end();
639             ++it ) {
640         (*it)->disable();
641     }
642
643     // now we wait for the SP's to get disabled
644     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be disabled...\n");
645     // we have to wait until all streamprocessors indicate that they are running
646     // i.e. that there is actually some data stream flowing
647     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds
648     bool enabled=true;
649     while (enabled && wait_cycles) {
650         wait_cycles--;
651         enabled=false;
652        
653         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
654                 it != m_ReceiveProcessors.end();
655                 ++it ) {
656             if((*it)->isEnabled()) enabled=true;
657         }
658
659         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
660                 it != m_TransmitProcessors.end();
661                 ++it ) {
662             if((*it)->isEnabled()) enabled=true;
663         }
664         usleep(1000); // one cycle
665     }
666    
667     if(!wait_cycles) { // timout has occurred
668         debugFatal("One or more streams couldn't be disabled (timeout):\n");
669                    
670         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
671                 it != m_ReceiveProcessors.end();
672                 ++it ) {
673             if(!(*it)->isEnabled()) {
674                     debugFatal(" receive stream %p not enabled\n",*it);
675             } else {   
676                     debugFatal(" receive stream %p enabled\n",*it);
677             }
678         }
679    
680         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
681                 it != m_TransmitProcessors.end();
682                 ++it ) {
683             if(!(*it)->isEnabled()) {
684                     debugFatal(" transmit stream %p not enabled\n",*it);
685             } else {   
686                     debugFatal(" transmit stream %p enabled\n",*it);
687             }
688         }
689         return false;
690     }
691    
692     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors disabled...\n");
693        
694     return true;
695 }
696
697 /**
698  * Called upon Xrun events. This brings all StreamProcessors back
699  * into their starting state, and then carries on streaming. This should
700  * have the same effect as restarting the whole thing.
701  *
702  * @return true if successful, false otherwise
703  */
704 bool StreamProcessorManager::handleXrun() {
705
706         debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
707
708         /*
709          * Reset means:
710          * 1) Disabling the SP's, so that they don't process any packets
711          *    note: the isomanager does keep on delivering/requesting them
712          * 2) Bringing all buffers & streamprocessors into a know state
713          *    - Clear all capture buffers
714          *    - Put nb_periods*period_size of null frames into the playback buffers
715          * 3) Re-enable the SP's
716          */
717         debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
718         if (!disableStreamProcessors()) {
719                 debugFatal("Could not disable StreamProcessors...\n");
720                 return false;
721         }
722
723         debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
724         // start all SP's synchonized
725         if (!syncStartAll()) {
726                 debugFatal("Could not syncStartAll...\n");
727                 return false;
728         }
729
730         debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
731        
732         return true;
733 }
734
735 /**
736  * @brief Waits until the next period of samples is ready
737  *
738  * This function does not return until a full period of samples is (or should be)
739  * ready to be transferred.
740  *
741  * @return true if the period is ready, false if an xrun occurred
742  */
743 bool StreamProcessorManager::waitForPeriod() {
744     int time_till_next_period;
745     bool xrun_occurred=false;
746        
747     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
748
749     assert(m_SyncSource);
750    
751     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodUsecs();
752    
753     while(time_till_next_period > 0) {
754         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
755    
756         // wait for the period
757         usleep(time_till_next_period);
758        
759         // check for underruns on the ISO side,
760         // those should make us bail out of the wait loop
761         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
762             it != m_ReceiveProcessors.end();
763             ++it ) {
764             // a xrun has occurred on the Iso side
765             xrun_occurred |= (*it)->xrunOccurred();
766         }
767         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
768             it != m_TransmitProcessors.end();
769             ++it ) {
770             // a xrun has occurred on the Iso side
771             xrun_occurred |= (*it)->xrunOccurred();
772         }
773
774         // check if we were waked up too soon
775         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodUsecs();
776     }
777    
778     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", -time_till_next_period);
779    
780     // this is to notify the client of the delay
781     // that we introduced
782     m_delayed_usecs=time_till_next_period;
783    
784     // we save the 'ideal' time of the transfer at this point,
785     // because we can have interleaved read - process - write
786     // cycles making that we modify a receiving stream's buffer
787     // before we get to writing.
788     // NOTE: before waitForPeriod() is called again, both the transmit
789     //       and the receive processors should have done their transfer.
790     m_time_of_transfer=m_SyncSource->getTimeAtPeriod();
791     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
792         m_time_of_transfer);
793    
794     xrun_occurred=false;
795    
796     // check if xruns occurred on the Iso side.
797     // also check if xruns will occur should we transfer() now
798    
799     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
800           it != m_ReceiveProcessors.end();
801           ++it ) {
802         // a xrun has occurred on the Iso side
803         xrun_occurred |= (*it)->xrunOccurred();
804        
805         // if this is true, a xrun will occur
806         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
807        
808 #ifdef DEBUG
809         if ((*it)->xrunOccurred()) {
810             debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it);
811             (*it)->dumpInfo();
812         }
813         if (!((*it)->canClientTransferFrames(m_period))) {
814             debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it);
815             (*it)->dumpInfo();
816         }
817 #endif
818        
819     }
820     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
821           it != m_TransmitProcessors.end();
822           ++it ) {
823         // a xrun has occurred on the Iso side
824         xrun_occurred |= (*it)->xrunOccurred();
825        
826         // if this is true, a xrun will occur
827         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
828        
829 #ifdef DEBUG
830         if ((*it)->xrunOccurred()) {
831             debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it);
832         }
833         if (!((*it)->canClientTransferFrames(m_period))) {
834             debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it);
835         }
836 #endif       
837     }
838    
839     m_nbperiods++;
840    
841     // now we can signal the client that we are (should be) ready
842     return !xrun_occurred;
843 }
844
845 /**
846  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
847  *
848  * Transfers one period of frames from the client side to the Iso side and vice versa.
849  *
850  * @return true if successful, false otherwise (indicates xrun).
851  */
852 bool StreamProcessorManager::transfer() {
853    
854     debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n");
855
856     if (!transfer(StreamProcessor::E_Receive)) return false;
857     if (!transfer(StreamProcessor::E_Transmit)) return false;
858
859     return true;
860 }
861
862 /**
863  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
864  *
865  * Transfers one period of frames from the client side to the Iso side or vice versa.
866  *
867  * @param t The processor type to tranfer for (receive or transmit)
868  * @return true if successful, false otherwise (indicates xrun).
869  */
870
871 bool StreamProcessorManager::transfer(enum StreamProcessor::EProcessorType t) {
872     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
873    
874     // a static cast could make sure that there is no performance
875     // penalty for the virtual functions (to be checked)
876     if (t==StreamProcessor::E_Receive) {
877         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
878                 it != m_ReceiveProcessors.end();
879                 ++it ) {
880                
881             if(!(*it)->getFrames(m_period)) {
882                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)",
883                             m_period, m_time_of_transfer,*it);
884                     return false; // buffer underrun
885             }
886
887         }
888     } else {
889         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
890                 it != m_TransmitProcessors.end();
891                 ++it ) {
892                
893             if(!(*it)->putFrames(m_period, (int64_t)m_time_of_transfer)) {
894                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)",
895                         m_period, m_time_of_transfer, *it);
896                 return false; // buffer overrun
897             }
898
899         }
900     }
901
902     return true;
903 }
904
905 void StreamProcessorManager::dumpInfo() {
906         debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
907         debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
908         debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
909
910         debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
911         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
912                 it != m_ReceiveProcessors.end();
913                 ++it ) {
914                 (*it)->dumpInfo();
915         }
916
917         debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
918         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
919                 it != m_TransmitProcessors.end();
920                 ++it ) {
921                 (*it)->dumpInfo();
922         }
923
924         debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
925         m_isoManager->dumpInfo();
926         debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
927
928 }
929
930 void StreamProcessorManager::setVerboseLevel(int l) {
931         setDebugLevel(l);
932
933         if (m_isoManager) m_isoManager->setVerboseLevel(l);
934
935         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
936         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
937                 it != m_ReceiveProcessors.end();
938                 ++it ) {
939                 (*it)->setVerboseLevel(l);
940         }
941
942         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
943         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
944                 it != m_TransmitProcessors.end();
945                 ++it ) {
946                 (*it)->setVerboseLevel(l);
947         }
948 }
949
950
951 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
952         int count=0;
953
954         if (direction == Port::E_Capture) {
955                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
956                         it != m_ReceiveProcessors.end();
957                         ++it ) {
958                         count += (*it)->getPortCount(type);
959                 }
960         } else {
961                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
962                         it != m_TransmitProcessors.end();
963                         ++it ) {
964                         count += (*it)->getPortCount(type);
965                 }
966         }
967         return count;
968 }
969
970 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
971         int count=0;
972
973         if (direction == Port::E_Capture) {
974                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
975                         it != m_ReceiveProcessors.end();
976                         ++it ) {
977                         count += (*it)->getPortCount();
978                 }
979         } else {
980                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
981                         it != m_TransmitProcessors.end();
982                         ++it ) {
983                         count += (*it)->getPortCount();
984                 }
985         }
986         return count;
987 }
988
989 // TODO: implement a port map here, instead of the loop
990
991 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
992         int count=0;
993         int prevcount=0;
994
995         if (direction == Port::E_Capture) {
996                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
997                         it != m_ReceiveProcessors.end();
998                         ++it ) {
999                         count += (*it)->getPortCount();
1000                         if (count > idx) {
1001                                 return (*it)->getPortAtIdx(idx-prevcount);
1002                         }
1003                         prevcount=count;
1004                 }
1005         } else {
1006                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1007                         it != m_TransmitProcessors.end();
1008                         ++it ) {
1009                         count += (*it)->getPortCount();
1010                         if (count > idx) {
1011                                 return (*it)->getPortAtIdx(idx-prevcount);
1012                         }
1013                         prevcount=count;
1014                 }
1015         }
1016         return NULL;
1017 }
1018
1019 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1020     m_thread_realtime=rt;
1021     m_thread_priority=priority;
1022     return true;
1023 }
1024
1025
1026 } // end of namespace
Note: See TracBrowser for help on using the browser.