root/branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp

Revision 705, 43.2 kB (checked in by ppalmers, 15 years ago)

restructure the streaming directory

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public
11  * License version 2.1, as published by the Free Software Foundation;
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public
19  * License along with this library; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
21  * MA 02110-1301 USA
22  */
23
24 #include "StreamProcessorManager.h"
25 #include "generic/StreamProcessor.h"
26 #include "generic/Port.h"
27 #include "util/cycletimer.h"
28
29 #include <errno.h>
30 #include <assert.h>
31
32 #define CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL 5
33
34 #define RUNNING_TIMEOUT_MSEC 4000
35 #define PREPARE_TIMEOUT_MSEC 4000
36 #define ENABLE_TIMEOUT_MSEC 4000
37
38 //#define ENABLE_DELAY_CYCLES 100
39 #define ENABLE_DELAY_CYCLES 1000
40
41 namespace Streaming {
42
43 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_NORMAL );
44
45 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int nb_buffers)
46     : m_is_slave( false )
47     , m_SyncSource(NULL)
48     , m_nb_buffers(nb_buffers)
49     , m_period(period)
50     , m_xruns(0)
51     , m_isoManager(0)
52     , m_nbperiods(0)
53 {
54     addOption(Util::OptionContainer::Option("slaveMode",false));
55 }
56
57 StreamProcessorManager::~StreamProcessorManager() {
58     if (m_isoManager) delete m_isoManager;
59
60 }
61
62 /**
63  * Registers \ref processor with this manager.
64  *
65  * also registers it with the isohandlermanager
66  *
67  * be sure to call isohandlermanager->init() first!
68  * and be sure that the processors are also ->init()'ed
69  *
70  * @param processor
71  * @return true if successfull
72  */
73 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
74 {
75     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
76     assert(processor);
77     assert(m_isoManager);
78
79     if (processor->getType()==StreamProcessor::E_Receive) {
80         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
81
82         m_ReceiveProcessors.push_back(processor);
83
84         processor->setManager(this);
85
86         return true;
87     }
88
89     if (processor->getType()==StreamProcessor::E_Transmit) {
90         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
91
92         m_TransmitProcessors.push_back(processor);
93
94         processor->setManager(this);
95
96         return true;
97     }
98
99     debugFatal("Unsupported processor type!\n");
100
101     return false;
102 }
103
104 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
105 {
106     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
107     assert(processor);
108
109     if (processor->getType()==StreamProcessor::E_Receive) {
110
111         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
112             it != m_ReceiveProcessors.end();
113             ++it ) {
114
115             if ( *it == processor ) {
116                     m_ReceiveProcessors.erase(it);
117
118                     processor->clearManager();
119
120                     if(!m_isoManager->unregisterStream(processor)) {
121                         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
122
123                         return false;
124
125                     }
126
127                     return true;
128                 }
129         }
130     }
131
132     if (processor->getType()==StreamProcessor::E_Transmit) {
133         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
134             it != m_TransmitProcessors.end();
135             ++it ) {
136
137             if ( *it == processor ) {
138                     m_TransmitProcessors.erase(it);
139
140                     processor->clearManager();
141
142                     if(!m_isoManager->unregisterStream(processor)) {
143                         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
144
145                         return false;
146
147                     }
148
149                     return true;
150                 }
151         }
152     }
153
154     debugFatal("Processor (%p) not found!\n",processor);
155
156     return false; //not found
157
158 }
159
160 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
161     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
162
163     m_SyncSource=s;
164     return true;
165 }
166
167 StreamProcessor *StreamProcessorManager::getSyncSource() {
168     return m_SyncSource;
169 }
170
171 bool StreamProcessorManager::init()
172 {
173     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
174
175     m_isoManager=new IsoHandlerManager(m_thread_realtime, m_thread_priority + 1);
176
177     if(!m_isoManager) {
178         debugFatal("Could not create IsoHandlerManager\n");
179         return false;
180     }
181
182     // propagate the debug level
183     m_isoManager->setVerboseLevel(getDebugLevel());
184
185     if(!m_isoManager->init()) {
186         debugFatal("Could not initialize IsoHandlerManager\n");
187         return false;
188     }
189
190     m_xrun_happened=false;
191
192     return true;
193 }
194
195 bool StreamProcessorManager::prepare() {
196
197     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
198
199     m_is_slave=false;
200     if(!getOption("slaveMode", m_is_slave)) {
201         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
202     }
203
204     // if no sync source is set, select one here
205     if(m_SyncSource == NULL) {
206        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
207     }
208
209     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
210         it != m_ReceiveProcessors.end();
211         ++it ) {
212             if(m_SyncSource == NULL) {
213                 debugWarning(" => Sync Source is %p.\n", *it);
214                 m_SyncSource = *it;
215             }
216     }
217
218     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
219         it != m_TransmitProcessors.end();
220         ++it ) {
221             if(m_SyncSource == NULL) {
222                 debugWarning(" => Sync Source is %p.\n", *it);
223                 m_SyncSource = *it;
224             }
225     }
226
227     // now do the actual preparation
228     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
229     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
230         it != m_ReceiveProcessors.end();
231         ++it ) {
232
233         if(!(*it)->setSyncSource(m_SyncSource)) {
234             debugFatal(  " could not set sync source (%p)...\n",(*it));
235             return false;
236         }
237
238         if(!(*it)->setOption("slaveMode", m_is_slave)) {
239             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
240         }
241
242         if(!(*it)->prepare()) {
243             debugFatal(  " could not prepare (%p)...\n",(*it));
244             return false;
245         }
246     }
247
248     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
249     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
250         it != m_TransmitProcessors.end();
251         ++it ) {
252         if(!(*it)->setSyncSource(m_SyncSource)) {
253             debugFatal(  " could not set sync source (%p)...\n",(*it));
254             return false;
255         }
256         if(!(*it)->setOption("slaveMode", m_is_slave)) {
257             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
258         }
259         if(!(*it)->prepare()) {
260             debugFatal( " could not prepare (%p)...\n",(*it));
261             return false;
262         }
263     }
264
265     // if there are no stream processors registered,
266     // fail
267     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
268         debugFatal("No stream processors registered, can't do anything usefull\n");
269         return false;
270     }
271
272     return true;
273 }
274
275 bool StreamProcessorManager::syncStartAll() {
276
277     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for StreamProcessor streams to start running...\n");
278     // we have to wait until all streamprocessors indicate that they are running
279     // i.e. that there is actually some data stream flowing
280     int wait_cycles=RUNNING_TIMEOUT_MSEC; // two seconds
281     bool notRunning=true;
282     while (notRunning && wait_cycles) {
283         wait_cycles--;
284         notRunning=false;
285
286         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
287                 it != m_ReceiveProcessors.end();
288                 ++it ) {
289             if(!(*it)->isRunning()) notRunning=true;
290         }
291
292         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
293                 it != m_TransmitProcessors.end();
294                 ++it ) {
295             if(!(*it)->isRunning()) notRunning=true;
296         }
297
298         usleep(1000);
299         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Running check: %d\n", notRunning);
300     }
301
302     if(!wait_cycles) { // timout has occurred
303         debugFatal("One or more streams are not starting up (timeout):\n");
304
305         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
306                 it != m_ReceiveProcessors.end();
307                 ++it ) {
308             if(!(*it)->isRunning()) {
309                 debugFatal(" receive stream %p not running\n",*it);
310             } else {
311                 debugFatal(" receive stream %p running\n",*it);
312             }
313         }
314
315         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
316                 it != m_TransmitProcessors.end();
317                 ++it ) {
318             if(!(*it)->isRunning()) {
319                 debugFatal(" transmit stream %p not running\n",*it);
320             } else {
321                 debugFatal(" transmit stream %p running\n",*it);
322             }
323         }
324         return false;
325     }
326
327     // we want to make sure that everything is running well,
328     // so wait for a while
329 //     usleep(USECS_PER_CYCLE * CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL);
330
331     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
332
333     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
334
335     int max_of_min_delay=0;
336     int min_delay=0;
337     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
338             it != m_ReceiveProcessors.end();
339             ++it ) {
340         min_delay=(*it)->getMinimalSyncDelay();
341         if(min_delay>max_of_min_delay) max_of_min_delay=min_delay;
342     }
343
344     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
345             it != m_TransmitProcessors.end();
346             ++it ) {
347         min_delay=(*it)->getMinimalSyncDelay();
348         if(min_delay>max_of_min_delay) max_of_min_delay=min_delay;
349     }
350
351     debugOutput( DEBUG_LEVEL_VERBOSE, "  %d ticks\n", max_of_min_delay);
352     m_SyncSource->setSyncDelay(max_of_min_delay);
353
354
355     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting StreamProcessors...\n");
356     // now we reset the frame counters
357     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
358             it != m_ReceiveProcessors.end();
359             ++it ) {
360         // get the receive SP's going at receiving data
361         (*it)->m_data_buffer->setTransparent(false);
362         (*it)->reset();
363     }
364
365     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
366             it != m_TransmitProcessors.end();
367             ++it ) {
368         // make sure the SP retains it's prefilled data
369         (*it)->m_data_buffer->setTransparent(false);
370         (*it)->reset();
371     }
372    
373     dumpInfo();
374     // All buffers are prefilled and set-up, the only thing
375     // that remains a mistery is the timestamp information.
376 //     m_SyncSource->m_data_buffer->setTransparent(false);
377 //     debugShowBackLog();
378    
379     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
380     // in order to obtain that, we wait for the first periods to be
381     // received.
382     int nb_sync_runs=10;
383     while(nb_sync_runs--) { // or while not sync-ed?
384         waitForPeriod();
385
386         // drop the frames for all receive SP's
387         dryRun(StreamProcessor::E_Receive);
388        
389         // we don't have to dryrun for the xmit SP's since they
390         // are not sending data yet.
391     }
392
393     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu...\n", m_time_of_transfer);
394     // FIXME: xruns can screw up the framecounter accounting. do something more sane here
395     resetXrunCounters();
396     // lock the isohandlermanager such that things freeze
397 //     debugShowBackLog();
398
399     // we now should have decent sync info
400     // the buffers of the receive streams should be (approx) empty
401     // the buffers of the xmit streams should be full
402    
403     // note what should the timestamp of the first sample be?
404    
405     // at this point the buffer head timestamp of the transmit buffers can be
406     // set properly since we know the sync source's timestamp of the last
407     // buffer transfer. we also know the rate.
408    
409     debugOutput( DEBUG_LEVEL_VERBOSE, " propagate sync info...\n");
410     // FIXME: in the SPM it would be nice to have system time instead of
411     //        1394 time
412 //     float rate=m_SyncSource->getTicksPerFrame();
413 //     int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers*m_period))*rate);
414 //     // the data at the front of the buffer is intended to be transmitted
415 //     // nb_periods*period_size after the last received period
416 //     int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
417
418     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
419             it != m_TransmitProcessors.end();
420             ++it ) {
421         // FIXME: encapsulate
422         (*it)->m_data_buffer->setBufferHeadTimestamp(m_time_of_transfer);
423         //(*it)->m_data_buffer->setNominalRate(rate); //CHECK!!!
424     }
425    
426     dumpInfo();
427    
428     // this is the place were we should be syncing the received streams too
429     // check how much they differ
430    
431    
432     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n");
433
434     uint64_t now=m_SyncSource->getTimeNow(); // fixme: should be in usecs, not ticks
435
436     // FIXME: this should not be in cycles, but in 'time'
437     // FIXME: remove the timestamp
438     unsigned int enable_at=TICKS_TO_CYCLES(now)+ENABLE_DELAY_CYCLES;
439     if (enable_at > 8000) enable_at -= 8000;
440
441     if (!enableStreamProcessors(enable_at)) {
442         debugFatal("Could not enable StreamProcessors...\n");
443         return false;
444     }
445
446     debugOutput( DEBUG_LEVEL_VERBOSE, "Running dry for a while...\n");
447     // run some cycles 'dry' such that everything can stabilize
448     int nb_dryrun_cycles=10;
449     bool no_xrun;
450     while(nb_dryrun_cycles--) {
451         waitForPeriod();
452         no_xrun = dryRun(); // dry run both receive and xmit
453
454         if (no_xrun) {
455             debugOutput( DEBUG_LEVEL_VERBOSE, " This dry-run was not xrun free...\n" );
456             resetXrunCounters();
457             // FIXME: xruns can screw up the framecounter accounting. do something more sane here
458         }
459     }
460
461     // now we should clear the xrun flags
462     resetXrunCounters();
463
464     // and off we go
465     return true;
466 }
467
468 void StreamProcessorManager::resetXrunCounters(){
469     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting xrun flags...\n");
470     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
471         it != m_ReceiveProcessors.end();
472         ++it )
473     {
474         (*it)->resetXrunCounter();
475     }
476
477     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
478         it != m_TransmitProcessors.end();
479         ++it )
480     {
481         (*it)->resetXrunCounter();
482     }
483 }
484
485 bool StreamProcessorManager::start() {
486     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
487     assert(m_isoManager);
488
489     debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
490     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
491     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
492         it != m_ReceiveProcessors.end();
493         ++it ) {
494             if (!(*it)->prepareForStart()) {
495                 debugOutput(DEBUG_LEVEL_VERBOSE,"Receive stream processor (%p) failed to prepare for start\n", *it);
496                 return false;
497             }
498             if (!m_isoManager->registerStream(*it)) {
499                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
500                 return false;
501             }
502         }
503
504     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
505     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
506         it != m_TransmitProcessors.end();
507         ++it ) {
508             if (!(*it)->prepareForStart()) {
509                 debugOutput(DEBUG_LEVEL_VERBOSE,"Transmit stream processor (%p) failed to prepare for start\n", *it);
510                 return false;
511             }
512             if (!m_isoManager->registerStream(*it)) {
513                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
514                 return false;
515             }
516         }
517
518     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
519     if (!m_isoManager->prepare()) {
520         debugFatal("Could not prepare isoManager\n");
521         return false;
522     }
523
524     debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
525         if (!disableStreamProcessors()) {
526         debugFatal("Could not disable StreamProcessors...\n");
527         return false;
528     }
529
530     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n");
531     if (!m_isoManager->startHandlers(-1)) {
532         debugFatal("Could not start handlers...\n");
533         return false;
534     }
535
536     // start all SP's synchonized
537     if (!syncStartAll()) {
538         debugFatal("Could not syncStartAll...\n");
539         return false;
540     }
541
542     // dump the iso stream information when in verbose mode
543     if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
544         m_isoManager->dumpInfo();
545     }
546
547     return true;
548
549 }
550
551 bool StreamProcessorManager::stop() {
552     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
553     assert(m_isoManager);
554
555     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to prepare to stop...\n");
556     // Most stream processors can just stop without special treatment.  However, some
557     // (like the MOTU) need to do a few things before it's safe to turn off the iso
558     // handling.
559     int wait_cycles=PREPARE_TIMEOUT_MSEC; // two seconds ought to be sufficient
560     bool allReady = false;
561     while (!allReady && wait_cycles) {
562         wait_cycles--;
563         allReady = true;
564
565         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
566             it != m_ReceiveProcessors.end();
567             ++it ) {
568             if(!(*it)->prepareForStop()) allReady = false;
569         }
570
571         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
572             it != m_TransmitProcessors.end();
573             ++it ) {
574             if(!(*it)->prepareForStop()) allReady = false;
575         }
576         usleep(1000);
577     }
578
579     debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
580         if (!disableStreamProcessors()) {
581         debugFatal("Could not disable StreamProcessors...\n");
582         return false;
583     }
584
585     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n");
586     if(!m_isoManager->stopHandlers()) {
587        debugFatal("Could not stop ISO handlers\n");
588        return false;
589     }
590
591     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processors from handlers...\n");
592     // now unregister all streams from iso manager
593     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
594     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
595         it != m_ReceiveProcessors.end();
596         ++it ) {
597             if (!m_isoManager->unregisterStream(*it)) {
598                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it);
599                 return false;
600             }
601
602         }
603
604     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
605     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
606         it != m_TransmitProcessors.end();
607         ++it ) {
608             if (!m_isoManager->unregisterStream(*it)) {
609                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it);
610                 return false;
611             }
612
613         }
614
615     return true;
616
617 }
618
619 /**
620  * Enables the registered StreamProcessors
621  * @return true if successful, false otherwise
622  */
623 bool StreamProcessorManager::enableStreamProcessors(uint64_t time_to_enable_at) {
624     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors at %llu...\n", time_to_enable_at);
625
626     debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor (%p)...\n",m_SyncSource);
627     debugOutput( DEBUG_LEVEL_VERBOSE, "  Prepare...\n");
628     if (!m_SyncSource->prepareForEnable(time_to_enable_at)) {
629             debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n");
630         return false;
631     }
632
633     debugOutput( DEBUG_LEVEL_VERBOSE, "  Enable...\n");
634     m_SyncSource->enable(time_to_enable_at);
635
636     debugOutput( DEBUG_LEVEL_VERBOSE, " Other StreamProcessors...\n");
637
638     // we prepare the streamprocessors for enable
639     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
640             it != m_ReceiveProcessors.end();
641             ++it ) {
642         if(*it != m_SyncSource) {
643             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Receive SP (%p)...\n",*it);
644             (*it)->prepareForEnable(time_to_enable_at);
645         }
646     }
647
648     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
649             it != m_TransmitProcessors.end();
650             ++it ) {
651         if(*it != m_SyncSource) {
652             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Transmit SP (%p)...\n",*it);
653             (*it)->prepareForEnable(time_to_enable_at);
654         }
655     }
656
657     // then we enable the streamprocessors
658     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
659             it != m_ReceiveProcessors.end();
660             ++it ) {
661         if(*it != m_SyncSource) {
662             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Receive SP (%p)...\n",*it);
663             (*it)->enable(time_to_enable_at);
664         }
665     }
666
667     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
668             it != m_TransmitProcessors.end();
669             ++it ) {
670         if(*it != m_SyncSource) {
671             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Transmit SP (%p)...\n",*it);
672             (*it)->enable(time_to_enable_at);
673         }
674     }
675
676     // now we wait for the SP's to get enabled
677 //     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be enabled...\n");
678 //     // we have to wait until all streamprocessors indicate that they are running
679 //     // i.e. that there is actually some data stream flowing
680 //     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds
681 //     bool notEnabled=true;
682 //     while (notEnabled && wait_cycles) {
683 //         wait_cycles--;
684 //         notEnabled=false;
685 //
686 //         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
687 //                 it != m_ReceiveProcessors.end();
688 //                 ++it ) {
689 //             if(!(*it)->isEnabled()) notEnabled=true;
690 //         }
691 //
692 //         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
693 //                 it != m_TransmitProcessors.end();
694 //                 ++it ) {
695 //             if(!(*it)->isEnabled()) notEnabled=true;
696 //         }
697 //         usleep(1000); // one cycle
698 //     }
699 //
700 //     if(!wait_cycles) { // timout has occurred
701 //         debugFatal("One or more streams couldn't be enabled (timeout):\n");
702 //
703 //         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
704 //                 it != m_ReceiveProcessors.end();
705 //                 ++it ) {
706 //             if(!(*it)->isEnabled()) {
707 //                     debugFatal(" receive stream %p not enabled\n",*it);
708 //             } else {
709 //                     debugFatal(" receive stream %p enabled\n",*it);
710 //             }
711 //         }
712 //
713 //         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
714 //                 it != m_TransmitProcessors.end();
715 //                 ++it ) {
716 //             if(!(*it)->isEnabled()) {
717 //                     debugFatal(" transmit stream %p not enabled\n",*it);
718 //             } else {
719 //                     debugFatal(" transmit stream %p enabled\n",*it);
720 //             }
721 //         }
722 //         return false;
723 //     }
724
725     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors enabled...\n");
726
727     return true;
728 }
729
730 /**
731  * Disables the registered StreamProcessors
732  * @return true if successful, false otherwise
733  */
734 bool StreamProcessorManager::disableStreamProcessors() {
735     // we prepare the streamprocessors for disable
736     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
737             it != m_ReceiveProcessors.end();
738             ++it ) {
739         (*it)->prepareForDisable();
740     }
741
742     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
743             it != m_TransmitProcessors.end();
744             ++it ) {
745         (*it)->prepareForDisable();
746     }
747
748     // then we disable the streamprocessors
749     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
750             it != m_ReceiveProcessors.end();
751             ++it ) {
752         (*it)->disable();
753         (*it)->m_data_buffer->setTransparent(true);
754     }
755
756     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
757             it != m_TransmitProcessors.end();
758             ++it ) {
759         (*it)->disable();
760         (*it)->m_data_buffer->setTransparent(true);
761     }
762
763     // now we wait for the SP's to get disabled
764     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be disabled...\n");
765     // we have to wait until all streamprocessors indicate that they are running
766     // i.e. that there is actually some data stream flowing
767     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds
768     bool enabled=true;
769     while (enabled && wait_cycles) {
770         wait_cycles--;
771         enabled=false;
772
773         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
774                 it != m_ReceiveProcessors.end();
775                 ++it ) {
776             if((*it)->isEnabled()) enabled=true;
777         }
778
779         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
780                 it != m_TransmitProcessors.end();
781                 ++it ) {
782             if((*it)->isEnabled()) enabled=true;
783         }
784         usleep(1000); // one cycle
785     }
786
787     if(!wait_cycles) { // timout has occurred
788         debugFatal("One or more streams couldn't be disabled (timeout):\n");
789
790         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
791                 it != m_ReceiveProcessors.end();
792                 ++it ) {
793             if(!(*it)->isEnabled()) {
794                     debugFatal(" receive stream %p not enabled\n",*it);
795             } else {
796                     debugFatal(" receive stream %p enabled\n",*it);
797             }
798         }
799
800         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
801                 it != m_TransmitProcessors.end();
802                 ++it ) {
803             if(!(*it)->isEnabled()) {
804                     debugFatal(" transmit stream %p not enabled\n",*it);
805             } else {
806                     debugFatal(" transmit stream %p enabled\n",*it);
807             }
808         }
809         return false;
810     }
811
812     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors disabled...\n");
813
814     return true;
815 }
816
817 /**
818  * Called upon Xrun events. This brings all StreamProcessors back
819  * into their starting state, and then carries on streaming. This should
820  * have the same effect as restarting the whole thing.
821  *
822  * @return true if successful, false otherwise
823  */
824 bool StreamProcessorManager::handleXrun() {
825
826     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
827
828     /*
829      * Reset means:
830      * 1) Disabling the SP's, so that they don't process any packets
831      *    note: the isomanager does keep on delivering/requesting them
832      * 2) Bringing all buffers & streamprocessors into a know state
833      *    - Clear all capture buffers
834      *    - Put nb_periods*period_size of null frames into the playback buffers
835      * 3) Re-enable the SP's
836      */
837     debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
838         if (!disableStreamProcessors()) {
839         debugFatal("Could not disable StreamProcessors...\n");
840         return false;
841     }
842
843     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
844     // start all SP's synchonized
845     if (!syncStartAll()) {
846         debugFatal("Could not syncStartAll...\n");
847         return false;
848     }
849
850     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
851
852     return true;
853 }
854
855 /**
856  * @brief Waits until the next period of samples is ready
857  *
858  * This function does not return until a full period of samples is (or should be)
859  * ready to be transferred.
860  *
861  * @return true if the period is ready, false if an xrun occurred
862  */
863 bool StreamProcessorManager::waitForPeriod() {
864     int time_till_next_period;
865     bool xrun_occurred=false;
866
867     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
868
869     assert(m_SyncSource);
870
871     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
872
873     while(time_till_next_period > 0) {
874         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
875
876         // wait for the period
877         usleep(time_till_next_period);
878
879         // check for underruns on the ISO side,
880         // those should make us bail out of the wait loop
881         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
882             it != m_ReceiveProcessors.end();
883             ++it ) {
884             // a xrun has occurred on the Iso side
885             xrun_occurred |= (*it)->xrunOccurred();
886         }
887         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
888             it != m_TransmitProcessors.end();
889             ++it ) {
890             // a xrun has occurred on the Iso side
891             xrun_occurred |= (*it)->xrunOccurred();
892         }
893
894         // check if we were waked up too soon
895         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
896     }
897
898     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", time_till_next_period);
899
900     // this is to notify the client of the delay
901     // that we introduced
902     m_delayed_usecs=time_till_next_period;
903
904     // we save the 'ideal' time of the transfer at this point,
905     // because we can have interleaved read - process - write
906     // cycles making that we modify a receiving stream's buffer
907     // before we get to writing.
908     // NOTE: before waitForPeriod() is called again, both the transmit
909     //       and the receive processors should have done their transfer.
910     m_time_of_transfer=m_SyncSource->getTimeAtPeriod();
911     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
912         m_time_of_transfer);
913
914 #ifdef DEBUG
915     int rcv_bf=0, xmt_bf=0;
916     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
917         it != m_ReceiveProcessors.end();
918         ++it ) {
919         rcv_bf = (*it)->getBufferFill();
920     }
921     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
922         it != m_TransmitProcessors.end();
923         ++it ) {
924         xmt_bf = (*it)->getBufferFill();
925     }
926     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
927         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
928
929 #endif
930
931     xrun_occurred=false;
932
933     // check if xruns occurred on the Iso side.
934     // also check if xruns will occur should we transfer() now
935
936     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
937           it != m_ReceiveProcessors.end();
938           ++it ) {
939         // a xrun has occurred on the Iso side
940         xrun_occurred |= (*it)->xrunOccurred();
941
942         // if this is true, a xrun will occur
943         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
944
945 #ifdef DEBUG
946         if ((*it)->xrunOccurred()) {
947             debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it);
948             (*it)->dumpInfo();
949         }
950         if (!((*it)->canClientTransferFrames(m_period))) {
951             debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it);
952             (*it)->dumpInfo();
953         }
954 #endif
955
956     }
957     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
958           it != m_TransmitProcessors.end();
959           ++it ) {
960         // a xrun has occurred on the Iso side
961         xrun_occurred |= (*it)->xrunOccurred();
962
963         // if this is true, a xrun will occur
964         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
965
966 #ifdef DEBUG
967         if ((*it)->xrunOccurred()) {
968             debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it);
969         }
970         if (!((*it)->canClientTransferFrames(m_period))) {
971             debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it);
972         }
973 #endif
974     }
975
976     m_nbperiods++;
977
978     // now we can signal the client that we are (should be) ready
979     return !xrun_occurred;
980 }
981
982 /**
983  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
984  *
985  * Transfers one period of frames from the client side to the Iso side and vice versa.
986  *
987  * @return true if successful, false otherwise (indicates xrun).
988  */
989 bool StreamProcessorManager::transfer() {
990
991     debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n");
992
993     if (!transfer(StreamProcessor::E_Receive)) return false;
994     if (!transfer(StreamProcessor::E_Transmit)) return false;
995
996     return true;
997 }
998
999 /**
1000  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
1001  *
1002  * Transfers one period of frames from the client side to the Iso side or vice versa.
1003  *
1004  * @param t The processor type to tranfer for (receive or transmit)
1005  * @return true if successful, false otherwise (indicates xrun).
1006  */
1007
1008 bool StreamProcessorManager::transfer(enum StreamProcessor::EProcessorType t) {
1009     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
1010
1011     // a static cast could make sure that there is no performance
1012     // penalty for the virtual functions (to be checked)
1013     if (t==StreamProcessor::E_Receive) {
1014        
1015         // determine the time at which we want reception to start
1016         float rate=m_SyncSource->getTicksPerFrame();
1017         int64_t one_frame_in_ticks=(int64_t)(((float)m_period)*rate);
1018        
1019         int64_t receive_timestamp = substractTicks(m_time_of_transfer, one_frame_in_ticks);
1020        
1021         if(receive_timestamp<0) {
1022             debugWarning("receive ts < 0.0 : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
1023              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
1024         }
1025         if(receive_timestamp>(128L*TICKS_PER_SECOND)) {
1026             debugWarning("receive ts > 128L*TICKS_PER_SECOND : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
1027              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
1028         }
1029        
1030         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1031                 it != m_ReceiveProcessors.end();
1032                 ++it ) {
1033
1034             if(!(*it)->getFrames(m_period, receive_timestamp)) {
1035                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)\n",
1036                             m_period, m_time_of_transfer,*it);
1037                     return false; // buffer underrun
1038             }
1039
1040         }
1041     } else {
1042         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1043                 it != m_TransmitProcessors.end();
1044                 ++it ) {
1045             // FIXME: in the SPM it would be nice to have system time instead of
1046             //        1394 time
1047             float rate=m_SyncSource->getTicksPerFrame();
1048             int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers*m_period))*rate);
1049
1050             // the data we are putting into the buffer is intended to be transmitted
1051             // one ringbuffer size after it has been received
1052             int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1053
1054             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
1055                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)\n",
1056                         m_period, transmit_timestamp, *it);
1057                 return false; // buffer overrun
1058             }
1059
1060         }
1061     }
1062
1063     return true;
1064 }
1065
1066 /**
1067  * @brief Dry run one period for both receive and transmit StreamProcessors
1068  *
1069  * Process one period of frames for all streamprocessors, without touching the
1070  * client buffers. This only removes an incoming period from the ISO receive buffer and
1071  * puts one period of silence into the transmit buffers.
1072  *
1073  * @return true if successful, false otherwise (indicates xrun).
1074  */
1075 bool StreamProcessorManager::dryRun() {
1076     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Dry-running period...\n");
1077     if (!dryRun(StreamProcessor::E_Receive)) return false;
1078     if (!dryRun(StreamProcessor::E_Transmit)) return false;
1079     return true;
1080 }
1081
1082 /**
1083  * @brief Dry run one period for either the receive or transmit StreamProcessors
1084  *
1085  * see dryRun()
1086  *
1087  * @param t The processor type to dryRun for (receive or transmit)
1088  * @return true if successful, false otherwise (indicates xrun).
1089  */
1090
1091 bool StreamProcessorManager::dryRun(enum StreamProcessor::EProcessorType t) {
1092     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
1093
1094     // a static cast could make sure that there is no performance
1095     // penalty for the virtual functions (to be checked)
1096     if (t==StreamProcessor::E_Receive) {
1097        
1098         // determine the time at which we want reception to start
1099         float rate=m_SyncSource->getTicksPerFrame();
1100         int64_t one_frame_in_ticks=(int64_t)(((float)m_period)*rate);
1101        
1102         int64_t receive_timestamp = substractTicks(m_time_of_transfer, one_frame_in_ticks);
1103        
1104         if(receive_timestamp<0) {
1105             debugWarning("receive ts < 0.0 : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
1106              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
1107         }
1108         if(receive_timestamp>(128L*TICKS_PER_SECOND)) {
1109             debugWarning("receive ts > 128L*TICKS_PER_SECOND : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
1110              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
1111         }
1112        
1113         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1114                 it != m_ReceiveProcessors.end();
1115                 ++it ) {
1116
1117             if(!(*it)->getFramesDry(m_period, receive_timestamp)) {
1118                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)\n",
1119                             m_period, m_time_of_transfer,*it);
1120                     return false; // buffer underrun
1121             }
1122
1123         }
1124     } else {
1125         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1126                 it != m_TransmitProcessors.end();
1127                 ++it ) {
1128             // FIXME: in the SPM it would be nice to have system time instead of
1129             //        1394 time
1130             float rate=m_SyncSource->getTicksPerFrame();
1131             int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers*m_period))*rate);
1132
1133             // the data we are putting into the buffer is intended to be transmitted
1134             // one ringbuffer size after it has been received
1135             int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1136
1137             if(!(*it)->putFramesDry(m_period, transmit_timestamp)) {
1138                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)\n",
1139                         m_period, transmit_timestamp, *it);
1140                 return false; // buffer overrun
1141             }
1142
1143         }
1144     }
1145
1146     return true;
1147 }
1148
1149 void StreamProcessorManager::dumpInfo() {
1150     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1151     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
1152     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
1153
1154     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
1155     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1156         it != m_ReceiveProcessors.end();
1157         ++it ) {
1158         (*it)->dumpInfo();
1159     }
1160
1161     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
1162     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1163         it != m_TransmitProcessors.end();
1164         ++it ) {
1165         (*it)->dumpInfo();
1166     }
1167
1168     debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
1169     m_isoManager->dumpInfo();
1170     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1171
1172 }
1173
1174 void StreamProcessorManager::setVerboseLevel(int l) {
1175     setDebugLevel(l);
1176
1177     if (m_isoManager) m_isoManager->setVerboseLevel(l);
1178
1179     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
1180     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1181         it != m_ReceiveProcessors.end();
1182         ++it ) {
1183         (*it)->setVerboseLevel(l);
1184     }
1185
1186     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
1187     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1188         it != m_TransmitProcessors.end();
1189         ++it ) {
1190         (*it)->setVerboseLevel(l);
1191     }
1192 }
1193
1194
1195 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1196     int count=0;
1197
1198     if (direction == Port::E_Capture) {
1199         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1200             it != m_ReceiveProcessors.end();
1201             ++it ) {
1202             count += (*it)->getPortCount(type);
1203         }
1204     } else {
1205         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1206             it != m_TransmitProcessors.end();
1207             ++it ) {
1208             count += (*it)->getPortCount(type);
1209         }
1210     }
1211     return count;
1212 }
1213
1214 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1215     int count=0;
1216
1217     if (direction == Port::E_Capture) {
1218         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1219             it != m_ReceiveProcessors.end();
1220             ++it ) {
1221             count += (*it)->getPortCount();
1222         }
1223     } else {
1224         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1225             it != m_TransmitProcessors.end();
1226             ++it ) {
1227             count += (*it)->getPortCount();
1228         }
1229     }
1230     return count;
1231 }
1232
1233 // TODO: implement a port map here, instead of the loop
1234
1235 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1236     int count=0;
1237     int prevcount=0;
1238
1239     if (direction == Port::E_Capture) {
1240         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1241             it != m_ReceiveProcessors.end();
1242             ++it ) {
1243             count += (*it)->getPortCount();
1244             if (count > idx) {
1245                 return (*it)->getPortAtIdx(idx-prevcount);
1246             }
1247             prevcount=count;
1248         }
1249     } else {
1250         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1251             it != m_TransmitProcessors.end();
1252             ++it ) {
1253             count += (*it)->getPortCount();
1254             if (count > idx) {
1255                 return (*it)->getPortAtIdx(idx-prevcount);
1256             }
1257             prevcount=count;
1258         }
1259     }
1260     return NULL;
1261 }
1262
1263 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1264     m_thread_realtime=rt;
1265     m_thread_priority=priority;
1266     return true;
1267 }
1268
1269
1270 } // end of namespace
Note: See TracBrowser for help on using the browser.