root/trunk/libfreebob/src/libstreaming/StreamProcessorManager.cpp

Revision 360, 22.0 kB (checked in by pieterpalmers, 17 years ago)

- temporary commit to backup some work

- Started a framework to synchronize IsoHandlers? to

any generic TimeSource?. The idea is to introduce
one overall time reference, and resynchronize all
other timed events to this time source.
This will, on the long run, allow:

  • combining devices on multiple FW busses together,
    as these are not synched by hardware.
  • synchronizing to the system clock
  • synchronizing to any other time source (e.g.
    when implementing a jackd client, i.e. using
    the freebob devices as jackd clients).

- Implemented a realtime safe way to read the cycle

timer for an IsoHandler?. (+ test application)

- Implemented tests/test-sytmonitor:

Monitors 2 or more channels and reports the average
SYT timestamp difference between both.

- Messed around with SYT timestamping for AMDTP. Doesn't

work (yet).

Line 
1 /* $Id$ */
2
3 /*
4  *   FreeBob Streaming API
5  *   FreeBob = Firewire (pro-)audio for linux
6  *
7  *   http://freebob.sf.net
8  *
9  *   Copyright (C) 2005,2006 Pieter Palmers <pieterpalmers@users.sourceforge.net>
10  *
11  *   This program is free software {} you can redistribute it and/or modify
12  *   it under the terms of the GNU General Public License as published by
13  *   the Free Software Foundation {} either version 2 of the License, or
14  *   (at your option) any later version.
15  *
16  *   This program is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY {} without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with this program {} if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  *
26  *
27  */
28
29 #include "StreamProcessorManager.h"
30 #include "StreamProcessor.h"
31 #include "Port.h"
32 #include <errno.h>
33 #include <assert.h>
34
35
36 namespace FreebobStreaming {
37
38 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_NORMAL );
39
40 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int nb_buffers)
41         : m_nb_buffers(nb_buffers), m_period(period), m_xruns(0), m_isoManager(0), m_nbperiods(0) {
42
43 }
44
45 StreamProcessorManager::~StreamProcessorManager() {
46         if (m_isoManager) delete m_isoManager;
47        
48 }
49
50 /**
51  * Registers \ref processor with this manager.
52  *
53  * also registers it with the isohandlermanager
54  *
55  * be sure to call isohandlermanager->init() first!
56  * and be sure that the processors are also ->init()'ed
57  *
58  * @param processor
59  * @return true if successfull
60  */
61 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
62 {
63         debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
64         assert(processor);
65         assert(m_isoManager);
66
67         if (processor->getType()==StreamProcessor::E_Receive) {
68                 processor->setVerboseLevel(getDebugLevel()); // inherit debug level
69                
70                 m_ReceiveProcessors.push_back(processor);
71                
72                 processor->setManager(this);
73                                
74                 return true;
75         }
76        
77         if (processor->getType()==StreamProcessor::E_Transmit) {
78                 processor->setVerboseLevel(getDebugLevel()); // inherit debug level
79                
80                 m_TransmitProcessors.push_back(processor);
81                
82                 processor->setManager(this);
83                
84                 return true;
85         }
86
87         debugFatal("Unsupported processor type!\n");
88        
89         return false;
90 }
91
92 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
93 {
94         debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
95         assert(processor);
96
97         if (processor->getType()==StreamProcessor::E_Receive) {
98
99                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
100                         it != m_ReceiveProcessors.end();
101                         ++it ) {
102
103                         if ( *it == processor ) {
104                                         m_ReceiveProcessors.erase(it);
105                                        
106                                         processor->clearManager();
107                                        
108                                         if(!m_isoManager->unregisterStream(processor)) {
109                                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
110                                                
111                                                 return false;
112                                                
113                                         }
114                                        
115                                         return true;
116                                 }
117                 }
118         }
119
120         if (processor->getType()==StreamProcessor::E_Transmit) {
121                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
122                         it != m_TransmitProcessors.end();
123                         ++it ) {
124
125                         if ( *it == processor ) {
126                                         m_TransmitProcessors.erase(it);
127                                        
128                                         processor->clearManager();
129                                        
130                                         if(!m_isoManager->unregisterStream(processor)) {
131                                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
132                                                
133                                                 return false;
134                                                
135                                         }
136                                        
137                                         return true;
138                                 }
139                 }
140         }
141        
142         debugFatal("Processor (%p) not found!\n",processor);
143
144         return false; //not found
145
146 }
147
148 bool StreamProcessorManager::init()
149 {
150         debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
151
152         // the tread that runs the packet iterators
153         m_streamingThread=new FreebobUtil::PosixThread(this,
154            m_thread_realtime, m_thread_priority+5,
155            PTHREAD_CANCEL_DEFERRED);
156            
157         if(!m_streamingThread) {
158                 debugFatal("Could not create streaming thread\n");
159                 return false;
160         }
161        
162         m_isoManager=new IsoHandlerManager();
163        
164         if(!m_isoManager) {
165                 debugFatal("Could not create IsoHandlerManager\n");
166                 return false;
167         }
168        
169         m_isoManager->setVerboseLevel(getDebugLevel());
170        
171         // the tread that keeps the handler's cycle counters up to date
172         // NOTE: is lower priority nescessary? it can block
173 //      m_isoManagerThread=new FreebobUtil::PosixThread(m_isoManager, m_thread_realtime, m_thread_priority+6, PTHREAD_CANCEL_DEFERRED);
174
175     // now that we are using a DLL, we don't need to run this at RT priority
176     // it only serves to cope with drift
177     // however, in order to make the DLL fast enough, we have to increase
178     // its bandwidth, making it more sensitive to deviations. These deviations
179     // are mostly determined by the time difference between reading the cycle
180     // time register and the local cpu clock.
181    
182         m_isoManagerThread=new FreebobUtil::PosixThread(
183               m_isoManager,
184               m_thread_realtime, m_thread_priority+6,
185               PTHREAD_CANCEL_DEFERRED);
186              
187         if(!m_isoManagerThread) {
188                 debugFatal("Could not create iso manager thread\n");
189                 return false;
190         }
191
192         return true;
193 }
194
195 bool StreamProcessorManager::Init()
196 {
197         debugOutput( DEBUG_LEVEL_VERBOSE, "Initializing runner...\n");
198        
199         // no xrun has occurred (yet)
200         m_xrun_happened=false;
201
202         if(sem_init(&m_period_semaphore, 0, 0)) {
203                 debugFatal( "Cannot init packet transfer semaphore\n");
204                 debugFatal( " Error: %s\n",strerror(errno));
205                 return false;
206     }
207  
208         return true;
209 }
210
211 bool StreamProcessorManager::prepare() {
212
213         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
214         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
215         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
216                 it != m_ReceiveProcessors.end();
217                 ++it ) {
218                         if(!(*it)->prepare()) {
219                                 debugFatal(  " could not prepare (%p)...\n",(*it));
220                                 return false;
221                                
222                         }
223                 }
224
225         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
226         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
227                 it != m_TransmitProcessors.end();
228                 ++it ) {
229                         if(!(*it)->prepare()) {
230                                 debugFatal( " could not prepare (%p)...\n",(*it));
231                                 return false;
232                        
233                         }
234                        
235                 }
236
237     // if there are no stream processors registered,
238     // fail
239     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
240         debugFatal("No stream processors registered, can't do anything usefull\n");
241         return false;
242     }
243
244         return true;
245 }
246
247 bool StreamProcessorManager::Execute()
248 {
249
250         bool period_ready=true;
251     bool xrun_has_occured=false;
252         bool this_period_ready;
253
254 //      debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "------------- EXECUTE -----------\n");
255
256         if(!m_isoManager->iterate()) {
257                 debugFatal("Could not iterate the isoManager\n");
258                 return false;
259         }
260        
261         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " RCV PROC: ");
262         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
263                 it != m_ReceiveProcessors.end();
264                 ++it ) {
265                
266                 this_period_ready = (*it)->isOnePeriodReady();
267                 period_ready = period_ready && this_period_ready;
268 //              if (this_period_ready) {
269 //                  m_isoManager->disablePolling(*it);
270 //              }
271 //             
272                 xrun_has_occured = xrun_has_occured || (*it)->xrunOccurred();
273                 debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, "(%d/%d/%d) ", period_ready, xrun_has_occured,(*it)->m_framecounter);
274         }
275         debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, "\n");
276
277         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " XMIT PROC: ");
278         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
279                 it != m_TransmitProcessors.end();
280                 ++it ) {
281                 this_period_ready = (*it)->isOnePeriodReady();
282                 period_ready = period_ready && this_period_ready;
283 //              if (this_period_ready) {
284 //                  m_isoManager->disablePolling(*it);
285 //              }
286                 xrun_has_occured = xrun_has_occured || (*it)->xrunOccurred();
287                 debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, "(%d/%d/%d) ", period_ready, xrun_has_occured,(*it)->m_framecounter);
288         }
289         debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, "\n");
290
291         if(xrun_has_occured) {
292                 // do xrun signaling/handling
293                 debugWarning("Streaming thread detected xrun\n");
294                 m_xruns++;
295                 m_xrun_happened=true;
296                 sem_post(&m_period_semaphore);
297        
298                 return false; // stop thread
299         }
300
301         if(period_ready) {
302                 // signal the waiting thread(s?) that a period is ready
303                 sem_post(&m_period_semaphore);
304                 debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, "Period done...\n");
305
306                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
307                         it != m_ReceiveProcessors.end();
308                         ++it ) {
309                         (*it)->decrementFrameCounter();
310 //                      m_isoManager->enablePolling(*it);
311                        
312                 }
313        
314                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
315                         it != m_TransmitProcessors.end();
316                         ++it ) {
317                         (*it)->decrementFrameCounter();
318 //                      m_isoManager->enablePolling(*it);
319                 }
320                
321                 m_nbperiods++;
322         }
323
324         return true;
325
326 }
327
328 bool StreamProcessorManager::start() {
329         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
330         assert(m_isoManager);
331        
332         debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
333         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
334         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
335                 it != m_ReceiveProcessors.end();
336                 ++it ) {
337                         if (!(*it)->preparedForStart()) {
338                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Receive stream processor (%p) failed to prepare for start\n", *it);
339                                 return false;
340                         }
341                         if (!m_isoManager->registerStream(*it)) {
342                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
343                                 return false;
344                         }
345                        
346                        
347                 }
348
349         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
350         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
351                 it != m_TransmitProcessors.end();
352                 ++it ) {
353                         if (!(*it)->preparedForStart()) {
354                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Transmit stream processor (%p) failed to prepare for start\n", *it);
355                                 return false;
356                         }
357                         if (!m_isoManager->registerStream(*it)) {
358                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
359                                 return false;
360                         }
361                        
362                 }
363
364         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
365         if (!m_isoManager->prepare()) {
366                 debugFatal("Could not prepare isoManager\n");
367                 return false;
368         }
369
370         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandler...\n");
371         if (!m_isoManager->startHandlers(0)) {
372                 debugFatal("Could not start handlers...\n");
373                 return false;
374         }
375        
376         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting streaming thread...\n");
377        
378         // start the runner thread
379         m_streamingThread->Start();
380        
381         // start the runner thread
382         m_isoManagerThread->Start();
383                
384         debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to start running...\n");
385         // we have to wait until all streamprocessors indicate that they are running
386         // i.e. that there is actually some data stream flowing
387         int wait_cycles=2000; // two seconds
388         bool notRunning=true;
389         while (notRunning && wait_cycles) {
390                 wait_cycles--;
391                 notRunning=false;
392                
393                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
394                         it != m_ReceiveProcessors.end();
395                         ++it ) {
396                         if(!(*it)->isRunning()) notRunning=true;
397                 }
398        
399                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
400                         it != m_TransmitProcessors.end();
401                         ++it ) {
402                         if(!(*it)->isRunning()) notRunning=true;
403                 }
404                 usleep(1000);
405         }
406        
407         if(!wait_cycles) { // timout has occurred
408                 debugFatal("One or more streams are not starting up (timeout):\n");
409                            
410                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
411                         it != m_ReceiveProcessors.end();
412                         ++it ) {
413                         if(!(*it)->isRunning()) {
414                                 debugFatal(" receive stream %p not running\n",*it);
415                         } else {       
416                                 debugFatal(" receive stream %p running\n",*it);
417                         }
418                 }
419        
420                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
421                         it != m_TransmitProcessors.end();
422                         ++it ) {
423                         if(!(*it)->isRunning()) {
424                                 debugFatal(" transmit stream %p not running\n",*it);
425                         } else {       
426                                 debugFatal(" transmit stream %p running\n",*it);
427                         }
428                 }
429                 return false;
430         }
431
432         debugOutput( DEBUG_LEVEL_VERBOSE, "StreamProcessors running...\n");
433         debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting frame counters...\n");
434        
435         // now we reset the frame counters
436         // FIXME: check how we are going to do sync
437         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
438                 it != m_ReceiveProcessors.end();
439                 ++it ) {
440                
441                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
442                         (*it)->dumpInfo();
443                 }
444
445                 (*it)->reset();
446
447                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
448                         (*it)->dumpInfo();
449                 }
450                
451         }
452        
453         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
454                 it != m_TransmitProcessors.end();
455                 ++it ) {
456                
457                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
458                         (*it)->dumpInfo();
459                 }
460                
461                 (*it)->reset();
462                
463                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
464                         (*it)->dumpInfo();
465                 }
466         }
467        
468         debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n");
469         // and we enable the streamprocessors
470         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
471                 it != m_ReceiveProcessors.end();
472                 ++it ) {               
473                 (*it)->enable();
474                 m_isoManager->enablePolling(*it);
475         }
476
477         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
478                 it != m_TransmitProcessors.end();
479                 ++it ) {
480                 (*it)->enable();
481                 m_isoManager->enablePolling(*it);
482         }
483        
484         // dump the iso stream information when in verbose mode
485         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
486                 m_isoManager->dumpInfo();
487         }
488        
489         return true;
490        
491 }
492
493 bool StreamProcessorManager::stop() {
494         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
495         assert(m_isoManager);
496         assert(m_streamingThread);
497
498         debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to prepare to stop...\n");
499         // Most stream processors can just stop without special treatment.  However, some
500         // (like the MOTU) need to do a few things before it's safe to turn off the iso
501         // handling.
502         int wait_cycles=2000; // two seconds ought to be sufficient
503         bool allReady = false;
504         while (!allReady && wait_cycles) {
505                 wait_cycles--;
506                 allReady = true;
507                
508                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
509                         it != m_ReceiveProcessors.end();
510                         ++it ) {
511                         if(!(*it)->preparedForStop()) allReady = false;
512                 }
513        
514                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
515                         it != m_TransmitProcessors.end();
516                         ++it ) {
517                         if(!(*it)->preparedForStop()) allReady = false;
518                 }
519                 usleep(1000);
520         }
521
522
523         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping threads...\n");
524        
525         m_streamingThread->Stop();
526         m_isoManagerThread->Stop();
527        
528         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n");
529         if(!m_isoManager->stopHandlers()) {
530            debugFatal("Could not stop ISO handlers\n");
531            return false;
532         }
533        
534         debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processors from handlers...\n");
535     // now unregister all streams from iso manager
536         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
537         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
538                 it != m_ReceiveProcessors.end();
539                 ++it ) {
540                         if (!m_isoManager->unregisterStream(*it)) {
541                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it);
542                                 return false;
543                         }
544                        
545                 }
546
547         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
548         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
549                 it != m_TransmitProcessors.end();
550                 ++it ) {
551                         if (!m_isoManager->unregisterStream(*it)) {
552                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it);
553                                 return false;
554                         }
555                        
556                 }
557        
558         return true;
559        
560 }
561
562 bool StreamProcessorManager::waitForPeriod() {
563
564         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
565
566         // Wait for packetizer thread to signal a period completion
567         sem_wait(&m_period_semaphore);
568        
569         if(m_xrun_happened) {
570            debugWarning("Detected underrun\n");
571            dumpInfo();
572            return false;
573         }
574        
575         return true;
576
577 }
578
579 bool StreamProcessorManager::handleXrun() {
580
581         debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
582
583         /*
584          * Reset means:
585          * 1) Stopping the packetizer thread
586          * 2) Bringing all buffers & streamprocessors into a know state
587          *    - Clear all capture buffers
588          *    - Put nb_periods*period_size of null frames into the playback buffers
589          * 3) Restarting the packetizer thread
590          */
591         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping processormanager...\n");
592         if(!stop()) {
593            debugFatal("Could not stop.\n");
594            return false;
595         }
596
597         debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting Processors...\n");
598        
599         // now we reset the frame counters
600         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
601                 it != m_ReceiveProcessors.end();
602                 ++it ) {
603                
604                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
605                         (*it)->dumpInfo();
606                 }
607                
608                 (*it)->reset();
609                
610                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
611                         (*it)->dumpInfo();
612                 }
613                
614         }
615        
616         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
617                 it != m_TransmitProcessors.end();
618                 ++it ) {
619                
620                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
621                         (*it)->dumpInfo();
622                 }
623                
624                 (*it)->reset();
625                
626                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
627                         (*it)->dumpInfo();
628                 }
629         }
630
631         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting processormanager...\n");
632
633         if(!start()) {
634            debugFatal("Could not start.\n");
635            return false;
636         }
637
638
639         debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
640        
641        
642         return true;
643 }
644
645 bool StreamProcessorManager::transfer() {
646
647         debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n");
648
649         // a static cast could make sure that there is no performance
650         // penalty for the virtual functions (to be checked)
651
652         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
653                 it != m_ReceiveProcessors.end();
654                 ++it ) {
655                 if(!(*it)->transfer()) {
656                         debugFatal("could not transfer() stream processor (%p)",*it);
657                         return false;
658                 }
659         }
660
661         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
662                 it != m_TransmitProcessors.end();
663                 ++it ) {
664                 if(!(*it)->transfer()) {
665                         debugFatal("could not transfer() stream processor (%p)",*it);
666                         return false;
667                 }
668         }
669
670         return true;
671 }
672
673 bool StreamProcessorManager::transfer(enum StreamProcessor::EProcessorType t) {
674
675         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
676
677         // a static cast could make sure that there is no performance
678         // penalty for the virtual functions (to be checked)
679         if (t==StreamProcessor::E_Receive) {
680                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
681                         it != m_ReceiveProcessors.end();
682                         ++it ) {
683                         if(!(*it)->transfer()) {
684                                 debugFatal("could not transfer() stream processor (%p)",*it);
685                                 return false;
686                         }
687                 }
688         } else {
689                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
690                         it != m_TransmitProcessors.end();
691                         ++it ) {
692                         if(!(*it)->transfer()) {
693                                 debugFatal("could not transfer() stream processor (%p)",*it);
694                                 return false;
695                         }
696                 }
697         }
698
699         return true;
700 }
701
702 void StreamProcessorManager::dumpInfo() {
703         debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
704         debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
705         debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %d\n", m_nbperiods);
706
707         debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
708         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
709                 it != m_ReceiveProcessors.end();
710                 ++it ) {
711                 (*it)->dumpInfo();
712         }
713
714         debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
715         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
716                 it != m_TransmitProcessors.end();
717                 ++it ) {
718                 (*it)->dumpInfo();
719         }
720
721         debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
722         m_isoManager->dumpInfo();
723         debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
724
725 }
726
727 void StreamProcessorManager::setVerboseLevel(int l) {
728         setDebugLevel(l);
729
730         if (m_isoManager) m_isoManager->setVerboseLevel(l);
731
732         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
733         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
734                 it != m_ReceiveProcessors.end();
735                 ++it ) {
736                 (*it)->setVerboseLevel(l);
737         }
738
739         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
740         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
741                 it != m_TransmitProcessors.end();
742                 ++it ) {
743                 (*it)->setVerboseLevel(l);
744         }
745 }
746
747
748 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
749         int count=0;
750
751         if (direction == Port::E_Capture) {
752                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
753                         it != m_ReceiveProcessors.end();
754                         ++it ) {
755                         count += (*it)->getPortCount(type);
756                 }
757         } else {
758                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
759                         it != m_TransmitProcessors.end();
760                         ++it ) {
761                         count += (*it)->getPortCount(type);
762                 }
763         }
764         return count;
765 }
766
767 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
768         int count=0;
769
770         if (direction == Port::E_Capture) {
771                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
772                         it != m_ReceiveProcessors.end();
773                         ++it ) {
774                         count += (*it)->getPortCount();
775                 }
776         } else {
777                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
778                         it != m_TransmitProcessors.end();
779                         ++it ) {
780                         count += (*it)->getPortCount();
781                 }
782         }
783         return count;
784 }
785
786 // TODO: implement a port map here, instead of the loop
787
788 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
789         int count=0;
790         int prevcount=0;
791
792         if (direction == Port::E_Capture) {
793                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
794                         it != m_ReceiveProcessors.end();
795                         ++it ) {
796                         count += (*it)->getPortCount();
797                         if (count > idx) {
798                                 return (*it)->getPortAtIdx(idx-prevcount);
799                         }
800                         prevcount=count;
801                 }
802         } else {
803                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
804                         it != m_TransmitProcessors.end();
805                         ++it ) {
806                         count += (*it)->getPortCount();
807                         if (count > idx) {
808                                 return (*it)->getPortAtIdx(idx-prevcount);
809                         }
810                         prevcount=count;
811                 }
812         }
813         return NULL;
814 }
815
816 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
817     m_thread_realtime=rt;
818     m_thread_priority=priority;
819     return true;
820 }
821
822
823 } // end of namespace
Note: See TracBrowser for help on using the browser.