root/branches/libfreebob-2.0/src/libstreaming/StreamProcessorManager.cpp

Revision 227, 17.3 kB (checked in by pieterpalmers, 18 years ago)

- another day of good progress comes to and end...
- compiles and runs, only the midi stuff and the xrun handling remain.

I'll also have to rework the C API somewhat.

Line 
1 /* $Id$ */
2
3 /*
4  *   FreeBob Streaming API
5  *   FreeBob = Firewire (pro-)audio for linux
6  *
7  *   http://freebob.sf.net
8  *
9  *   Copyright (C) 2005,2006 Pieter Palmers <pieterpalmers@users.sourceforge.net>
10  *
11  *   This program is free software {} you can redistribute it and/or modify
12  *   it under the terms of the GNU General Public License as published by
13  *   the Free Software Foundation {} either version 2 of the License, or
14  *   (at your option) any later version.
15  *
16  *   This program is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY {} without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with this program {} if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  *
26  *
27  */
28
29 #include "StreamProcessorManager.h"
30 #include "StreamProcessor.h"
31 #include "Port.h"
32 #include <errno.h>
33 #include <assert.h>
34
35
36 namespace FreebobStreaming {
37
38 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_NORMAL );
39
40 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int nb_buffers)
41         : m_nb_buffers(nb_buffers), m_period(period), m_xruns(0), m_isoManager(0) {
42
43 }
44
45 StreamProcessorManager::~StreamProcessorManager() {
46         if (m_isoManager) delete m_isoManager;
47        
48 }
49
50 /**
51  * Registers \ref processor with this manager.
52  *
53  * also registers it with the isohandlermanager
54  *
55  * be sure to call isohandlermanager->init() first!
56  * and be sure that the processors are also ->init()'ed
57  *
58  * @param processor
59  * @return true if successfull
60  */
61 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
62 {
63         debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
64         assert(processor);
65         assert(m_isoManager);
66
67         if (processor->getType()==StreamProcessor::E_Receive) {
68                 processor->setVerboseLevel(getDebugLevel()); // inherit debug level
69                
70                 m_ReceiveProcessors.push_back(processor);
71                
72                 processor->setManager(this);
73                                
74                 return true;
75         }
76        
77         if (processor->getType()==StreamProcessor::E_Transmit) {
78                 processor->setVerboseLevel(getDebugLevel()); // inherit debug level
79                
80                 m_TransmitProcessors.push_back(processor);
81                
82                 processor->setManager(this);
83                
84                 return true;
85         }
86
87         debugFatal("Unsupported processor type!\n");
88        
89         return false;
90 }
91
92 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
93 {
94         debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
95         assert(processor);
96
97         if (processor->getType()==StreamProcessor::E_Receive) {
98
99                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
100                         it != m_ReceiveProcessors.end();
101                         ++it ) {
102
103                         if ( *it == processor ) {
104                                         m_ReceiveProcessors.erase(it);
105                                        
106                                         processor->clearManager();
107                                        
108                                         if(!m_isoManager->unregisterStream(processor)) {
109                                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
110                                                
111                                                 return false;
112                                                
113                                         }
114                                        
115                                         return true;
116                                 }
117                 }
118         }
119
120         if (processor->getType()==StreamProcessor::E_Transmit) {
121                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
122                         it != m_TransmitProcessors.end();
123                         ++it ) {
124
125                         if ( *it == processor ) {
126                                         m_TransmitProcessors.erase(it);
127                                        
128                                         processor->clearManager();
129                                        
130                                         if(!m_isoManager->unregisterStream(processor)) {
131                                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
132                                                
133                                                 return false;
134                                                
135                                         }
136                                        
137                                         return true;
138                                 }
139                 }
140         }
141        
142         debugFatal("Processor (%p) not found!\n",processor);
143
144         return false; //not found
145
146 }
147
148 bool StreamProcessorManager::init()
149 {
150         debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
151
152         // and the tread that runs the runner
153         streamingThread=new FreebobPosixThread(this);
154         if(!streamingThread) {
155                 debugFatal("Could not create streaming thread\n");
156                 return false;
157         }
158
159         m_isoManager=new IsoHandlerManager();
160        
161         if(!m_isoManager) {
162                 debugFatal("Could not create IsoHandlerManager\n");
163                 return false;
164         }
165        
166         if(!m_isoManager->Init()) {
167                 debugFatal("Could not init IsoHandlerManager\n");
168                 return false;
169         }
170
171         if(sem_init(&m_period_semaphore, 0, 0)) {
172                 debugFatal( "Cannot init packet transfer semaphore\n");
173                 debugFatal( " Error: %s\n",strerror(errno));
174                 return false;
175         } else {
176                 debugOutput( DEBUG_LEVEL_VERBOSE,"Successfull init of packet transfer semaphore\n");
177         }
178
179         return true;
180 }
181
182 bool StreamProcessorManager::Init()
183 {
184         debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
185
186         return true;
187 }
188
189 bool StreamProcessorManager::prepare() {
190
191         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
192         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
193         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
194                 it != m_ReceiveProcessors.end();
195                 ++it ) {
196                         if(!(*it)->prepare()) {
197                                 debugFatal(  " could not prepare (%p)...\n",(*it));
198                                 return false;
199                                
200                         }
201                 }
202
203         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
204         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
205                 it != m_TransmitProcessors.end();
206                 ++it ) {
207                         if(!(*it)->prepare()) {
208                                 debugFatal( " could not prepare (%p)...\n",(*it));
209                                 return false;
210                        
211                         }
212                        
213                 }
214
215         return true;
216 }
217
218 bool StreamProcessorManager::Execute()
219 {
220         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
221         if(!m_isoManager->Execute()) {
222                 debugFatal("Could not execute isoManager\n");
223                 return false;
224         }
225
226         bool period_ready=true;
227         m_xrun_has_occured=false;
228
229         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " RCV PROC: ");
230         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
231                 it != m_ReceiveProcessors.end();
232                 ++it ) {
233                 period_ready = period_ready && (*it)->isOnePeriodReady();
234                 m_xrun_has_occured = m_xrun_has_occured || (*it)->xrunOccurred();
235                 debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, "(%d/%d/%d) ", period_ready, m_xrun_has_occured,(*it)->m_framecounter);
236         }
237         debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, "\n");
238
239         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " XMIT PROC: ");
240         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
241                 it != m_TransmitProcessors.end();
242                 ++it ) {
243                 period_ready = period_ready && (*it)->isOnePeriodReady();
244                 m_xrun_has_occured = m_xrun_has_occured || (*it)->xrunOccurred();
245                 debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, "(%d/%d/%d) ", period_ready, m_xrun_has_occured,(*it)->m_framecounter);
246         }
247         debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, "\n");
248
249         if(m_xrun_has_occured) {
250                 // do xrun signaling/handling
251                 m_xruns++;
252                 sem_post(&m_period_semaphore);
253                 return false;
254         }
255
256         if(period_ready) {
257                 // signal the waiting thread(s?) that a period is ready
258                 sem_post(&m_period_semaphore);
259                 debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, "Period done...\n");
260
261                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
262                         it != m_ReceiveProcessors.end();
263                         ++it ) {
264                         (*it)->decrementFrameCounter();
265                 }
266        
267                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
268                         it != m_TransmitProcessors.end();
269                         ++it ) {
270                         (*it)->decrementFrameCounter();
271                 }
272         }
273
274         return true;
275
276 }
277
278 bool StreamProcessorManager::start() {
279         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
280         assert(m_isoManager);
281        
282         debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
283         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
284         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
285                 it != m_ReceiveProcessors.end();
286                 ++it ) {
287                         if (!m_isoManager->registerStream(*it)) {
288                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
289                                 return false;
290                         }
291                        
292                        
293                 }
294
295         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
296         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
297                 it != m_TransmitProcessors.end();
298                 ++it ) {
299                         if (!m_isoManager->registerStream(*it)) {
300                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
301                                 return false;
302                         }
303                        
304                 }
305
306         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
307         if (!m_isoManager->prepare()) {
308                 debugFatal("Could not prepare isoManager\n");
309                 return false;
310         }
311
312         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandler...\n");
313         if (!m_isoManager->startHandlers()) {
314                 debugFatal("Could not start handlers...\n");
315                 return false;
316         }
317        
318         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting streaming thread...\n");
319        
320         // start the runner thread
321         streamingThread->Start();
322        
323         debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to start running...\n");
324         // we have to wait untill all streamprocessors indicate that they are running
325         // i.e. that there is actually some data stream flowing
326         int wait_cycles=2000; // two seconds
327         bool notRunning=true;
328         while (notRunning && wait_cycles) {
329                 wait_cycles--;
330                 notRunning=false;
331                
332                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
333                         it != m_ReceiveProcessors.end();
334                         ++it ) {
335                         if(!(*it)->isRunning()) notRunning=true;
336                 }
337        
338                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
339                         it != m_TransmitProcessors.end();
340                         ++it ) {
341                         if(!(*it)->isRunning()) notRunning=true;
342                 }
343                 usleep(1000);
344         }
345        
346         if(!wait_cycles) { // timout has occurred
347                 debugFatal("One or more streams are not starting up (timeout):\n");
348                            
349                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
350                         it != m_ReceiveProcessors.end();
351                         ++it ) {
352                         if(!(*it)->isRunning()) {
353                                 debugFatal(" receive stream %p not running\n",*it);
354                         } else {       
355                                 debugFatal(" receive stream %p running\n",*it);
356                         }
357                 }
358        
359                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
360                         it != m_TransmitProcessors.end();
361                         ++it ) {
362                         if(!(*it)->isRunning()) {
363                                 debugFatal(" transmit stream %p not running\n",*it);
364                         } else {       
365                                 debugFatal(" transmit stream %p running\n",*it);
366                         }
367                 }
368                 return false;
369         }
370        
371         debugOutput( DEBUG_LEVEL_VERBOSE, "StreamProcessors running...\n");
372         debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting frame counters...\n");
373        
374         // now we reset the frame counters
375         // FIXME: check how we are going to do sync
376         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
377                 it != m_ReceiveProcessors.end();
378                 ++it ) {
379                
380                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
381                         (*it)->dumpInfo();
382                 }
383                
384                 (*it)->resetFrameCounter();
385         }
386        
387         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
388                 it != m_TransmitProcessors.end();
389                 ++it ) {
390                
391                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
392                         (*it)->dumpInfo();
393                 }
394                
395                 (*it)->resetFrameCounter();
396         }
397        
398         debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n");
399         // and we enable the streamprocessors
400         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
401                 it != m_ReceiveProcessors.end();
402                 ++it ) {               
403                 (*it)->enable();
404         }
405        
406         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
407                 it != m_TransmitProcessors.end();
408                 ++it ) {
409                 (*it)->enable();
410         }
411        
412         // dump the iso stream information when in verbose mode
413         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
414                 m_isoManager->dumpInfo();
415         }
416        
417 }
418
419 bool StreamProcessorManager::stop() {
420         debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
421         assert(m_isoManager);
422         assert(streamingThread);
423        
424         streamingThread->Stop();
425        
426         return m_isoManager->stopHandlers();
427        
428 }
429
430 bool StreamProcessorManager::waitForPeriod() {
431         int ret;
432         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
433
434         // Wait for packetizer thread to signal a period completion
435         sem_wait(&m_period_semaphore);
436        
437         if(m_xrun_has_occured) return false;
438        
439         return true;
440
441 }
442
443 bool StreamProcessorManager::reset() {
444
445         debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting processors...\n");
446
447         /*
448          * Reset means:
449          * Bringing all buffers & connections into a know state
450          *   - Clear all capture buffers
451          *   - Put nb_periods*period_size of null frames into the playback buffers
452          *  => implemented by a reset() call, implementation dependant on the type
453          */
454        
455 //      debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, " Receive processors...\n");
456         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
457                 it != m_ReceiveProcessors.end();
458                 ++it ) {
459                 if(!(*it)->reset()) {
460                         debugFatal("could not reset stream processor (%p)",*it);
461                         return false;
462                 }
463         }
464
465 //      debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, " Transmit processors...\n");
466         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
467                 it != m_TransmitProcessors.end();
468                 ++it ) {
469                 if(!(*it)->reset()) {
470                         debugFatal("could not reset stream processor (%p)",*it);
471                         return false;
472                 }
473         }
474         return true;
475 }
476
477 bool StreamProcessorManager::transfer() {
478
479         debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n");
480
481         // a static cast could make sure that there is no performance
482         // penalty for the virtual functions (to be checked)
483
484 //      debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, " Receive processors...\n");
485         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
486                 it != m_ReceiveProcessors.end();
487                 ++it ) {
488                 if(!(*it)->transfer()) {
489                         debugFatal("could not transfer() stream processor (%p)",*it);
490                         return false;
491                 }
492         }
493
494 //      debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, " Transmit processors...\n");
495         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
496                 it != m_TransmitProcessors.end();
497                 ++it ) {
498                 if(!(*it)->transfer()) {
499                         debugFatal("could not transfer() stream processor (%p)",*it);
500                         return false;
501                 }
502         }
503
504         return true;
505 }
506
507 bool StreamProcessorManager::transfer(enum StreamProcessor::EProcessorType t) {
508
509         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
510
511         // a static cast could make sure that there is no performance
512         // penalty for the virtual functions (to be checked)
513
514 //      debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, " Receive processors...\n");
515         if (t==StreamProcessor::E_Receive) {
516                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
517                         it != m_ReceiveProcessors.end();
518                         ++it ) {
519                         if(!(*it)->transfer()) {
520                                 debugFatal("could not transfer() stream processor (%p)",*it);
521                                 return false;
522                         }
523                 }
524         } else {
525 //      debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, " Transmit processors...\n");
526                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
527                         it != m_TransmitProcessors.end();
528                         ++it ) {
529                         if(!(*it)->transfer()) {
530                                 debugFatal("could not transfer() stream processor (%p)",*it);
531                                 return false;
532                         }
533                 }
534         }
535
536         return true;
537 }
538
539 void StreamProcessorManager::dumpInfo() {
540         debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
541
542         debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
543         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
544                 it != m_ReceiveProcessors.end();
545                 ++it ) {
546                 (*it)->dumpInfo();
547         }
548
549         debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
550         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
551                 it != m_TransmitProcessors.end();
552                 ++it ) {
553                 (*it)->dumpInfo();
554         }
555
556         debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
557         m_isoManager->dumpInfo();
558
559 }
560
561 void StreamProcessorManager::setVerboseLevel(int l) {
562         setDebugLevel(l);
563        
564         if (m_isoManager) m_isoManager->setVerboseLevel(l);
565
566         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
567         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
568                 it != m_ReceiveProcessors.end();
569                 ++it ) {
570                 (*it)->setVerboseLevel(l);
571         }
572
573         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
574         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
575                 it != m_TransmitProcessors.end();
576                 ++it ) {
577                 (*it)->setVerboseLevel(l);
578         }
579 }
580
581
582 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
583         int count=0;
584
585         if (direction == Port::E_Capture) {
586                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
587                         it != m_ReceiveProcessors.end();
588                         ++it ) {
589                         count += (*it)->getPortCount(type);
590                 }
591         } else {
592                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
593                         it != m_TransmitProcessors.end();
594                         ++it ) {
595                         count += (*it)->getPortCount(type);
596                 }
597         }
598         return count;
599 }
600
601 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
602         int count=0;
603
604         if (direction == Port::E_Capture) {
605                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
606                         it != m_ReceiveProcessors.end();
607                         ++it ) {
608                         count += (*it)->getPortCount();
609                 }
610         } else {
611                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
612                         it != m_TransmitProcessors.end();
613                         ++it ) {
614                         count += (*it)->getPortCount();
615                 }
616         }
617         return count;
618 }
619
620 // TODO: implement a port map here, instead of the loop
621
622 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
623         int count=0;
624         int prevcount=0;
625
626         if (direction == Port::E_Capture) {
627                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
628                         it != m_ReceiveProcessors.end();
629                         ++it ) {
630                         count += (*it)->getPortCount();
631                         if (count > idx) {
632                                 return (*it)->getPortAtIdx(idx-prevcount);
633                         }
634                         prevcount=count;
635                 }
636         } else {
637                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
638                         it != m_TransmitProcessors.end();
639                         ++it ) {
640                         count += (*it)->getPortCount();
641                         if (count > idx) {
642                                 return (*it)->getPortAtIdx(idx-prevcount);
643                         }
644                         prevcount=count;
645                 }
646         }
647         return NULL;
648 }
649
650
651 } // end of namespace
Note: See TracBrowser for help on using the browser.