root/branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp

Revision 395, 31.7 kB (checked in by pieterpalmers, 17 years ago)

debugmodule.cpp:
- removed path from the source file name when printing debug messages

IsoStream?.cpp, StreamProcessor?.cpp:
- debug message modifications

StreamProcessorManager?.cpp:
- removed obsolete debug code

AmdtpStreamProcessor?.cpp:
- debug message modifications
- removed DLL that calculates framerate, as it is also calculated in the

TimestampedBuffer?

- converted code to use the new offset feature of the TimestampedBuffer?
- converted code to use the new frame timestamp calculation feature

of the TimestampedBuffer?

- first try at xmit sync code (unfinished)
- fixed 'unable to start' bug in receive SP

bebob_avdevice.cpp:
- added some #ifdef code to test with xmit SP's only (temporary)

TimestampedBuffer?.cpp:
- add offset feature
- add abitrary frame timestamp calculation

Line 
1 /* $Id$ */
2
3 /*
4  *   FreeBob Streaming API
5  *   FreeBob = Firewire (pro-)audio for linux
6  *
7  *   http://freebob.sf.net
8  *
9  *   Copyright (C) 2005,2006 Pieter Palmers <pieterpalmers@users.sourceforge.net>
10  *
11  *   This program is free software {} you can redistribute it and/or modify
12  *   it under the terms of the GNU General Public License as published by
13  *   the Free Software Foundation {} either version 2 of the License, or
14  *   (at your option) any later version.
15  *
16  *   This program is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY {} without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with this program {} if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  *
26  *
27  */
28
29 #include "StreamProcessorManager.h"
30 #include "StreamProcessor.h"
31 #include "Port.h"
32 #include <errno.h>
33 #include <assert.h>
34
35 #include "../libutil/PosixThread.h"
36
37 #include "libstreaming/cycletimer.h"
38
39 #define CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL 50
40
41 namespace FreebobStreaming {
42
43 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_NORMAL );
44
45 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int nb_buffers)
46         : m_SyncSource(NULL), m_nb_buffers(nb_buffers), m_period(period), m_xruns(0),
47         m_isoManager(0), m_nbperiods(0) {
48
49 }
50
51 StreamProcessorManager::~StreamProcessorManager() {
52         if (m_isoManager) delete m_isoManager;
53        
54 }
55
56 /**
57  * Registers \ref processor with this manager.
58  *
59  * also registers it with the isohandlermanager
60  *
61  * be sure to call isohandlermanager->init() first!
62  * and be sure that the processors are also ->init()'ed
63  *
64  * @param processor
65  * @return true if successfull
66  */
67 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
68 {
69         debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
70         assert(processor);
71         assert(m_isoManager);
72
73         if (processor->getType()==StreamProcessor::E_Receive) {
74                 processor->setVerboseLevel(getDebugLevel()); // inherit debug level
75                
76                 m_ReceiveProcessors.push_back(processor);
77                
78                 processor->setManager(this);
79                                
80                 return true;
81         }
82        
83         if (processor->getType()==StreamProcessor::E_Transmit) {
84                 processor->setVerboseLevel(getDebugLevel()); // inherit debug level
85                
86                 m_TransmitProcessors.push_back(processor);
87                
88                 processor->setManager(this);
89                
90                 return true;
91         }
92
93         debugFatal("Unsupported processor type!\n");
94        
95         return false;
96 }
97
98 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
99 {
100         debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
101         assert(processor);
102
103         if (processor->getType()==StreamProcessor::E_Receive) {
104
105                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
106                         it != m_ReceiveProcessors.end();
107                         ++it ) {
108
109                         if ( *it == processor ) {
110                                         m_ReceiveProcessors.erase(it);
111                                        
112                                         processor->clearManager();
113                                        
114                                         if(!m_isoManager->unregisterStream(processor)) {
115                                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
116                                                
117                                                 return false;
118                                                
119                                         }
120                                        
121                                         return true;
122                                 }
123                 }
124         }
125
126         if (processor->getType()==StreamProcessor::E_Transmit) {
127                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
128                         it != m_TransmitProcessors.end();
129                         ++it ) {
130
131                         if ( *it == processor ) {
132                                         m_TransmitProcessors.erase(it);
133                                        
134                                         processor->clearManager();
135                                        
136                                         if(!m_isoManager->unregisterStream(processor)) {
137                                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
138                                                
139                                                 return false;
140                                                
141                                         }
142                                        
143                                         return true;
144                                 }
145                 }
146         }
147        
148         debugFatal("Processor (%p) not found!\n",processor);
149
150         return false; //not found
151
152 }
153
154 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
155     m_SyncSource=s;
156     return true;
157 }
158
159 StreamProcessor *StreamProcessorManager::getSyncSource() {
160     return m_SyncSource;
161 }
162
163 bool StreamProcessorManager::init()
164 {
165         debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
166
167         // the tread that runs the StreamProcessor
168         // checking the period boundaries
169         int prio=m_thread_priority+5;
170         if (prio>98) prio=98;
171        
172         m_streamingThread=new FreebobUtil::PosixThread(this,
173            m_thread_realtime, prio,
174            PTHREAD_CANCEL_DEFERRED);
175            
176         if(!m_streamingThread) {
177                 debugFatal("Could not create streaming thread\n");
178                 return false;
179         }
180        
181         m_isoManager=new IsoHandlerManager(m_thread_realtime, m_thread_priority);
182        
183         if(!m_isoManager) {
184                 debugFatal("Could not create IsoHandlerManager\n");
185                 return false;
186         }
187        
188         // propagate the debug level
189         m_isoManager->setVerboseLevel(getDebugLevel());
190        
191         if(!m_isoManager->init()) {
192                 debugFatal("Could not initialize IsoHandlerManager\n");
193                 return false;
194         }
195        
196         m_xrun_happened=false;
197        
198         return true;
199 }
200
201 bool StreamProcessorManager::Init()
202 {
203     debugOutput( DEBUG_LEVEL_VERBOSE, "Initializing runner...\n");
204
205     // no xrun has occurred (yet)
206
207     return true;
208 }
209
210 bool StreamProcessorManager::prepare() {
211
212         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
213        
214         // if no sync source is set, select one here
215         if(m_SyncSource == NULL) {
216            debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
217         }
218        
219         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
220                 it != m_ReceiveProcessors.end();
221                 ++it ) {
222                         if(m_SyncSource == NULL) {
223                                 debugWarning(" => Sync Source is %p.\n", *it);
224                                 m_SyncSource = *it;
225                         }
226         }
227
228         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
229                 it != m_TransmitProcessors.end();
230                 ++it ) {
231                         if(m_SyncSource == NULL) {
232                                 debugWarning(" => Sync Source is %p.\n", *it);
233                                 m_SyncSource = *it;
234                         }
235         }
236
237         // now do the actual preparation
238         debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
239         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
240                 it != m_ReceiveProcessors.end();
241                 ++it ) {
242                         if(!(*it)->setSyncSource(m_SyncSource)) {
243                                 debugFatal(  " could not set sync source (%p)...\n",(*it));
244                                 return false;
245                         }
246                        
247                         if(!(*it)->prepare()) {
248                                 debugFatal(  " could not prepare (%p)...\n",(*it));
249                                 return false;
250                         }
251         }
252
253         debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
254         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
255                 it != m_TransmitProcessors.end();
256                 ++it ) {
257                         if(!(*it)->setSyncSource(m_SyncSource)) {
258                                 debugFatal(  " could not set sync source (%p)...\n",(*it));
259                                 return false;
260                         }               
261                         if(!(*it)->prepare()) {
262                                 debugFatal( " could not prepare (%p)...\n",(*it));
263                                 return false;
264                         }
265         }
266
267     // if there are no stream processors registered,
268     // fail
269     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
270         debugFatal("No stream processors registered, can't do anything usefull\n");
271         return false;
272     }
273
274         return true;
275 }
276
277 // FIXME: this can be removed
278 bool StreamProcessorManager::Execute()
279 {
280         // temp measure, polling
281         usleep(1000);
282
283         // FIXME: move this to an IsoHandlerManager sub-thread
284         // and make this private again in IHM
285         m_isoManager->updateCycleTimers();
286        
287         return true;
288 }
289
290 bool StreamProcessorManager::syncStartAll() {
291
292     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessor streams to start running...\n");
293     // we have to wait until all streamprocessors indicate that they are running
294     // i.e. that there is actually some data stream flowing
295     int wait_cycles=2000; // two seconds
296     bool notRunning=true;
297     while (notRunning && wait_cycles) {
298         wait_cycles--;
299         notRunning=false;
300        
301         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
302                 it != m_ReceiveProcessors.end();
303                 ++it ) {
304             if(!(*it)->isRunning()) notRunning=true;
305         }
306
307         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
308                 it != m_TransmitProcessors.end();
309                 ++it ) {
310             if(!(*it)->isRunning()) notRunning=true;
311         }
312         usleep(1000);
313         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Running check: %d\n",notRunning);
314     }
315
316     if(!wait_cycles) { // timout has occurred
317         debugFatal("One or more streams are not starting up (timeout):\n");
318                    
319         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
320                 it != m_ReceiveProcessors.end();
321                 ++it ) {
322             if(!(*it)->isRunning()) {
323                 debugFatal(" receive stream %p not running\n",*it);
324             } else {   
325                 debugFatal(" receive stream %p running\n",*it);
326             }
327         }
328
329         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
330                 it != m_TransmitProcessors.end();
331                 ++it ) {
332             if(!(*it)->isRunning()) {
333                 debugFatal(" transmit stream %p not running\n",*it);
334             } else {   
335                 debugFatal(" transmit stream %p running\n",*it);
336             }
337         }
338        
339         return false;
340     }
341
342     // we want to make sure that everything is running well,
343     // so wait for a while
344     usleep(USECS_PER_CYCLE * CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL);
345
346     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
347     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting StreamProcessors...\n");
348
349     // now we reset the frame counters
350     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
351             it != m_ReceiveProcessors.end();
352             ++it ) {
353
354         debugOutput( DEBUG_LEVEL_VERBOSE, "Before:\n");
355
356         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
357             (*it)->dumpInfo();
358         }
359
360         (*it)->reset();
361
362         debugOutput( DEBUG_LEVEL_VERBOSE, "After:\n");
363
364         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
365             (*it)->dumpInfo();
366         }
367
368     }
369    
370     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
371             it != m_TransmitProcessors.end();
372             ++it ) {
373            
374         debugOutput( DEBUG_LEVEL_VERBOSE, "Before:\n");
375        
376         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
377             (*it)->dumpInfo();
378         }
379        
380         (*it)->reset();
381        
382         debugOutput( DEBUG_LEVEL_VERBOSE, "After:\n");
383        
384         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
385             (*it)->dumpInfo();
386         }
387     }
388        
389     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n");
390    
391     uint64_t now=m_SyncSource->getTimeNow(); // fixme: should be in usecs, not ticks
392    
393     // FIXME: this should not be in cycles, but in 'time'
394     unsigned int enable_at=TICKS_TO_CYCLES(now)+2000;
395     if (enable_at > 8000) enable_at -= 8000;
396
397     debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor...\n");
398     if (!m_SyncSource->prepareForEnable()) {
399             debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n");
400         return false;
401     }
402
403     m_SyncSource->enable(enable_at);
404
405     debugOutput( DEBUG_LEVEL_VERBOSE, " All StreamProcessors...\n");
406     if (!enableStreamProcessors(enable_at)) {
407         debugFatal("Could not enable StreamProcessors...\n");
408         return false;
409     }
410
411     return true;
412 }
413
414 bool StreamProcessorManager::start() {
415         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
416         assert(m_isoManager);
417        
418         debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
419         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
420         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
421                 it != m_ReceiveProcessors.end();
422                 ++it ) {
423                         if (!(*it)->prepareForStart()) {
424                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Receive stream processor (%p) failed to prepare for start\n", *it);
425                                 return false;
426                         }
427                         if (!m_isoManager->registerStream(*it)) {
428                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
429                                 return false;
430                         }
431                 }
432
433         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
434         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
435                 it != m_TransmitProcessors.end();
436                 ++it ) {
437                         if (!(*it)->prepareForStart()) {
438                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Transmit stream processor (%p) failed to prepare for start\n", *it);
439                                 return false;
440                         }
441                         if (!m_isoManager->registerStream(*it)) {
442                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
443                                 return false;
444                         }
445                 }
446
447         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
448         if (!m_isoManager->prepare()) {
449                 debugFatal("Could not prepare isoManager\n");
450                 return false;
451         }
452
453         debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
454         if (!disableStreamProcessors()) {
455                 debugFatal("Could not disable StreamProcessors...\n");
456                 return false;
457         }
458                
459         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n");
460         if (!m_isoManager->startHandlers(0)) {
461                 debugFatal("Could not start handlers...\n");
462                 return false;
463         }
464        
465         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting streaming threads...\n");
466
467         // start the runner thread
468         // FIXME: not used anymore (for updatecycletimers ATM, but that's not good)
469         m_streamingThread->Start();
470
471         // start all SP's synchonized
472         if (!syncStartAll()) {
473                 debugFatal("Could not syncStartAll...\n");
474                 return false;
475         }
476        
477         // dump the iso stream information when in verbose mode
478         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
479                 m_isoManager->dumpInfo();
480         }
481        
482         return true;
483        
484 }
485
486 bool StreamProcessorManager::stop() {
487         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
488         assert(m_isoManager);
489         assert(m_streamingThread);
490
491         debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to prepare to stop...\n");
492         // Most stream processors can just stop without special treatment.  However, some
493         // (like the MOTU) need to do a few things before it's safe to turn off the iso
494         // handling.
495         int wait_cycles=2000; // two seconds ought to be sufficient
496         bool allReady = false;
497         while (!allReady && wait_cycles) {
498                 wait_cycles--;
499                 allReady = true;
500                
501                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
502                         it != m_ReceiveProcessors.end();
503                         ++it ) {
504                         if(!(*it)->prepareForStop()) allReady = false;
505                 }
506        
507                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
508                         it != m_TransmitProcessors.end();
509                         ++it ) {
510                         if(!(*it)->prepareForStop()) allReady = false;
511                 }
512                 usleep(1000);
513         }
514
515
516         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping threads...\n");
517        
518         m_streamingThread->Stop();
519        
520         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n");
521         if(!m_isoManager->stopHandlers()) {
522            debugFatal("Could not stop ISO handlers\n");
523            return false;
524         }
525        
526         debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processors from handlers...\n");
527     // now unregister all streams from iso manager
528         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
529         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
530                 it != m_ReceiveProcessors.end();
531                 ++it ) {
532                         if (!m_isoManager->unregisterStream(*it)) {
533                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it);
534                                 return false;
535                         }
536                        
537                 }
538
539         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
540         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
541                 it != m_TransmitProcessors.end();
542                 ++it ) {
543                         if (!m_isoManager->unregisterStream(*it)) {
544                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it);
545                                 return false;
546                         }
547                        
548                 }
549        
550         return true;
551        
552 }
553
554 /**
555  * Enables the registered StreamProcessors
556  * @return true if successful, false otherwise
557  */
558 bool StreamProcessorManager::enableStreamProcessors(unsigned int time_to_enable_at) {
559     // we prepare the streamprocessors for enable
560     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
561             it != m_ReceiveProcessors.end();
562             ++it ) {           
563         (*it)->prepareForEnable();
564     }
565
566     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
567             it != m_TransmitProcessors.end();
568             ++it ) {
569         (*it)->prepareForEnable();
570     }
571
572     // then we enable the streamprocessors
573     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
574             it != m_ReceiveProcessors.end();
575             ++it ) {           
576         (*it)->enable(time_to_enable_at);
577     }
578
579     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
580             it != m_TransmitProcessors.end();
581             ++it ) {
582         (*it)->enable(time_to_enable_at);
583     }
584
585     // now we wait for the SP's to get enabled
586     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be enabled...\n");
587     // we have to wait until all streamprocessors indicate that they are running
588     // i.e. that there is actually some data stream flowing
589     int wait_cycles=2000; // two seconds
590     bool notEnabled=true;
591     while (notEnabled && wait_cycles) {
592         wait_cycles--;
593         notEnabled=false;
594        
595         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
596                 it != m_ReceiveProcessors.end();
597                 ++it ) {
598             if(!(*it)->isEnabled()) notEnabled=true;
599         }
600
601         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
602                 it != m_TransmitProcessors.end();
603                 ++it ) {
604             if(!(*it)->isEnabled()) notEnabled=true;
605         }
606         usleep(1000); // one cycle
607     }
608    
609     if(!wait_cycles) { // timout has occurred
610         debugFatal("One or more streams couldn't be enabled (timeout):\n");
611                    
612         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
613                 it != m_ReceiveProcessors.end();
614                 ++it ) {
615             if(!(*it)->isEnabled()) {
616                     debugFatal(" receive stream %p not enabled\n",*it);
617             } else {   
618                     debugFatal(" receive stream %p enabled\n",*it);
619             }
620         }
621    
622         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
623                 it != m_TransmitProcessors.end();
624                 ++it ) {
625             if(!(*it)->isEnabled()) {
626                     debugFatal(" transmit stream %p not enabled\n",*it);
627             } else {   
628                     debugFatal(" transmit stream %p enabled\n",*it);
629             }
630         }
631         return false;
632     }
633    
634     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors enabled...\n");
635
636     return true;
637 }
638
639 /**
640  * Disables the registered StreamProcessors
641  * @return true if successful, false otherwise
642  */
643 bool StreamProcessorManager::disableStreamProcessors() {
644     // we prepare the streamprocessors for disable
645     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
646             it != m_ReceiveProcessors.end();
647             ++it ) {
648         (*it)->prepareForDisable();
649     }
650
651     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
652             it != m_TransmitProcessors.end();
653             ++it ) {
654         (*it)->prepareForDisable();
655     }
656
657     // then we disable the streamprocessors
658     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
659             it != m_ReceiveProcessors.end();
660             ++it ) {
661         (*it)->disable();
662     }
663
664     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
665             it != m_TransmitProcessors.end();
666             ++it ) {
667         (*it)->disable();
668     }
669
670     // now we wait for the SP's to get disabled
671     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be disabled...\n");
672     // we have to wait until all streamprocessors indicate that they are running
673     // i.e. that there is actually some data stream flowing
674     int wait_cycles=2000; // two seconds
675     bool enabled=true;
676     while (enabled && wait_cycles) {
677         wait_cycles--;
678         enabled=false;
679        
680         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
681                 it != m_ReceiveProcessors.end();
682                 ++it ) {
683             if((*it)->isEnabled()) enabled=true;
684         }
685
686         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
687                 it != m_TransmitProcessors.end();
688                 ++it ) {
689             if((*it)->isEnabled()) enabled=true;
690         }
691         usleep(1000); // one cycle
692     }
693    
694     if(!wait_cycles) { // timout has occurred
695         debugFatal("One or more streams couldn't be disabled (timeout):\n");
696                    
697         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
698                 it != m_ReceiveProcessors.end();
699                 ++it ) {
700             if(!(*it)->isEnabled()) {
701                     debugFatal(" receive stream %p not enabled\n",*it);
702             } else {   
703                     debugFatal(" receive stream %p enabled\n",*it);
704             }
705         }
706    
707         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
708                 it != m_TransmitProcessors.end();
709                 ++it ) {
710             if(!(*it)->isEnabled()) {
711                     debugFatal(" transmit stream %p not enabled\n",*it);
712             } else {   
713                     debugFatal(" transmit stream %p enabled\n",*it);
714             }
715         }
716         return false;
717     }
718    
719     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors disabled...\n");
720        
721     return true;
722 }
723
724 /**
725  * Called upon Xrun events. This brings all StreamProcessors back
726  * into their starting state, and then carries on streaming. This should
727  * have the same effect as restarting the whole thing.
728  *
729  * @return true if successful, false otherwise
730  */
731 bool StreamProcessorManager::handleXrun() {
732
733         debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
734
735         /*
736          * Reset means:
737          * 1) Disabling the SP's, so that they don't process any packets
738          *    note: the isomanager does keep on delivering/requesting them
739          * 2) Bringing all buffers & streamprocessors into a know state
740          *    - Clear all capture buffers
741          *    - Put nb_periods*period_size of null frames into the playback buffers
742          * 3) Re-enable the SP's
743          */
744         debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
745         if (!disableStreamProcessors()) {
746                 debugFatal("Could not disable StreamProcessors...\n");
747                 return false;
748         }
749
750         debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
751         // start all SP's synchonized
752         if (!syncStartAll()) {
753                 debugFatal("Could not syncStartAll...\n");
754                 return false;
755         }
756
757         debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
758        
759         return true;
760 }
761
762 /**
763  * @brief Waits until the next period of samples is ready
764  *
765  * This function does not return until a full period of samples is (or should be)
766  * ready to be transferred.
767  *
768  * @return true if the period is ready, false if an xrun occurred
769  */
770 bool StreamProcessorManager::waitForPeriod() {
771     int time_till_next_period;
772     bool xrun_occurred=false;
773        
774     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
775
776     assert(m_SyncSource);
777    
778     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodUsecs();
779    
780     while(time_till_next_period > 0) {
781         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
782    
783         // wait for the period
784         usleep(time_till_next_period);
785        
786         // check for underruns on the ISO side,
787         // those should make us bail out of the wait loop
788         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
789             it != m_ReceiveProcessors.end();
790             ++it ) {
791             // a xrun has occurred on the Iso side
792             xrun_occurred |= (*it)->xrunOccurred();
793         }
794         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
795             it != m_TransmitProcessors.end();
796             ++it ) {
797             // a xrun has occurred on the Iso side
798             xrun_occurred |= (*it)->xrunOccurred();
799         }
800
801         // check if we were waked up too soon
802         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodUsecs();
803     }
804    
805     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", -time_till_next_period);
806    
807     // this is to notify the client of the delay
808     // that we introduced
809     m_delayed_usecs=time_till_next_period;
810    
811     // we save the 'ideal' time of the transfer at this point,
812     // because we can have interleaved read - process - write
813     // cycles making that we modify a receiving stream's buffer
814     // before we get to writing.
815     // NOTE: before waitForPeriod() is called again, both the transmit
816     //       and the receive processors should have done their transfer.
817     m_time_of_transfer=m_SyncSource->getTimeAtPeriod();
818     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
819         m_time_of_transfer);
820    
821     xrun_occurred=false;
822    
823     // check if xruns occurred on the Iso side.
824     // also check if xruns will occur should we transfer() now
825    
826     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
827           it != m_ReceiveProcessors.end();
828           ++it ) {
829         // a xrun has occurred on the Iso side
830         xrun_occurred |= (*it)->xrunOccurred();
831        
832         // if this is true, a xrun will occur
833         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
834        
835 #ifdef DEBUG
836         if ((*it)->xrunOccurred()) {
837             debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it);
838             (*it)->dumpInfo();
839         }
840         if (!((*it)->canClientTransferFrames(m_period))) {
841             debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it);
842             (*it)->dumpInfo();
843         }
844 #endif
845        
846     }
847     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
848           it != m_TransmitProcessors.end();
849           ++it ) {
850         // a xrun has occurred on the Iso side
851         xrun_occurred |= (*it)->xrunOccurred();
852        
853         // if this is true, a xrun will occur
854         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
855        
856 #ifdef DEBUG
857         if ((*it)->xrunOccurred()) {
858             debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it);
859         }
860         if (!((*it)->canClientTransferFrames(m_period))) {
861             debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it);
862         }
863 #endif       
864     }
865    
866     m_nbperiods++;
867    
868     // now we can signal the client that we are (should be) ready
869     return !xrun_occurred;
870 }
871
872 /**
873  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
874  *
875  * Transfers one period of frames from the client side to the Iso side and vice versa.
876  *
877  * @return true if successful, false otherwise (indicates xrun).
878  */
879 bool StreamProcessorManager::transfer() {
880    
881     debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n");
882
883     if (!transfer(StreamProcessor::E_Receive)) return false;
884     if (!transfer(StreamProcessor::E_Transmit)) return false;
885
886     return true;
887 }
888
889 /**
890  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
891  *
892  * Transfers one period of frames from the client side to the Iso side or vice versa.
893  *
894  * @param t The processor type to tranfer for (receive or transmit)
895  * @return true if successful, false otherwise (indicates xrun).
896  */
897
898 bool StreamProcessorManager::transfer(enum StreamProcessor::EProcessorType t) {
899     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
900    
901     // a static cast could make sure that there is no performance
902     // penalty for the virtual functions (to be checked)
903     if (t==StreamProcessor::E_Receive) {
904         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
905                 it != m_ReceiveProcessors.end();
906                 ++it ) {
907                
908             if(!(*it)->getFrames(m_period)) {
909                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)",
910                             m_period, m_time_of_transfer,*it);
911                     return false; // buffer underrun
912             }
913
914         }
915     } else {
916         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
917                 it != m_TransmitProcessors.end();
918                 ++it ) {
919                
920             if(!(*it)->putFrames(m_period, (int64_t)m_time_of_transfer)) {
921                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)",
922                         m_period, m_time_of_transfer, *it);
923                 return false; // buffer overrun
924             }
925
926         }
927     }
928
929     return true;
930 }
931
932 void StreamProcessorManager::dumpInfo() {
933         debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
934         debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
935         debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
936
937         debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
938         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
939                 it != m_ReceiveProcessors.end();
940                 ++it ) {
941                 (*it)->dumpInfo();
942         }
943
944         debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
945         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
946                 it != m_TransmitProcessors.end();
947                 ++it ) {
948                 (*it)->dumpInfo();
949         }
950
951         debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
952         m_isoManager->dumpInfo();
953         debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
954
955 }
956
957 void StreamProcessorManager::setVerboseLevel(int l) {
958         setDebugLevel(l);
959
960         if (m_isoManager) m_isoManager->setVerboseLevel(l);
961
962         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
963         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
964                 it != m_ReceiveProcessors.end();
965                 ++it ) {
966                 (*it)->setVerboseLevel(l);
967         }
968
969         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
970         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
971                 it != m_TransmitProcessors.end();
972                 ++it ) {
973                 (*it)->setVerboseLevel(l);
974         }
975 }
976
977
978 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
979         int count=0;
980
981         if (direction == Port::E_Capture) {
982                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
983                         it != m_ReceiveProcessors.end();
984                         ++it ) {
985                         count += (*it)->getPortCount(type);
986                 }
987         } else {
988                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
989                         it != m_TransmitProcessors.end();
990                         ++it ) {
991                         count += (*it)->getPortCount(type);
992                 }
993         }
994         return count;
995 }
996
997 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
998         int count=0;
999
1000         if (direction == Port::E_Capture) {
1001                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1002                         it != m_ReceiveProcessors.end();
1003                         ++it ) {
1004                         count += (*it)->getPortCount();
1005                 }
1006         } else {
1007                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1008                         it != m_TransmitProcessors.end();
1009                         ++it ) {
1010                         count += (*it)->getPortCount();
1011                 }
1012         }
1013         return count;
1014 }
1015
1016 // TODO: implement a port map here, instead of the loop
1017
1018 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1019         int count=0;
1020         int prevcount=0;
1021
1022         if (direction == Port::E_Capture) {
1023                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1024                         it != m_ReceiveProcessors.end();
1025                         ++it ) {
1026                         count += (*it)->getPortCount();
1027                         if (count > idx) {
1028                                 return (*it)->getPortAtIdx(idx-prevcount);
1029                         }
1030                         prevcount=count;
1031                 }
1032         } else {
1033                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1034                         it != m_TransmitProcessors.end();
1035                         ++it ) {
1036                         count += (*it)->getPortCount();
1037                         if (count > idx) {
1038                                 return (*it)->getPortAtIdx(idx-prevcount);
1039                         }
1040                         prevcount=count;
1041                 }
1042         }
1043         return NULL;
1044 }
1045
1046 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1047     m_thread_realtime=rt;
1048     m_thread_priority=priority;
1049     return true;
1050 }
1051
1052
1053 } // end of namespace
Note: See TracBrowser for help on using the browser.