root/branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp

Revision 390, 35.1 kB (checked in by pieterpalmers, 17 years ago)

* working version of SYT based AMDTP receive and transmit.

Still has to be tuned to work with low buffer sizes.

Line 
1 /* $Id$ */
2
3 /*
4  *   FreeBob Streaming API
5  *   FreeBob = Firewire (pro-)audio for linux
6  *
7  *   http://freebob.sf.net
8  *
9  *   Copyright (C) 2005,2006 Pieter Palmers <pieterpalmers@users.sourceforge.net>
10  *
11  *   This program is free software {} you can redistribute it and/or modify
12  *   it under the terms of the GNU General Public License as published by
13  *   the Free Software Foundation {} either version 2 of the License, or
14  *   (at your option) any later version.
15  *
16  *   This program is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY {} without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with this program {} if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  *
26  *
27  */
28
29 #include "StreamProcessorManager.h"
30 #include "StreamProcessor.h"
31 #include "Port.h"
32 #include <errno.h>
33 #include <assert.h>
34
35 #include "../libutil/PosixThread.h"
36
37 #include "libstreaming/cycletimer.h"
38
39 #define CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL 50
40
41 namespace FreebobStreaming {
42
43 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_NORMAL );
44
45 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int nb_buffers)
46         : m_SyncSource(NULL), m_nb_buffers(nb_buffers), m_period(period), m_xruns(0),
47         m_isoManager(0), m_nbperiods(0) {
48
49 }
50
51 StreamProcessorManager::~StreamProcessorManager() {
52         if (m_isoManager) delete m_isoManager;
53        
54 }
55
56 /**
57  * Registers \ref processor with this manager.
58  *
59  * also registers it with the isohandlermanager
60  *
61  * be sure to call isohandlermanager->init() first!
62  * and be sure that the processors are also ->init()'ed
63  *
64  * @param processor
65  * @return true if successfull
66  */
67 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
68 {
69         debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
70         assert(processor);
71         assert(m_isoManager);
72
73         if (processor->getType()==StreamProcessor::E_Receive) {
74                 processor->setVerboseLevel(getDebugLevel()); // inherit debug level
75                
76                 m_ReceiveProcessors.push_back(processor);
77                
78                 processor->setManager(this);
79                                
80                 return true;
81         }
82        
83         if (processor->getType()==StreamProcessor::E_Transmit) {
84                 processor->setVerboseLevel(getDebugLevel()); // inherit debug level
85                
86                 m_TransmitProcessors.push_back(processor);
87                
88                 processor->setManager(this);
89                
90                 return true;
91         }
92
93         debugFatal("Unsupported processor type!\n");
94        
95         return false;
96 }
97
98 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
99 {
100         debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
101         assert(processor);
102
103         if (processor->getType()==StreamProcessor::E_Receive) {
104
105                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
106                         it != m_ReceiveProcessors.end();
107                         ++it ) {
108
109                         if ( *it == processor ) {
110                                         m_ReceiveProcessors.erase(it);
111                                        
112                                         processor->clearManager();
113                                        
114                                         if(!m_isoManager->unregisterStream(processor)) {
115                                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
116                                                
117                                                 return false;
118                                                
119                                         }
120                                        
121                                         return true;
122                                 }
123                 }
124         }
125
126         if (processor->getType()==StreamProcessor::E_Transmit) {
127                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
128                         it != m_TransmitProcessors.end();
129                         ++it ) {
130
131                         if ( *it == processor ) {
132                                         m_TransmitProcessors.erase(it);
133                                        
134                                         processor->clearManager();
135                                        
136                                         if(!m_isoManager->unregisterStream(processor)) {
137                                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
138                                                
139                                                 return false;
140                                                
141                                         }
142                                        
143                                         return true;
144                                 }
145                 }
146         }
147        
148         debugFatal("Processor (%p) not found!\n",processor);
149
150         return false; //not found
151
152 }
153
154 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
155     m_SyncSource=s;
156     return true;
157 }
158
159 StreamProcessor *StreamProcessorManager::getSyncSource() {
160     return m_SyncSource;
161 }
162
163 bool StreamProcessorManager::init()
164 {
165         debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
166
167         // the tread that runs the StreamProcessor
168         // checking the period boundaries
169         int prio=m_thread_priority+5;
170         if (prio>98) prio=98;
171        
172         m_streamingThread=new FreebobUtil::PosixThread(this,
173            m_thread_realtime, prio,
174            PTHREAD_CANCEL_DEFERRED);
175            
176         if(!m_streamingThread) {
177                 debugFatal("Could not create streaming thread\n");
178                 return false;
179         }
180        
181         m_isoManager=new IsoHandlerManager(m_thread_realtime, m_thread_priority);
182        
183         if(!m_isoManager) {
184                 debugFatal("Could not create IsoHandlerManager\n");
185                 return false;
186         }
187        
188         // propagate the debug level
189         m_isoManager->setVerboseLevel(getDebugLevel());
190        
191         if(!m_isoManager->init()) {
192                 debugFatal("Could not initialize IsoHandlerManager\n");
193                 return false;
194         }
195        
196         m_xrun_happened=false;
197        
198         return true;
199 }
200
201 bool StreamProcessorManager::Init()
202 {
203     debugOutput( DEBUG_LEVEL_VERBOSE, "Initializing runner...\n");
204
205     // no xrun has occurred (yet)
206
207     return true;
208 }
209
210 bool StreamProcessorManager::prepare() {
211
212         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
213        
214         // if no sync source is set, select one here
215         if(m_SyncSource == NULL) {
216            debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
217         }
218        
219         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
220                 it != m_ReceiveProcessors.end();
221                 ++it ) {
222                         if(m_SyncSource == NULL) {
223                                 debugWarning(" => Sync Source is %p.\n", *it);
224                                 m_SyncSource = *it;
225                         }
226         }
227
228         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
229                 it != m_TransmitProcessors.end();
230                 ++it ) {
231                         if(m_SyncSource == NULL) {
232                                 debugWarning(" => Sync Source is %p.\n", *it);
233                                 m_SyncSource = *it;
234                         }
235         }
236
237         // now do the actual preparation
238         debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
239         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
240                 it != m_ReceiveProcessors.end();
241                 ++it ) {
242                         if(!(*it)->setSyncSource(m_SyncSource)) {
243                                 debugFatal(  " could not set sync source (%p)...\n",(*it));
244                                 return false;
245                         }
246                        
247                         if(!(*it)->prepare()) {
248                                 debugFatal(  " could not prepare (%p)...\n",(*it));
249                                 return false;
250                         }
251         }
252
253         debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
254         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
255                 it != m_TransmitProcessors.end();
256                 ++it ) {
257                         if(!(*it)->setSyncSource(m_SyncSource)) {
258                                 debugFatal(  " could not set sync source (%p)...\n",(*it));
259                                 return false;
260                         }               
261                         if(!(*it)->prepare()) {
262                                 debugFatal( " could not prepare (%p)...\n",(*it));
263                                 return false;
264                         }
265         }
266
267     // if there are no stream processors registered,
268     // fail
269     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
270         debugFatal("No stream processors registered, can't do anything usefull\n");
271         return false;
272     }
273
274         return true;
275 }
276
277 // FIXME: this can be removed
278 bool StreamProcessorManager::Execute()
279 {
280         // temp measure, polling
281         usleep(1000);
282
283         // FIXME: move this to an IsoHandlerManager sub-thread
284         // and make this private again in IHM
285         m_isoManager->updateCycleTimers();
286        
287         return true;
288 }
289
290 bool StreamProcessorManager::syncStartAll() {
291
292     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessor streams to start running...\n");
293     // we have to wait until all streamprocessors indicate that they are running
294     // i.e. that there is actually some data stream flowing
295     int wait_cycles=2000; // two seconds
296     bool notRunning=true;
297     while (notRunning && wait_cycles) {
298         wait_cycles--;
299         notRunning=false;
300        
301         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
302                 it != m_ReceiveProcessors.end();
303                 ++it ) {
304             if(!(*it)->isRunning()) notRunning=true;
305         }
306
307         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
308                 it != m_TransmitProcessors.end();
309                 ++it ) {
310             if(!(*it)->isRunning()) notRunning=true;
311         }
312         usleep(1000);
313         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Running check: %d\n",notRunning);
314     }
315
316     if(!wait_cycles) { // timout has occurred
317         debugFatal("One or more streams are not starting up (timeout):\n");
318                    
319         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
320                 it != m_ReceiveProcessors.end();
321                 ++it ) {
322             if(!(*it)->isRunning()) {
323                 debugFatal(" receive stream %p not running\n",*it);
324             } else {   
325                 debugFatal(" receive stream %p running\n",*it);
326             }
327         }
328
329         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
330                 it != m_TransmitProcessors.end();
331                 ++it ) {
332             if(!(*it)->isRunning()) {
333                 debugFatal(" transmit stream %p not running\n",*it);
334             } else {   
335                 debugFatal(" transmit stream %p running\n",*it);
336             }
337         }
338        
339         return false;
340     }
341
342     // we want to make sure that everything is running well,
343     // so wait for a while
344     usleep(USECS_PER_CYCLE * CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL);
345
346     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
347     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting StreamProcessors...\n");
348
349     // now we reset the frame counters
350     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
351             it != m_ReceiveProcessors.end();
352             ++it ) {
353
354         debugOutput( DEBUG_LEVEL_VERBOSE, "Before:\n");
355
356         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
357             (*it)->dumpInfo();
358         }
359
360         (*it)->reset();
361
362         debugOutput( DEBUG_LEVEL_VERBOSE, "After:\n");
363
364         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
365             (*it)->dumpInfo();
366         }
367
368     }
369    
370     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
371             it != m_TransmitProcessors.end();
372             ++it ) {
373            
374         debugOutput( DEBUG_LEVEL_VERBOSE, "Before:\n");
375        
376         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
377             (*it)->dumpInfo();
378         }
379        
380         (*it)->reset();
381        
382         debugOutput( DEBUG_LEVEL_VERBOSE, "After:\n");
383        
384         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
385             (*it)->dumpInfo();
386         }
387     }
388        
389     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n");
390    
391     uint64_t now=m_SyncSource->getTimeNow(); // fixme: should be in usecs, not ticks
392    
393     // FIXME: this should not be in cycles, but in 'time'
394     unsigned int enable_at=TICKS_TO_CYCLES(now)+300;
395        
396     debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor...\n");
397     if (!m_SyncSource->prepareForEnable()) {
398             debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n");
399         return false;
400     }
401
402     m_SyncSource->enable(enable_at);
403
404     debugOutput( DEBUG_LEVEL_VERBOSE, " All StreamProcessors...\n");
405     if (!enableStreamProcessors(enable_at)) {
406         debugFatal("Could not enable StreamProcessors...\n");
407         return false;
408     }
409
410     return true;
411 }
412
413 bool StreamProcessorManager::start() {
414         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
415         assert(m_isoManager);
416        
417         debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
418         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
419         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
420                 it != m_ReceiveProcessors.end();
421                 ++it ) {
422                         if (!(*it)->prepareForStart()) {
423                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Receive stream processor (%p) failed to prepare for start\n", *it);
424                                 return false;
425                         }
426                         if (!m_isoManager->registerStream(*it)) {
427                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
428                                 return false;
429                         }
430                 }
431
432         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
433         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
434                 it != m_TransmitProcessors.end();
435                 ++it ) {
436                         if (!(*it)->prepareForStart()) {
437                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Transmit stream processor (%p) failed to prepare for start\n", *it);
438                                 return false;
439                         }
440                         if (!m_isoManager->registerStream(*it)) {
441                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
442                                 return false;
443                         }
444                 }
445
446         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
447         if (!m_isoManager->prepare()) {
448                 debugFatal("Could not prepare isoManager\n");
449                 return false;
450         }
451
452         debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
453         if (!disableStreamProcessors()) {
454                 debugFatal("Could not disable StreamProcessors...\n");
455                 return false;
456         }
457                
458         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n");
459         if (!m_isoManager->startHandlers(0)) {
460                 debugFatal("Could not start handlers...\n");
461                 return false;
462         }
463        
464         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting streaming threads...\n");
465
466         // start the runner thread
467         // FIXME: not used anymore (for updatecycletimers ATM, but that's not good)
468         m_streamingThread->Start();
469
470         // start all SP's synchonized
471         if (!syncStartAll()) {
472                 debugFatal("Could not syncStartAll...\n");
473                 return false;
474         }
475        
476         // dump the iso stream information when in verbose mode
477         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
478                 m_isoManager->dumpInfo();
479         }
480        
481         return true;
482        
483 }
484
485 bool StreamProcessorManager::stop() {
486         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
487         assert(m_isoManager);
488         assert(m_streamingThread);
489
490         debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to prepare to stop...\n");
491         // Most stream processors can just stop without special treatment.  However, some
492         // (like the MOTU) need to do a few things before it's safe to turn off the iso
493         // handling.
494         int wait_cycles=2000; // two seconds ought to be sufficient
495         bool allReady = false;
496         while (!allReady && wait_cycles) {
497                 wait_cycles--;
498                 allReady = true;
499                
500                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
501                         it != m_ReceiveProcessors.end();
502                         ++it ) {
503                         if(!(*it)->prepareForStop()) allReady = false;
504                 }
505        
506                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
507                         it != m_TransmitProcessors.end();
508                         ++it ) {
509                         if(!(*it)->prepareForStop()) allReady = false;
510                 }
511                 usleep(1000);
512         }
513
514
515         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping threads...\n");
516        
517         m_streamingThread->Stop();
518        
519         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n");
520         if(!m_isoManager->stopHandlers()) {
521            debugFatal("Could not stop ISO handlers\n");
522            return false;
523         }
524        
525         debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processors from handlers...\n");
526     // now unregister all streams from iso manager
527         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
528         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
529                 it != m_ReceiveProcessors.end();
530                 ++it ) {
531                         if (!m_isoManager->unregisterStream(*it)) {
532                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it);
533                                 return false;
534                         }
535                        
536                 }
537
538         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
539         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
540                 it != m_TransmitProcessors.end();
541                 ++it ) {
542                         if (!m_isoManager->unregisterStream(*it)) {
543                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it);
544                                 return false;
545                         }
546                        
547                 }
548        
549         return true;
550        
551 }
552
553 /**
554  * Enables the registered StreamProcessors
555  * @return true if successful, false otherwise
556  */
557 bool StreamProcessorManager::enableStreamProcessors(unsigned int time_to_enable_at) {
558     // we prepare the streamprocessors for enable
559     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
560             it != m_ReceiveProcessors.end();
561             ++it ) {           
562         (*it)->prepareForEnable();
563     }
564
565     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
566             it != m_TransmitProcessors.end();
567             ++it ) {
568         (*it)->prepareForEnable();
569     }
570
571     // then we enable the streamprocessors
572     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
573             it != m_ReceiveProcessors.end();
574             ++it ) {           
575         (*it)->enable(time_to_enable_at);
576     }
577
578     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
579             it != m_TransmitProcessors.end();
580             ++it ) {
581         (*it)->enable(time_to_enable_at);
582     }
583
584     // now we wait for the SP's to get enabled
585     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be enabled...\n");
586     // we have to wait until all streamprocessors indicate that they are running
587     // i.e. that there is actually some data stream flowing
588     int wait_cycles=2000; // two seconds
589     bool notEnabled=true;
590     while (notEnabled && wait_cycles) {
591         wait_cycles--;
592         notEnabled=false;
593        
594         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
595                 it != m_ReceiveProcessors.end();
596                 ++it ) {
597             if(!(*it)->isEnabled()) notEnabled=true;
598         }
599
600         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
601                 it != m_TransmitProcessors.end();
602                 ++it ) {
603             if(!(*it)->isEnabled()) notEnabled=true;
604         }
605         usleep(1000); // one cycle
606     }
607    
608     if(!wait_cycles) { // timout has occurred
609         debugFatal("One or more streams couldn't be enabled (timeout):\n");
610                    
611         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
612                 it != m_ReceiveProcessors.end();
613                 ++it ) {
614             if(!(*it)->isEnabled()) {
615                     debugFatal(" receive stream %p not enabled\n",*it);
616             } else {   
617                     debugFatal(" receive stream %p enabled\n",*it);
618             }
619         }
620    
621         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
622                 it != m_TransmitProcessors.end();
623                 ++it ) {
624             if(!(*it)->isEnabled()) {
625                     debugFatal(" transmit stream %p not enabled\n",*it);
626             } else {   
627                     debugFatal(" transmit stream %p enabled\n",*it);
628             }
629         }
630         return false;
631     }
632    
633     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors enabled...\n");
634
635     return true;
636 }
637
638 /**
639  * Disables the registered StreamProcessors
640  * @return true if successful, false otherwise
641  */
642 bool StreamProcessorManager::disableStreamProcessors() {
643     // we prepare the streamprocessors for disable
644     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
645             it != m_ReceiveProcessors.end();
646             ++it ) {
647         (*it)->prepareForDisable();
648     }
649
650     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
651             it != m_TransmitProcessors.end();
652             ++it ) {
653         (*it)->prepareForDisable();
654     }
655
656     // then we disable the streamprocessors
657     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
658             it != m_ReceiveProcessors.end();
659             ++it ) {
660         (*it)->disable();
661     }
662
663     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
664             it != m_TransmitProcessors.end();
665             ++it ) {
666         (*it)->disable();
667     }
668
669     // now we wait for the SP's to get disabled
670     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be disabled...\n");
671     // we have to wait until all streamprocessors indicate that they are running
672     // i.e. that there is actually some data stream flowing
673     int wait_cycles=2000; // two seconds
674     bool enabled=true;
675     while (enabled && wait_cycles) {
676         wait_cycles--;
677         enabled=false;
678        
679         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
680                 it != m_ReceiveProcessors.end();
681                 ++it ) {
682             if((*it)->isEnabled()) enabled=true;
683         }
684
685         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
686                 it != m_TransmitProcessors.end();
687                 ++it ) {
688             if((*it)->isEnabled()) enabled=true;
689         }
690         usleep(1000); // one cycle
691     }
692    
693     if(!wait_cycles) { // timout has occurred
694         debugFatal("One or more streams couldn't be disabled (timeout):\n");
695                    
696         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
697                 it != m_ReceiveProcessors.end();
698                 ++it ) {
699             if(!(*it)->isEnabled()) {
700                     debugFatal(" receive stream %p not enabled\n",*it);
701             } else {   
702                     debugFatal(" receive stream %p enabled\n",*it);
703             }
704         }
705    
706         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
707                 it != m_TransmitProcessors.end();
708                 ++it ) {
709             if(!(*it)->isEnabled()) {
710                     debugFatal(" transmit stream %p not enabled\n",*it);
711             } else {   
712                     debugFatal(" transmit stream %p enabled\n",*it);
713             }
714         }
715         return false;
716     }
717    
718     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors disabled...\n");
719        
720     return true;
721 }
722
723 /**
724  * Called upon Xrun events. This brings all StreamProcessors back
725  * into their starting state, and then carries on streaming. This should
726  * have the same effect as restarting the whole thing.
727  *
728  * @return true if successful, false otherwise
729  */
730 bool StreamProcessorManager::handleXrun() {
731
732         debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
733
734         /*
735          * Reset means:
736          * 1) Disabling the SP's, so that they don't process any packets
737          *    note: the isomanager does keep on delivering/requesting them
738          * 2) Bringing all buffers & streamprocessors into a know state
739          *    - Clear all capture buffers
740          *    - Put nb_periods*period_size of null frames into the playback buffers
741          * 3) Re-enable the SP's
742          */
743         debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
744         if (!disableStreamProcessors()) {
745                 debugFatal("Could not disable StreamProcessors...\n");
746                 return false;
747         }
748
749         debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
750         // start all SP's synchonized
751         if (!syncStartAll()) {
752                 debugFatal("Could not syncStartAll...\n");
753                 return false;
754         }
755
756         debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
757        
758         return true;
759 }
760
761 /**
762  * @brief Waits until the next period of samples is ready
763  *
764  * This function does not return until a full period of samples is (or should be)
765  * ready to be transferred.
766  *
767  * @return true if the period is ready, false if an xrun occurred
768  */
769 bool StreamProcessorManager::waitForPeriod() {
770     int time_till_next_period;
771     bool xrun_occurred=false;
772        
773     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
774
775     assert(m_SyncSource);
776    
777     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodUsecs();
778    
779     while(time_till_next_period > 0) {
780         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
781    
782         // wait for the period
783         usleep(time_till_next_period);
784        
785         // check for underruns on the ISO side,
786         // those should make us bail out of the wait loop
787         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
788             it != m_ReceiveProcessors.end();
789             ++it ) {
790             // a xrun has occurred on the Iso side
791             xrun_occurred |= (*it)->xrunOccurred();
792         }
793         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
794             it != m_TransmitProcessors.end();
795             ++it ) {
796             // a xrun has occurred on the Iso side
797             xrun_occurred |= (*it)->xrunOccurred();
798         }
799
800         // check if we were waked up too soon
801         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodUsecs();
802     }
803    
804     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", -time_till_next_period);
805    
806     // this is to notify the client of the delay
807     // that we introduced
808     m_delayed_usecs=time_till_next_period;
809    
810     // we save the 'ideal' time of the transfer at this point,
811     // because we can have interleaved read - process - write
812     // cycles making that we modify a receiving stream's buffer
813     // before we get to writing.
814     // NOTE: before waitForPeriod() is called again, both the transmit
815     //       and the receive processors should have done their transfer.
816     m_time_of_transfer=m_SyncSource->getTimeAtPeriod();
817     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
818         m_time_of_transfer);
819    
820     xrun_occurred=false;
821    
822     // check if xruns occurred on the Iso side.
823     // also check if xruns will occur should we transfer() now
824    
825     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
826           it != m_ReceiveProcessors.end();
827           ++it ) {
828         // a xrun has occurred on the Iso side
829         xrun_occurred |= (*it)->xrunOccurred();
830        
831         // if this is true, a xrun will occur
832         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
833        
834 #ifdef DEBUG
835         if ((*it)->xrunOccurred()) {
836             debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it);
837         }
838         if (!((*it)->canClientTransferFrames(m_period))) {
839             debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it);
840         }
841 #endif
842        
843     }
844     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
845           it != m_TransmitProcessors.end();
846           ++it ) {
847         // a xrun has occurred on the Iso side
848         xrun_occurred |= (*it)->xrunOccurred();
849        
850         // if this is true, a xrun will occur
851         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
852        
853 #ifdef DEBUG
854         if ((*it)->xrunOccurred()) {
855             debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it);
856         }
857         if (!((*it)->canClientTransferFrames(m_period))) {
858             debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it);
859         }
860 #endif       
861     }
862    
863     m_nbperiods++;
864    
865     // now we can signal the client that we are (should be) ready
866     return !xrun_occurred;
867 }
868
869 /**
870  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
871  *
872  * Transfers one period of frames from the client side to the Iso side and vice versa.
873  *
874  * @return true if successful, false otherwise (indicates xrun).
875  */
876 bool StreamProcessorManager::transfer() {
877    
878     debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n");
879
880     if (!transfer(StreamProcessor::E_Receive)) return false;
881     if (!transfer(StreamProcessor::E_Transmit)) return false;
882
883     return true;
884 }
885
886 /**
887  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
888  *
889  * Transfers one period of frames from the client side to the Iso side or vice versa.
890  *
891  * @param t The processor type to tranfer for (receive or transmit)
892  * @return true if successful, false otherwise (indicates xrun).
893  */
894
895 bool StreamProcessorManager::transfer(enum StreamProcessor::EProcessorType t) {
896     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
897    
898     // a static cast could make sure that there is no performance
899     // penalty for the virtual functions (to be checked)
900     if (t==StreamProcessor::E_Receive) {
901         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
902                 it != m_ReceiveProcessors.end();
903                 ++it ) {
904            
905             //#ifdef DEBUG
906             #if 0
907             {
908                 uint64_t ts_tail=0;
909                 uint64_t fc_tail=0;
910                
911                 uint64_t ts_head=0;
912                 uint64_t fc_head=0;
913                
914                 int cnt=0;
915                
916                 (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
917                 (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
918                
919                 while((fc_head != fc_tail) && (cnt++ < 10)) {
920                     (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
921                     (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
922                 }
923                
924                 debugOutput(DEBUG_LEVEL_VERBOSE,"R => HEAD: TS=%11llu, FC=%5llu | TAIL: TS=%11llu, FC=%5llu, %d\n",
925                     ts_head, fc_head, ts_tail, fc_tail, cnt);
926             }
927             #endif
928    
929             if(!(*it)->getFrames(m_period, (int64_t)m_time_of_transfer)) {
930                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)",
931                             m_period, m_time_of_transfer,*it);
932                     return false; // buffer underrun
933             }
934            
935             //#ifdef DEBUG
936             #if 0
937             {
938                 uint64_t ts_tail=0;
939                 uint64_t fc_tail=0;
940                
941                 uint64_t ts_head=0;
942                 uint64_t fc_head=0;
943                
944                 int cnt=0;
945                
946                 (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
947                 (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
948            
949                 while((fc_head != fc_tail) && (cnt++ < 10)) {
950                     (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
951                     (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
952                 }
953                
954                 debugOutput(DEBUG_LEVEL_VERBOSE,"R  > HEAD: TS=%11llu, FC=%5llu | TAIL: TS=%11llu, FC=%5llu, %d\n",
955                     ts_head, fc_head, ts_tail, fc_tail, cnt);
956             }
957             #endif
958    
959         }
960     } else {
961         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
962                 it != m_TransmitProcessors.end();
963                 ++it ) {
964                
965             //#ifdef DEBUG
966             #if 0
967             {
968                 uint64_t ts_tail=0;
969                 uint64_t fc_tail=0;
970                
971                 uint64_t ts_head=0;
972                 uint64_t fc_head=0;
973                
974                 int cnt=0;
975                
976                 (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
977                 (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
978                
979                 while((fc_head != fc_tail) && (cnt++ < 10)) {
980                     (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
981                     (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
982                 }
983                
984                 debugOutput(DEBUG_LEVEL_VERBOSE,"T => HEAD: TS=%11llu, FC=%5llu | TAIL: TS=%11llu, FC=%5llu, %d\n",
985                     ts_head, fc_head, ts_tail, fc_tail, cnt);
986             }
987             #endif
988                
989             if(!(*it)->putFrames(m_period, (int64_t)m_time_of_transfer)) {
990                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)",
991                         m_period, m_time_of_transfer, *it);
992                 return false; // buffer overrun
993             }
994            
995             //#ifdef DEBUG
996             #if 0
997             {
998                 uint64_t ts_tail=0;
999                 uint64_t fc_tail=0;
1000                
1001                 uint64_t ts_head=0;
1002                 uint64_t fc_head=0;
1003                
1004                 int cnt=0;
1005                
1006                 (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
1007                 (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
1008            
1009                 while((fc_head != fc_tail) && (cnt++ < 10)) {
1010                     (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
1011                     (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
1012                 }
1013                
1014                 debugOutput(DEBUG_LEVEL_VERBOSE,"T  > HEAD: TS=%11llu, FC=%5llu | TAIL: TS=%11llu, FC=%5llu, %d\n",
1015                     ts_head, fc_head, ts_tail, fc_tail, cnt);
1016             }
1017             #endif
1018         }
1019     }
1020
1021     return true;
1022 }
1023
1024 void StreamProcessorManager::dumpInfo() {
1025         debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1026         debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
1027         debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
1028
1029         debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
1030         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1031                 it != m_ReceiveProcessors.end();
1032                 ++it ) {
1033                 (*it)->dumpInfo();
1034         }
1035
1036         debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
1037         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1038                 it != m_TransmitProcessors.end();
1039                 ++it ) {
1040                 (*it)->dumpInfo();
1041         }
1042
1043         debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
1044         m_isoManager->dumpInfo();
1045         debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1046
1047 }
1048
1049 void StreamProcessorManager::setVerboseLevel(int l) {
1050         setDebugLevel(l);
1051
1052         if (m_isoManager) m_isoManager->setVerboseLevel(l);
1053
1054         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
1055         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1056                 it != m_ReceiveProcessors.end();
1057                 ++it ) {
1058                 (*it)->setVerboseLevel(l);
1059         }
1060
1061         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
1062         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1063                 it != m_TransmitProcessors.end();
1064                 ++it ) {
1065                 (*it)->setVerboseLevel(l);
1066         }
1067 }
1068
1069
1070 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1071         int count=0;
1072
1073         if (direction == Port::E_Capture) {
1074                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1075                         it != m_ReceiveProcessors.end();
1076                         ++it ) {
1077                         count += (*it)->getPortCount(type);
1078                 }
1079         } else {
1080                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1081                         it != m_TransmitProcessors.end();
1082                         ++it ) {
1083                         count += (*it)->getPortCount(type);
1084                 }
1085         }
1086         return count;
1087 }
1088
1089 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1090         int count=0;
1091
1092         if (direction == Port::E_Capture) {
1093                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1094                         it != m_ReceiveProcessors.end();
1095                         ++it ) {
1096                         count += (*it)->getPortCount();
1097                 }
1098         } else {
1099                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1100                         it != m_TransmitProcessors.end();
1101                         ++it ) {
1102                         count += (*it)->getPortCount();
1103                 }
1104         }
1105         return count;
1106 }
1107
1108 // TODO: implement a port map here, instead of the loop
1109
1110 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1111         int count=0;
1112         int prevcount=0;
1113
1114         if (direction == Port::E_Capture) {
1115                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1116                         it != m_ReceiveProcessors.end();
1117                         ++it ) {
1118                         count += (*it)->getPortCount();
1119                         if (count > idx) {
1120                                 return (*it)->getPortAtIdx(idx-prevcount);
1121                         }
1122                         prevcount=count;
1123                 }
1124         } else {
1125                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1126                         it != m_TransmitProcessors.end();
1127                         ++it ) {
1128                         count += (*it)->getPortCount();
1129                         if (count > idx) {
1130                                 return (*it)->getPortAtIdx(idx-prevcount);
1131                         }
1132                         prevcount=count;
1133                 }
1134         }
1135         return NULL;
1136 }
1137
1138 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1139     m_thread_realtime=rt;
1140     m_thread_priority=priority;
1141     return true;
1142 }
1143
1144
1145 } // end of namespace
Note: See TracBrowser for help on using the browser.