root/branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp

Revision 385, 30.9 kB (checked in by pieterpalmers, 17 years ago)

- fixed issues with SYT timestamp processing
- SYT based sync works if syncing to the received stream

Line 
1 /* $Id$ */
2
3 /*
4  *   FreeBob Streaming API
5  *   FreeBob = Firewire (pro-)audio for linux
6  *
7  *   http://freebob.sf.net
8  *
9  *   Copyright (C) 2005,2006 Pieter Palmers <pieterpalmers@users.sourceforge.net>
10  *
11  *   This program is free software {} you can redistribute it and/or modify
12  *   it under the terms of the GNU General Public License as published by
13  *   the Free Software Foundation {} either version 2 of the License, or
14  *   (at your option) any later version.
15  *
16  *   This program is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY {} without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with this program {} if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  *
26  *
27  */
28
29 #include "StreamProcessorManager.h"
30 #include "StreamProcessor.h"
31 #include "Port.h"
32 #include <errno.h>
33 #include <assert.h>
34
35 #include "libstreaming/cycletimer.h"
36
37 #define CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL 50
38
39 namespace FreebobStreaming {
40
41 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_NORMAL );
42
43 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int nb_buffers)
44         : m_SyncSource(NULL), m_nb_buffers(nb_buffers), m_period(period), m_xruns(0),
45         m_isoManager(0), m_nbperiods(0) {
46
47 }
48
49 StreamProcessorManager::~StreamProcessorManager() {
50         if (m_isoManager) delete m_isoManager;
51        
52 }
53
54 /**
55  * Registers \ref processor with this manager.
56  *
57  * also registers it with the isohandlermanager
58  *
59  * be sure to call isohandlermanager->init() first!
60  * and be sure that the processors are also ->init()'ed
61  *
62  * @param processor
63  * @return true if successfull
64  */
65 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
66 {
67         debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
68         assert(processor);
69         assert(m_isoManager);
70
71         if (processor->getType()==StreamProcessor::E_Receive) {
72                 processor->setVerboseLevel(getDebugLevel()); // inherit debug level
73                
74                 m_ReceiveProcessors.push_back(processor);
75                
76                 processor->setManager(this);
77                                
78                 return true;
79         }
80        
81         if (processor->getType()==StreamProcessor::E_Transmit) {
82                 processor->setVerboseLevel(getDebugLevel()); // inherit debug level
83                
84                 m_TransmitProcessors.push_back(processor);
85                
86                 processor->setManager(this);
87                
88                 return true;
89         }
90
91         debugFatal("Unsupported processor type!\n");
92        
93         return false;
94 }
95
96 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
97 {
98         debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
99         assert(processor);
100
101         if (processor->getType()==StreamProcessor::E_Receive) {
102
103                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
104                         it != m_ReceiveProcessors.end();
105                         ++it ) {
106
107                         if ( *it == processor ) {
108                                         m_ReceiveProcessors.erase(it);
109                                        
110                                         processor->clearManager();
111                                        
112                                         if(!m_isoManager->unregisterStream(processor)) {
113                                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
114                                                
115                                                 return false;
116                                                
117                                         }
118                                        
119                                         return true;
120                                 }
121                 }
122         }
123
124         if (processor->getType()==StreamProcessor::E_Transmit) {
125                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
126                         it != m_TransmitProcessors.end();
127                         ++it ) {
128
129                         if ( *it == processor ) {
130                                         m_TransmitProcessors.erase(it);
131                                        
132                                         processor->clearManager();
133                                        
134                                         if(!m_isoManager->unregisterStream(processor)) {
135                                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
136                                                
137                                                 return false;
138                                                
139                                         }
140                                        
141                                         return true;
142                                 }
143                 }
144         }
145        
146         debugFatal("Processor (%p) not found!\n",processor);
147
148         return false; //not found
149
150 }
151
152 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
153     m_SyncSource=s;
154     return true;
155 }
156
157 StreamProcessor *StreamProcessorManager::getSyncSource() {
158     return m_SyncSource;
159 }
160
161 bool StreamProcessorManager::init()
162 {
163         debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
164
165         // the tread that runs the StreamProcessor
166         // checking the period boundaries
167         m_streamingThread=new FreebobUtil::PosixThread(this,
168            m_thread_realtime, m_thread_priority+5,
169            PTHREAD_CANCEL_DEFERRED);
170            
171         if(!m_streamingThread) {
172                 debugFatal("Could not create streaming thread\n");
173                 return false;
174         }
175        
176         m_isoManager=new IsoHandlerManager();
177        
178         if(!m_isoManager) {
179                 debugFatal("Could not create IsoHandlerManager\n");
180                 return false;
181         }
182        
183         m_isoManager->setVerboseLevel(getDebugLevel());
184        
185         // the tread that keeps the handler's cycle timers up to date
186         // and performs the actual packet transfer
187         // needs high priority
188         m_isoManagerThread=new FreebobUtil::PosixThread(
189               m_isoManager,
190               m_thread_realtime, m_thread_priority+6,
191               PTHREAD_CANCEL_DEFERRED);
192              
193         if(!m_isoManagerThread) {
194                 debugFatal("Could not create iso manager thread\n");
195                 return false;
196         }
197
198         return true;
199 }
200
201 bool StreamProcessorManager::Init()
202 {
203     debugOutput( DEBUG_LEVEL_VERBOSE, "Initializing runner...\n");
204
205     // no xrun has occurred (yet)
206     m_xrun_happened=false;
207
208     if(sem_init(&m_period_semaphore, 0, 0)) {
209         debugFatal( "Cannot init period transfer semaphore\n");
210         debugFatal( " Error: %s\n",strerror(errno));
211         return false;
212     }
213
214     return true;
215 }
216
217 bool StreamProcessorManager::prepare() {
218
219         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
220        
221         // if no sync source is set, select one here
222         if(m_SyncSource == NULL) {
223            debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
224         }
225        
226         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
227                 it != m_ReceiveProcessors.end();
228                 ++it ) {
229                         if(m_SyncSource == NULL) {
230                                 debugWarning(" => Sync Source is %p.\n", *it);
231                                 m_SyncSource = *it;
232                         }
233         }
234
235         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
236                 it != m_TransmitProcessors.end();
237                 ++it ) {
238                         if(m_SyncSource == NULL) {
239                                 debugWarning(" => Sync Source is %p.\n", *it);
240                                 m_SyncSource = *it;
241                         }
242         }
243
244         // now do the actual preparation
245         debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
246         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
247                 it != m_ReceiveProcessors.end();
248                 ++it ) {
249                         if(!(*it)->setSyncSource(m_SyncSource)) {
250                                 debugFatal(  " could not set sync source (%p)...\n",(*it));
251                                 return false;
252                         }
253                        
254                         if(!(*it)->prepare()) {
255                                 debugFatal(  " could not prepare (%p)...\n",(*it));
256                                 return false;
257                         }
258         }
259
260         debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
261         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
262                 it != m_TransmitProcessors.end();
263                 ++it ) {
264                         if(!(*it)->setSyncSource(m_SyncSource)) {
265                                 debugFatal(  " could not set sync source (%p)...\n",(*it));
266                                 return false;
267                         }               
268                         if(!(*it)->prepare()) {
269                                 debugFatal( " could not prepare (%p)...\n",(*it));
270                                 return false;
271                         }
272         }
273
274     // if there are no stream processors registered,
275     // fail
276     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
277         debugFatal("No stream processors registered, can't do anything usefull\n");
278         return false;
279     }
280
281         return true;
282 }
283
284 // FIXME: this can be removed
285 bool StreamProcessorManager::Execute()
286 {
287         // temp measure, polling
288         usleep(1000);
289
290         // FIXME: move this to an IsoHandlerManager sub-thread
291         // and make this private again in IHM
292         m_isoManager->updateCycleTimers();
293        
294         return true;
295 }
296
297 bool StreamProcessorManager::start() {
298         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
299         assert(m_isoManager);
300        
301         debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
302         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
303         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
304                 it != m_ReceiveProcessors.end();
305                 ++it ) {
306                         if (!(*it)->prepareForStart()) {
307                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Receive stream processor (%p) failed to prepare for start\n", *it);
308                                 return false;
309                         }
310                         if (!m_isoManager->registerStream(*it)) {
311                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
312                                 return false;
313                         }
314                 }
315
316         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
317         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
318                 it != m_TransmitProcessors.end();
319                 ++it ) {
320                         if (!(*it)->prepareForStart()) {
321                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Transmit stream processor (%p) failed to prepare for start\n", *it);
322                                 return false;
323                         }
324                         if (!m_isoManager->registerStream(*it)) {
325                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
326                                 return false;
327                         }
328                 }
329
330         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
331         if (!m_isoManager->prepare()) {
332                 debugFatal("Could not prepare isoManager\n");
333                 return false;
334         }
335
336         debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
337         if (!disableStreamProcessors()) {
338                 debugFatal("Could not disable StreamProcessors...\n");
339                 return false;
340         }
341                
342         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n");
343         if (!m_isoManager->startHandlers(0)) {
344                 debugFatal("Could not start handlers...\n");
345                 return false;
346         }
347        
348         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting streaming threads...\n");
349
350         // note: libraw1394 doesn't like it if you poll() and/or iterate() before
351         //       starting the streams.
352         // start the runner thread
353         // FIXME: maybe this should go into the isomanager itself.
354         m_isoManagerThread->Start();
355        
356         // start the runner thread
357         // FIXME: not used anymore (for updatecycletimers ATM, but that's not good)
358         m_streamingThread->Start();
359
360         debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to start running...\n");
361         // we have to wait until all streamprocessors indicate that they are running
362         // i.e. that there is actually some data stream flowing
363         int wait_cycles=2000; // two seconds
364         bool notRunning=true;
365         while (notRunning && wait_cycles) {
366                 wait_cycles--;
367                 notRunning=false;
368                
369                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
370                         it != m_ReceiveProcessors.end();
371                         ++it ) {
372                         if(!(*it)->isRunning()) notRunning=true;
373                 }
374        
375                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
376                         it != m_TransmitProcessors.end();
377                         ++it ) {
378                         if(!(*it)->isRunning()) notRunning=true;
379                 }
380                 usleep(1000);
381         }
382        
383         if(!wait_cycles) { // timout has occurred
384                 debugFatal("One or more streams are not starting up (timeout):\n");
385                            
386                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
387                         it != m_ReceiveProcessors.end();
388                         ++it ) {
389                         if(!(*it)->isRunning()) {
390                                 debugFatal(" receive stream %p not running\n",*it);
391                         } else {       
392                                 debugFatal(" receive stream %p running\n",*it);
393                         }
394                 }
395        
396                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
397                         it != m_TransmitProcessors.end();
398                         ++it ) {
399                         if(!(*it)->isRunning()) {
400                                 debugFatal(" transmit stream %p not running\n",*it);
401                         } else {       
402                                 debugFatal(" transmit stream %p running\n",*it);
403                         }
404                 }
405                 return false;
406         }
407
408         // we want to make sure that everything is running well,
409         // so wait for a while
410         usleep(USECS_PER_CYCLE * CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL);
411
412         debugOutput( DEBUG_LEVEL_VERBOSE, "StreamProcessors running...\n");
413         debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting counters...\n");
414        
415         // now we reset the frame counters
416         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
417                 it != m_ReceiveProcessors.end();
418                 ++it ) {
419                
420                 debugOutput( DEBUG_LEVEL_VERBOSE, "Before:\n");
421                
422                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
423                         (*it)->dumpInfo();
424                 }
425
426                 (*it)->reset();
427                
428                 debugOutput( DEBUG_LEVEL_VERBOSE, "After:\n");
429
430                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
431                         (*it)->dumpInfo();
432                 }
433                
434         }
435        
436         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
437                 it != m_TransmitProcessors.end();
438                 ++it ) {
439                
440                 debugOutput( DEBUG_LEVEL_VERBOSE, "Before:\n");
441                
442                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
443                         (*it)->dumpInfo();
444                 }
445                
446                 (*it)->reset();
447                
448                 debugOutput( DEBUG_LEVEL_VERBOSE, "After:\n");
449                
450                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
451                         (*it)->dumpInfo();
452                 }
453         }
454        
455         debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n");
456        
457         debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor...\n");
458         if (!m_SyncSource->prepareForEnable()) {
459                 debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n");
460                 return false;
461         }
462        
463         m_SyncSource->enable();
464
465         debugOutput( DEBUG_LEVEL_VERBOSE, " All StreamProcessors...\n");
466         if (!enableStreamProcessors()) {
467                 debugFatal("Could not enable StreamProcessors...\n");
468                 return false;
469         }
470        
471         // dump the iso stream information when in verbose mode
472         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
473                 m_isoManager->dumpInfo();
474         }
475        
476         return true;
477        
478 }
479
480 bool StreamProcessorManager::stop() {
481         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
482         assert(m_isoManager);
483         assert(m_streamingThread);
484
485         debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to prepare to stop...\n");
486         // Most stream processors can just stop without special treatment.  However, some
487         // (like the MOTU) need to do a few things before it's safe to turn off the iso
488         // handling.
489         int wait_cycles=2000; // two seconds ought to be sufficient
490         bool allReady = false;
491         while (!allReady && wait_cycles) {
492                 wait_cycles--;
493                 allReady = true;
494                
495                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
496                         it != m_ReceiveProcessors.end();
497                         ++it ) {
498                         if(!(*it)->prepareForStop()) allReady = false;
499                 }
500        
501                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
502                         it != m_TransmitProcessors.end();
503                         ++it ) {
504                         if(!(*it)->prepareForStop()) allReady = false;
505                 }
506                 usleep(1000);
507         }
508
509
510         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping threads...\n");
511        
512         m_streamingThread->Stop();
513         m_isoManagerThread->Stop();
514        
515         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n");
516         if(!m_isoManager->stopHandlers()) {
517            debugFatal("Could not stop ISO handlers\n");
518            return false;
519         }
520        
521         debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processors from handlers...\n");
522     // now unregister all streams from iso manager
523         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
524         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
525                 it != m_ReceiveProcessors.end();
526                 ++it ) {
527                         if (!m_isoManager->unregisterStream(*it)) {
528                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it);
529                                 return false;
530                         }
531                        
532                 }
533
534         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
535         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
536                 it != m_TransmitProcessors.end();
537                 ++it ) {
538                         if (!m_isoManager->unregisterStream(*it)) {
539                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it);
540                                 return false;
541                         }
542                        
543                 }
544        
545         return true;
546        
547 }
548
549 /**
550  * Enables the registered StreamProcessors
551  * @return true if successful, false otherwise
552  */
553 bool StreamProcessorManager::enableStreamProcessors() {
554         // and we enable the streamprocessors
555         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
556                 it != m_ReceiveProcessors.end();
557                 ++it ) {               
558                 (*it)->prepareForEnable();
559                 (*it)->enable();
560         }
561
562         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
563                 it != m_TransmitProcessors.end();
564                 ++it ) {
565                 (*it)->prepareForEnable();
566                 (*it)->enable();
567         }
568         return true;
569 }
570
571 /**
572  * Disables the registered StreamProcessors
573  * @return true if successful, false otherwise
574  */
575 bool StreamProcessorManager::disableStreamProcessors() {
576         // and we disable the streamprocessors
577         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
578                 it != m_ReceiveProcessors.end();
579                 ++it ) {
580                 (*it)->prepareForDisable();
581                 (*it)->disable();
582         }
583
584         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
585                 it != m_TransmitProcessors.end();
586                 ++it ) {
587                 (*it)->prepareForDisable();
588                 (*it)->disable();
589         }
590         return true;
591 }
592
593 /**
594  * Called upon Xrun events. This brings all StreamProcessors back
595  * into their starting state, and then carries on streaming. This should
596  * have the same effect as restarting the whole thing.
597  *
598  * @return true if successful, false otherwise
599  */
600 bool StreamProcessorManager::handleXrun() {
601
602         debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
603
604         /*
605          * Reset means:
606          * 1) Disabling the SP's, so that they don't process any packets
607          *    note: the isomanager does keep on delivering/requesting them
608          * 2) Bringing all buffers & streamprocessors into a know state
609          *    - Clear all capture buffers
610          *    - Put nb_periods*period_size of null frames into the playback buffers
611          * 3) Re-enable the SP's
612          */
613         debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
614         if (!disableStreamProcessors()) {
615                 debugFatal("Could not disable StreamProcessors...\n");
616                 return false;
617         }
618
619         debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting Processors...\n");
620        
621         // now we reset the streamprocessors
622         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
623                 it != m_ReceiveProcessors.end();
624                 ++it ) {
625                
626                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
627                         (*it)->dumpInfo();
628                 }
629                
630                 (*it)->reset();
631                
632                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
633                         (*it)->dumpInfo();
634                 }
635         }
636        
637         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
638                 it != m_TransmitProcessors.end();
639                 ++it ) {
640                
641                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
642                         (*it)->dumpInfo();
643                 }
644                
645                 (*it)->reset();
646                
647                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
648                         (*it)->dumpInfo();
649                 }
650         }
651
652         debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n");
653        
654         debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor...\n");
655         if (!m_SyncSource->prepareForEnable()) {
656                 debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n");
657                 return false;
658         }
659        
660         m_SyncSource->enable();
661
662         debugOutput( DEBUG_LEVEL_VERBOSE, " All StreamProcessors...\n");
663         if (!enableStreamProcessors()) {
664                 debugFatal("Could not enable StreamProcessors...\n");
665                 return false;
666         }
667
668         debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
669        
670         return true;
671 }
672
673 /**
674  * @brief Waits until the next period of samples is ready
675  *
676  * This function does not return until a full period of samples is (or should be)
677  * ready to be transferred.
678  *
679  * @return true if the period is ready, false if an xrun occurred
680  */
681 bool StreamProcessorManager::waitForPeriod() {
682     int time_till_next_period;
683     bool xrun_occurred=false;
684        
685     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
686
687     assert(m_SyncSource);
688    
689     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodUsecs();
690    
691     while(time_till_next_period > 0) {
692         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
693    
694         // wait for the period
695         usleep(time_till_next_period);
696        
697         // check for underruns on the ISO side,
698         // those should make us bail out of the wait loop
699         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
700             it != m_ReceiveProcessors.end();
701             ++it ) {
702             // a xrun has occurred on the Iso side
703             xrun_occurred |= (*it)->xrunOccurred();
704         }
705         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
706             it != m_TransmitProcessors.end();
707             ++it ) {
708             // a xrun has occurred on the Iso side
709             xrun_occurred |= (*it)->xrunOccurred();
710         }
711
712         // check if we were waked up too soon
713         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodUsecs();
714     }
715    
716     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", -time_till_next_period);
717    
718     // this is to notify the client of the delay
719     // that we introduced
720     m_delayed_usecs=time_till_next_period;
721    
722     // we save the 'ideal' time of the transfer at this point,
723     // because we can have interleaved read - process - write
724     // cycles making that we modify a receiving stream's buffer
725     // before we get to writing.
726     // NOTE: before waitForPeriod() is called again, both the transmit
727     //       and the receive processors should have done their transfer.
728     m_time_of_transfer=m_SyncSource->getTimeAtPeriod();
729     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
730         m_time_of_transfer);
731    
732     xrun_occurred=false;
733    
734     // check if xruns occurred on the Iso side.
735     // also check if xruns will occur should we transfer() now
736    
737     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
738           it != m_ReceiveProcessors.end();
739           ++it ) {
740         // a xrun has occurred on the Iso side
741         xrun_occurred |= (*it)->xrunOccurred();
742        
743         // if this is true, a xrun will occur
744         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
745        
746 #ifdef DEBUG
747         if ((*it)->xrunOccurred()) {
748             debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it);
749         }
750         if (!((*it)->canClientTransferFrames(m_period))) {
751             debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it);
752         }
753 #endif
754        
755     }
756     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
757           it != m_TransmitProcessors.end();
758           ++it ) {
759         // a xrun has occurred on the Iso side
760         xrun_occurred |= (*it)->xrunOccurred();
761        
762         // if this is true, a xrun will occur
763         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
764        
765 #ifdef DEBUG
766         if ((*it)->xrunOccurred()) {
767             debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it);
768         }
769         if (!((*it)->canClientTransferFrames(m_period))) {
770             debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it);
771         }
772 #endif       
773     }
774    
775     m_nbperiods++;
776    
777     // now we can signal the client that we are (should be) ready
778     return !xrun_occurred;
779 }
780
781 /**
782  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
783  *
784  * Transfers one period of frames from the client side to the Iso side and vice versa.
785  *
786  * @return true if successful, false otherwise (indicates xrun).
787  */
788 bool StreamProcessorManager::transfer() {
789    
790     debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n");
791
792     if (!transfer(StreamProcessor::E_Receive)) return false;
793     if (!transfer(StreamProcessor::E_Transmit)) return false;
794
795     return true;
796 }
797
798 /**
799  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
800  *
801  * Transfers one period of frames from the client side to the Iso side or vice versa.
802  *
803  * @param t The processor type to tranfer for (receive or transmit)
804  * @return true if successful, false otherwise (indicates xrun).
805  */
806
807 bool StreamProcessorManager::transfer(enum StreamProcessor::EProcessorType t) {
808     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
809    
810     // a static cast could make sure that there is no performance
811     // penalty for the virtual functions (to be checked)
812     if (t==StreamProcessor::E_Receive) {
813         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
814                 it != m_ReceiveProcessors.end();
815                 ++it ) {
816            
817             //#ifdef DEBUG
818             #if 0
819             {
820                 uint64_t ts_tail=0;
821                 uint64_t fc_tail=0;
822                
823                 uint64_t ts_head=0;
824                 uint64_t fc_head=0;
825                
826                 int cnt=0;
827                
828                 (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
829                 (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
830                
831                 while((fc_head != fc_tail) && (cnt++ < 10)) {
832                     (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
833                     (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
834                 }
835                
836                 debugOutput(DEBUG_LEVEL_VERBOSE,"R => HEAD: TS=%11llu, FC=%5llu | TAIL: TS=%11llu, FC=%5llu, %d\n",
837                     ts_head, fc_head, ts_tail, fc_tail, cnt);
838             }
839             #endif
840    
841             if(!(*it)->getFrames(m_period, (int64_t)m_time_of_transfer)) {
842                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)",
843                             m_period, m_time_of_transfer,*it);
844                     return false; // buffer underrun
845             }
846            
847             //#ifdef DEBUG
848             #if 0
849             {
850                 uint64_t ts_tail=0;
851                 uint64_t fc_tail=0;
852                
853                 uint64_t ts_head=0;
854                 uint64_t fc_head=0;
855                
856                 int cnt=0;
857                
858                 (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
859                 (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
860            
861                 while((fc_head != fc_tail) && (cnt++ < 10)) {
862                     (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
863                     (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
864                 }
865                
866                 debugOutput(DEBUG_LEVEL_VERBOSE,"R  > HEAD: TS=%11llu, FC=%5llu | TAIL: TS=%11llu, FC=%5llu, %d\n",
867                     ts_head, fc_head, ts_tail, fc_tail, cnt);
868             }
869             #endif
870    
871         }
872     } else {
873         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
874                 it != m_TransmitProcessors.end();
875                 ++it ) {
876                
877             //#ifdef DEBUG
878             #if 0
879             {
880                 uint64_t ts_tail=0;
881                 uint64_t fc_tail=0;
882                
883                 uint64_t ts_head=0;
884                 uint64_t fc_head=0;
885                
886                 int cnt=0;
887                
888                 (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
889                 (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
890                
891                 while((fc_head != fc_tail) && (cnt++ < 10)) {
892                     (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
893                     (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
894                 }
895                
896                 debugOutput(DEBUG_LEVEL_VERBOSE,"T => HEAD: TS=%11llu, FC=%5llu | TAIL: TS=%11llu, FC=%5llu, %d\n",
897                     ts_head, fc_head, ts_tail, fc_tail, cnt);
898             }
899             #endif
900                
901             if(!(*it)->putFrames(m_period, (int64_t)m_time_of_transfer)) {
902                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)",
903                         m_period, m_time_of_transfer, *it);
904                 return false; // buffer overrun
905             }
906            
907             //#ifdef DEBUG
908             #if 0
909             {
910                 uint64_t ts_tail=0;
911                 uint64_t fc_tail=0;
912                
913                 uint64_t ts_head=0;
914                 uint64_t fc_head=0;
915                
916                 int cnt=0;
917                
918                 (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
919                 (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
920            
921                 while((fc_head != fc_tail) && (cnt++ < 10)) {
922                     (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
923                     (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
924                 }
925                
926                 debugOutput(DEBUG_LEVEL_VERBOSE,"T  > HEAD: TS=%11llu, FC=%5llu | TAIL: TS=%11llu, FC=%5llu, %d\n",
927                     ts_head, fc_head, ts_tail, fc_tail, cnt);
928             }
929             #endif
930         }
931     }
932
933     return true;
934 }
935
936 void StreamProcessorManager::dumpInfo() {
937         debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
938         debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
939         debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
940
941         debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
942         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
943                 it != m_ReceiveProcessors.end();
944                 ++it ) {
945                 (*it)->dumpInfo();
946         }
947
948         debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
949         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
950                 it != m_TransmitProcessors.end();
951                 ++it ) {
952                 (*it)->dumpInfo();
953         }
954
955         debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
956         m_isoManager->dumpInfo();
957         debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
958
959 }
960
961 void StreamProcessorManager::setVerboseLevel(int l) {
962         setDebugLevel(l);
963
964         if (m_isoManager) m_isoManager->setVerboseLevel(l);
965
966         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
967         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
968                 it != m_ReceiveProcessors.end();
969                 ++it ) {
970                 (*it)->setVerboseLevel(l);
971         }
972
973         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
974         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
975                 it != m_TransmitProcessors.end();
976                 ++it ) {
977                 (*it)->setVerboseLevel(l);
978         }
979 }
980
981
982 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
983         int count=0;
984
985         if (direction == Port::E_Capture) {
986                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
987                         it != m_ReceiveProcessors.end();
988                         ++it ) {
989                         count += (*it)->getPortCount(type);
990                 }
991         } else {
992                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
993                         it != m_TransmitProcessors.end();
994                         ++it ) {
995                         count += (*it)->getPortCount(type);
996                 }
997         }
998         return count;
999 }
1000
1001 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1002         int count=0;
1003
1004         if (direction == Port::E_Capture) {
1005                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1006                         it != m_ReceiveProcessors.end();
1007                         ++it ) {
1008                         count += (*it)->getPortCount();
1009                 }
1010         } else {
1011                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1012                         it != m_TransmitProcessors.end();
1013                         ++it ) {
1014                         count += (*it)->getPortCount();
1015                 }
1016         }
1017         return count;
1018 }
1019
1020 // TODO: implement a port map here, instead of the loop
1021
1022 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1023         int count=0;
1024         int prevcount=0;
1025
1026         if (direction == Port::E_Capture) {
1027                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1028                         it != m_ReceiveProcessors.end();
1029                         ++it ) {
1030                         count += (*it)->getPortCount();
1031                         if (count > idx) {
1032                                 return (*it)->getPortAtIdx(idx-prevcount);
1033                         }
1034                         prevcount=count;
1035                 }
1036         } else {
1037                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1038                         it != m_TransmitProcessors.end();
1039                         ++it ) {
1040                         count += (*it)->getPortCount();
1041                         if (count > idx) {
1042                                 return (*it)->getPortAtIdx(idx-prevcount);
1043                         }
1044                         prevcount=count;
1045                 }
1046         }
1047         return NULL;
1048 }
1049
1050 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1051     m_thread_realtime=rt;
1052     m_thread_priority=priority;
1053     return true;
1054 }
1055
1056
1057 } // end of namespace
Note: See TracBrowser for help on using the browser.