root/branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp

Revision 707, 43.7 kB (checked in by ppalmers, 15 years ago)

- code cleanup
- make transmit handler AMDTP compliant (don't send too much in advance)

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public
11  * License version 2.1, as published by the Free Software Foundation;
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public
19  * License along with this library; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
21  * MA 02110-1301 USA
22  */
23
24 #include "StreamProcessorManager.h"
25 #include "generic/StreamProcessor.h"
26 #include "generic/Port.h"
27 #include "util/cycletimer.h"
28
29 #include <errno.h>
30 #include <assert.h>
31
32 #define CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL 5
33
34 #define RUNNING_TIMEOUT_MSEC 4000
35 #define PREPARE_TIMEOUT_MSEC 4000
36 #define ENABLE_TIMEOUT_MSEC 4000
37
38 //#define ENABLE_DELAY_CYCLES 100
39 #define ENABLE_DELAY_CYCLES 1000
40
41 namespace Streaming {
42
43 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
44
45 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int nb_buffers)
46     : m_is_slave( false )
47     , m_SyncSource(NULL)
48     , m_nb_buffers(nb_buffers)
49     , m_period(period)
50     , m_xruns(0)
51     , m_isoManager(0)
52     , m_nbperiods(0)
53 {
54     addOption(Util::OptionContainer::Option("slaveMode",false));
55 }
56
57 StreamProcessorManager::~StreamProcessorManager() {
58     if (m_isoManager) delete m_isoManager;
59
60 }
61
62 /**
63  * Registers \ref processor with this manager.
64  *
65  * also registers it with the isohandlermanager
66  *
67  * be sure to call isohandlermanager->init() first!
68  * and be sure that the processors are also ->init()'ed
69  *
70  * @param processor
71  * @return true if successfull
72  */
73 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
74 {
75     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
76     assert(processor);
77     assert(m_isoManager);
78
79     if (processor->getType()==StreamProcessor::E_Receive) {
80         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
81
82         m_ReceiveProcessors.push_back(processor);
83
84         processor->setManager(this);
85
86         return true;
87     }
88
89     if (processor->getType()==StreamProcessor::E_Transmit) {
90         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
91
92         m_TransmitProcessors.push_back(processor);
93
94         processor->setManager(this);
95
96         return true;
97     }
98
99     debugFatal("Unsupported processor type!\n");
100
101     return false;
102 }
103
104 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
105 {
106     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
107     assert(processor);
108
109     if (processor->getType()==StreamProcessor::E_Receive) {
110
111         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
112             it != m_ReceiveProcessors.end();
113             ++it ) {
114
115             if ( *it == processor ) {
116                     m_ReceiveProcessors.erase(it);
117
118                     processor->clearManager();
119
120                     if(!m_isoManager->unregisterStream(processor)) {
121                         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
122
123                         return false;
124
125                     }
126
127                     return true;
128                 }
129         }
130     }
131
132     if (processor->getType()==StreamProcessor::E_Transmit) {
133         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
134             it != m_TransmitProcessors.end();
135             ++it ) {
136
137             if ( *it == processor ) {
138                     m_TransmitProcessors.erase(it);
139
140                     processor->clearManager();
141
142                     if(!m_isoManager->unregisterStream(processor)) {
143                         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
144
145                         return false;
146
147                     }
148
149                     return true;
150                 }
151         }
152     }
153
154     debugFatal("Processor (%p) not found!\n",processor);
155
156     return false; //not found
157
158 }
159
160 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
161     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
162
163     m_SyncSource=s;
164     return true;
165 }
166
167 StreamProcessor *StreamProcessorManager::getSyncSource() {
168     return m_SyncSource;
169 }
170
171 bool StreamProcessorManager::init()
172 {
173     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
174
175     m_isoManager=new IsoHandlerManager(m_thread_realtime, m_thread_priority + 1);
176
177     if(!m_isoManager) {
178         debugFatal("Could not create IsoHandlerManager\n");
179         return false;
180     }
181
182     // propagate the debug level
183     m_isoManager->setVerboseLevel(getDebugLevel());
184
185     if(!m_isoManager->init()) {
186         debugFatal("Could not initialize IsoHandlerManager\n");
187         return false;
188     }
189
190     m_xrun_happened=false;
191
192     return true;
193 }
194
195 bool StreamProcessorManager::prepare() {
196
197     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
198
199     m_is_slave=false;
200     if(!getOption("slaveMode", m_is_slave)) {
201         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
202     }
203
204     // if no sync source is set, select one here
205     if(m_SyncSource == NULL) {
206        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
207     }
208
209     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
210         it != m_ReceiveProcessors.end();
211         ++it ) {
212             if(m_SyncSource == NULL) {
213                 debugWarning(" => Sync Source is %p.\n", *it);
214                 m_SyncSource = *it;
215             }
216     }
217
218     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
219         it != m_TransmitProcessors.end();
220         ++it ) {
221             if(m_SyncSource == NULL) {
222                 debugWarning(" => Sync Source is %p.\n", *it);
223                 m_SyncSource = *it;
224             }
225     }
226
227     // now do the actual preparation
228     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
229     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
230         it != m_ReceiveProcessors.end();
231         ++it ) {
232
233         if(!(*it)->setSyncSource(m_SyncSource)) {
234             debugFatal(  " could not set sync source (%p)...\n",(*it));
235             return false;
236         }
237
238         if(!(*it)->setOption("slaveMode", m_is_slave)) {
239             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
240         }
241
242         if(!(*it)->prepare()) {
243             debugFatal(  " could not prepare (%p)...\n",(*it));
244             return false;
245         }
246     }
247
248     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
249     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
250         it != m_TransmitProcessors.end();
251         ++it ) {
252         if(!(*it)->setSyncSource(m_SyncSource)) {
253             debugFatal(  " could not set sync source (%p)...\n",(*it));
254             return false;
255         }
256         if(!(*it)->setOption("slaveMode", m_is_slave)) {
257             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
258         }
259         if(!(*it)->prepare()) {
260             debugFatal( " could not prepare (%p)...\n",(*it));
261             return false;
262         }
263     }
264
265     // if there are no stream processors registered,
266     // fail
267     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
268         debugFatal("No stream processors registered, can't do anything usefull\n");
269         return false;
270     }
271
272     return true;
273 }
274
275 bool StreamProcessorManager::syncStartAll() {
276
277     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for StreamProcessor streams to start running...\n");
278     // we have to wait until all streamprocessors indicate that they are running
279     // i.e. that there is actually some data stream flowing
280     int wait_cycles=RUNNING_TIMEOUT_MSEC; // two seconds
281     bool notRunning=true;
282     while (notRunning && wait_cycles) {
283         wait_cycles--;
284         notRunning=false;
285
286         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
287                 it != m_ReceiveProcessors.end();
288                 ++it ) {
289             if(!(*it)->isRunning()) notRunning=true;
290         }
291
292         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
293                 it != m_TransmitProcessors.end();
294                 ++it ) {
295             if(!(*it)->isRunning()) notRunning=true;
296         }
297
298         usleep(1000);
299         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Running check: %d\n", notRunning);
300     }
301
302     if(!wait_cycles) { // timout has occurred
303         debugFatal("One or more streams are not starting up (timeout):\n");
304
305         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
306                 it != m_ReceiveProcessors.end();
307                 ++it ) {
308             if(!(*it)->isRunning()) {
309                 debugFatal(" receive stream %p not running\n",*it);
310             } else {
311                 debugFatal(" receive stream %p running\n",*it);
312             }
313         }
314
315         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
316                 it != m_TransmitProcessors.end();
317                 ++it ) {
318             if(!(*it)->isRunning()) {
319                 debugFatal(" transmit stream %p not running\n",*it);
320             } else {
321                 debugFatal(" transmit stream %p running\n",*it);
322             }
323         }
324         return false;
325     }
326
327     // we want to make sure that everything is running well,
328     // so wait for a while
329 //     usleep(USECS_PER_CYCLE * CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL);
330
331     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
332
333     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
334
335     int max_of_min_delay=0;
336     int min_delay=0;
337     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
338             it != m_ReceiveProcessors.end();
339             ++it ) {
340         min_delay=(*it)->getMinimalSyncDelay();
341         if(min_delay>max_of_min_delay) max_of_min_delay=min_delay;
342     }
343
344     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
345             it != m_TransmitProcessors.end();
346             ++it ) {
347         min_delay=(*it)->getMinimalSyncDelay();
348         if(min_delay>max_of_min_delay) max_of_min_delay=min_delay;
349     }
350
351     debugOutput( DEBUG_LEVEL_VERBOSE, "  %d ticks\n", max_of_min_delay);
352     m_SyncSource->setSyncDelay(max_of_min_delay);
353
354
355     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting StreamProcessors...\n");
356     // now we reset the frame counters
357     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
358             it != m_ReceiveProcessors.end();
359             ++it ) {
360         // get the receive SP's going at receiving data
361         (*it)->m_data_buffer->setTransparent(false);
362         (*it)->reset();
363     }
364
365     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
366             it != m_TransmitProcessors.end();
367             ++it ) {
368         // make sure the SP retains it's prefilled data
369         (*it)->m_data_buffer->setTransparent(false);
370         (*it)->reset();
371     }
372    
373     dumpInfo();
374     // All buffers are prefilled and set-up, the only thing
375     // that remains a mistery is the timestamp information.
376 //     m_SyncSource->m_data_buffer->setTransparent(false);
377 //     debugShowBackLog();
378    
379     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
380     // in order to obtain that, we wait for the first periods to be
381     // received.
382     int nb_sync_runs=10;
383     while(nb_sync_runs--) { // or while not sync-ed?
384         waitForPeriod();
385
386         // drop the frames for all receive SP's
387         dryRun(StreamProcessor::E_Receive);
388        
389         // we don't have to dryrun for the xmit SP's since they
390         // are not sending data yet.
391     }
392
393     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
394         m_time_of_transfer,
395         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
396         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
397         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
398     // FIXME: xruns can screw up the framecounter accounting. do something more sane here
399     resetXrunCounters();
400     // lock the isohandlermanager such that things freeze
401 //     debugShowBackLog();
402
403     // we now should have decent sync info
404     // the buffers of the receive streams should be (approx) empty
405     // the buffers of the xmit streams should be full
406    
407     // note what should the timestamp of the first sample be?
408    
409     // at this point the buffer head timestamp of the transmit buffers can be
410     // set properly since we know the sync source's timestamp of the last
411     // buffer transfer. we also know the rate.
412    
413     debugOutput( DEBUG_LEVEL_VERBOSE, " propagate sync info...\n");
414     // FIXME: in the SPM it would be nice to have system time instead of
415     //        1394 time
416 //     float rate=m_SyncSource->getTicksPerFrame();
417 //     int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers*m_period))*rate);
418 //     // the data at the front of the buffer is intended to be transmitted
419 //     // nb_periods*period_size after the last received period
420 //     int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
421
422     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
423             it != m_TransmitProcessors.end();
424             ++it ) {
425         // FIXME: encapsulate
426         (*it)->m_data_buffer->setBufferHeadTimestamp(m_time_of_transfer);
427         //(*it)->m_data_buffer->setNominalRate(rate); //CHECK!!!
428     }
429    
430     dumpInfo();
431    
432     // this is the place were we should be syncing the received streams too
433     // check how much they differ
434    
435    
436     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n");
437
438     uint64_t now=m_SyncSource->getTimeNow(); // fixme: should be in usecs, not ticks
439
440     // FIXME: this should not be in cycles, but in 'time'
441     // FIXME: remove the timestamp
442     unsigned int enable_at=TICKS_TO_CYCLES(now)+ENABLE_DELAY_CYCLES;
443     if (enable_at > 8000) enable_at -= 8000;
444
445     if (!enableStreamProcessors(enable_at)) {
446         debugFatal("Could not enable StreamProcessors...\n");
447         return false;
448     }
449
450     debugOutput( DEBUG_LEVEL_VERBOSE, "Running dry for a while...\n");
451     // run some cycles 'dry' such that everything can stabilize
452     int nb_dryrun_cycles=10;
453     bool no_xrun;
454     while(nb_dryrun_cycles--) {
455         waitForPeriod();
456         m_TransmitProcessors.at(0)->m_PacketStat.dumpInfo();
457         no_xrun = dryRun(); // dry run both receive and xmit
458
459         if (!no_xrun) {
460             debugOutput( DEBUG_LEVEL_VERBOSE, " This dry-run was not xrun free...\n" );
461             resetXrunCounters();
462             // FIXME: xruns can screw up the framecounter accounting. do something more sane here
463         }
464     }
465
466     // now we should clear the xrun flags
467     resetXrunCounters();
468
469     // and off we go
470     return true;
471 }
472
473 void StreamProcessorManager::resetXrunCounters(){
474     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting xrun flags...\n");
475     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
476         it != m_ReceiveProcessors.end();
477         ++it )
478     {
479         (*it)->resetXrunCounter();
480     }
481
482     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
483         it != m_TransmitProcessors.end();
484         ++it )
485     {
486         (*it)->resetXrunCounter();
487     }
488 }
489
490 bool StreamProcessorManager::start() {
491     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
492     assert(m_isoManager);
493
494     debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
495     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
496     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
497         it != m_ReceiveProcessors.end();
498         ++it ) {
499             if (!(*it)->prepareForStart()) {
500                 debugOutput(DEBUG_LEVEL_VERBOSE,"Receive stream processor (%p) failed to prepare for start\n", *it);
501                 return false;
502             }
503             if (!m_isoManager->registerStream(*it)) {
504                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
505                 return false;
506             }
507         }
508
509     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
510     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
511         it != m_TransmitProcessors.end();
512         ++it ) {
513             if (!(*it)->prepareForStart()) {
514                 debugOutput(DEBUG_LEVEL_VERBOSE,"Transmit stream processor (%p) failed to prepare for start\n", *it);
515                 return false;
516             }
517             if (!m_isoManager->registerStream(*it)) {
518                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
519                 return false;
520             }
521         }
522
523     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
524     if (!m_isoManager->prepare()) {
525         debugFatal("Could not prepare isoManager\n");
526         return false;
527     }
528
529     debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
530         if (!disableStreamProcessors()) {
531         debugFatal("Could not disable StreamProcessors...\n");
532         return false;
533     }
534
535     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n");
536     if (!m_isoManager->startHandlers(-1)) {
537         debugFatal("Could not start handlers...\n");
538         return false;
539     }
540
541     // start all SP's synchonized
542     if (!syncStartAll()) {
543         debugFatal("Could not syncStartAll...\n");
544         return false;
545     }
546
547     // dump the iso stream information when in verbose mode
548     if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
549         m_isoManager->dumpInfo();
550     }
551
552     return true;
553
554 }
555
556 bool StreamProcessorManager::stop() {
557     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
558     assert(m_isoManager);
559
560     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to prepare to stop...\n");
561     // Most stream processors can just stop without special treatment.  However, some
562     // (like the MOTU) need to do a few things before it's safe to turn off the iso
563     // handling.
564     int wait_cycles=PREPARE_TIMEOUT_MSEC; // two seconds ought to be sufficient
565     bool allReady = false;
566     while (!allReady && wait_cycles) {
567         wait_cycles--;
568         allReady = true;
569
570         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
571             it != m_ReceiveProcessors.end();
572             ++it ) {
573             if(!(*it)->prepareForStop()) allReady = false;
574         }
575
576         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
577             it != m_TransmitProcessors.end();
578             ++it ) {
579             if(!(*it)->prepareForStop()) allReady = false;
580         }
581         usleep(1000);
582     }
583
584     debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
585         if (!disableStreamProcessors()) {
586         debugFatal("Could not disable StreamProcessors...\n");
587         return false;
588     }
589
590     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n");
591     if(!m_isoManager->stopHandlers()) {
592        debugFatal("Could not stop ISO handlers\n");
593        return false;
594     }
595
596     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processors from handlers...\n");
597     // now unregister all streams from iso manager
598     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
599     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
600         it != m_ReceiveProcessors.end();
601         ++it ) {
602             if (!m_isoManager->unregisterStream(*it)) {
603                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it);
604                 return false;
605             }
606
607         }
608
609     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
610     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
611         it != m_TransmitProcessors.end();
612         ++it ) {
613             if (!m_isoManager->unregisterStream(*it)) {
614                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it);
615                 return false;
616             }
617
618         }
619
620     return true;
621
622 }
623
624 /**
625  * Enables the registered StreamProcessors
626  * @return true if successful, false otherwise
627  */
628 bool StreamProcessorManager::enableStreamProcessors(uint64_t time_to_enable_at) {
629     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors at %llu...\n", time_to_enable_at);
630
631     debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor (%p)...\n",m_SyncSource);
632     debugOutput( DEBUG_LEVEL_VERBOSE, "  Prepare...\n");
633     if (!m_SyncSource->prepareForEnable(time_to_enable_at)) {
634             debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n");
635         return false;
636     }
637
638     debugOutput( DEBUG_LEVEL_VERBOSE, "  Enable...\n");
639     m_SyncSource->enable(time_to_enable_at);
640
641     debugOutput( DEBUG_LEVEL_VERBOSE, " Other StreamProcessors...\n");
642
643     // we prepare the streamprocessors for enable
644     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
645             it != m_ReceiveProcessors.end();
646             ++it ) {
647         if(*it != m_SyncSource) {
648             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Receive SP (%p)...\n",*it);
649             (*it)->prepareForEnable(time_to_enable_at);
650         }
651     }
652
653     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
654             it != m_TransmitProcessors.end();
655             ++it ) {
656         if(*it != m_SyncSource) {
657             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Transmit SP (%p)...\n",*it);
658             (*it)->prepareForEnable(time_to_enable_at);
659         }
660     }
661
662     // then we enable the streamprocessors
663     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
664             it != m_ReceiveProcessors.end();
665             ++it ) {
666         if(*it != m_SyncSource) {
667             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Receive SP (%p)...\n",*it);
668             (*it)->enable(time_to_enable_at);
669         }
670     }
671
672     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
673             it != m_TransmitProcessors.end();
674             ++it ) {
675         if(*it != m_SyncSource) {
676             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Transmit SP (%p)...\n",*it);
677             (*it)->enable(time_to_enable_at);
678         }
679     }
680
681     // now we wait for the SP's to get enabled
682 //     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be enabled...\n");
683 //     // we have to wait until all streamprocessors indicate that they are running
684 //     // i.e. that there is actually some data stream flowing
685 //     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds
686 //     bool notEnabled=true;
687 //     while (notEnabled && wait_cycles) {
688 //         wait_cycles--;
689 //         notEnabled=false;
690 //
691 //         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
692 //                 it != m_ReceiveProcessors.end();
693 //                 ++it ) {
694 //             if(!(*it)->isEnabled()) notEnabled=true;
695 //         }
696 //
697 //         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
698 //                 it != m_TransmitProcessors.end();
699 //                 ++it ) {
700 //             if(!(*it)->isEnabled()) notEnabled=true;
701 //         }
702 //         usleep(1000); // one cycle
703 //     }
704 //
705 //     if(!wait_cycles) { // timout has occurred
706 //         debugFatal("One or more streams couldn't be enabled (timeout):\n");
707 //
708 //         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
709 //                 it != m_ReceiveProcessors.end();
710 //                 ++it ) {
711 //             if(!(*it)->isEnabled()) {
712 //                     debugFatal(" receive stream %p not enabled\n",*it);
713 //             } else {
714 //                     debugFatal(" receive stream %p enabled\n",*it);
715 //             }
716 //         }
717 //
718 //         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
719 //                 it != m_TransmitProcessors.end();
720 //                 ++it ) {
721 //             if(!(*it)->isEnabled()) {
722 //                     debugFatal(" transmit stream %p not enabled\n",*it);
723 //             } else {
724 //                     debugFatal(" transmit stream %p enabled\n",*it);
725 //             }
726 //         }
727 //         return false;
728 //     }
729
730     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors enabled...\n");
731
732     return true;
733 }
734
735 /**
736  * Disables the registered StreamProcessors
737  * @return true if successful, false otherwise
738  */
739 bool StreamProcessorManager::disableStreamProcessors() {
740     // we prepare the streamprocessors for disable
741     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
742             it != m_ReceiveProcessors.end();
743             ++it ) {
744         (*it)->prepareForDisable();
745     }
746
747     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
748             it != m_TransmitProcessors.end();
749             ++it ) {
750         (*it)->prepareForDisable();
751     }
752
753     // then we disable the streamprocessors
754     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
755             it != m_ReceiveProcessors.end();
756             ++it ) {
757         (*it)->disable();
758         (*it)->m_data_buffer->setTransparent(true);
759     }
760
761     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
762             it != m_TransmitProcessors.end();
763             ++it ) {
764         (*it)->disable();
765         (*it)->m_data_buffer->setTransparent(true);
766     }
767
768     // now we wait for the SP's to get disabled
769     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be disabled...\n");
770     // we have to wait until all streamprocessors indicate that they are running
771     // i.e. that there is actually some data stream flowing
772     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds
773     bool enabled=true;
774     while (enabled && wait_cycles) {
775         wait_cycles--;
776         enabled=false;
777
778         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
779                 it != m_ReceiveProcessors.end();
780                 ++it ) {
781             if((*it)->isEnabled()) enabled=true;
782         }
783
784         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
785                 it != m_TransmitProcessors.end();
786                 ++it ) {
787             if((*it)->isEnabled()) enabled=true;
788         }
789         usleep(1000); // one cycle
790     }
791
792     if(!wait_cycles) { // timout has occurred
793         debugFatal("One or more streams couldn't be disabled (timeout):\n");
794
795         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
796                 it != m_ReceiveProcessors.end();
797                 ++it ) {
798             if(!(*it)->isEnabled()) {
799                     debugFatal(" receive stream %p not enabled\n",*it);
800             } else {
801                     debugFatal(" receive stream %p enabled\n",*it);
802             }
803         }
804
805         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
806                 it != m_TransmitProcessors.end();
807                 ++it ) {
808             if(!(*it)->isEnabled()) {
809                     debugFatal(" transmit stream %p not enabled\n",*it);
810             } else {
811                     debugFatal(" transmit stream %p enabled\n",*it);
812             }
813         }
814         return false;
815     }
816
817     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors disabled...\n");
818
819     return true;
820 }
821
822 /**
823  * Called upon Xrun events. This brings all StreamProcessors back
824  * into their starting state, and then carries on streaming. This should
825  * have the same effect as restarting the whole thing.
826  *
827  * @return true if successful, false otherwise
828  */
829 bool StreamProcessorManager::handleXrun() {
830
831     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
832
833     dumpInfo();
834
835     /*
836      * Reset means:
837      * 1) Disabling the SP's, so that they don't process any packets
838      *    note: the isomanager does keep on delivering/requesting them
839      * 2) Bringing all buffers & streamprocessors into a know state
840      *    - Clear all capture buffers
841      *    - Put nb_periods*period_size of null frames into the playback buffers
842      * 3) Re-enable the SP's
843      */
844     debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
845         if (!disableStreamProcessors()) {
846         debugFatal("Could not disable StreamProcessors...\n");
847         return false;
848     }
849
850     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
851     // start all SP's synchonized
852     if (!syncStartAll()) {
853         debugFatal("Could not syncStartAll...\n");
854         return false;
855     }
856
857     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
858
859     return true;
860 }
861
862 /**
863  * @brief Waits until the next period of samples is ready
864  *
865  * This function does not return until a full period of samples is (or should be)
866  * ready to be transferred.
867  *
868  * @return true if the period is ready, false if an xrun occurred
869  */
870 bool StreamProcessorManager::waitForPeriod() {
871     int time_till_next_period;
872     bool xrun_occurred=false;
873
874     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
875
876     assert(m_SyncSource);
877
878     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
879
880     while(time_till_next_period > 0) {
881         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
882
883         // wait for the period
884         usleep(time_till_next_period);
885
886         // check for underruns on the ISO side,
887         // those should make us bail out of the wait loop
888         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
889             it != m_ReceiveProcessors.end();
890             ++it ) {
891             // a xrun has occurred on the Iso side
892             xrun_occurred |= (*it)->xrunOccurred();
893         }
894         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
895             it != m_TransmitProcessors.end();
896             ++it ) {
897             // a xrun has occurred on the Iso side
898             xrun_occurred |= (*it)->xrunOccurred();
899         }
900
901         if(xrun_occurred) break;
902
903         // check if we were waked up too soon
904         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
905     }
906
907     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", time_till_next_period);
908
909     // this is to notify the client of the delay
910     // that we introduced
911     m_delayed_usecs=time_till_next_period;
912
913     // we save the 'ideal' time of the transfer at this point,
914     // because we can have interleaved read - process - write
915     // cycles making that we modify a receiving stream's buffer
916     // before we get to writing.
917     // NOTE: before waitForPeriod() is called again, both the transmit
918     //       and the receive processors should have done their transfer.
919     m_time_of_transfer=m_SyncSource->getTimeAtPeriod();
920     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
921         m_time_of_transfer);
922
923 #ifdef DEBUG
924     int rcv_bf=0, xmt_bf=0;
925     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
926         it != m_ReceiveProcessors.end();
927         ++it ) {
928         rcv_bf = (*it)->getBufferFill();
929     }
930     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
931         it != m_TransmitProcessors.end();
932         ++it ) {
933         xmt_bf = (*it)->getBufferFill();
934     }
935     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
936         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
937
938 #endif
939
940     xrun_occurred=false;
941
942     // check if xruns occurred on the Iso side.
943     // also check if xruns will occur should we transfer() now
944
945     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
946           it != m_ReceiveProcessors.end();
947           ++it ) {
948         // a xrun has occurred on the Iso side
949         xrun_occurred |= (*it)->xrunOccurred();
950
951         // if this is true, a xrun will occur
952         xrun_occurred |= !((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled();
953
954 #ifdef DEBUG
955         if ((*it)->xrunOccurred()) {
956             debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it);
957             (*it)->dumpInfo();
958         }
959         if (!((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled()) {
960             debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it);
961             (*it)->dumpInfo();
962         }
963 #endif
964
965     }
966     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
967           it != m_TransmitProcessors.end();
968           ++it ) {
969         // a xrun has occurred on the Iso side
970         xrun_occurred |= (*it)->xrunOccurred();
971
972         // if this is true, a xrun will occur
973         xrun_occurred |= !((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled();
974
975 #ifdef DEBUG
976         if ((*it)->xrunOccurred()) {
977             debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it);
978         }
979         if (!((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled()) {
980             debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it);
981         }
982 #endif
983     }
984
985     m_nbperiods++;
986
987     // now we can signal the client that we are (should be) ready
988     return !xrun_occurred;
989 }
990
991 /**
992  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
993  *
994  * Transfers one period of frames from the client side to the Iso side and vice versa.
995  *
996  * @return true if successful, false otherwise (indicates xrun).
997  */
998 bool StreamProcessorManager::transfer() {
999
1000     debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n");
1001     bool retval=true;
1002     retval &= dryRun(StreamProcessor::E_Receive);
1003     retval &= dryRun(StreamProcessor::E_Transmit);
1004     return retval;
1005 }
1006
1007 /**
1008  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
1009  *
1010  * Transfers one period of frames from the client side to the Iso side or vice versa.
1011  *
1012  * @param t The processor type to tranfer for (receive or transmit)
1013  * @return true if successful, false otherwise (indicates xrun).
1014  */
1015
1016 bool StreamProcessorManager::transfer(enum StreamProcessor::EProcessorType t) {
1017     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
1018     bool retval = true;
1019     // a static cast could make sure that there is no performance
1020     // penalty for the virtual functions (to be checked)
1021     if (t==StreamProcessor::E_Receive) {
1022         // determine the time at which we want reception to start
1023         float rate=m_SyncSource->getTicksPerFrame();
1024         int64_t one_frame_in_ticks=(int64_t)(((float)m_period)*rate);
1025        
1026         int64_t receive_timestamp = substractTicks(m_time_of_transfer, one_frame_in_ticks);
1027        
1028         if(receive_timestamp<0) {
1029             debugWarning("receive ts < 0.0 : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
1030              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
1031         }
1032         if(receive_timestamp>(128L*TICKS_PER_SECOND)) {
1033             debugWarning("receive ts > 128L*TICKS_PER_SECOND : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
1034              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
1035         }
1036        
1037         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1038                 it != m_ReceiveProcessors.end();
1039                 ++it ) {
1040
1041             if(!(*it)->getFrames(m_period, receive_timestamp)) {
1042                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)\n",
1043                             m_period, m_time_of_transfer,*it);
1044                 retval &= false; // buffer underrun
1045             }
1046         }
1047     } else {
1048         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1049                 it != m_TransmitProcessors.end();
1050                 ++it ) {
1051             // FIXME: in the SPM it would be nice to have system time instead of
1052             //        1394 time
1053             float rate=m_SyncSource->getTicksPerFrame();
1054             int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers*m_period))*rate);
1055
1056             // the data we are putting into the buffer is intended to be transmitted
1057             // one ringbuffer size after it has been received
1058             int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1059
1060             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
1061                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)\n",
1062                         m_period, transmit_timestamp, *it);
1063                 retval &= false; // buffer underrun
1064             }
1065
1066         }
1067     }
1068     return retval;
1069 }
1070
1071 /**
1072  * @brief Dry run one period for both receive and transmit StreamProcessors
1073  *
1074  * Process one period of frames for all streamprocessors, without touching the
1075  * client buffers. This only removes an incoming period from the ISO receive buffer and
1076  * puts one period of silence into the transmit buffers.
1077  *
1078  * @return true if successful, false otherwise (indicates xrun).
1079  */
1080 bool StreamProcessorManager::dryRun() {
1081     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Dry-running period...\n");
1082     bool retval=true;
1083     retval &= dryRun(StreamProcessor::E_Receive);
1084     retval &= dryRun(StreamProcessor::E_Transmit);
1085     return retval;
1086 }
1087
1088 /**
1089  * @brief Dry run one period for either the receive or transmit StreamProcessors
1090  *
1091  * see dryRun()
1092  *
1093  * @param t The processor type to dryRun for (receive or transmit)
1094  * @return true if successful, false otherwise (indicates xrun).
1095  */
1096
1097 bool StreamProcessorManager::dryRun(enum StreamProcessor::EProcessorType t) {
1098     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Dry-running period...\n");
1099     bool retval = true;
1100     // a static cast could make sure that there is no performance
1101     // penalty for the virtual functions (to be checked)
1102     if (t==StreamProcessor::E_Receive) {
1103         // determine the time at which we want reception to start
1104         float rate=m_SyncSource->getTicksPerFrame();
1105         int64_t one_frame_in_ticks=(int64_t)(((float)m_period)*rate);
1106        
1107         int64_t receive_timestamp = substractTicks(m_time_of_transfer, one_frame_in_ticks);
1108        
1109         if(receive_timestamp<0) {
1110             debugWarning("receive ts < 0.0 : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
1111              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
1112         }
1113         if(receive_timestamp>(128L*TICKS_PER_SECOND)) {
1114             debugWarning("receive ts > 128L*TICKS_PER_SECOND : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
1115              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
1116         }
1117        
1118         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1119                 it != m_ReceiveProcessors.end();
1120                 ++it ) {
1121
1122             if(!(*it)->getFramesDry(m_period, receive_timestamp)) {
1123                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)\n",
1124                             m_period, m_time_of_transfer,*it);
1125                 retval &= false; // buffer underrun
1126             }
1127
1128         }
1129     } else {
1130         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1131                 it != m_TransmitProcessors.end();
1132                 ++it ) {
1133             // FIXME: in the SPM it would be nice to have system time instead of
1134             //        1394 time
1135             float rate=m_SyncSource->getTicksPerFrame();
1136             int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers*m_period))*rate);
1137
1138             // the data we are putting into the buffer is intended to be transmitted
1139             // one ringbuffer size after it has been received
1140             int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1141
1142             if(!(*it)->putFramesDry(m_period, transmit_timestamp)) {
1143                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)\n",
1144                         m_period, transmit_timestamp, *it);
1145                 retval &= false; // buffer underrun
1146             }
1147
1148         }
1149     }
1150     return retval;
1151 }
1152
1153 void StreamProcessorManager::dumpInfo() {
1154     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1155     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
1156     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
1157
1158     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
1159     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1160         it != m_ReceiveProcessors.end();
1161         ++it ) {
1162         (*it)->dumpInfo();
1163     }
1164
1165     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
1166     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1167         it != m_TransmitProcessors.end();
1168         ++it ) {
1169         (*it)->dumpInfo();
1170     }
1171
1172     debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
1173     m_isoManager->dumpInfo();
1174     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1175
1176 }
1177
1178 void StreamProcessorManager::setVerboseLevel(int l) {
1179     setDebugLevel(l);
1180
1181     if (m_isoManager) m_isoManager->setVerboseLevel(l);
1182
1183     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
1184     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1185         it != m_ReceiveProcessors.end();
1186         ++it ) {
1187         (*it)->setVerboseLevel(l);
1188     }
1189
1190     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
1191     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1192         it != m_TransmitProcessors.end();
1193         ++it ) {
1194         (*it)->setVerboseLevel(l);
1195     }
1196 }
1197
1198
1199 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1200     int count=0;
1201
1202     if (direction == Port::E_Capture) {
1203         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1204             it != m_ReceiveProcessors.end();
1205             ++it ) {
1206             count += (*it)->getPortCount(type);
1207         }
1208     } else {
1209         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1210             it != m_TransmitProcessors.end();
1211             ++it ) {
1212             count += (*it)->getPortCount(type);
1213         }
1214     }
1215     return count;
1216 }
1217
1218 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1219     int count=0;
1220
1221     if (direction == Port::E_Capture) {
1222         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1223             it != m_ReceiveProcessors.end();
1224             ++it ) {
1225             count += (*it)->getPortCount();
1226         }
1227     } else {
1228         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1229             it != m_TransmitProcessors.end();
1230             ++it ) {
1231             count += (*it)->getPortCount();
1232         }
1233     }
1234     return count;
1235 }
1236
1237 // TODO: implement a port map here, instead of the loop
1238
1239 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1240     int count=0;
1241     int prevcount=0;
1242
1243     if (direction == Port::E_Capture) {
1244         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1245             it != m_ReceiveProcessors.end();
1246             ++it ) {
1247             count += (*it)->getPortCount();
1248             if (count > idx) {
1249                 return (*it)->getPortAtIdx(idx-prevcount);
1250             }
1251             prevcount=count;
1252         }
1253     } else {
1254         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1255             it != m_TransmitProcessors.end();
1256             ++it ) {
1257             count += (*it)->getPortCount();
1258             if (count > idx) {
1259                 return (*it)->getPortAtIdx(idx-prevcount);
1260             }
1261             prevcount=count;
1262         }
1263     }
1264     return NULL;
1265 }
1266
1267 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1268     m_thread_realtime=rt;
1269     m_thread_priority=priority;
1270     return true;
1271 }
1272
1273
1274 } // end of namespace
Note: See TracBrowser for help on using the browser.