root/branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp

Revision 709, 43.7 kB (checked in by ppalmers, 16 years ago)

some more streaming system updates.
this works with the saffire up till -n2 -p256

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public
11  * License version 2.1, as published by the Free Software Foundation;
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public
19  * License along with this library; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
21  * MA 02110-1301 USA
22  */
23
24 #include "StreamProcessorManager.h"
25 #include "generic/StreamProcessor.h"
26 #include "generic/Port.h"
27 #include "util/cycletimer.h"
28
29 #include <errno.h>
30 #include <assert.h>
31
32 #define RUNNING_TIMEOUT_MSEC 4000
33 #define PREPARE_TIMEOUT_MSEC 4000
34 #define ENABLE_TIMEOUT_MSEC 4000
35
36 namespace Streaming {
37
38 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
39
40 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int nb_buffers)
41     : m_is_slave( false )
42     , m_SyncSource(NULL)
43     , m_nb_buffers(nb_buffers)
44     , m_period(period)
45     , m_xruns(0)
46     , m_isoManager(0)
47     , m_nbperiods(0)
48 {
49     addOption(Util::OptionContainer::Option("slaveMode",false));
50 }
51
52 StreamProcessorManager::~StreamProcessorManager() {
53     if (m_isoManager) delete m_isoManager;
54
55 }
56
57 /**
58  * Registers \ref processor with this manager.
59  *
60  * also registers it with the isohandlermanager
61  *
62  * be sure to call isohandlermanager->init() first!
63  * and be sure that the processors are also ->init()'ed
64  *
65  * @param processor
66  * @return true if successfull
67  */
68 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
69 {
70     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
71     assert(processor);
72     assert(m_isoManager);
73
74     if (processor->getType()==StreamProcessor::E_Receive) {
75         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
76
77         m_ReceiveProcessors.push_back(processor);
78
79         processor->setManager(this);
80
81         return true;
82     }
83
84     if (processor->getType()==StreamProcessor::E_Transmit) {
85         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
86
87         m_TransmitProcessors.push_back(processor);
88
89         processor->setManager(this);
90
91         return true;
92     }
93
94     debugFatal("Unsupported processor type!\n");
95
96     return false;
97 }
98
99 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
100 {
101     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
102     assert(processor);
103
104     if (processor->getType()==StreamProcessor::E_Receive) {
105
106         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
107             it != m_ReceiveProcessors.end();
108             ++it ) {
109
110             if ( *it == processor ) {
111                     m_ReceiveProcessors.erase(it);
112
113                     processor->clearManager();
114
115                     if(!m_isoManager->unregisterStream(processor)) {
116                         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
117
118                         return false;
119
120                     }
121
122                     return true;
123                 }
124         }
125     }
126
127     if (processor->getType()==StreamProcessor::E_Transmit) {
128         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
129             it != m_TransmitProcessors.end();
130             ++it ) {
131
132             if ( *it == processor ) {
133                     m_TransmitProcessors.erase(it);
134
135                     processor->clearManager();
136
137                     if(!m_isoManager->unregisterStream(processor)) {
138                         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
139
140                         return false;
141
142                     }
143
144                     return true;
145                 }
146         }
147     }
148
149     debugFatal("Processor (%p) not found!\n",processor);
150
151     return false; //not found
152
153 }
154
155 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
156     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
157
158     m_SyncSource=s;
159     return true;
160 }
161
162 StreamProcessor *StreamProcessorManager::getSyncSource() {
163     return m_SyncSource;
164 }
165
166 bool StreamProcessorManager::init()
167 {
168     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
169
170     m_isoManager=new IsoHandlerManager(m_thread_realtime, m_thread_priority + 1);
171
172     if(!m_isoManager) {
173         debugFatal("Could not create IsoHandlerManager\n");
174         return false;
175     }
176
177     // propagate the debug level
178     m_isoManager->setVerboseLevel(getDebugLevel());
179
180     if(!m_isoManager->init()) {
181         debugFatal("Could not initialize IsoHandlerManager\n");
182         return false;
183     }
184
185     m_xrun_happened=false;
186
187     return true;
188 }
189
190 bool StreamProcessorManager::prepare() {
191
192     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
193
194     m_is_slave=false;
195     if(!getOption("slaveMode", m_is_slave)) {
196         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
197     }
198
199     // if no sync source is set, select one here
200     if(m_SyncSource == NULL) {
201        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
202     }
203
204     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
205         it != m_ReceiveProcessors.end();
206         ++it ) {
207             if(m_SyncSource == NULL) {
208                 debugWarning(" => Sync Source is %p.\n", *it);
209                 m_SyncSource = *it;
210             }
211     }
212
213     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
214         it != m_TransmitProcessors.end();
215         ++it ) {
216             if(m_SyncSource == NULL) {
217                 debugWarning(" => Sync Source is %p.\n", *it);
218                 m_SyncSource = *it;
219             }
220     }
221
222     // now do the actual preparation
223     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
224     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
225         it != m_ReceiveProcessors.end();
226         ++it ) {
227
228         if(!(*it)->setSyncSource(m_SyncSource)) {
229             debugFatal(  " could not set sync source (%p)...\n",(*it));
230             return false;
231         }
232
233         if(!(*it)->setOption("slaveMode", m_is_slave)) {
234             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
235         }
236
237         if(!(*it)->prepare()) {
238             debugFatal(  " could not prepare (%p)...\n",(*it));
239             return false;
240         }
241     }
242
243     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
244     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
245         it != m_TransmitProcessors.end();
246         ++it ) {
247         if(!(*it)->setSyncSource(m_SyncSource)) {
248             debugFatal(  " could not set sync source (%p)...\n",(*it));
249             return false;
250         }
251         if(!(*it)->setOption("slaveMode", m_is_slave)) {
252             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
253         }
254         if(!(*it)->prepare()) {
255             debugFatal( " could not prepare (%p)...\n",(*it));
256             return false;
257         }
258     }
259
260     // if there are no stream processors registered,
261     // fail
262     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
263         debugFatal("No stream processors registered, can't do anything usefull\n");
264         return false;
265     }
266
267     return true;
268 }
269
270 bool StreamProcessorManager::syncStartAll() {
271
272     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for StreamProcessor streams to start running...\n");
273     // we have to wait until all streamprocessors indicate that they are running
274     // i.e. that there is actually some data stream flowing
275     int wait_cycles=RUNNING_TIMEOUT_MSEC; // two seconds
276     bool notRunning=true;
277     while (notRunning && wait_cycles) {
278         wait_cycles--;
279         notRunning=false;
280
281         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
282                 it != m_ReceiveProcessors.end();
283                 ++it ) {
284             if(!(*it)->isRunning()) notRunning=true;
285         }
286
287         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
288                 it != m_TransmitProcessors.end();
289                 ++it ) {
290             if(!(*it)->isRunning()) notRunning=true;
291         }
292
293         usleep(1000);
294         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Running check: %d\n", notRunning);
295     }
296
297     if(!wait_cycles) { // timout has occurred
298         debugFatal("One or more streams are not starting up (timeout):\n");
299
300         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
301                 it != m_ReceiveProcessors.end();
302                 ++it ) {
303             if(!(*it)->isRunning()) {
304                 debugFatal(" receive stream %p not running\n",*it);
305             } else {
306                 debugFatal(" receive stream %p running\n",*it);
307             }
308         }
309
310         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
311                 it != m_TransmitProcessors.end();
312                 ++it ) {
313             if(!(*it)->isRunning()) {
314                 debugFatal(" transmit stream %p not running\n",*it);
315             } else {
316                 debugFatal(" transmit stream %p running\n",*it);
317             }
318         }
319         return false;
320     }
321
322     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
323
324     // now find out how long we have to delay the wait operation such that
325     // the received frames will all be presented to the SP
326     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
327     int max_of_min_delay=0;
328     int min_delay=0;
329     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
330             it != m_ReceiveProcessors.end();
331             ++it ) {
332         min_delay=(*it)->getMaxFrameLatency();
333         if(min_delay>max_of_min_delay) max_of_min_delay=min_delay;
334     }
335
336     debugOutput( DEBUG_LEVEL_VERBOSE, "  %d ticks (%03us %04uc %04ut)...\n",
337         max_of_min_delay,
338         (unsigned int)TICKS_TO_SECS(max_of_min_delay),
339         (unsigned int)TICKS_TO_CYCLES(max_of_min_delay),
340         (unsigned int)TICKS_TO_OFFSET(max_of_min_delay));
341     m_SyncSource->setSyncDelay(max_of_min_delay);
342
343
344     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting StreamProcessors...\n");
345     // now we reset the frame counters
346     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
347             it != m_ReceiveProcessors.end();
348             ++it ) {
349         // get the receive SP's going at receiving data
350         (*it)->m_data_buffer->setTransparent(false);
351         (*it)->reset();
352     }
353
354     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
355             it != m_TransmitProcessors.end();
356             ++it ) {
357         // make sure the SP retains it's prefilled data
358         (*it)->m_data_buffer->setTransparent(false);
359         (*it)->reset();
360     }
361    
362     dumpInfo();
363     // All buffers are prefilled and set-up, the only thing
364     // that remains a mistery is the timestamp information.
365 //     m_SyncSource->m_data_buffer->setTransparent(false);
366 //     debugShowBackLog();
367    
368     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
369     // in order to obtain that, we wait for the first periods to be
370     // received.
371     int nb_sync_runs=10;
372     while(nb_sync_runs--) { // or while not sync-ed?
373         waitForPeriod();
374
375         // drop the frames for all receive SP's
376         dryRun(StreamProcessor::E_Receive);
377        
378         // we don't have to dryrun for the xmit SP's since they
379         // are not sending data yet.
380     }
381
382     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
383         m_time_of_transfer,
384         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
385         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
386         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
387     // FIXME: xruns can screw up the framecounter accounting. do something more sane here
388     resetXrunCounters();
389     // lock the isohandlermanager such that things freeze
390 //     debugShowBackLog();
391
392     // we now should have decent sync info
393     // the buffers of the receive streams should be (approx) empty
394     // the buffers of the xmit streams should be full
395    
396     // note what should the timestamp of the first sample be?
397    
398     // at this point the buffer head timestamp of the transmit buffers can be
399     // set properly since we know the sync source's timestamp of the last
400     // buffer transfer. we also know the rate.
401    
402     debugOutput( DEBUG_LEVEL_VERBOSE, " propagate sync info...\n");
403     // FIXME: in the SPM it would be nice to have system time instead of
404     //        1394 time
405 //     float rate=m_SyncSource->getTicksPerFrame();
406 //     int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers*m_period))*rate);
407 //     // the data at the front of the buffer is intended to be transmitted
408 //     // nb_periods*period_size after the last received period
409 //     int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
410
411     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
412             it != m_TransmitProcessors.end();
413             ++it ) {
414         // FIXME: encapsulate
415         (*it)->m_data_buffer->setBufferHeadTimestamp(m_time_of_transfer);
416         //(*it)->m_data_buffer->setNominalRate(rate); //CHECK!!!
417     }
418    
419     dumpInfo();
420    
421     // this is the place were we should be syncing the received streams too
422     // check how much they differ
423    
424    
425     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n");
426
427     // FIXME: this should not be in cycles, but in 'time'
428     // FIXME: remove the timestamp
429     if (!enableStreamProcessors(0)) {
430         debugFatal("Could not enable StreamProcessors...\n");
431         return false;
432     }
433
434     debugOutput( DEBUG_LEVEL_VERBOSE, "Running dry for a while...\n");
435     #define MAX_DRYRUN_CYCLES               20
436     #define MIN_SUCCESSFUL_DRYRUN_CYCLES    4
437     // run some cycles 'dry' such that everything can stabilize
438     int nb_dryrun_cycles_left = MAX_DRYRUN_CYCLES;
439     int nb_succesful_cycles = 0;
440     while(nb_dryrun_cycles_left > 0 &&
441           nb_succesful_cycles < MIN_SUCCESSFUL_DRYRUN_CYCLES ) {
442
443         waitForPeriod();
444
445         if (dryRun()) {
446             nb_succesful_cycles++;
447         } else {
448             debugOutput( DEBUG_LEVEL_VERBOSE, " This dry-run was not xrun free...\n" );
449             resetXrunCounters();
450             nb_succesful_cycles = 0;
451             // FIXME: xruns can screw up the framecounter accounting. do something more sane here
452         }
453         nb_dryrun_cycles_left--;
454     }
455
456     if(nb_dryrun_cycles_left == 0) {
457         debugOutput( DEBUG_LEVEL_VERBOSE, " max # dry-run cycles achieved without steady-state...\n" );
458         return false;
459     }
460     debugOutput( DEBUG_LEVEL_VERBOSE, " dry-run resulted in steady-state...\n" );
461
462     // now we should clear the xrun flags
463     resetXrunCounters();
464
465     // and off we go
466     return true;
467 }
468
469 void StreamProcessorManager::resetXrunCounters(){
470     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting xrun flags...\n");
471     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
472         it != m_ReceiveProcessors.end();
473         ++it )
474     {
475         (*it)->resetXrunCounter();
476     }
477
478     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
479         it != m_TransmitProcessors.end();
480         ++it )
481     {
482         (*it)->resetXrunCounter();
483     }
484 }
485
486 bool StreamProcessorManager::start() {
487     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
488     assert(m_isoManager);
489
490     debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
491     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
492     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
493         it != m_ReceiveProcessors.end();
494         ++it ) {
495             if (!(*it)->prepareForStart()) {
496                 debugOutput(DEBUG_LEVEL_VERBOSE,"Receive stream processor (%p) failed to prepare for start\n", *it);
497                 return false;
498             }
499             if (!m_isoManager->registerStream(*it)) {
500                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
501                 return false;
502             }
503         }
504
505     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
506     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
507         it != m_TransmitProcessors.end();
508         ++it ) {
509             if (!(*it)->prepareForStart()) {
510                 debugOutput(DEBUG_LEVEL_VERBOSE,"Transmit stream processor (%p) failed to prepare for start\n", *it);
511                 return false;
512             }
513             if (!m_isoManager->registerStream(*it)) {
514                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
515                 return false;
516             }
517         }
518
519     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
520     if (!m_isoManager->prepare()) {
521         debugFatal("Could not prepare isoManager\n");
522         return false;
523     }
524
525     debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
526         if (!disableStreamProcessors()) {
527         debugFatal("Could not disable StreamProcessors...\n");
528         return false;
529     }
530
531     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n");
532     if (!m_isoManager->startHandlers(-1)) {
533         debugFatal("Could not start handlers...\n");
534         return false;
535     }
536
537     // start all SP's synchonized
538     if (!syncStartAll()) {
539         debugFatal("Could not syncStartAll...\n");
540         return false;
541     }
542
543     // dump the iso stream information when in verbose mode
544     if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
545         m_isoManager->dumpInfo();
546     }
547
548     return true;
549
550 }
551
552 bool StreamProcessorManager::stop() {
553     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
554     assert(m_isoManager);
555
556     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to prepare to stop...\n");
557     // Most stream processors can just stop without special treatment.  However, some
558     // (like the MOTU) need to do a few things before it's safe to turn off the iso
559     // handling.
560     int wait_cycles=PREPARE_TIMEOUT_MSEC; // two seconds ought to be sufficient
561     bool allReady = false;
562     while (!allReady && wait_cycles) {
563         wait_cycles--;
564         allReady = true;
565
566         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
567             it != m_ReceiveProcessors.end();
568             ++it ) {
569             if(!(*it)->prepareForStop()) allReady = false;
570         }
571
572         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
573             it != m_TransmitProcessors.end();
574             ++it ) {
575             if(!(*it)->prepareForStop()) allReady = false;
576         }
577         usleep(1000);
578     }
579
580     debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
581         if (!disableStreamProcessors()) {
582         debugFatal("Could not disable StreamProcessors...\n");
583         return false;
584     }
585
586     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n");
587     if(!m_isoManager->stopHandlers()) {
588        debugFatal("Could not stop ISO handlers\n");
589        return false;
590     }
591
592     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processors from handlers...\n");
593     // now unregister all streams from iso manager
594     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
595     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
596         it != m_ReceiveProcessors.end();
597         ++it ) {
598             if (!m_isoManager->unregisterStream(*it)) {
599                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it);
600                 return false;
601             }
602
603         }
604
605     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
606     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
607         it != m_TransmitProcessors.end();
608         ++it ) {
609             if (!m_isoManager->unregisterStream(*it)) {
610                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it);
611                 return false;
612             }
613
614         }
615
616     return true;
617
618 }
619
620 /**
621  * Enables the registered StreamProcessors
622  * @return true if successful, false otherwise
623  */
624 bool StreamProcessorManager::enableStreamProcessors(uint64_t time_to_enable_at) {
625     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors at %llu...\n", time_to_enable_at);
626
627     debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor (%p)...\n",m_SyncSource);
628     debugOutput( DEBUG_LEVEL_VERBOSE, "  Prepare...\n");
629     if (!m_SyncSource->prepareForEnable(time_to_enable_at)) {
630             debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n");
631         return false;
632     }
633
634     debugOutput( DEBUG_LEVEL_VERBOSE, "  Enable...\n");
635     m_SyncSource->enable(time_to_enable_at);
636
637     debugOutput( DEBUG_LEVEL_VERBOSE, " Other StreamProcessors...\n");
638
639     // we prepare the streamprocessors for enable
640     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
641             it != m_ReceiveProcessors.end();
642             ++it ) {
643         if(*it != m_SyncSource) {
644             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Receive SP (%p)...\n",*it);
645             (*it)->prepareForEnable(time_to_enable_at);
646         }
647     }
648
649     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
650             it != m_TransmitProcessors.end();
651             ++it ) {
652         if(*it != m_SyncSource) {
653             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Transmit SP (%p)...\n",*it);
654             (*it)->prepareForEnable(time_to_enable_at);
655         }
656     }
657
658     // then we enable the streamprocessors
659     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
660             it != m_ReceiveProcessors.end();
661             ++it ) {
662         if(*it != m_SyncSource) {
663             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Receive SP (%p)...\n",*it);
664             (*it)->enable(time_to_enable_at);
665         }
666     }
667
668     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
669             it != m_TransmitProcessors.end();
670             ++it ) {
671         if(*it != m_SyncSource) {
672             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Transmit SP (%p)...\n",*it);
673             (*it)->enable(time_to_enable_at);
674         }
675     }
676
677     // now we wait for the SP's to get enabled
678 //     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be enabled...\n");
679 //     // we have to wait until all streamprocessors indicate that they are running
680 //     // i.e. that there is actually some data stream flowing
681 //     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds
682 //     bool notEnabled=true;
683 //     while (notEnabled && wait_cycles) {
684 //         wait_cycles--;
685 //         notEnabled=false;
686 //
687 //         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
688 //                 it != m_ReceiveProcessors.end();
689 //                 ++it ) {
690 //             if(!(*it)->isEnabled()) notEnabled=true;
691 //         }
692 //
693 //         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
694 //                 it != m_TransmitProcessors.end();
695 //                 ++it ) {
696 //             if(!(*it)->isEnabled()) notEnabled=true;
697 //         }
698 //         usleep(1000); // one cycle
699 //     }
700 //
701 //     if(!wait_cycles) { // timout has occurred
702 //         debugFatal("One or more streams couldn't be enabled (timeout):\n");
703 //
704 //         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
705 //                 it != m_ReceiveProcessors.end();
706 //                 ++it ) {
707 //             if(!(*it)->isEnabled()) {
708 //                     debugFatal(" receive stream %p not enabled\n",*it);
709 //             } else {
710 //                     debugFatal(" receive stream %p enabled\n",*it);
711 //             }
712 //         }
713 //
714 //         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
715 //                 it != m_TransmitProcessors.end();
716 //                 ++it ) {
717 //             if(!(*it)->isEnabled()) {
718 //                     debugFatal(" transmit stream %p not enabled\n",*it);
719 //             } else {
720 //                     debugFatal(" transmit stream %p enabled\n",*it);
721 //             }
722 //         }
723 //         return false;
724 //     }
725
726     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors enabled...\n");
727
728     return true;
729 }
730
731 /**
732  * Disables the registered StreamProcessors
733  * @return true if successful, false otherwise
734  */
735 bool StreamProcessorManager::disableStreamProcessors() {
736     // we prepare the streamprocessors for disable
737     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
738             it != m_ReceiveProcessors.end();
739             ++it ) {
740         (*it)->prepareForDisable();
741     }
742
743     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
744             it != m_TransmitProcessors.end();
745             ++it ) {
746         (*it)->prepareForDisable();
747     }
748
749     // then we disable the streamprocessors
750     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
751             it != m_ReceiveProcessors.end();
752             ++it ) {
753         (*it)->disable();
754         (*it)->m_data_buffer->setTransparent(true);
755     }
756
757     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
758             it != m_TransmitProcessors.end();
759             ++it ) {
760         (*it)->disable();
761         (*it)->m_data_buffer->setTransparent(true);
762     }
763
764     // now we wait for the SP's to get disabled
765     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be disabled...\n");
766     // we have to wait until all streamprocessors indicate that they are running
767     // i.e. that there is actually some data stream flowing
768     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds
769     bool enabled=true;
770     while (enabled && wait_cycles) {
771         wait_cycles--;
772         enabled=false;
773
774         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
775                 it != m_ReceiveProcessors.end();
776                 ++it ) {
777             if((*it)->isEnabled()) enabled=true;
778         }
779
780         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
781                 it != m_TransmitProcessors.end();
782                 ++it ) {
783             if((*it)->isEnabled()) enabled=true;
784         }
785         usleep(1000); // one cycle
786     }
787
788     if(!wait_cycles) { // timout has occurred
789         debugFatal("One or more streams couldn't be disabled (timeout):\n");
790
791         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
792                 it != m_ReceiveProcessors.end();
793                 ++it ) {
794             if(!(*it)->isEnabled()) {
795                     debugFatal(" receive stream %p not enabled\n",*it);
796             } else {
797                     debugFatal(" receive stream %p enabled\n",*it);
798             }
799         }
800
801         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
802                 it != m_TransmitProcessors.end();
803                 ++it ) {
804             if(!(*it)->isEnabled()) {
805                     debugFatal(" transmit stream %p not enabled\n",*it);
806             } else {
807                     debugFatal(" transmit stream %p enabled\n",*it);
808             }
809         }
810         return false;
811     }
812
813     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors disabled...\n");
814
815     return true;
816 }
817
818 /**
819  * Called upon Xrun events. This brings all StreamProcessors back
820  * into their starting state, and then carries on streaming. This should
821  * have the same effect as restarting the whole thing.
822  *
823  * @return true if successful, false otherwise
824  */
825 bool StreamProcessorManager::handleXrun() {
826
827     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
828
829     dumpInfo();
830
831     /*
832      * Reset means:
833      * 1) Disabling the SP's, so that they don't process any packets
834      *    note: the isomanager does keep on delivering/requesting them
835      * 2) Bringing all buffers & streamprocessors into a know state
836      *    - Clear all capture buffers
837      *    - Put nb_periods*period_size of null frames into the playback buffers
838      * 3) Re-enable the SP's
839      */
840     debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
841         if (!disableStreamProcessors()) {
842         debugFatal("Could not disable StreamProcessors...\n");
843         return false;
844     }
845
846     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
847     // start all SP's synchonized
848     if (!syncStartAll()) {
849         debugFatal("Could not syncStartAll...\n");
850         return false;
851     }
852
853     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
854
855     return true;
856 }
857
858 /**
859  * @brief Waits until the next period of samples is ready
860  *
861  * This function does not return until a full period of samples is (or should be)
862  * ready to be transferred.
863  *
864  * @return true if the period is ready, false if an xrun occurred
865  */
866 bool StreamProcessorManager::waitForPeriod() {
867     int time_till_next_period;
868     bool xrun_occurred=false;
869
870     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
871
872     assert(m_SyncSource);
873
874     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
875
876     while(time_till_next_period > 0) {
877         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
878
879         // wait for the period
880         usleep(time_till_next_period);
881
882         // check for underruns on the ISO side,
883         // those should make us bail out of the wait loop
884         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
885             it != m_ReceiveProcessors.end();
886             ++it ) {
887             // a xrun has occurred on the Iso side
888             xrun_occurred |= (*it)->xrunOccurred();
889         }
890         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
891             it != m_TransmitProcessors.end();
892             ++it ) {
893             // a xrun has occurred on the Iso side
894             xrun_occurred |= (*it)->xrunOccurred();
895         }
896
897         if(xrun_occurred) break;
898
899         // check if we were waked up too soon
900         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
901     }
902
903     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", time_till_next_period);
904
905     // this is to notify the client of the delay
906     // that we introduced
907     m_delayed_usecs=time_till_next_period;
908
909     // we save the 'ideal' time of the transfer at this point,
910     // because we can have interleaved read - process - write
911     // cycles making that we modify a receiving stream's buffer
912     // before we get to writing.
913     // NOTE: before waitForPeriod() is called again, both the transmit
914     //       and the receive processors should have done their transfer.
915     m_time_of_transfer=m_SyncSource->getTimeAtPeriod();
916     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
917         m_time_of_transfer);
918
919 #ifdef DEBUG
920     int rcv_bf=0, xmt_bf=0;
921     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
922         it != m_ReceiveProcessors.end();
923         ++it ) {
924         rcv_bf = (*it)->getBufferFill();
925     }
926     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
927         it != m_TransmitProcessors.end();
928         ++it ) {
929         xmt_bf = (*it)->getBufferFill();
930     }
931     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
932         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
933
934 #endif
935
936     xrun_occurred=false;
937
938     // check if xruns occurred on the Iso side.
939     // also check if xruns will occur should we transfer() now
940
941     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
942           it != m_ReceiveProcessors.end();
943           ++it ) {
944         // a xrun has occurred on the Iso side
945         xrun_occurred |= (*it)->xrunOccurred();
946
947         // if this is true, a xrun will occur
948         xrun_occurred |= !((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled();
949
950 #ifdef DEBUG
951         if ((*it)->xrunOccurred()) {
952             debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it);
953             (*it)->dumpInfo();
954         }
955         if (!((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled()) {
956             debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it);
957             (*it)->dumpInfo();
958         }
959 #endif
960
961     }
962     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
963           it != m_TransmitProcessors.end();
964           ++it ) {
965         // a xrun has occurred on the Iso side
966         xrun_occurred |= (*it)->xrunOccurred();
967
968         // if this is true, a xrun will occur
969         xrun_occurred |= !((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled();
970
971 #ifdef DEBUG
972         if ((*it)->xrunOccurred()) {
973             debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it);
974         }
975         if (!((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled()) {
976             debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it);
977         }
978 #endif
979     }
980
981     m_nbperiods++;
982
983     // now we can signal the client that we are (should be) ready
984     return !xrun_occurred;
985 }
986
987 /**
988  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
989  *
990  * Transfers one period of frames from the client side to the Iso side and vice versa.
991  *
992  * @return true if successful, false otherwise (indicates xrun).
993  */
994 bool StreamProcessorManager::transfer() {
995
996     debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n");
997     bool retval=true;
998     retval &= dryRun(StreamProcessor::E_Receive);
999     retval &= dryRun(StreamProcessor::E_Transmit);
1000     return retval;
1001 }
1002
1003 /**
1004  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
1005  *
1006  * Transfers one period of frames from the client side to the Iso side or vice versa.
1007  *
1008  * @param t The processor type to tranfer for (receive or transmit)
1009  * @return true if successful, false otherwise (indicates xrun).
1010  */
1011
1012 bool StreamProcessorManager::transfer(enum StreamProcessor::EProcessorType t) {
1013     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
1014     bool retval = true;
1015     // a static cast could make sure that there is no performance
1016     // penalty for the virtual functions (to be checked)
1017     if (t==StreamProcessor::E_Receive) {
1018         // determine the time at which we want reception to start
1019         float rate=m_SyncSource->getTicksPerFrame();
1020         int64_t one_frame_in_ticks=(int64_t)(((float)m_period)*rate);
1021        
1022         int64_t receive_timestamp = substractTicks(m_time_of_transfer, one_frame_in_ticks);
1023        
1024         if(receive_timestamp<0) {
1025             debugWarning("receive ts < 0.0 : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
1026              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
1027         }
1028         if(receive_timestamp>(128L*TICKS_PER_SECOND)) {
1029             debugWarning("receive ts > 128L*TICKS_PER_SECOND : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
1030              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
1031         }
1032        
1033         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1034                 it != m_ReceiveProcessors.end();
1035                 ++it ) {
1036
1037             if(!(*it)->getFrames(m_period, receive_timestamp)) {
1038                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)\n",
1039                             m_period, m_time_of_transfer,*it);
1040                 retval &= false; // buffer underrun
1041             }
1042         }
1043     } else {
1044         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1045                 it != m_TransmitProcessors.end();
1046                 ++it ) {
1047             // FIXME: in the SPM it would be nice to have system time instead of
1048             //        1394 time
1049             float rate=m_SyncSource->getTicksPerFrame();
1050             int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers*m_period))*rate);
1051
1052             // the data we are putting into the buffer is intended to be transmitted
1053             // one ringbuffer size after it has been received
1054             int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1055
1056             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
1057                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)\n",
1058                         m_period, transmit_timestamp, *it);
1059                 retval &= false; // buffer underrun
1060             }
1061
1062         }
1063     }
1064     return retval;
1065 }
1066
1067 /**
1068  * @brief Dry run one period for both receive and transmit StreamProcessors
1069  *
1070  * Process one period of frames for all streamprocessors, without touching the
1071  * client buffers. This only removes an incoming period from the ISO receive buffer and
1072  * puts one period of silence into the transmit buffers.
1073  *
1074  * @return true if successful, false otherwise (indicates xrun).
1075  */
1076 bool StreamProcessorManager::dryRun() {
1077     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Dry-running period...\n");
1078     bool retval=true;
1079     retval &= dryRun(StreamProcessor::E_Receive);
1080     retval &= dryRun(StreamProcessor::E_Transmit);
1081     return retval;
1082 }
1083
1084 /**
1085  * @brief Dry run one period for either the receive or transmit StreamProcessors
1086  *
1087  * see dryRun()
1088  *
1089  * @param t The processor type to dryRun for (receive or transmit)
1090  * @return true if successful, false otherwise (indicates xrun).
1091  */
1092
1093 bool StreamProcessorManager::dryRun(enum StreamProcessor::EProcessorType t) {
1094     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Dry-running period...\n");
1095     bool retval = true;
1096     // a static cast could make sure that there is no performance
1097     // penalty for the virtual functions (to be checked)
1098     if (t==StreamProcessor::E_Receive) {
1099         // determine the time at which we want reception to start
1100         float rate=m_SyncSource->getTicksPerFrame();
1101         int64_t one_frame_in_ticks=(int64_t)(((float)m_period)*rate);
1102        
1103         int64_t receive_timestamp = substractTicks(m_time_of_transfer, one_frame_in_ticks);
1104        
1105         if(receive_timestamp<0) {
1106             debugWarning("receive ts < 0.0 : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
1107              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
1108         }
1109         if(receive_timestamp>(128L*TICKS_PER_SECOND)) {
1110             debugWarning("receive ts > 128L*TICKS_PER_SECOND : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
1111              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
1112         }
1113        
1114         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1115                 it != m_ReceiveProcessors.end();
1116                 ++it ) {
1117
1118             if(!(*it)->getFramesDry(m_period, receive_timestamp)) {
1119                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)\n",
1120                             m_period, m_time_of_transfer,*it);
1121                 retval &= false; // buffer underrun
1122             }
1123
1124         }
1125     } else {
1126         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1127                 it != m_TransmitProcessors.end();
1128                 ++it ) {
1129             // FIXME: in the SPM it would be nice to have system time instead of
1130             //        1394 time
1131             float rate=m_SyncSource->getTicksPerFrame();
1132             int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers*m_period))*rate);
1133
1134             // the data we are putting into the buffer is intended to be transmitted
1135             // one ringbuffer size after it has been received
1136             int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1137
1138             if(!(*it)->putFramesDry(m_period, transmit_timestamp)) {
1139                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)\n",
1140                         m_period, transmit_timestamp, *it);
1141                 retval &= false; // buffer underrun
1142             }
1143
1144         }
1145     }
1146     return retval;
1147 }
1148
1149 void StreamProcessorManager::dumpInfo() {
1150     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1151     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
1152     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
1153
1154     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
1155     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1156         it != m_ReceiveProcessors.end();
1157         ++it ) {
1158         (*it)->dumpInfo();
1159     }
1160
1161     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
1162     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1163         it != m_TransmitProcessors.end();
1164         ++it ) {
1165         (*it)->dumpInfo();
1166     }
1167
1168     debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
1169     m_isoManager->dumpInfo();
1170     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1171
1172 }
1173
1174 void StreamProcessorManager::setVerboseLevel(int l) {
1175     setDebugLevel(l);
1176
1177     if (m_isoManager) m_isoManager->setVerboseLevel(l);
1178
1179     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
1180     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1181         it != m_ReceiveProcessors.end();
1182         ++it ) {
1183         (*it)->setVerboseLevel(l);
1184     }
1185
1186     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
1187     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1188         it != m_TransmitProcessors.end();
1189         ++it ) {
1190         (*it)->setVerboseLevel(l);
1191     }
1192 }
1193
1194
1195 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1196     int count=0;
1197
1198     if (direction == Port::E_Capture) {
1199         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1200             it != m_ReceiveProcessors.end();
1201             ++it ) {
1202             count += (*it)->getPortCount(type);
1203         }
1204     } else {
1205         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1206             it != m_TransmitProcessors.end();
1207             ++it ) {
1208             count += (*it)->getPortCount(type);
1209         }
1210     }
1211     return count;
1212 }
1213
1214 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1215     int count=0;
1216
1217     if (direction == Port::E_Capture) {
1218         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1219             it != m_ReceiveProcessors.end();
1220             ++it ) {
1221             count += (*it)->getPortCount();
1222         }
1223     } else {
1224         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1225             it != m_TransmitProcessors.end();
1226             ++it ) {
1227             count += (*it)->getPortCount();
1228         }
1229     }
1230     return count;
1231 }
1232
1233 // TODO: implement a port map here, instead of the loop
1234
1235 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1236     int count=0;
1237     int prevcount=0;
1238
1239     if (direction == Port::E_Capture) {
1240         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1241             it != m_ReceiveProcessors.end();
1242             ++it ) {
1243             count += (*it)->getPortCount();
1244             if (count > idx) {
1245                 return (*it)->getPortAtIdx(idx-prevcount);
1246             }
1247             prevcount=count;
1248         }
1249     } else {
1250         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1251             it != m_TransmitProcessors.end();
1252             ++it ) {
1253             count += (*it)->getPortCount();
1254             if (count > idx) {
1255                 return (*it)->getPortAtIdx(idx-prevcount);
1256             }
1257             prevcount=count;
1258         }
1259     }
1260     return NULL;
1261 }
1262
1263 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1264     m_thread_realtime=rt;
1265     m_thread_priority=priority;
1266     return true;
1267 }
1268
1269
1270 } // end of namespace
Note: See TracBrowser for help on using the browser.