root/branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp

Revision 384, 30.7 kB (checked in by pieterpalmers, 16 years ago)

- temporary commit as backup measure
- rewrote synchronisation code
- receive streaming based on SYT works
- transmit streaming synced to received stream sort of works, still

have to iron out some issues.

NOTE: all devices but the bebob's are disabled in this code,

because they still have to be ported to the new sync
mechanism.

Line 
1 /* $Id$ */
2
3 /*
4  *   FreeBob Streaming API
5  *   FreeBob = Firewire (pro-)audio for linux
6  *
7  *   http://freebob.sf.net
8  *
9  *   Copyright (C) 2005,2006 Pieter Palmers <pieterpalmers@users.sourceforge.net>
10  *
11  *   This program is free software {} you can redistribute it and/or modify
12  *   it under the terms of the GNU General Public License as published by
13  *   the Free Software Foundation {} either version 2 of the License, or
14  *   (at your option) any later version.
15  *
16  *   This program is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY {} without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with this program {} if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  *
26  *
27  */
28
29 #include "StreamProcessorManager.h"
30 #include "StreamProcessor.h"
31 #include "Port.h"
32 #include <errno.h>
33 #include <assert.h>
34
35
36 namespace FreebobStreaming {
37
38 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_NORMAL );
39
40 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int nb_buffers)
41         : m_SyncSource(NULL), m_nb_buffers(nb_buffers), m_period(period), m_xruns(0),
42         m_isoManager(0), m_nbperiods(0) {
43
44 }
45
46 StreamProcessorManager::~StreamProcessorManager() {
47         if (m_isoManager) delete m_isoManager;
48        
49 }
50
51 /**
52  * Registers \ref processor with this manager.
53  *
54  * also registers it with the isohandlermanager
55  *
56  * be sure to call isohandlermanager->init() first!
57  * and be sure that the processors are also ->init()'ed
58  *
59  * @param processor
60  * @return true if successfull
61  */
62 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
63 {
64         debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
65         assert(processor);
66         assert(m_isoManager);
67
68         if (processor->getType()==StreamProcessor::E_Receive) {
69                 processor->setVerboseLevel(getDebugLevel()); // inherit debug level
70                
71                 m_ReceiveProcessors.push_back(processor);
72                
73                 processor->setManager(this);
74                                
75                 return true;
76         }
77        
78         if (processor->getType()==StreamProcessor::E_Transmit) {
79                 processor->setVerboseLevel(getDebugLevel()); // inherit debug level
80                
81                 m_TransmitProcessors.push_back(processor);
82                
83                 processor->setManager(this);
84                
85                 return true;
86         }
87
88         debugFatal("Unsupported processor type!\n");
89        
90         return false;
91 }
92
93 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
94 {
95         debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
96         assert(processor);
97
98         if (processor->getType()==StreamProcessor::E_Receive) {
99
100                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
101                         it != m_ReceiveProcessors.end();
102                         ++it ) {
103
104                         if ( *it == processor ) {
105                                         m_ReceiveProcessors.erase(it);
106                                        
107                                         processor->clearManager();
108                                        
109                                         if(!m_isoManager->unregisterStream(processor)) {
110                                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
111                                                
112                                                 return false;
113                                                
114                                         }
115                                        
116                                         return true;
117                                 }
118                 }
119         }
120
121         if (processor->getType()==StreamProcessor::E_Transmit) {
122                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
123                         it != m_TransmitProcessors.end();
124                         ++it ) {
125
126                         if ( *it == processor ) {
127                                         m_TransmitProcessors.erase(it);
128                                        
129                                         processor->clearManager();
130                                        
131                                         if(!m_isoManager->unregisterStream(processor)) {
132                                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
133                                                
134                                                 return false;
135                                                
136                                         }
137                                        
138                                         return true;
139                                 }
140                 }
141         }
142        
143         debugFatal("Processor (%p) not found!\n",processor);
144
145         return false; //not found
146
147 }
148
149 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
150     m_SyncSource=s;
151     return true;
152 }
153
154 StreamProcessor *StreamProcessorManager::getSyncSource() {
155     return m_SyncSource;
156 }
157
158 bool StreamProcessorManager::init()
159 {
160         debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
161
162         // the tread that runs the StreamProcessor
163         // checking the period boundaries
164         m_streamingThread=new FreebobUtil::PosixThread(this,
165            m_thread_realtime, m_thread_priority+5,
166            PTHREAD_CANCEL_DEFERRED);
167            
168         if(!m_streamingThread) {
169                 debugFatal("Could not create streaming thread\n");
170                 return false;
171         }
172        
173         m_isoManager=new IsoHandlerManager();
174        
175         if(!m_isoManager) {
176                 debugFatal("Could not create IsoHandlerManager\n");
177                 return false;
178         }
179        
180         m_isoManager->setVerboseLevel(getDebugLevel());
181        
182         // the tread that keeps the handler's cycle timers up to date
183         // and performs the actual packet transfer
184         // needs high priority
185         m_isoManagerThread=new FreebobUtil::PosixThread(
186               m_isoManager,
187               m_thread_realtime, m_thread_priority+6,
188               PTHREAD_CANCEL_DEFERRED);
189              
190         if(!m_isoManagerThread) {
191                 debugFatal("Could not create iso manager thread\n");
192                 return false;
193         }
194
195         return true;
196 }
197
198 bool StreamProcessorManager::Init()
199 {
200     debugOutput( DEBUG_LEVEL_VERBOSE, "Initializing runner...\n");
201
202     // no xrun has occurred (yet)
203     m_xrun_happened=false;
204
205     if(sem_init(&m_period_semaphore, 0, 0)) {
206         debugFatal( "Cannot init period transfer semaphore\n");
207         debugFatal( " Error: %s\n",strerror(errno));
208         return false;
209     }
210
211     return true;
212 }
213
214 bool StreamProcessorManager::prepare() {
215
216         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
217        
218         // if no sync source is set, select one here
219         if(m_SyncSource == NULL) {
220            debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
221         }
222        
223         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
224                 it != m_ReceiveProcessors.end();
225                 ++it ) {
226                         if(m_SyncSource == NULL) {
227                                 debugWarning(" => Sync Source is %p.\n", *it);
228                                 m_SyncSource = *it;
229                         }
230         }
231
232         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
233                 it != m_TransmitProcessors.end();
234                 ++it ) {
235                         if(m_SyncSource == NULL) {
236                                 debugWarning(" => Sync Source is %p.\n", *it);
237                                 m_SyncSource = *it;
238                         }
239         }
240
241         // now do the actual preparation
242         debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
243         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
244                 it != m_ReceiveProcessors.end();
245                 ++it ) {
246                         if(!(*it)->setSyncSource(m_SyncSource)) {
247                                 debugFatal(  " could not set sync source (%p)...\n",(*it));
248                                 return false;
249                         }
250                        
251                         if(!(*it)->prepare()) {
252                                 debugFatal(  " could not prepare (%p)...\n",(*it));
253                                 return false;
254                         }
255         }
256
257         debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
258         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
259                 it != m_TransmitProcessors.end();
260                 ++it ) {
261                         if(!(*it)->setSyncSource(m_SyncSource)) {
262                                 debugFatal(  " could not set sync source (%p)...\n",(*it));
263                                 return false;
264                         }               
265                         if(!(*it)->prepare()) {
266                                 debugFatal( " could not prepare (%p)...\n",(*it));
267                                 return false;
268                         }
269         }
270
271     // if there are no stream processors registered,
272     // fail
273     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
274         debugFatal("No stream processors registered, can't do anything usefull\n");
275         return false;
276     }
277
278         return true;
279 }
280
281 // FIXME: this can be removed
282 bool StreamProcessorManager::Execute()
283 {
284         // temp measure, polling
285         usleep(1000);
286
287         // FIXME: move this to an IsoHandlerManager sub-thread
288         // and make this private again in IHM
289         m_isoManager->updateCycleTimers();
290        
291         return true;
292 }
293
294 bool StreamProcessorManager::start() {
295         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
296         assert(m_isoManager);
297        
298         debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
299         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
300         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
301                 it != m_ReceiveProcessors.end();
302                 ++it ) {
303                         if (!(*it)->prepareForStart()) {
304                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Receive stream processor (%p) failed to prepare for start\n", *it);
305                                 return false;
306                         }
307                         if (!m_isoManager->registerStream(*it)) {
308                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
309                                 return false;
310                         }
311                 }
312
313         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
314         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
315                 it != m_TransmitProcessors.end();
316                 ++it ) {
317                         if (!(*it)->prepareForStart()) {
318                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Transmit stream processor (%p) failed to prepare for start\n", *it);
319                                 return false;
320                         }
321                         if (!m_isoManager->registerStream(*it)) {
322                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
323                                 return false;
324                         }
325                 }
326
327         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
328         if (!m_isoManager->prepare()) {
329                 debugFatal("Could not prepare isoManager\n");
330                 return false;
331         }
332
333         debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
334         if (!disableStreamProcessors()) {
335                 debugFatal("Could not disable StreamProcessors...\n");
336                 return false;
337         }
338                
339         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n");
340         if (!m_isoManager->startHandlers(0)) {
341                 debugFatal("Could not start handlers...\n");
342                 return false;
343         }
344        
345         debugOutput( DEBUG_LEVEL_VERBOSE, "Starting streaming threads...\n");
346
347         // note: libraw1394 doesn't like it if you poll() and/or iterate() before
348         //       starting the streams.
349         // start the runner thread
350         // FIXME: maybe this should go into the isomanager itself.
351         m_isoManagerThread->Start();
352        
353         // start the runner thread
354         // FIXME: not used anymore (for updatecycletimers ATM, but that's not good)
355         m_streamingThread->Start();
356
357         debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to start running...\n");
358         // we have to wait until all streamprocessors indicate that they are running
359         // i.e. that there is actually some data stream flowing
360         int wait_cycles=2000; // two seconds
361         bool notRunning=true;
362         while (notRunning && wait_cycles) {
363                 wait_cycles--;
364                 notRunning=false;
365                
366                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
367                         it != m_ReceiveProcessors.end();
368                         ++it ) {
369                         if(!(*it)->isRunning()) notRunning=true;
370                 }
371        
372                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
373                         it != m_TransmitProcessors.end();
374                         ++it ) {
375                         if(!(*it)->isRunning()) notRunning=true;
376                 }
377                 usleep(1000);
378         }
379        
380         if(!wait_cycles) { // timout has occurred
381                 debugFatal("One or more streams are not starting up (timeout):\n");
382                            
383                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
384                         it != m_ReceiveProcessors.end();
385                         ++it ) {
386                         if(!(*it)->isRunning()) {
387                                 debugFatal(" receive stream %p not running\n",*it);
388                         } else {       
389                                 debugFatal(" receive stream %p running\n",*it);
390                         }
391                 }
392        
393                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
394                         it != m_TransmitProcessors.end();
395                         ++it ) {
396                         if(!(*it)->isRunning()) {
397                                 debugFatal(" transmit stream %p not running\n",*it);
398                         } else {       
399                                 debugFatal(" transmit stream %p running\n",*it);
400                         }
401                 }
402                 return false;
403         }
404
405         debugOutput( DEBUG_LEVEL_VERBOSE, "StreamProcessors running...\n");
406         debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting frame counters...\n");
407        
408         // now we reset the frame counters
409         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
410                 it != m_ReceiveProcessors.end();
411                 ++it ) {
412                
413                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
414                         (*it)->dumpInfo();
415                 }
416
417                 (*it)->reset();
418
419                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
420                         (*it)->dumpInfo();
421                 }
422                
423         }
424        
425         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
426                 it != m_TransmitProcessors.end();
427                 ++it ) {
428                
429                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
430                         (*it)->dumpInfo();
431                 }
432                
433                 (*it)->reset();
434                
435                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
436                         (*it)->dumpInfo();
437                 }
438         }
439        
440         debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n");
441        
442         debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor...\n");
443         if (!m_SyncSource->prepareForEnable()) {
444                 debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n");
445                 return false;
446         }
447        
448         m_SyncSource->enable();
449
450         debugOutput( DEBUG_LEVEL_VERBOSE, " All StreamProcessors...\n");
451         if (!enableStreamProcessors()) {
452                 debugFatal("Could not enable StreamProcessors...\n");
453                 return false;
454         }
455        
456         // dump the iso stream information when in verbose mode
457         if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
458                 m_isoManager->dumpInfo();
459         }
460        
461         return true;
462        
463 }
464
465 bool StreamProcessorManager::stop() {
466         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
467         assert(m_isoManager);
468         assert(m_streamingThread);
469
470         debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to prepare to stop...\n");
471         // Most stream processors can just stop without special treatment.  However, some
472         // (like the MOTU) need to do a few things before it's safe to turn off the iso
473         // handling.
474         int wait_cycles=2000; // two seconds ought to be sufficient
475         bool allReady = false;
476         while (!allReady && wait_cycles) {
477                 wait_cycles--;
478                 allReady = true;
479                
480                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
481                         it != m_ReceiveProcessors.end();
482                         ++it ) {
483                         if(!(*it)->prepareForStop()) allReady = false;
484                 }
485        
486                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
487                         it != m_TransmitProcessors.end();
488                         ++it ) {
489                         if(!(*it)->prepareForStop()) allReady = false;
490                 }
491                 usleep(1000);
492         }
493
494
495         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping threads...\n");
496        
497         m_streamingThread->Stop();
498         m_isoManagerThread->Stop();
499        
500         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n");
501         if(!m_isoManager->stopHandlers()) {
502            debugFatal("Could not stop ISO handlers\n");
503            return false;
504         }
505        
506         debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processors from handlers...\n");
507     // now unregister all streams from iso manager
508         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
509         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
510                 it != m_ReceiveProcessors.end();
511                 ++it ) {
512                         if (!m_isoManager->unregisterStream(*it)) {
513                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it);
514                                 return false;
515                         }
516                        
517                 }
518
519         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
520         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
521                 it != m_TransmitProcessors.end();
522                 ++it ) {
523                         if (!m_isoManager->unregisterStream(*it)) {
524                                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it);
525                                 return false;
526                         }
527                        
528                 }
529        
530         return true;
531        
532 }
533
534 /**
535  * Enables the registered StreamProcessors
536  * @return true if successful, false otherwise
537  */
538 bool StreamProcessorManager::enableStreamProcessors() {
539         // and we enable the streamprocessors
540         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
541                 it != m_ReceiveProcessors.end();
542                 ++it ) {               
543                 (*it)->prepareForEnable();
544                 (*it)->enable();
545         }
546
547         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
548                 it != m_TransmitProcessors.end();
549                 ++it ) {
550                 (*it)->prepareForEnable();
551                 (*it)->enable();
552         }
553         return true;
554 }
555
556 /**
557  * Disables the registered StreamProcessors
558  * @return true if successful, false otherwise
559  */
560 bool StreamProcessorManager::disableStreamProcessors() {
561         // and we disable the streamprocessors
562         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
563                 it != m_ReceiveProcessors.end();
564                 ++it ) {
565                 (*it)->prepareForDisable();
566                 (*it)->disable();
567         }
568
569         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
570                 it != m_TransmitProcessors.end();
571                 ++it ) {
572                 (*it)->prepareForDisable();
573                 (*it)->disable();
574         }
575         return true;
576 }
577
578 /**
579  * Called upon Xrun events. This brings all StreamProcessors back
580  * into their starting state, and then carries on streaming. This should
581  * have the same effect as restarting the whole thing.
582  *
583  * @return true if successful, false otherwise
584  */
585 bool StreamProcessorManager::handleXrun() {
586
587         debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
588
589         /*
590          * Reset means:
591          * 1) Disabling the SP's, so that they don't process any packets
592          *    note: the isomanager does keep on delivering/requesting them
593          * 2) Bringing all buffers & streamprocessors into a know state
594          *    - Clear all capture buffers
595          *    - Put nb_periods*period_size of null frames into the playback buffers
596          * 3) Re-enable the SP's
597          */
598         debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
599         if (!disableStreamProcessors()) {
600                 debugFatal("Could not disable StreamProcessors...\n");
601                 return false;
602         }
603
604         debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting Processors...\n");
605        
606         // now we reset the streamprocessors
607         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
608                 it != m_ReceiveProcessors.end();
609                 ++it ) {
610                
611                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
612                         (*it)->dumpInfo();
613                 }
614                
615                 (*it)->reset();
616                
617                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
618                         (*it)->dumpInfo();
619                 }
620         }
621        
622         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
623                 it != m_TransmitProcessors.end();
624                 ++it ) {
625                
626                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
627                         (*it)->dumpInfo();
628                 }
629                
630                 (*it)->reset();
631                
632                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
633                         (*it)->dumpInfo();
634                 }
635         }
636
637         debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n");
638        
639         debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor...\n");
640         if (!m_SyncSource->prepareForEnable()) {
641                 debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n");
642                 return false;
643         }
644        
645         m_SyncSource->enable();
646
647         debugOutput( DEBUG_LEVEL_VERBOSE, " All StreamProcessors...\n");
648         if (!enableStreamProcessors()) {
649                 debugFatal("Could not enable StreamProcessors...\n");
650                 return false;
651         }
652
653         debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
654        
655         return true;
656 }
657
658 /**
659  * @brief Waits until the next period of samples is ready
660  *
661  * This function does not return until a full period of samples is (or should be)
662  * ready to be transferred.
663  *
664  * @return true if the period is ready, false if an xrun occurred
665  */
666 bool StreamProcessorManager::waitForPeriod() {
667     int time_till_next_period;
668    
669     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
670
671     assert(m_SyncSource);
672    
673     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodUsecs();
674    
675     while(time_till_next_period > 0) {
676         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
677    
678         // wait for the period
679         usleep(time_till_next_period);
680        
681         // check if it is still true
682         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodUsecs();
683     }
684    
685     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", time_till_next_period);
686    
687     // this is to notify the client of the delay
688     // that we introduced
689     m_delayed_usecs=time_till_next_period;
690    
691     // we save the 'ideal' time of the transfer at this point,
692     // because we can have interleaved read - process - write
693     // cycles making that we modify a receiving stream's buffer
694     // before we get to writing.
695     // NOTE: before waitForPeriod() is called again, both the transmit
696     //       and the receive processors should have done their transfer.
697     m_time_of_transfer=m_SyncSource->getTimeAtPeriod();
698     debugOutput( DEBUG_LEVEL_VERBOSE, "transfer at %llu ticks...\n",
699         m_time_of_transfer);
700    
701     // check if xruns occurred on the Iso side.
702     // also check if xruns will occur should we transfer() now
703     bool xrun_occurred=false;
704    
705     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
706           it != m_ReceiveProcessors.end();
707           ++it ) {
708         // a xrun has occurred on the Iso side
709         xrun_occurred |= (*it)->xrunOccurred();
710        
711         // if this is true, a xrun will occur
712         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
713        
714 #ifdef DEBUG
715         if ((*it)->xrunOccurred()) {
716             debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it);
717         }
718         if (!((*it)->canClientTransferFrames(m_period))) {
719             debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it);
720         }
721 #endif
722        
723     }
724     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
725           it != m_TransmitProcessors.end();
726           ++it ) {
727         // a xrun has occurred on the Iso side
728         xrun_occurred |= (*it)->xrunOccurred();
729        
730         // if this is true, a xrun will occur
731         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
732        
733 #ifdef DEBUG
734         if ((*it)->xrunOccurred()) {
735             debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it);
736         }
737         if (!((*it)->canClientTransferFrames(m_period))) {
738             debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it);
739         }
740 #endif       
741     }
742    
743     // now we can signal the client that we are (should be) ready
744     return !xrun_occurred;
745 }
746
747 /**
748  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
749  *
750  * Transfers one period of frames from the client side to the Iso side and vice versa.
751  *
752  * @return true if successful, false otherwise (indicates xrun).
753  */
754 bool StreamProcessorManager::transfer() {
755    
756     debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n");
757
758     if (!transfer(StreamProcessor::E_Receive)) return false;
759     if (!transfer(StreamProcessor::E_Transmit)) return false;
760
761     return true;
762 }
763
764 /**
765  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
766  *
767  * Transfers one period of frames from the client side to the Iso side or vice versa.
768  *
769  * @param t The processor type to tranfer for (receive or transmit)
770  * @return true if successful, false otherwise (indicates xrun).
771  */
772
773 bool StreamProcessorManager::transfer(enum StreamProcessor::EProcessorType t) {
774     int64_t time_of_transfer;
775     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
776    
777     // first we should find out on what time this transfer is
778     // supposed to be happening. this time will become the time
779     // stamp for the transmitted buffer.
780     // NOTE: maybe we should include the transfer delay here, that
781     //       would make it equal for all types of SP's
782     time_of_transfer=m_time_of_transfer;
783    
784     // a static cast could make sure that there is no performance
785     // penalty for the virtual functions (to be checked)
786     if (t==StreamProcessor::E_Receive) {
787         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
788                 it != m_ReceiveProcessors.end();
789                 ++it ) {
790             uint64_t buffer_tail_ts;
791             uint64_t fc;
792             int64_t ts;
793        
794             (*it)->getBufferTailTimestamp(&buffer_tail_ts,&fc);
795             ts =  buffer_tail_ts;
796             ts += (int64_t)((-(int64_t)fc) * m_SyncSource->getTicksPerFrame());
797             // NOTE: ts can be negative due to wraparound, it is the responsability of the
798             //       SP to deal with that.
799            
800             float tmp=m_SyncSource->getTicksPerFrame();
801            
802             debugOutput(DEBUG_LEVEL_VERBOSE, "=> TS=%11lld, BLT=%11llu, FC=%5d, TPF=%f\n",
803                 ts, buffer_tail_ts, fc, tmp
804                 );
805             debugOutput(DEBUG_LEVEL_VERBOSE, "   TPF=%f\n", tmp);
806              
807             #ifdef DEBUG
808             {
809                 uint64_t ts_tail=0;
810                 uint64_t fc_tail=0;
811                
812                 uint64_t ts_head=0;
813                 uint64_t fc_head=0;
814                
815                 int cnt=0;
816                
817                 (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
818                 (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
819                
820                 while((fc_head != fc_tail) && (cnt++ < 10)) {
821                     (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
822                     (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
823                 }
824                
825                 debugOutput(DEBUG_LEVEL_VERBOSE,"R *  HEAD: TS=%llu, FC=%llu | TAIL: TS=%llu, FC=%llu, %d\n",
826                     ts_tail, fc_tail, ts_head, fc_head, cnt);
827             }
828             #endif
829    
830             if(!(*it)->getFrames(m_period, ts)) {
831                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u) from stream processor (%p)",
832                             m_period,*it);
833                     return false; // buffer underrun
834             }
835            
836             #ifdef DEBUG
837             {
838                 uint64_t ts_tail=0;
839                 uint64_t fc_tail=0;
840                
841                 uint64_t ts_head=0;
842                 uint64_t fc_head=0;
843                
844                 int cnt=0;
845                
846                 (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
847                 (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
848            
849                 while((fc_head != fc_tail) && (cnt++ < 10)) {
850                     (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
851                     (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
852                 }
853                
854                 debugOutput(DEBUG_LEVEL_VERBOSE,"R > HEAD: TS=%llu, FC=%llu | TAIL: TS=%llu, FC=%llu, %d\n",
855                     ts_tail, fc_tail, ts_head, fc_head, cnt);           
856             }
857             #endif
858    
859         }
860     } else {
861         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
862                 it != m_TransmitProcessors.end();
863                 ++it ) {
864                
865             #ifdef DEBUG
866             {
867                 uint64_t ts_tail=0;
868                 uint64_t fc_tail=0;
869                
870                 uint64_t ts_head=0;
871                 uint64_t fc_head=0;
872                
873                 int cnt=0;
874                
875                 (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
876                 (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
877                
878                 while((fc_head != fc_tail) && (cnt++ < 10)) {
879                     (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
880                     (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
881                 }
882                
883                 debugOutput(DEBUG_LEVEL_VERBOSE,"T *  HEAD: TS=%llu, FC=%llu | TAIL: TS=%llu, FC=%llu, %d\n",
884                     ts_tail, fc_tail, ts_head, fc_head, cnt);
885             }
886             #endif
887                
888             if(!(*it)->putFrames(m_period,time_of_transfer)) {
889                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)",
890                         m_period, time_of_transfer, *it);
891                 return false; // buffer overrun
892             }
893            
894             #ifdef DEBUG
895             {
896                 uint64_t ts_tail=0;
897                 uint64_t fc_tail=0;
898                
899                 uint64_t ts_head=0;
900                 uint64_t fc_head=0;
901                
902                 int cnt=0;
903                
904                 (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
905                 (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
906            
907                 while((fc_head != fc_tail) && (cnt++ < 10)) {
908                     (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);
909                     (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);
910                 }
911                
912                 debugOutput(DEBUG_LEVEL_VERBOSE,"T > HEAD: TS=%llu, FC=%llu | TAIL: TS=%llu, FC=%llu, %d\n",
913                     ts_tail, fc_tail, ts_head, fc_head, cnt);           
914             }
915             #endif
916         }
917     }
918
919     return true;
920 }
921
922 void StreamProcessorManager::dumpInfo() {
923         debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
924         debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
925         debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %d\n", m_nbperiods);
926
927         debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
928         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
929                 it != m_ReceiveProcessors.end();
930                 ++it ) {
931                 (*it)->dumpInfo();
932         }
933
934         debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
935         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
936                 it != m_TransmitProcessors.end();
937                 ++it ) {
938                 (*it)->dumpInfo();
939         }
940
941         debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
942         m_isoManager->dumpInfo();
943         debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
944
945 }
946
947 void StreamProcessorManager::setVerboseLevel(int l) {
948         setDebugLevel(l);
949
950         if (m_isoManager) m_isoManager->setVerboseLevel(l);
951
952         debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
953         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
954                 it != m_ReceiveProcessors.end();
955                 ++it ) {
956                 (*it)->setVerboseLevel(l);
957         }
958
959         debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
960         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
961                 it != m_TransmitProcessors.end();
962                 ++it ) {
963                 (*it)->setVerboseLevel(l);
964         }
965 }
966
967
968 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
969         int count=0;
970
971         if (direction == Port::E_Capture) {
972                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
973                         it != m_ReceiveProcessors.end();
974                         ++it ) {
975                         count += (*it)->getPortCount(type);
976                 }
977         } else {
978                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
979                         it != m_TransmitProcessors.end();
980                         ++it ) {
981                         count += (*it)->getPortCount(type);
982                 }
983         }
984         return count;
985 }
986
987 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
988         int count=0;
989
990         if (direction == Port::E_Capture) {
991                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
992                         it != m_ReceiveProcessors.end();
993                         ++it ) {
994                         count += (*it)->getPortCount();
995                 }
996         } else {
997                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
998                         it != m_TransmitProcessors.end();
999                         ++it ) {
1000                         count += (*it)->getPortCount();
1001                 }
1002         }
1003         return count;
1004 }
1005
1006 // TODO: implement a port map here, instead of the loop
1007
1008 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1009         int count=0;
1010         int prevcount=0;
1011
1012         if (direction == Port::E_Capture) {
1013                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1014                         it != m_ReceiveProcessors.end();
1015                         ++it ) {
1016                         count += (*it)->getPortCount();
1017                         if (count > idx) {
1018                                 return (*it)->getPortAtIdx(idx-prevcount);
1019                         }
1020                         prevcount=count;
1021                 }
1022         } else {
1023                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1024                         it != m_TransmitProcessors.end();
1025                         ++it ) {
1026                         count += (*it)->getPortCount();
1027                         if (count > idx) {
1028                                 return (*it)->getPortAtIdx(idx-prevcount);
1029                         }
1030                         prevcount=count;
1031                 }
1032         }
1033         return NULL;
1034 }
1035
1036 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1037     m_thread_realtime=rt;
1038     m_thread_priority=priority;
1039     return true;
1040 }
1041
1042
1043 } // end of namespace
Note: See TracBrowser for help on using the browser.