root/branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp

Revision 391, 35.2 kB (checked in by pieterpalmers, 16 years ago)

* Partially finished:

  • Introduce TimestampedBuffer? util class
  • replace interal ringbuffer of SP with timed ringbuffer

* Compiles & works

Line 
1 /* $Id$ */
2
3 /*
4  *   FreeBob Streaming API
5  *   FreeBob = Firewire (pro-)audio for linux
6  *
7  *   http://freebob.sf.net
8  *
9  *   Copyright (C) 2005,2006 Pieter Palmers <pieterpalmers@users.sourceforge.net>
10  *
11  *   This program is free software {} you can redistribute it and/or modify
12  *   it under the terms of the GNU General Public License as published by
13  *   the Free Software Foundation {} either version 2 of the License, or
14  *   (at your option) any later version.
15  *
16  *   This program is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY {} without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with this program {} if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  *
26  *
27  */
28
29 #include "StreamProcessorManager.h"
30 #include "StreamProcessor.h"
31 #include "Port.h"
32 #include <errno.h>
33 #include <assert.h>
34
35 #include "../libutil/PosixThread.h"
36
37 #include "libstreaming/cycletimer.h"
38
39 #define CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL 50
40
41 namespace FreebobStreaming {
42
43 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_NORMAL );
44
45 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int nb_buffers)
46         : m_SyncSource(NULL), m_nb_buffers(nb_buffers), m_period(period), m_xruns(0),
47         m_isoManager(0), m_nbperiods(0) {
48
49 }
50
51 StreamProcessorManager::~StreamProcessorManager() {
52         if (m_isoManager) delete m_isoManager;
53        
54 }
55
56 /**
57  * Registers \ref processor with this manager.
58  *
59  * also registers it with the isohandlermanager
60  *
61  * be sure to call isohandlermanager->init() first!
62  * and be sure that the processors are also ->init()'ed
63  *
64  * @param processor
65  * @return true if successfull
66  */
67 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
68 {
69         debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
70         assert(processor);
71         assert(m_isoManager);
72
73         if (processor->getType()==StreamProcessor::E_Receive) {
74                 processor->setVerboseLevel(getDebugLevel()); // inherit debug level
75                
76                 m_ReceiveProcessors.push_back(processor);
77                
78                 processor->setManager(this);
79                                
80                 return true;
81         }
82        
83         if (processor->getType()==StreamProcessor::E_Transmit) {
84                 processor->setVerboseLevel(getDebugLevel()); // inherit debug level
85                
86                 m_TransmitProcessors.push_back(processor);
87                
88                 processor->setManager(this);
89                
90                 return true;
91         }
92
93         debugFatal("Unsupported processor type!\n");
94        
95         return false;
96 }
97
98 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
99 {
100         debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
101         assert(processor);
102
103         if (processor->getType()==StreamProcessor::E_Receive) {
104
105                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
106                         it != m_ReceiveProcessors.end();
107                         ++it ) {
108
109                         if ( *it == processor ) {
110                                         m_ReceiveProcessors.erase(it);
111                                        
112                                         processor->clearManager();
113                                        
114                                         if(!m_isoManager->unregisterStream(processor)) {
115                                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
116                                                
117                                                 return false;
118                                                
119                                         }
120                                        
121                                         return true;
122                                 }
123                 }
124         }
125
126         if (processor->getType()==StreamProcessor::E_Transmit) {
127                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
128                         it != m_TransmitProcessors.end();
129                         ++it ) {
130
131                         if ( *it == processor ) {
132                                         m_TransmitProcessors.erase(it);
133                                        
134                                         processor->clearManager();
135                                        
136                                         if(!m_isoManager->unregisterStream(processor)) {
137                                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
138                                                
139                                                 return false;
140                                                
141                                         }
142                                        
143                                         return true;
144                                 }
145                 }
146         }
147        
148         debugFatal("Processor (%p) not found!\n",processor);
149
150         return false; //not found
151
152 }
153
154 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
155     m_SyncSource=s;
156     return true;
157 }
158
159 StreamProcessor *StreamProcessorManager::getSyncSource() {
160     return m_SyncSource;
161 }
162
163 bool StreamProcessorManager::init()
164 {
165         debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
166
167         // the tread that runs the StreamProcessor
168         // checking the period boundaries
169         int prio=m_thread_priority+5;
170         if (prio>98) prio=98;
171        
172         m_streamingThread=new FreebobUtil::PosixThread(this,
173            m_thread_realtime, prio,
174            PTHREAD_CANCEL_DEFERRED);
175            
176         if(!m_streamingThread) {
177                 debugFatal("Could not create streaming thread\n");
178                 return false;
179         }
180        
181         m_isoManager=new IsoHandlerManager(m_thread_realtime, m_thread_priority);
182        
183         if(!m_isoManager) {
184                 debugFatal("Could not create IsoHandlerManager\n");
185                 return false;
186         }
187        
188         // propagate the debug level
189         m_isoManager->setVerboseLevel(getDebugLevel());
190        
191         if(!m_isoManager->init()) {
192                 debugFatal("Could not initialize IsoHandlerManager\n");
193                 return false;
194         }
195        
196         m_xrun_happened=false;
197        
198         return true;
199 }
200
201 bool StreamProcessorManager::Init()
202 {
203     debugOutput( DEBUG_LEVEL_VERBOSE, "Initializing runner...\n");
204
205     // no xrun has occurred (yet)
206
207     return true;
208 }
209
210 bool StreamProcessorManager::prepare() {
211
212         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
213        
214         // if no sync source is set, select one here
215         if(m_SyncSource == NULL) {
216            debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
217         }
218        
219         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
220                 it != m_ReceiveProcessors.end();
221                 ++it ) {
222                         if(m_SyncSource == NULL) {
223                                 debugWarning(" => Sync Source is %p.\n", *it);
224                                 m_SyncSource = *it;
225                         }
226         }
227
228         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
229                 it != m_TransmitProcessors.end();
230                 ++it ) {
231                         if(m_SyncSource == NULL) {
232                                 debugWarning(" => Sync Source is %p.\n", *it);
233                                 m_SyncSource = *it;
234                         }
235         }
236
237         // now do the actual preparation
238         debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
239         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
240                 it != m_ReceiveProcessors.end();
241                 ++it ) {
242                         if(!(*it)->setSyncSource(m_SyncSource)) {
243                                 debugFatal(  " could not set sync source (%p)...\n",(*it));
244                                 return false;
245                         }
246                        
247                         if(!(*it)->prepare()) {
248                                 debugFatal(  " could not prepare (%p)...\n",(*it));
249                                 return false;
250                         }
251         }
252
253         debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
254         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
255                 it != m_TransmitProcessors.end();
256                 ++it ) {
257                         if(!(*it)->setSyncSource(m_SyncSource)) {
258                                 debugFatal(  " could not set sync source (%p)...\n",(*it));
259                                 return false;
260                         }               
261                         if(!(*it)->prepare()) {
262                                 debugFatal( " could not prepare (%p)...\n",(*it));
263                                 return false;
264                         }
265         }
266
267     // if there are no stream processors registered,
268     // fail
269     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
270         debugFatal("No stream processors registered, can't do anything usefull\n");
271         return false;
272     }
273
274         return true;
275 }
276
277 // FIXME: this can be removed
278 bool StreamProcessorManager::Execute()
279 {
280         // temp measure, polling
281         usleep(1000);
282
283         // FIXME: move this to an IsoHandlerManager sub-thread
284         // and make this private again in IHM
285         m_isoManager->updateCycleTimers();
286        
287         return true;
288 }
289
290 bool StreamProcessorManager::syncStartAll() {
291
292     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessor streams to start running...\n");
293     // we have to wait until all streamprocessors indicate that they are running
294     // i.e. that there is actually some data stream flowing
295     int wait_cycles=2000; // two seconds
296     bool notRunning=true;
297     while (notRunning && wait_cycles) {
298         wait_cycles--;
299         notRunning=false;
300        
301         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
302                 it != m_ReceiveProcessors.end();
303                 ++it ) {
304             if(!(*it)->isRunning()) notRunning=true;
305         }
306
307         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
308                 it != m_TransmitProcessors.end();
309                 ++it ) {
310             if(!(*it)->isRunning()) notRunning=true;
311         }
312         usleep(1000);
313         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Running check: %d\n",notRunning);
314     }
315
316     if(!wait_cycles) { // timout has occurred
317         debugFatal("One or more streams are not starting up (timeout):\n");
318                    
319         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
320                 it != m_ReceiveProcessors.end();
321                 ++it ) {
322             if(!(*it)->isRunning()) {
323                 debugFatal(" receive stream %p not running\n",*it);
324             } else {   
325                 debugFatal(" receive stream %p running\n",*it);
326             }
327         }
328
329         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
330                 it != m_TransmitProcessors.end();
331                 ++it ) {
332             if(!(*it)->isRunning()) {
333                 debugFatal(" transmit stream %p not running\n",*it);
334             } else {   
335                 debugFatal(" transmit stream %p running\n",*it);
336             }
337         }
338        
339         return false;
340     }
341
342     // we want to make sure that everything is running well,
343     // so wait for a while
344     usleep(USECS_PER_CYCLE * CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL);
345
346     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
347     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting StreamProcessors...\n");
348
349     // now we reset the frame counters
350     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
351             it != m_ReceiveProcessors.end();
352             ++it ) {
353
354         debugOutput( DEBUG_LEVEL_VERBOSE, "Before:\n");
355
356         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
357             (*it)->dumpInfo();
358         }
359
360         (*it)->reset();
361
362         debugOutput( DEBUG_LEVEL_VERBOSE, "After:\n");
363
364         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
365             (*it)->dumpInfo();
366         }
367
368     }
369    
370     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
371             it != m_TransmitProcessors.end();
372             ++it ) {
373            
374         debugOutput( DEBUG_LEVEL_VERBOSE, "Before:\n");
375        
376         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
377             (*it)->dumpInfo();
378         }
379        
380         (*it)->reset();
381        
382         debugOutput( DEBUG_LEVEL_VERBOSE, "After:\n");
383        
384         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
385             (*it)->dumpInfo();
386         }
387     }
388        
389     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n");
390    
391     uint64_t now=m_SyncSource->getTimeNow(); // fixme: should be in usecs, not ticks
392    
393     // FIXME: this should not be in cycles, but in 'time'
394     unsigned int enable_at=TICKS_TO_CYCLES(now)+2000;
395     if (enable_at > 8000) enable_at -= 8000;
396
397     debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor...\n");
398     if (!m_SyncSource->prepareForEnable()) {
399             debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n");
400         return false;
401     }
402
403     m_SyncSource->enable(enable_at);
404
405     debugOutput( DEBUG_LEVEL_VERBOSE, " All StreamProcessors...\n");
406     if (!enableStreamProcessors(enable_at)) {
407         debugFatal("Could not enable StreamProcessors...\n");
408         return false;
409     }
410
411     return true;
412 }
413
414 bool StreamProcessorManager::start() {
415         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
416         assert(m_isoManager);
417        
418         debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
419         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
420         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
421                 it != m_ReceiveProcessors.end();
422                 ++it ) {
423                         if (!(*it)->prepareForStart()) {
424                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Receive stream processor (%p) failed to prepare for start\n", *it);
425                                 return false;
426                         }
427                         if (!m_isoManager->registerStream(*it)) {
428                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
429                                 return false;
430                         }
431                 }
432
433         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
434         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
435                 it != m_TransmitProcessors.end();
436                 ++it ) {
437                         if (!(*it)->prepareForStart()) {
438                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Transmit stream processor (%p) failed to prepare for start\n", *it);
439                                 return false;
440                         }
441                         if (!m_isoManager->registerStream(*it)) {
442                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
443                                 return false;
444                         }
445                 }
446
447         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
448         if (!m_isoManager->prepare()) {
449                 debugFatal("Could not prepare isoManager\n");
450                 return false;
451         }
452
453         debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
454         if (!disableStreamProcessors()) {
455                 debugFatal("Could not disable StreamProcessors...\n");
456                 return false;
457         }
458                
459         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n");
460         if (!m_isoManager->startHandlers(0)) {
461                 debugFatal("Could not start handlers...\n");
462                 return false;
463         }
464        
465         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting streaming threads...\n");
466
467         // start the runner thread
468         // FIXME: not used anymore (for updatecycletimers ATM, but that's not good)
469         m_streamingThread->Start();
470
471         // start all SP's synchonized
472         if (!syncStartAll()) {
473                 debugFatal("Could not syncStartAll...\n");
474                 return false;
475         }
476        
477         // dump the iso stream information when in verbose mode
478         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
479                 m_isoManager->dumpInfo();
480         }
481        
482         return true;
483        
484 }
485
486 bool StreamProcessorManager::stop() {
487         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
488         assert(m_isoManager);
489         assert(m_streamingThread);
490
491         debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to prepare to stop...\n");
492         // Most stream processors can just stop without special treatment.  However, some
493         // (like the MOTU) need to do a few things before it's safe to turn off the iso
494         // handling.
495         int wait_cycles=2000; // two seconds ought to be sufficient
496         bool allReady = false;
497         while (!allReady && wait_cycles) {
498                 wait_cycles--;
499                 allReady = true;
500                
501                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
502                         it != m_ReceiveProcessors.end();
503                         ++it ) {
504                         if(!(*it)->prepareForStop()) allReady = false;
505                 }
506        
507                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
508                         it != m_TransmitProcessors.end();
509                         ++it ) {
510                         if(!(*it)->prepareForStop()) allReady = false;
511                 }
512                 usleep(1000);
513         }
514
515
516         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping threads...\n");
517        
518         m_streamingThread->Stop();
519        
520         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n");
521         if(!m_isoManager->stopHandlers()) {
522            debugFatal("Could not stop ISO handlers\n");
523            return false;
524         }
525        
526         debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processors from handlers...\n");
527     // now unregister all streams from iso manager
528         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
529         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
530                 it != m_ReceiveProcessors.end();
531                 ++it ) {
532                         if (!m_isoManager->unregisterStream(*it)) {
533                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it);
534                                 return false;
535                         }
536                        
537                 }
538
539         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
540         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
541                 it != m_TransmitProcessors.end();
542                 ++it ) {
543                         if (!m_isoManager->unregisterStream(*it)) {
544                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it);
545                                 return false;
546                         }
547                        
548                 }
549        
550         return true;
551        
552 }
553
554 /**
555  * Enables the registered StreamProcessors
556  * @return true if successful, false otherwise
557  */
558 bool StreamProcessorManager::enableStreamProcessors(unsigned int time_to_enable_at) {
559     // we prepare the streamprocessors for enable
560     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
561             it != m_ReceiveProcessors.end();
562             ++it ) {           
563         (*it)->prepareForEnable();
564     }
565
566     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
567             it != m_TransmitProcessors.end();
568             ++it ) {
569         (*it)->prepareForEnable();
570     }
571
572     // then we enable the streamprocessors
573     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
574             it != m_ReceiveProcessors.end();
575             ++it ) {           
576         (*it)->enable(time_to_enable_at);
577     }
578
579     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
580             it != m_TransmitProcessors.end();
581             ++it ) {
582         (*it)->enable(time_to_enable_at);
583     }
584
585     // now we wait for the SP's to get enabled
586     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be enabled...\n");
587     // we have to wait until all streamprocessors indicate that they are running
588     // i.e. that there is actually some data stream flowing
589     int wait_cycles=2000; // two seconds
590     bool notEnabled=true;
591     while (notEnabled && wait_cycles) {
592         wait_cycles--;
593         notEnabled=false;
594        
595         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
596                 it != m_ReceiveProcessors.end();
597                 ++it ) {
598             if(!(*it)->isEnabled()) notEnabled=true;
599         }
600
601         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
602                 it != m_TransmitProcessors.end();
603                 ++it ) {
604             if(!(*it)->isEnabled()) notEnabled=true;
605         }
606         usleep(1000); // one cycle
607     }
608    
609     if(!wait_cycles) { // timout has occurred
610         debugFatal("One or more streams couldn't be enabled (timeout):\n");
611                    
612         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
613                 it != m_ReceiveProcessors.end();
614                 ++it ) {
615             if(!(*it)->isEnabled()) {
616                     debugFatal(" receive stream %p not enabled\n",*it);
617             } else {   
618                     debugFatal(" receive stream %p enabled\n",*it);
619             }
620         }
621    
622         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
623                 it != m_TransmitProcessors.end();
624                 ++it ) {
625             if(!(*it)->isEnabled()) {
626                     debugFatal(" transmit stream %p not enabled\n",*it);
627             } else {   
628                     debugFatal(" transmit stream %p enabled\n",*it);
629             }
630         }
631         return false;
632     }
633    
634     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors enabled...\n");
635
636     return true;
637 }
638
639 /**
640  * Disables the registered StreamProcessors
641  * @return true if successful, false otherwise
642  */
643 bool StreamProcessorManager::disableStreamProcessors() {
644     // we prepare the streamprocessors for disable
645     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
646             it != m_ReceiveProcessors.end();
647             ++it ) {
648         (*it)->prepareForDisable();
649     }
650
651     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
652             it != m_TransmitProcessors.end();
653             ++it ) {
654         (*it)->prepareForDisable();
655     }
656
657     // then we disable the streamprocessors
658     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
659             it != m_ReceiveProcessors.end();
660             ++it ) {
661         (*it)->disable();
662     }
663
664     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
665             it != m_TransmitProcessors.end();
666             ++it ) {
667         (*it)->disable();
668     }
669
670     // now we wait for the SP's to get disabled
671     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be disabled...\n");
672     // we have to wait until all streamprocessors indicate that they are running
673     // i.e. that there is actually some data stream flowing
674     int wait_cycles=2000; // two seconds
675     bool enabled=true;
676     while (enabled && wait_cycles) {
677         wait_cycles--;
678         enabled=false;
679        
680         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
681                 it != m_ReceiveProcessors.end();
682                 ++it ) {
683             if((*it)->isEnabled()) enabled=true;
684         }
685
686         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
687                 it != m_TransmitProcessors.end();
688                 ++it ) {
689             if((*it)->isEnabled()) enabled=true;
690         }
691         usleep(1000); // one cycle
692     }
693    
694     if(!wait_cycles) { // timout has occurred
695         debugFatal("One or more streams couldn't be disabled (timeout):\n");
696                    
697         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
698                 it != m_ReceiveProcessors.end();
699                 ++it ) {
700             if(!(*it)->isEnabled()) {
701                     debugFatal(" receive stream %p not enabled\n",*it);
702             } else {   
703                     debugFatal(" receive stream %p enabled\n",*it);
704             }
705         }
706    
707         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
708                 it != m_TransmitProcessors.end();
709                 ++it ) {
710             if(!(*it)->isEnabled()) {
711                     debugFatal(" transmit stream %p not enabled\n",*it);
712             } else {   
713                     debugFatal(" transmit stream %p enabled\n",*it);
714             }
715         }
716         return false;
717     }
718    
719     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors disabled...\n");
720        
721     return true;
722 }
723
724 /**
725  * Called upon Xrun events. This brings all StreamProcessors back
726  * into their starting state, and then carries on streaming. This should
727  * have the same effect as restarting the whole thing.
728  *
729  * @return true if successful, false otherwise
730  */
731 bool StreamProcessorManager::handleXrun() {
732
733         debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
734
735         /*
736          * Reset means:
737          * 1) Disabling the SP's, so that they don't process any packets
738          *    note: the isomanager does keep on delivering/requesting them
739          * 2) Bringing all buffers & streamprocessors into a know state
740          *    - Clear all capture buffers
741          *    - Put nb_periods*period_size of null frames into the playback buffers
742          * 3) Re-enable the SP's
743          */
744         debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
745         if (!disableStreamProcessors()) {
746                 debugFatal("Could not disable StreamProcessors...\n");
747                 return false;
748         }
749
750         debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
751         // start all SP's synchonized
752         if (!syncStartAll()) {
753                 debugFatal("Could not syncStartAll...\n");
754                 return false;
755         }
756
757         debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
758        
759         return true;
760 }
761
762 /**
763  * @brief Waits until the next period of samples is ready
764  *
765  * This function does not return until a full period of samples is (or should be)
766  * ready to be transferred.
767  *
768  * @return true if the period is ready, false if an xrun occurred
769  */
770 bool StreamProcessorManager::waitForPeriod() {
771     int time_till_next_period;
772     bool xrun_occurred=false;
773        
774     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
775
776     assert(m_SyncSource);
777    
778     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodUsecs();
779    
780     while(time_till_next_period > 0) {
781         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
782    
783         // wait for the period
784         usleep(time_till_next_period);
785        
786         // check for underruns on the ISO side,
787         // those should make us bail out of the wait loop
788         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
789             it != m_ReceiveProcessors.end();
790             ++it ) {
791             // a xrun has occurred on the Iso side
792             xrun_occurred |= (*it)->xrunOccurred();
793         }
794         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
795             it != m_TransmitProcessors.end();
796             ++it ) {
797             // a xrun has occurred on the Iso side
798             xrun_occurred |= (*it)->xrunOccurred();
799         }
800
801         // check if we were waked up too soon
802         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodUsecs();
803     }
804    
805     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", -time_till_next_period);
806    
807     // this is to notify the client of the delay
808     // that we introduced
809     m_delayed_usecs=time_till_next_period;
810    
811     // we save the 'ideal' time of the transfer at this point,
812     // because we can have interleaved read - process - write
813     // cycles making that we modify a receiving stream's buffer
814     // before we get to writing.
815     // NOTE: before waitForPeriod() is called again, both the transmit
816     //       and the receive processors should have done their transfer.
817     m_time_of_transfer=m_SyncSource->getTimeAtPeriod();
818     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
819         m_time_of_transfer);
820    
821     xrun_occurred=false;
822    
823     // check if xruns occurred on the Iso side.
824     // also check if xruns will occur should we transfer() now
825    
826     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
827           it != m_ReceiveProcessors.end();
828           ++it ) {
829         // a xrun has occurred on the Iso side
830         xrun_occurred |= (*it)->xrunOccurred();
831        
832         // if this is true, a xrun will occur
833         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
834        
835 #ifdef DEBUG
836         if ((*it)->xrunOccurred()) {
837             debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it);
838             (*it)->dumpInfo();
839         }
840         if (!((*it)->canClientTransferFrames(m_period))) {
841             debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it);
842             (*it)->dumpInfo();
843         }
844 #endif
845        
846     }
847     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
848           it != m_TransmitProcessors.end();
849           ++it ) {
850         // a xrun has occurred on the Iso side
851         xrun_occurred |= (*it)->xrunOccurred();
852        
853         // if this is true, a xrun will occur
854         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
855        
856 #ifdef DEBUG
857         if ((*it)->xrunOccurred()) {
858             debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it);
859         }
860         if (!((*it)->canClientTransferFrames(m_period))) {
861             debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it);
862         }
863 #endif       
864     }
865    
866     m_nbperiods++;
867    
868     // now we can signal the client that we are (should be) ready
869     return !xrun_occurred;
870 }
871
872 /**
873  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
874  *
875  * Transfers one period of frames from the client side to the Iso side and vice versa.
876  *
877  * @return true if successful, false otherwise (indicates xrun).
878  */
879 bool StreamProcessorManager::transfer() {
880    
881     debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n");
882
883     if (!transfer(StreamProcessor::E_Receive)) return false;
884     if (!transfer(StreamProcessor::E_Transmit)) return false;
885
886     return true;
887 }
888
889 /**
890  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
891  *
892  * Transfers one period of frames from the client side to the Iso side or vice versa.
893  *
894  * @param t The processor type to tranfer for (receive or transmit)
895  * @return true if successful, false otherwise (indicates xrun).
896  */
897
898 bool StreamProcessorManager::transfer(enum StreamProcessor::EProcessorType t) {
899     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
900    
901     // a static cast could make sure that there is no performance
902     // penalty for the virtual functions (to be checked)
903     if (t==StreamProcessor::E_Receive) {
904         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
905                 it != m_ReceiveProcessors.end();
906                 ++it ) {
907            
908             //#ifdef DEBUG
909             #if 0
910             {
911                 uint64_t ts_tail=0;
912                 uint64_t fc_tail=0;
913                
914                 uint64_t ts_head=0;
915                 uint64_t fc_head=0;
916                
917                 int cnt=0;
918                
919                 (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
920                 (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
921                
922                 while((fc_head != fc_tail) && (cnt++ < 10)) {
923                     (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
924                     (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
925                 }
926                
927                 debugOutput(DEBUG_LEVEL_VERBOSE,"R => HEAD: TS=%11llu, FC=%5llu | TAIL: TS=%11llu, FC=%5llu, %d\n",
928                     ts_head, fc_head, ts_tail, fc_tail, cnt);
929             }
930             #endif
931    
932             if(!(*it)->getFrames(m_period)) {
933                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)",
934                             m_period, m_time_of_transfer,*it);
935                     return false; // buffer underrun
936             }
937            
938             //#ifdef DEBUG
939             #if 0
940             {
941                 uint64_t ts_tail=0;
942                 uint64_t fc_tail=0;
943                
944                 uint64_t ts_head=0;
945                 uint64_t fc_head=0;
946                
947                 int cnt=0;
948                
949                 (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
950                 (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
951            
952                 while((fc_head != fc_tail) && (cnt++ < 10)) {
953                     (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
954                     (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
955                 }
956                
957                 debugOutput(DEBUG_LEVEL_VERBOSE,"R  > HEAD: TS=%11llu, FC=%5llu | TAIL: TS=%11llu, FC=%5llu, %d\n",
958                     ts_head, fc_head, ts_tail, fc_tail, cnt);
959             }
960             #endif
961    
962         }
963     } else {
964         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
965                 it != m_TransmitProcessors.end();
966                 ++it ) {
967                
968             //#ifdef DEBUG
969             #if 0
970             {
971                 uint64_t ts_tail=0;
972                 uint64_t fc_tail=0;
973                
974                 uint64_t ts_head=0;
975                 uint64_t fc_head=0;
976                
977                 int cnt=0;
978                
979                 (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
980                 (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
981                
982                 while((fc_head != fc_tail) && (cnt++ < 10)) {
983                     (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
984                     (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
985                 }
986                
987                 debugOutput(DEBUG_LEVEL_VERBOSE,"T => HEAD: TS=%11llu, FC=%5llu | TAIL: TS=%11llu, FC=%5llu, %d\n",
988                     ts_head, fc_head, ts_tail, fc_tail, cnt);
989             }
990             #endif
991                
992             if(!(*it)->putFrames(m_period, (int64_t)m_time_of_transfer)) {
993                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)",
994                         m_period, m_time_of_transfer, *it);
995                 return false; // buffer overrun
996             }
997            
998             //#ifdef DEBUG
999             #if 0
1000             {
1001                 uint64_t ts_tail=0;
1002                 uint64_t fc_tail=0;
1003                
1004                 uint64_t ts_head=0;
1005                 uint64_t fc_head=0;
1006                
1007                 int cnt=0;
1008                
1009                 (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
1010                 (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
1011            
1012                 while((fc_head != fc_tail) && (cnt++ < 10)) {
1013                     (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
1014                     (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
1015                 }
1016                
1017                 debugOutput(DEBUG_LEVEL_VERBOSE,"T  > HEAD: TS=%11llu, FC=%5llu | TAIL: TS=%11llu, FC=%5llu, %d\n",
1018                     ts_head, fc_head, ts_tail, fc_tail, cnt);
1019             }
1020             #endif
1021         }
1022     }
1023
1024     return true;
1025 }
1026
1027 void StreamProcessorManager::dumpInfo() {
1028         debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1029         debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
1030         debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
1031
1032         debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
1033         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1034                 it != m_ReceiveProcessors.end();
1035                 ++it ) {
1036                 (*it)->dumpInfo();
1037         }
1038
1039         debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
1040         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1041                 it != m_TransmitProcessors.end();
1042                 ++it ) {
1043                 (*it)->dumpInfo();
1044         }
1045
1046         debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
1047         m_isoManager->dumpInfo();
1048         debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1049
1050 }
1051
1052 void StreamProcessorManager::setVerboseLevel(int l) {
1053         setDebugLevel(l);
1054
1055         if (m_isoManager) m_isoManager->setVerboseLevel(l);
1056
1057         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
1058         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1059                 it != m_ReceiveProcessors.end();
1060                 ++it ) {
1061                 (*it)->setVerboseLevel(l);
1062         }
1063
1064         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
1065         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1066                 it != m_TransmitProcessors.end();
1067                 ++it ) {
1068                 (*it)->setVerboseLevel(l);
1069         }
1070 }
1071
1072
1073 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1074         int count=0;
1075
1076         if (direction == Port::E_Capture) {
1077                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1078                         it != m_ReceiveProcessors.end();
1079                         ++it ) {
1080                         count += (*it)->getPortCount(type);
1081                 }
1082         } else {
1083                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1084                         it != m_TransmitProcessors.end();
1085                         ++it ) {
1086                         count += (*it)->getPortCount(type);
1087                 }
1088         }
1089         return count;
1090 }
1091
1092 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1093         int count=0;
1094
1095         if (direction == Port::E_Capture) {
1096                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1097                         it != m_ReceiveProcessors.end();
1098                         ++it ) {
1099                         count += (*it)->getPortCount();
1100                 }
1101         } else {
1102                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1103                         it != m_TransmitProcessors.end();
1104                         ++it ) {
1105                         count += (*it)->getPortCount();
1106                 }
1107         }
1108         return count;
1109 }
1110
1111 // TODO: implement a port map here, instead of the loop
1112
1113 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1114         int count=0;
1115         int prevcount=0;
1116
1117         if (direction == Port::E_Capture) {
1118                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1119                         it != m_ReceiveProcessors.end();
1120                         ++it ) {
1121                         count += (*it)->getPortCount();
1122                         if (count > idx) {
1123                                 return (*it)->getPortAtIdx(idx-prevcount);
1124                         }
1125                         prevcount=count;
1126                 }
1127         } else {
1128                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1129                         it != m_TransmitProcessors.end();
1130                         ++it ) {
1131                         count += (*it)->getPortCount();
1132                         if (count > idx) {
1133                                 return (*it)->getPortAtIdx(idx-prevcount);
1134                         }
1135                         prevcount=count;
1136                 }
1137         }
1138         return NULL;
1139 }
1140
1141 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1142     m_thread_realtime=rt;
1143     m_thread_priority=priority;
1144     return true;
1145 }
1146
1147
1148 } // end of namespace
Note: See TracBrowser for help on using the browser.