root/branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp

Revision 708, 43.8 kB (checked in by ppalmers, 16 years ago)

some more startup checks

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public
11  * License version 2.1, as published by the Free Software Foundation;
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public
19  * License along with this library; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
21  * MA 02110-1301 USA
22  */
23
24 #include "StreamProcessorManager.h"
25 #include "generic/StreamProcessor.h"
26 #include "generic/Port.h"
27 #include "util/cycletimer.h"
28
29 #include <errno.h>
30 #include <assert.h>
31
32 #define RUNNING_TIMEOUT_MSEC 4000
33 #define PREPARE_TIMEOUT_MSEC 4000
34 #define ENABLE_TIMEOUT_MSEC 4000
35
36 namespace Streaming {
37
38 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
39
40 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int nb_buffers)
41     : m_is_slave( false )
42     , m_SyncSource(NULL)
43     , m_nb_buffers(nb_buffers)
44     , m_period(period)
45     , m_xruns(0)
46     , m_isoManager(0)
47     , m_nbperiods(0)
48 {
49     addOption(Util::OptionContainer::Option("slaveMode",false));
50 }
51
52 StreamProcessorManager::~StreamProcessorManager() {
53     if (m_isoManager) delete m_isoManager;
54
55 }
56
57 /**
58  * Registers \ref processor with this manager.
59  *
60  * also registers it with the isohandlermanager
61  *
62  * be sure to call isohandlermanager->init() first!
63  * and be sure that the processors are also ->init()'ed
64  *
65  * @param processor
66  * @return true if successfull
67  */
68 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
69 {
70     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
71     assert(processor);
72     assert(m_isoManager);
73
74     if (processor->getType()==StreamProcessor::E_Receive) {
75         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
76
77         m_ReceiveProcessors.push_back(processor);
78
79         processor->setManager(this);
80
81         return true;
82     }
83
84     if (processor->getType()==StreamProcessor::E_Transmit) {
85         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
86
87         m_TransmitProcessors.push_back(processor);
88
89         processor->setManager(this);
90
91         return true;
92     }
93
94     debugFatal("Unsupported processor type!\n");
95
96     return false;
97 }
98
99 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
100 {
101     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
102     assert(processor);
103
104     if (processor->getType()==StreamProcessor::E_Receive) {
105
106         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
107             it != m_ReceiveProcessors.end();
108             ++it ) {
109
110             if ( *it == processor ) {
111                     m_ReceiveProcessors.erase(it);
112
113                     processor->clearManager();
114
115                     if(!m_isoManager->unregisterStream(processor)) {
116                         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
117
118                         return false;
119
120                     }
121
122                     return true;
123                 }
124         }
125     }
126
127     if (processor->getType()==StreamProcessor::E_Transmit) {
128         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
129             it != m_TransmitProcessors.end();
130             ++it ) {
131
132             if ( *it == processor ) {
133                     m_TransmitProcessors.erase(it);
134
135                     processor->clearManager();
136
137                     if(!m_isoManager->unregisterStream(processor)) {
138                         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
139
140                         return false;
141
142                     }
143
144                     return true;
145                 }
146         }
147     }
148
149     debugFatal("Processor (%p) not found!\n",processor);
150
151     return false; //not found
152
153 }
154
155 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
156     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
157
158     m_SyncSource=s;
159     return true;
160 }
161
162 StreamProcessor *StreamProcessorManager::getSyncSource() {
163     return m_SyncSource;
164 }
165
166 bool StreamProcessorManager::init()
167 {
168     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
169
170     m_isoManager=new IsoHandlerManager(m_thread_realtime, m_thread_priority + 1);
171
172     if(!m_isoManager) {
173         debugFatal("Could not create IsoHandlerManager\n");
174         return false;
175     }
176
177     // propagate the debug level
178     m_isoManager->setVerboseLevel(getDebugLevel());
179
180     if(!m_isoManager->init()) {
181         debugFatal("Could not initialize IsoHandlerManager\n");
182         return false;
183     }
184
185     m_xrun_happened=false;
186
187     return true;
188 }
189
190 bool StreamProcessorManager::prepare() {
191
192     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
193
194     m_is_slave=false;
195     if(!getOption("slaveMode", m_is_slave)) {
196         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
197     }
198
199     // if no sync source is set, select one here
200     if(m_SyncSource == NULL) {
201        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
202     }
203
204     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
205         it != m_ReceiveProcessors.end();
206         ++it ) {
207             if(m_SyncSource == NULL) {
208                 debugWarning(" => Sync Source is %p.\n", *it);
209                 m_SyncSource = *it;
210             }
211     }
212
213     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
214         it != m_TransmitProcessors.end();
215         ++it ) {
216             if(m_SyncSource == NULL) {
217                 debugWarning(" => Sync Source is %p.\n", *it);
218                 m_SyncSource = *it;
219             }
220     }
221
222     // now do the actual preparation
223     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
224     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
225         it != m_ReceiveProcessors.end();
226         ++it ) {
227
228         if(!(*it)->setSyncSource(m_SyncSource)) {
229             debugFatal(  " could not set sync source (%p)...\n",(*it));
230             return false;
231         }
232
233         if(!(*it)->setOption("slaveMode", m_is_slave)) {
234             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
235         }
236
237         if(!(*it)->prepare()) {
238             debugFatal(  " could not prepare (%p)...\n",(*it));
239             return false;
240         }
241     }
242
243     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
244     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
245         it != m_TransmitProcessors.end();
246         ++it ) {
247         if(!(*it)->setSyncSource(m_SyncSource)) {
248             debugFatal(  " could not set sync source (%p)...\n",(*it));
249             return false;
250         }
251         if(!(*it)->setOption("slaveMode", m_is_slave)) {
252             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
253         }
254         if(!(*it)->prepare()) {
255             debugFatal( " could not prepare (%p)...\n",(*it));
256             return false;
257         }
258     }
259
260     // if there are no stream processors registered,
261     // fail
262     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
263         debugFatal("No stream processors registered, can't do anything usefull\n");
264         return false;
265     }
266
267     return true;
268 }
269
270 bool StreamProcessorManager::syncStartAll() {
271
272     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for StreamProcessor streams to start running...\n");
273     // we have to wait until all streamprocessors indicate that they are running
274     // i.e. that there is actually some data stream flowing
275     int wait_cycles=RUNNING_TIMEOUT_MSEC; // two seconds
276     bool notRunning=true;
277     while (notRunning && wait_cycles) {
278         wait_cycles--;
279         notRunning=false;
280
281         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
282                 it != m_ReceiveProcessors.end();
283                 ++it ) {
284             if(!(*it)->isRunning()) notRunning=true;
285         }
286
287         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
288                 it != m_TransmitProcessors.end();
289                 ++it ) {
290             if(!(*it)->isRunning()) notRunning=true;
291         }
292
293         usleep(1000);
294         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Running check: %d\n", notRunning);
295     }
296
297     if(!wait_cycles) { // timout has occurred
298         debugFatal("One or more streams are not starting up (timeout):\n");
299
300         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
301                 it != m_ReceiveProcessors.end();
302                 ++it ) {
303             if(!(*it)->isRunning()) {
304                 debugFatal(" receive stream %p not running\n",*it);
305             } else {
306                 debugFatal(" receive stream %p running\n",*it);
307             }
308         }
309
310         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
311                 it != m_TransmitProcessors.end();
312                 ++it ) {
313             if(!(*it)->isRunning()) {
314                 debugFatal(" transmit stream %p not running\n",*it);
315             } else {
316                 debugFatal(" transmit stream %p running\n",*it);
317             }
318         }
319         return false;
320     }
321
322     // we want to make sure that everything is running well,
323     // so wait for a while
324 //     usleep(USECS_PER_CYCLE * CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL);
325
326     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
327
328     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
329
330     int max_of_min_delay=0;
331     int min_delay=0;
332     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
333             it != m_ReceiveProcessors.end();
334             ++it ) {
335         min_delay=(*it)->getMinimalSyncDelay();
336         if(min_delay>max_of_min_delay) max_of_min_delay=min_delay;
337     }
338
339     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
340             it != m_TransmitProcessors.end();
341             ++it ) {
342         min_delay=(*it)->getMinimalSyncDelay();
343         if(min_delay>max_of_min_delay) max_of_min_delay=min_delay;
344     }
345
346     debugOutput( DEBUG_LEVEL_VERBOSE, "  %d ticks\n", max_of_min_delay);
347     m_SyncSource->setSyncDelay(max_of_min_delay);
348
349
350     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting StreamProcessors...\n");
351     // now we reset the frame counters
352     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
353             it != m_ReceiveProcessors.end();
354             ++it ) {
355         // get the receive SP's going at receiving data
356         (*it)->m_data_buffer->setTransparent(false);
357         (*it)->reset();
358     }
359
360     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
361             it != m_TransmitProcessors.end();
362             ++it ) {
363         // make sure the SP retains it's prefilled data
364         (*it)->m_data_buffer->setTransparent(false);
365         (*it)->reset();
366     }
367    
368     dumpInfo();
369     // All buffers are prefilled and set-up, the only thing
370     // that remains a mistery is the timestamp information.
371 //     m_SyncSource->m_data_buffer->setTransparent(false);
372 //     debugShowBackLog();
373    
374     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
375     // in order to obtain that, we wait for the first periods to be
376     // received.
377     int nb_sync_runs=10;
378     while(nb_sync_runs--) { // or while not sync-ed?
379         waitForPeriod();
380
381         // drop the frames for all receive SP's
382         dryRun(StreamProcessor::E_Receive);
383        
384         // we don't have to dryrun for the xmit SP's since they
385         // are not sending data yet.
386     }
387
388     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
389         m_time_of_transfer,
390         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
391         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
392         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
393     // FIXME: xruns can screw up the framecounter accounting. do something more sane here
394     resetXrunCounters();
395     // lock the isohandlermanager such that things freeze
396 //     debugShowBackLog();
397
398     // we now should have decent sync info
399     // the buffers of the receive streams should be (approx) empty
400     // the buffers of the xmit streams should be full
401    
402     // note what should the timestamp of the first sample be?
403    
404     // at this point the buffer head timestamp of the transmit buffers can be
405     // set properly since we know the sync source's timestamp of the last
406     // buffer transfer. we also know the rate.
407    
408     debugOutput( DEBUG_LEVEL_VERBOSE, " propagate sync info...\n");
409     // FIXME: in the SPM it would be nice to have system time instead of
410     //        1394 time
411 //     float rate=m_SyncSource->getTicksPerFrame();
412 //     int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers*m_period))*rate);
413 //     // the data at the front of the buffer is intended to be transmitted
414 //     // nb_periods*period_size after the last received period
415 //     int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
416
417     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
418             it != m_TransmitProcessors.end();
419             ++it ) {
420         // FIXME: encapsulate
421         (*it)->m_data_buffer->setBufferHeadTimestamp(m_time_of_transfer);
422         //(*it)->m_data_buffer->setNominalRate(rate); //CHECK!!!
423     }
424    
425     dumpInfo();
426    
427     // this is the place were we should be syncing the received streams too
428     // check how much they differ
429    
430    
431     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n");
432
433     // FIXME: this should not be in cycles, but in 'time'
434     // FIXME: remove the timestamp
435     if (!enableStreamProcessors(0)) {
436         debugFatal("Could not enable StreamProcessors...\n");
437         return false;
438     }
439
440     debugOutput( DEBUG_LEVEL_VERBOSE, "Running dry for a while...\n");
441     #define MAX_DRYRUN_CYCLES               20
442     #define MIN_SUCCESSFUL_DRYRUN_CYCLES    4
443     // run some cycles 'dry' such that everything can stabilize
444     int nb_dryrun_cycles_left = MAX_DRYRUN_CYCLES;
445     int nb_succesful_cycles = 0;
446     while(nb_dryrun_cycles_left > 0 &&
447           nb_succesful_cycles < MIN_SUCCESSFUL_DRYRUN_CYCLES ) {
448
449         waitForPeriod();
450
451         if (dryRun()) {
452             nb_succesful_cycles++;
453         } else {
454             debugOutput( DEBUG_LEVEL_VERBOSE, " This dry-run was not xrun free...\n" );
455             resetXrunCounters();
456             nb_succesful_cycles = 0;
457             // FIXME: xruns can screw up the framecounter accounting. do something more sane here
458         }
459         nb_dryrun_cycles_left--;
460     }
461
462     if(nb_dryrun_cycles_left == 0) {
463         debugOutput( DEBUG_LEVEL_VERBOSE, " max # dry-run cycles achieved without steady-state...\n" );
464         return false;
465     }
466     debugOutput( DEBUG_LEVEL_VERBOSE, " dry-run resulted in steady-state...\n" );
467
468     // now we should clear the xrun flags
469     resetXrunCounters();
470
471     // and off we go
472     return true;
473 }
474
475 void StreamProcessorManager::resetXrunCounters(){
476     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting xrun flags...\n");
477     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
478         it != m_ReceiveProcessors.end();
479         ++it )
480     {
481         (*it)->resetXrunCounter();
482     }
483
484     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
485         it != m_TransmitProcessors.end();
486         ++it )
487     {
488         (*it)->resetXrunCounter();
489     }
490 }
491
492 bool StreamProcessorManager::start() {
493     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
494     assert(m_isoManager);
495
496     debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
497     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
498     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
499         it != m_ReceiveProcessors.end();
500         ++it ) {
501             if (!(*it)->prepareForStart()) {
502                 debugOutput(DEBUG_LEVEL_VERBOSE,"Receive stream processor (%p) failed to prepare for start\n", *it);
503                 return false;
504             }
505             if (!m_isoManager->registerStream(*it)) {
506                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
507                 return false;
508             }
509         }
510
511     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
512     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
513         it != m_TransmitProcessors.end();
514         ++it ) {
515             if (!(*it)->prepareForStart()) {
516                 debugOutput(DEBUG_LEVEL_VERBOSE,"Transmit stream processor (%p) failed to prepare for start\n", *it);
517                 return false;
518             }
519             if (!m_isoManager->registerStream(*it)) {
520                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
521                 return false;
522             }
523         }
524
525     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
526     if (!m_isoManager->prepare()) {
527         debugFatal("Could not prepare isoManager\n");
528         return false;
529     }
530
531     debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
532         if (!disableStreamProcessors()) {
533         debugFatal("Could not disable StreamProcessors...\n");
534         return false;
535     }
536
537     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n");
538     if (!m_isoManager->startHandlers(-1)) {
539         debugFatal("Could not start handlers...\n");
540         return false;
541     }
542
543     // start all SP's synchonized
544     if (!syncStartAll()) {
545         debugFatal("Could not syncStartAll...\n");
546         return false;
547     }
548
549     // dump the iso stream information when in verbose mode
550     if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
551         m_isoManager->dumpInfo();
552     }
553
554     return true;
555
556 }
557
558 bool StreamProcessorManager::stop() {
559     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
560     assert(m_isoManager);
561
562     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to prepare to stop...\n");
563     // Most stream processors can just stop without special treatment.  However, some
564     // (like the MOTU) need to do a few things before it's safe to turn off the iso
565     // handling.
566     int wait_cycles=PREPARE_TIMEOUT_MSEC; // two seconds ought to be sufficient
567     bool allReady = false;
568     while (!allReady && wait_cycles) {
569         wait_cycles--;
570         allReady = true;
571
572         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
573             it != m_ReceiveProcessors.end();
574             ++it ) {
575             if(!(*it)->prepareForStop()) allReady = false;
576         }
577
578         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
579             it != m_TransmitProcessors.end();
580             ++it ) {
581             if(!(*it)->prepareForStop()) allReady = false;
582         }
583         usleep(1000);
584     }
585
586     debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
587         if (!disableStreamProcessors()) {
588         debugFatal("Could not disable StreamProcessors...\n");
589         return false;
590     }
591
592     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n");
593     if(!m_isoManager->stopHandlers()) {
594        debugFatal("Could not stop ISO handlers\n");
595        return false;
596     }
597
598     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processors from handlers...\n");
599     // now unregister all streams from iso manager
600     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
601     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
602         it != m_ReceiveProcessors.end();
603         ++it ) {
604             if (!m_isoManager->unregisterStream(*it)) {
605                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it);
606                 return false;
607             }
608
609         }
610
611     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
612     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
613         it != m_TransmitProcessors.end();
614         ++it ) {
615             if (!m_isoManager->unregisterStream(*it)) {
616                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it);
617                 return false;
618             }
619
620         }
621
622     return true;
623
624 }
625
626 /**
627  * Enables the registered StreamProcessors
628  * @return true if successful, false otherwise
629  */
630 bool StreamProcessorManager::enableStreamProcessors(uint64_t time_to_enable_at) {
631     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors at %llu...\n", time_to_enable_at);
632
633     debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor (%p)...\n",m_SyncSource);
634     debugOutput( DEBUG_LEVEL_VERBOSE, "  Prepare...\n");
635     if (!m_SyncSource->prepareForEnable(time_to_enable_at)) {
636             debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n");
637         return false;
638     }
639
640     debugOutput( DEBUG_LEVEL_VERBOSE, "  Enable...\n");
641     m_SyncSource->enable(time_to_enable_at);
642
643     debugOutput( DEBUG_LEVEL_VERBOSE, " Other StreamProcessors...\n");
644
645     // we prepare the streamprocessors for enable
646     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
647             it != m_ReceiveProcessors.end();
648             ++it ) {
649         if(*it != m_SyncSource) {
650             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Receive SP (%p)...\n",*it);
651             (*it)->prepareForEnable(time_to_enable_at);
652         }
653     }
654
655     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
656             it != m_TransmitProcessors.end();
657             ++it ) {
658         if(*it != m_SyncSource) {
659             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Transmit SP (%p)...\n",*it);
660             (*it)->prepareForEnable(time_to_enable_at);
661         }
662     }
663
664     // then we enable the streamprocessors
665     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
666             it != m_ReceiveProcessors.end();
667             ++it ) {
668         if(*it != m_SyncSource) {
669             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Receive SP (%p)...\n",*it);
670             (*it)->enable(time_to_enable_at);
671         }
672     }
673
674     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
675             it != m_TransmitProcessors.end();
676             ++it ) {
677         if(*it != m_SyncSource) {
678             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Transmit SP (%p)...\n",*it);
679             (*it)->enable(time_to_enable_at);
680         }
681     }
682
683     // now we wait for the SP's to get enabled
684 //     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be enabled...\n");
685 //     // we have to wait until all streamprocessors indicate that they are running
686 //     // i.e. that there is actually some data stream flowing
687 //     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds
688 //     bool notEnabled=true;
689 //     while (notEnabled && wait_cycles) {
690 //         wait_cycles--;
691 //         notEnabled=false;
692 //
693 //         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
694 //                 it != m_ReceiveProcessors.end();
695 //                 ++it ) {
696 //             if(!(*it)->isEnabled()) notEnabled=true;
697 //         }
698 //
699 //         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
700 //                 it != m_TransmitProcessors.end();
701 //                 ++it ) {
702 //             if(!(*it)->isEnabled()) notEnabled=true;
703 //         }
704 //         usleep(1000); // one cycle
705 //     }
706 //
707 //     if(!wait_cycles) { // timout has occurred
708 //         debugFatal("One or more streams couldn't be enabled (timeout):\n");
709 //
710 //         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
711 //                 it != m_ReceiveProcessors.end();
712 //                 ++it ) {
713 //             if(!(*it)->isEnabled()) {
714 //                     debugFatal(" receive stream %p not enabled\n",*it);
715 //             } else {
716 //                     debugFatal(" receive stream %p enabled\n",*it);
717 //             }
718 //         }
719 //
720 //         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
721 //                 it != m_TransmitProcessors.end();
722 //                 ++it ) {
723 //             if(!(*it)->isEnabled()) {
724 //                     debugFatal(" transmit stream %p not enabled\n",*it);
725 //             } else {
726 //                     debugFatal(" transmit stream %p enabled\n",*it);
727 //             }
728 //         }
729 //         return false;
730 //     }
731
732     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors enabled...\n");
733
734     return true;
735 }
736
737 /**
738  * Disables the registered StreamProcessors
739  * @return true if successful, false otherwise
740  */
741 bool StreamProcessorManager::disableStreamProcessors() {
742     // we prepare the streamprocessors for disable
743     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
744             it != m_ReceiveProcessors.end();
745             ++it ) {
746         (*it)->prepareForDisable();
747     }
748
749     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
750             it != m_TransmitProcessors.end();
751             ++it ) {
752         (*it)->prepareForDisable();
753     }
754
755     // then we disable the streamprocessors
756     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
757             it != m_ReceiveProcessors.end();
758             ++it ) {
759         (*it)->disable();
760         (*it)->m_data_buffer->setTransparent(true);
761     }
762
763     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
764             it != m_TransmitProcessors.end();
765             ++it ) {
766         (*it)->disable();
767         (*it)->m_data_buffer->setTransparent(true);
768     }
769
770     // now we wait for the SP's to get disabled
771     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be disabled...\n");
772     // we have to wait until all streamprocessors indicate that they are running
773     // i.e. that there is actually some data stream flowing
774     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds
775     bool enabled=true;
776     while (enabled && wait_cycles) {
777         wait_cycles--;
778         enabled=false;
779
780         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
781                 it != m_ReceiveProcessors.end();
782                 ++it ) {
783             if((*it)->isEnabled()) enabled=true;
784         }
785
786         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
787                 it != m_TransmitProcessors.end();
788                 ++it ) {
789             if((*it)->isEnabled()) enabled=true;
790         }
791         usleep(1000); // one cycle
792     }
793
794     if(!wait_cycles) { // timout has occurred
795         debugFatal("One or more streams couldn't be disabled (timeout):\n");
796
797         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
798                 it != m_ReceiveProcessors.end();
799                 ++it ) {
800             if(!(*it)->isEnabled()) {
801                     debugFatal(" receive stream %p not enabled\n",*it);
802             } else {
803                     debugFatal(" receive stream %p enabled\n",*it);
804             }
805         }
806
807         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
808                 it != m_TransmitProcessors.end();
809                 ++it ) {
810             if(!(*it)->isEnabled()) {
811                     debugFatal(" transmit stream %p not enabled\n",*it);
812             } else {
813                     debugFatal(" transmit stream %p enabled\n",*it);
814             }
815         }
816         return false;
817     }
818
819     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors disabled...\n");
820
821     return true;
822 }
823
824 /**
825  * Called upon Xrun events. This brings all StreamProcessors back
826  * into their starting state, and then carries on streaming. This should
827  * have the same effect as restarting the whole thing.
828  *
829  * @return true if successful, false otherwise
830  */
831 bool StreamProcessorManager::handleXrun() {
832
833     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
834
835     dumpInfo();
836
837     /*
838      * Reset means:
839      * 1) Disabling the SP's, so that they don't process any packets
840      *    note: the isomanager does keep on delivering/requesting them
841      * 2) Bringing all buffers & streamprocessors into a know state
842      *    - Clear all capture buffers
843      *    - Put nb_periods*period_size of null frames into the playback buffers
844      * 3) Re-enable the SP's
845      */
846     debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
847         if (!disableStreamProcessors()) {
848         debugFatal("Could not disable StreamProcessors...\n");
849         return false;
850     }
851
852     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
853     // start all SP's synchonized
854     if (!syncStartAll()) {
855         debugFatal("Could not syncStartAll...\n");
856         return false;
857     }
858
859     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
860
861     return true;
862 }
863
864 /**
865  * @brief Waits until the next period of samples is ready
866  *
867  * This function does not return until a full period of samples is (or should be)
868  * ready to be transferred.
869  *
870  * @return true if the period is ready, false if an xrun occurred
871  */
872 bool StreamProcessorManager::waitForPeriod() {
873     int time_till_next_period;
874     bool xrun_occurred=false;
875
876     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
877
878     assert(m_SyncSource);
879
880     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
881
882     while(time_till_next_period > 0) {
883         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
884
885         // wait for the period
886         usleep(time_till_next_period);
887
888         // check for underruns on the ISO side,
889         // those should make us bail out of the wait loop
890         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
891             it != m_ReceiveProcessors.end();
892             ++it ) {
893             // a xrun has occurred on the Iso side
894             xrun_occurred |= (*it)->xrunOccurred();
895         }
896         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
897             it != m_TransmitProcessors.end();
898             ++it ) {
899             // a xrun has occurred on the Iso side
900             xrun_occurred |= (*it)->xrunOccurred();
901         }
902
903         if(xrun_occurred) break;
904
905         // check if we were waked up too soon
906         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
907     }
908
909     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", time_till_next_period);
910
911     // this is to notify the client of the delay
912     // that we introduced
913     m_delayed_usecs=time_till_next_period;
914
915     // we save the 'ideal' time of the transfer at this point,
916     // because we can have interleaved read - process - write
917     // cycles making that we modify a receiving stream's buffer
918     // before we get to writing.
919     // NOTE: before waitForPeriod() is called again, both the transmit
920     //       and the receive processors should have done their transfer.
921     m_time_of_transfer=m_SyncSource->getTimeAtPeriod();
922     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
923         m_time_of_transfer);
924
925 #ifdef DEBUG
926     int rcv_bf=0, xmt_bf=0;
927     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
928         it != m_ReceiveProcessors.end();
929         ++it ) {
930         rcv_bf = (*it)->getBufferFill();
931     }
932     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
933         it != m_TransmitProcessors.end();
934         ++it ) {
935         xmt_bf = (*it)->getBufferFill();
936     }
937     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
938         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
939
940 #endif
941
942     xrun_occurred=false;
943
944     // check if xruns occurred on the Iso side.
945     // also check if xruns will occur should we transfer() now
946
947     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
948           it != m_ReceiveProcessors.end();
949           ++it ) {
950         // a xrun has occurred on the Iso side
951         xrun_occurred |= (*it)->xrunOccurred();
952
953         // if this is true, a xrun will occur
954         xrun_occurred |= !((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled();
955
956 #ifdef DEBUG
957         if ((*it)->xrunOccurred()) {
958             debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it);
959             (*it)->dumpInfo();
960         }
961         if (!((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled()) {
962             debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it);
963             (*it)->dumpInfo();
964         }
965 #endif
966
967     }
968     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
969           it != m_TransmitProcessors.end();
970           ++it ) {
971         // a xrun has occurred on the Iso side
972         xrun_occurred |= (*it)->xrunOccurred();
973
974         // if this is true, a xrun will occur
975         xrun_occurred |= !((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled();
976
977 #ifdef DEBUG
978         if ((*it)->xrunOccurred()) {
979             debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it);
980         }
981         if (!((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled()) {
982             debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it);
983         }
984 #endif
985     }
986
987     m_nbperiods++;
988
989     // now we can signal the client that we are (should be) ready
990     return !xrun_occurred;
991 }
992
993 /**
994  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
995  *
996  * Transfers one period of frames from the client side to the Iso side and vice versa.
997  *
998  * @return true if successful, false otherwise (indicates xrun).
999  */
1000 bool StreamProcessorManager::transfer() {
1001
1002     debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n");
1003     bool retval=true;
1004     retval &= dryRun(StreamProcessor::E_Receive);
1005     retval &= dryRun(StreamProcessor::E_Transmit);
1006     return retval;
1007 }
1008
1009 /**
1010  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
1011  *
1012  * Transfers one period of frames from the client side to the Iso side or vice versa.
1013  *
1014  * @param t The processor type to tranfer for (receive or transmit)
1015  * @return true if successful, false otherwise (indicates xrun).
1016  */
1017
1018 bool StreamProcessorManager::transfer(enum StreamProcessor::EProcessorType t) {
1019     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
1020     bool retval = true;
1021     // a static cast could make sure that there is no performance
1022     // penalty for the virtual functions (to be checked)
1023     if (t==StreamProcessor::E_Receive) {
1024         // determine the time at which we want reception to start
1025         float rate=m_SyncSource->getTicksPerFrame();
1026         int64_t one_frame_in_ticks=(int64_t)(((float)m_period)*rate);
1027        
1028         int64_t receive_timestamp = substractTicks(m_time_of_transfer, one_frame_in_ticks);
1029        
1030         if(receive_timestamp<0) {
1031             debugWarning("receive ts < 0.0 : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
1032              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
1033         }
1034         if(receive_timestamp>(128L*TICKS_PER_SECOND)) {
1035             debugWarning("receive ts > 128L*TICKS_PER_SECOND : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
1036              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
1037         }
1038        
1039         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1040                 it != m_ReceiveProcessors.end();
1041                 ++it ) {
1042
1043             if(!(*it)->getFrames(m_period, receive_timestamp)) {
1044                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)\n",
1045                             m_period, m_time_of_transfer,*it);
1046                 retval &= false; // buffer underrun
1047             }
1048         }
1049     } else {
1050         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1051                 it != m_TransmitProcessors.end();
1052                 ++it ) {
1053             // FIXME: in the SPM it would be nice to have system time instead of
1054             //        1394 time
1055             float rate=m_SyncSource->getTicksPerFrame();
1056             int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers*m_period))*rate);
1057
1058             // the data we are putting into the buffer is intended to be transmitted
1059             // one ringbuffer size after it has been received
1060             int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1061
1062             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
1063                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)\n",
1064                         m_period, transmit_timestamp, *it);
1065                 retval &= false; // buffer underrun
1066             }
1067
1068         }
1069     }
1070     return retval;
1071 }
1072
1073 /**
1074  * @brief Dry run one period for both receive and transmit StreamProcessors
1075  *
1076  * Process one period of frames for all streamprocessors, without touching the
1077  * client buffers. This only removes an incoming period from the ISO receive buffer and
1078  * puts one period of silence into the transmit buffers.
1079  *
1080  * @return true if successful, false otherwise (indicates xrun).
1081  */
1082 bool StreamProcessorManager::dryRun() {
1083     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Dry-running period...\n");
1084     bool retval=true;
1085     retval &= dryRun(StreamProcessor::E_Receive);
1086     retval &= dryRun(StreamProcessor::E_Transmit);
1087     return retval;
1088 }
1089
1090 /**
1091  * @brief Dry run one period for either the receive or transmit StreamProcessors
1092  *
1093  * see dryRun()
1094  *
1095  * @param t The processor type to dryRun for (receive or transmit)
1096  * @return true if successful, false otherwise (indicates xrun).
1097  */
1098
1099 bool StreamProcessorManager::dryRun(enum StreamProcessor::EProcessorType t) {
1100     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Dry-running period...\n");
1101     bool retval = true;
1102     // a static cast could make sure that there is no performance
1103     // penalty for the virtual functions (to be checked)
1104     if (t==StreamProcessor::E_Receive) {
1105         // determine the time at which we want reception to start
1106         float rate=m_SyncSource->getTicksPerFrame();
1107         int64_t one_frame_in_ticks=(int64_t)(((float)m_period)*rate);
1108        
1109         int64_t receive_timestamp = substractTicks(m_time_of_transfer, one_frame_in_ticks);
1110        
1111         if(receive_timestamp<0) {
1112             debugWarning("receive ts < 0.0 : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
1113              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
1114         }
1115         if(receive_timestamp>(128L*TICKS_PER_SECOND)) {
1116             debugWarning("receive ts > 128L*TICKS_PER_SECOND : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
1117              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
1118         }
1119        
1120         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1121                 it != m_ReceiveProcessors.end();
1122                 ++it ) {
1123
1124             if(!(*it)->getFramesDry(m_period, receive_timestamp)) {
1125                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)\n",
1126                             m_period, m_time_of_transfer,*it);
1127                 retval &= false; // buffer underrun
1128             }
1129
1130         }
1131     } else {
1132         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1133                 it != m_TransmitProcessors.end();
1134                 ++it ) {
1135             // FIXME: in the SPM it would be nice to have system time instead of
1136             //        1394 time
1137             float rate=m_SyncSource->getTicksPerFrame();
1138             int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers*m_period))*rate);
1139
1140             // the data we are putting into the buffer is intended to be transmitted
1141             // one ringbuffer size after it has been received
1142             int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1143
1144             if(!(*it)->putFramesDry(m_period, transmit_timestamp)) {
1145                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)\n",
1146                         m_period, transmit_timestamp, *it);
1147                 retval &= false; // buffer underrun
1148             }
1149
1150         }
1151     }
1152     return retval;
1153 }
1154
1155 void StreamProcessorManager::dumpInfo() {
1156     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1157     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
1158     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
1159
1160     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
1161     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1162         it != m_ReceiveProcessors.end();
1163         ++it ) {
1164         (*it)->dumpInfo();
1165     }
1166
1167     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
1168     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1169         it != m_TransmitProcessors.end();
1170         ++it ) {
1171         (*it)->dumpInfo();
1172     }
1173
1174     debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
1175     m_isoManager->dumpInfo();
1176     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1177
1178 }
1179
1180 void StreamProcessorManager::setVerboseLevel(int l) {
1181     setDebugLevel(l);
1182
1183     if (m_isoManager) m_isoManager->setVerboseLevel(l);
1184
1185     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
1186     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1187         it != m_ReceiveProcessors.end();
1188         ++it ) {
1189         (*it)->setVerboseLevel(l);
1190     }
1191
1192     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
1193     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1194         it != m_TransmitProcessors.end();
1195         ++it ) {
1196         (*it)->setVerboseLevel(l);
1197     }
1198 }
1199
1200
1201 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1202     int count=0;
1203
1204     if (direction == Port::E_Capture) {
1205         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1206             it != m_ReceiveProcessors.end();
1207             ++it ) {
1208             count += (*it)->getPortCount(type);
1209         }
1210     } else {
1211         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1212             it != m_TransmitProcessors.end();
1213             ++it ) {
1214             count += (*it)->getPortCount(type);
1215         }
1216     }
1217     return count;
1218 }
1219
1220 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1221     int count=0;
1222
1223     if (direction == Port::E_Capture) {
1224         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1225             it != m_ReceiveProcessors.end();
1226             ++it ) {
1227             count += (*it)->getPortCount();
1228         }
1229     } else {
1230         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1231             it != m_TransmitProcessors.end();
1232             ++it ) {
1233             count += (*it)->getPortCount();
1234         }
1235     }
1236     return count;
1237 }
1238
1239 // TODO: implement a port map here, instead of the loop
1240
1241 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1242     int count=0;
1243     int prevcount=0;
1244
1245     if (direction == Port::E_Capture) {
1246         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1247             it != m_ReceiveProcessors.end();
1248             ++it ) {
1249             count += (*it)->getPortCount();
1250             if (count > idx) {
1251                 return (*it)->getPortAtIdx(idx-prevcount);
1252             }
1253             prevcount=count;
1254         }
1255     } else {
1256         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1257             it != m_TransmitProcessors.end();
1258             ++it ) {
1259             count += (*it)->getPortCount();
1260             if (count > idx) {
1261                 return (*it)->getPortAtIdx(idx-prevcount);
1262             }
1263             prevcount=count;
1264         }
1265     }
1266     return NULL;
1267 }
1268
1269 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1270     m_thread_realtime=rt;
1271     m_thread_priority=priority;
1272     return true;
1273 }
1274
1275
1276 } // end of namespace
Note: See TracBrowser for help on using the browser.