root/trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

Revision 1001, 44.8 kB (checked in by ppalmers, 13 years ago)

Improve streaming startup for better initial timestamps and locking.

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23 #include "config.h"
24
25 #include "StreamProcessorManager.h"
26 #include "generic/StreamProcessor.h"
27 #include "generic/Port.h"
28 #include "libieee1394/cycletimer.h"
29
30 #include "libutil/Time.h"
31
32 #include <errno.h>
33 #include <assert.h>
34 #include <math.h>
35
36 namespace Streaming {
37
38 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
39
40 StreamProcessorManager::StreamProcessorManager()
41     : m_is_slave( false )
42     , m_SyncSource(NULL)
43     , m_xrun_happened( false )
44     , m_nb_buffers( 0 )
45     , m_period( 0 )
46     , m_audio_datatype( eADT_Float )
47     , m_nominal_framerate ( 0 )
48     , m_xruns(0)
49     , m_shutdown_needed(false)
50     , m_nbperiods(0)
51 {
52     addOption(Util::OptionContainer::Option("slaveMode",false));
53 }
54
55 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int framerate, unsigned int nb_buffers)
56     : m_is_slave( false )
57     , m_SyncSource(NULL)
58     , m_xrun_happened( false )
59     , m_nb_buffers(nb_buffers)
60     , m_period(period)
61     , m_audio_datatype( eADT_Float )
62     , m_nominal_framerate ( framerate )
63     , m_xruns(0)
64     , m_shutdown_needed(false)
65     , m_nbperiods(0)
66 {
67     addOption(Util::OptionContainer::Option("slaveMode",false));
68 }
69
70 StreamProcessorManager::~StreamProcessorManager() {
71 }
72
73 void
74 StreamProcessorManager::handleBusReset()
75 {
76     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) Handle bus reset...\n", this);
77
78     // FIXME: we request shutdown for now.
79     m_shutdown_needed=true;
80
81     // note that all receive streams are gone once a device is unplugged
82
83     // synchronize with the wait lock
84     m_WaitLock.Lock();
85
86     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) got wait lock...\n", this);
87     // cause all SP's to bail out
88     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
89           it != m_ReceiveProcessors.end();
90           ++it )
91     {
92         (*it)->handleBusReset();
93     }
94     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
95           it != m_TransmitProcessors.end();
96           ++it )
97     {
98         (*it)->handleBusReset();
99     }
100
101     m_WaitLock.Unlock();
102 }
103
104 /**
105  * Registers \ref processor with this manager.
106  *
107  * also registers it with the isohandlermanager
108  *
109  * be sure to call isohandlermanager->init() first!
110  * and be sure that the processors are also ->init()'ed
111  *
112  * @param processor
113  * @return true if successfull
114  */
115 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
116 {
117     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
118     assert(processor);
119     if (processor->getType() == StreamProcessor::ePT_Receive) {
120         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
121         m_ReceiveProcessors.push_back(processor);
122         return true;
123     }
124
125     if (processor->getType() == StreamProcessor::ePT_Transmit) {
126         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
127         m_TransmitProcessors.push_back(processor);
128         return true;
129     }
130
131     debugFatal("Unsupported processor type!\n");
132     return false;
133 }
134
135 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
136 {
137     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
138     assert(processor);
139
140     if (processor->getType()==StreamProcessor::ePT_Receive) {
141
142         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
143               it != m_ReceiveProcessors.end();
144               ++it )
145         {
146             if ( *it == processor ) {
147                 if (*it == m_SyncSource) {
148                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
149                     m_SyncSource = NULL;
150                 }
151                 m_ReceiveProcessors.erase(it);
152                 return true;
153             }
154         }
155     }
156
157     if (processor->getType()==StreamProcessor::ePT_Transmit) {
158         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
159               it != m_TransmitProcessors.end();
160               ++it )
161         {
162             if ( *it == processor ) {
163                 if (*it == m_SyncSource) {
164                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
165                     m_SyncSource = NULL;
166                 }
167                 m_TransmitProcessors.erase(it);
168                 return true;
169             }
170         }
171     }
172
173     debugFatal("Processor (%p) not found!\n",processor);
174     return false; //not found
175 }
176
177 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
178     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
179     m_SyncSource=s;
180     return true;
181 }
182
183 bool StreamProcessorManager::prepare() {
184
185     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
186     m_is_slave=false;
187     if(!getOption("slaveMode", m_is_slave)) {
188         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
189     }
190
191     m_shutdown_needed=false;
192
193     // if no sync source is set, select one here
194     if(m_SyncSource == NULL) {
195        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
196     }
197
198     // FIXME: put into separate method
199     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
200           it != m_ReceiveProcessors.end();
201           ++it )
202     {
203         if(m_SyncSource == NULL) {
204             debugWarning(" => Sync Source is %p.\n", *it);
205             m_SyncSource = *it;
206         }
207     }
208     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
209           it != m_TransmitProcessors.end();
210           ++it )
211     {
212         if(m_SyncSource == NULL) {
213             debugWarning(" => Sync Source is %p.\n", *it);
214             m_SyncSource = *it;
215         }
216     }
217
218     // now do the actual preparation of the SP's
219     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
220     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
221         it != m_ReceiveProcessors.end();
222         ++it ) {
223
224         if(!(*it)->setOption("slaveMode", m_is_slave)) {
225             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
226         }
227
228         if(!(*it)->prepare()) {
229             debugFatal(  " could not prepare (%p)...\n",(*it));
230             return false;
231         }
232     }
233     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
234     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
235         it != m_TransmitProcessors.end();
236         ++it ) {
237         if(!(*it)->setOption("slaveMode", m_is_slave)) {
238             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
239         }
240         if(!(*it)->prepare()) {
241             debugFatal( " could not prepare (%p)...\n",(*it));
242             return false;
243         }
244     }
245
246     // if there are no stream processors registered,
247     // fail
248     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
249         debugFatal("No stream processors registered, can't do anything usefull\n");
250         return false;
251     }
252     return true;
253 }
254
255 bool StreamProcessorManager::startDryRunning() {
256     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
257     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
258     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
259             it != m_TransmitProcessors.end();
260             ++it ) {
261         if (!(*it)->isDryRunning()) {
262             if(!(*it)->scheduleStartDryRunning(-1)) {
263                 debugError("Could not put SP %p into the dry-running state\n", *it);
264                 return false;
265             }
266         } else {
267             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
268         }
269     }
270     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
271             it != m_ReceiveProcessors.end();
272             ++it ) {
273         if (!(*it)->isDryRunning()) {
274             if(!(*it)->scheduleStartDryRunning(-1)) {
275                 debugError("Could not put SP %p into the dry-running state\n", *it);
276                 return false;
277             }
278         } else {
279             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
280         }
281     }
282     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
283     // wait for the syncsource to start running.
284     // that will block the waitForPeriod call until everyone has started (theoretically)
285     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_DRYRUN; // by then it should have started
286     bool all_dry_running = false;
287     while (!all_dry_running && cnt) {
288         all_dry_running = true;
289         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
290                 it != m_ReceiveProcessors.end();
291                 ++it ) {
292             all_dry_running &= (*it)->isDryRunning();
293         }
294         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
295                 it != m_TransmitProcessors.end();
296                 ++it ) {
297             all_dry_running &= (*it)->isDryRunning();
298         }
299
300         SleepRelativeUsec(125);
301         cnt--;
302     }
303     if(cnt==0) {
304         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
305         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
306                 it != m_ReceiveProcessors.end();
307                 ++it ) {
308             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
309                 (*it)->getTypeString(), *it, (*it)->getStateString());
310         }
311         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
312                 it != m_TransmitProcessors.end();
313                 ++it ) {
314             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
315                 (*it)->getTypeString(), *it, (*it)->getStateString());
316         }
317         return false;
318     }
319     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
320     return true;
321 }
322
323 bool StreamProcessorManager::syncStartAll() {
324     if(m_SyncSource == NULL) return false;
325     // figure out when to get the SP's running.
326     // the xmit SP's should also know the base timestamp
327     // streams should be aligned here
328
329     // now find out how long we have to delay the wait operation such that
330     // the received frames will all be presented to the SP
331     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
332     int max_of_min_delay = 0;
333     int min_delay = 0;
334     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
335             it != m_ReceiveProcessors.end();
336             ++it ) {
337         min_delay = (*it)->getMaxFrameLatency();
338         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
339     }
340
341     // add some processing margin. This only shifts the time
342     // at which the buffer is transfer()'ed. This makes things somewhat
343     // more robust. It should be noted though that shifting the transfer
344     // time to a later time instant also causes the xmit buffer fill to be
345     // lower on average.
346     max_of_min_delay += STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS;
347     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n",
348         max_of_min_delay,
349         (unsigned int)TICKS_TO_SECS(max_of_min_delay),
350         (unsigned int)TICKS_TO_CYCLES(max_of_min_delay),
351         (unsigned int)TICKS_TO_OFFSET(max_of_min_delay));
352     m_SyncSource->setSyncDelay(max_of_min_delay);
353
354     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
355     //        have aquired lock
356     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
357     //sleep(2); // FIXME: be smarter here
358
359     // make sure that we are dry-running long enough for the
360     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
361     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
362    
363     unsigned int nb_sync_runs = (STREAMPROCESSORMANAGER_SYNC_WAIT_TIME_MSEC * getNominalRate());
364     nb_sync_runs /= 1000;
365     nb_sync_runs /= getPeriodSize();
366
367     int64_t time_till_next_period;
368     while(nb_sync_runs--) { // or while not sync-ed?
369         // check if we were woken up too soon
370         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
371         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
372         if(time_till_next_period > 0) {
373             // wait for the period
374             SleepRelativeUsec(time_till_next_period);
375         }
376     }
377
378     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
379     // FIXME: in the SPM it would be nice to have system time instead of
380     //        1394 time
381
382     // we now should have decent sync info on the sync source
383     // determine a point in time where the system should start
384     // figure out where we are now
385     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
386     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
387         time_of_first_sample,
388         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
389         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
390         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
391
392     // start wet-running in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles
393     // this is the time window we have to setup all SP's such that they
394     // can start wet-running correctly.
395     time_of_first_sample = addTicks(time_of_first_sample,
396                                     STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP * TICKS_PER_CYCLE);
397
398     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
399         time_of_first_sample,
400         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
401         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
402         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
403
404     // we should start wet-running the transmit SP's some cycles in advance
405     // such that we know it is wet-running when it should output its first sample
406     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
407                                                  STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE);
408
409     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
410                                                  STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE);
411     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
412         time_to_start_xmit,
413         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
414         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
415         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
416     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
417         time_to_start_recv,
418         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
419         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
420         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
421
422     // at this point the buffer head timestamp of the transmit buffers can be set
423     // this is the presentation time of the first sample in the buffer
424     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
425           it != m_TransmitProcessors.end();
426           ++it ) {
427         (*it)->setBufferHeadTimestamp(time_of_first_sample);
428     }
429
430     // switch syncsource to running state
431     uint64_t time_to_start_sync;
432     // FIXME: this is most likely not going to work for transmit sync sources
433     // but those are unsupported in this version
434     if(m_SyncSource->getType() == StreamProcessor::ePT_Receive ) {
435         time_to_start_sync = time_to_start_recv;
436     } else {
437         time_to_start_sync = time_to_start_xmit;
438     }
439     if(!m_SyncSource->scheduleStartRunning(time_to_start_sync)) {
440         debugError("m_SyncSource->scheduleStartRunning(%11llu) failed\n", time_to_start_sync);
441         return false;
442     }
443
444     // STEP X: switch all non-syncsource SP's over to the running state
445     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
446           it != m_ReceiveProcessors.end();
447           ++it ) {
448         if(*it != m_SyncSource) {
449             if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
450                 debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
451                 return false;
452             }
453         }
454     }
455     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
456           it != m_TransmitProcessors.end();
457           ++it ) {
458         if(*it != m_SyncSource) {
459             if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
460                 debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
461                 return false;
462             }
463         }
464     }
465     // wait for the syncsource to start running.
466     // that will block the waitForPeriod call until everyone has started (theoretically)
467     // note: the SP's are scheduled to start in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles,
468     // so a 20 times this value should be a good timeout
469     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP * 20; // by then it should have started
470     while (!m_SyncSource->isRunning() && cnt) {
471         SleepRelativeUsec(125);
472         cnt--;
473     }
474     if(cnt==0) {
475         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
476         return false;
477     }
478
479     // the sync source is running, we can now read a decent received timestamp from it
480     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
481
482     // and a (still very rough) approximation of the rate
483     float rate = m_SyncSource->getTicksPerFrame();
484     int64_t delay_in_ticks=(int64_t)(((float)((m_nb_buffers-1) * m_period)) * rate);
485     debugOutput( DEBUG_LEVEL_VERBOSE, "  initial time of transfer %010lld, rate %f...\n",
486                 m_time_of_transfer, rate);
487
488     // then use this information to initialize the xmit handlers
489
490     //  we now set the buffer tail timestamp of the transmit buffer
491     //  to the period transfer time instant plus what's nb_buffers - 1
492     //  in ticks. This due to the fact that we (should) have received one period
493     //  worth of ticks at t=m_time_of_transfer
494     //  hence one period of frames should also have been transmitted, which means
495     //  that there should be (nb_buffers - 1) * periodsize of frames in the xmit buffer
496     //  that allows us to calculate the tail timestamp for the buffer.
497
498     int64_t transmit_tail_timestamp = addTicks(m_time_of_transfer, delay_in_ticks);
499
500     debugOutput( DEBUG_LEVEL_VERBOSE, "  preset transmit tail TS %010lld, rate %f...\n",
501                 transmit_tail_timestamp, rate);
502
503     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
504         it != m_TransmitProcessors.end();
505         ++it ) {
506         (*it)->setBufferTailTimestamp(transmit_tail_timestamp);
507         (*it)->setTicksPerFrame(rate);
508     }
509
510     // align the received streams to be phase aligned
511     if(!alignReceivedStreams()) {
512         debugError("Could not align streams...\n");
513         return false;
514     }
515
516     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
517     return true;
518 }
519
520 bool
521 StreamProcessorManager::alignReceivedStreams()
522 {
523     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
524     unsigned int nb_sync_runs;
525     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
526     int64_t diff_between_streams[nb_rcv_sp];
527     int64_t diff;
528
529     unsigned int i;
530
531     unsigned int periods_per_align_try = (STREAMPROCESSORMANAGER_ALIGN_AVERAGE_TIME_MSEC * getNominalRate());
532     periods_per_align_try /= 1000;
533     periods_per_align_try /= getPeriodSize();
534     debugOutput( DEBUG_LEVEL_VERBOSE, " averaging over %u periods...\n", periods_per_align_try);
535
536     bool aligned = false;
537     int cnt = STREAMPROCESSORMANAGER_NB_ALIGN_TRIES;
538     while (!aligned && cnt--) {
539         nb_sync_runs = periods_per_align_try;
540         while(nb_sync_runs) {
541             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
542             if(!waitForPeriod()) {
543                 debugWarning("xrun while aligning streams...\n");
544                 return false;
545             };
546
547             i = 0;
548             for ( i = 0; i < nb_rcv_sp; i++) {
549                 StreamProcessor *s = m_ReceiveProcessors.at(i);
550                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
551                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",
552                     m_SyncSource, s, diff);
553                 if ( nb_sync_runs == periods_per_align_try ) {
554                     diff_between_streams[i] = diff;
555                 } else {
556                     diff_between_streams[i] += diff;
557                 }
558             }
559
560             if(!transferSilence()) {
561                 debugError("Could not transfer silence\n");
562                 return false;
563             }
564             nb_sync_runs--;
565         }
566         // calculate the average offsets
567         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
568         int diff_between_streams_frames[nb_rcv_sp];
569         aligned = true;
570         for ( i = 0; i < nb_rcv_sp; i++) {
571             StreamProcessor *s = m_ReceiveProcessors.at(i);
572
573             diff_between_streams[i] /= periods_per_align_try;
574             diff_between_streams_frames[i] = (int)roundf(diff_between_streams[i] / s->getTicksPerFrame());
575             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",
576                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
577
578             aligned &= (diff_between_streams_frames[i] == 0);
579
580             // reposition the stream
581             if(!s->shiftStream(diff_between_streams_frames[i])) {
582                 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]);
583                 return false;
584             }
585         }
586         if (!aligned) {
587             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
588         }
589     }
590     if (cnt == 0) {
591         debugError("Align failed\n");
592         return false;
593     }
594     return true;
595 }
596
597 bool StreamProcessorManager::start() {
598     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
599
600     // start all SP's synchonized
601     bool start_result = false;
602     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
603         // put all SP's into dry-running state
604         if (!startDryRunning()) {
605             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
606             start_result = false;
607             continue;
608         }
609
610         start_result = syncStartAll();
611         if(start_result) {
612             break;
613         } else {
614             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
615         }
616     }
617     if (!start_result) {
618         debugFatal("Could not syncStartAll...\n");
619         return false;
620     }
621
622     return true;
623 }
624
625 bool StreamProcessorManager::stop() {
626     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
627
628     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
629     // switch SP's over to the dry-running state
630     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
631           it != m_ReceiveProcessors.end();
632           ++it ) {
633         if((*it)->isRunning()) {
634             if(!(*it)->scheduleStopRunning(-1)) {
635                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
636                 return false;
637             }
638         }
639     }
640     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
641           it != m_TransmitProcessors.end();
642           ++it ) {
643         if((*it)->isRunning()) {
644             if(!(*it)->scheduleStopRunning(-1)) {
645                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
646                 return false;
647             }
648         }
649     }
650     // wait for the SP's to get into the dry-running/stopped state
651     int cnt = 8000;
652     bool ready = false;
653     while (!ready && cnt) {
654         ready = true;
655         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
656             it != m_ReceiveProcessors.end();
657             ++it ) {
658             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream());
659         }
660         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
661             it != m_TransmitProcessors.end();
662             ++it ) {
663             ready &= ((*it)->isDryRunning() || (*it)->isStopped() || (*it)->isWaitingForStream());
664         }
665         SleepRelativeUsec(125);
666         cnt--;
667     }
668     if(cnt==0) {
669         debugWarning(" Timeout waiting for the SP's to start dry-running\n");
670         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
671             it != m_ReceiveProcessors.end();
672             ++it ) {
673             (*it)->dumpInfo();
674         }
675         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
676             it != m_TransmitProcessors.end();
677             ++it ) {
678             (*it)->dumpInfo();
679         }
680         return false;
681     }
682
683     // switch SP's over to the stopped state
684     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
685           it != m_ReceiveProcessors.end();
686           ++it ) {
687         if(!(*it)->scheduleStopDryRunning(-1)) {
688             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
689             return false;
690         }
691     }
692     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
693           it != m_TransmitProcessors.end();
694           ++it ) {
695         if(!(*it)->scheduleStopDryRunning(-1)) {
696             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
697             return false;
698         }
699     }
700     // wait for the SP's to get into the stopped state
701     cnt = 8000;
702     ready = false;
703     while (!ready && cnt) {
704         ready = true;
705         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
706             it != m_ReceiveProcessors.end();
707             ++it ) {
708             ready &= (*it)->isStopped();
709         }
710         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
711             it != m_TransmitProcessors.end();
712             ++it ) {
713             ready &= (*it)->isStopped();
714         }
715         SleepRelativeUsec(125);
716         cnt--;
717     }
718     if(cnt==0) {
719         debugWarning(" Timeout waiting for the SP's to stop\n");
720         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
721             it != m_ReceiveProcessors.end();
722             ++it ) {
723             (*it)->dumpInfo();
724         }
725         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
726             it != m_TransmitProcessors.end();
727             ++it ) {
728             (*it)->dumpInfo();
729         }
730         return false;
731     }
732     return true;
733 }
734
735 /**
736  * Called upon Xrun events. This brings all StreamProcessors back
737  * into their starting state, and then carries on streaming. This should
738  * have the same effect as restarting the whole thing.
739  *
740  * @return true if successful, false otherwise
741  */
742 bool StreamProcessorManager::handleXrun() {
743
744     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
745
746     dumpInfo();
747
748     /*
749      * Reset means:
750      * 1) Disabling the SP's, so that they don't process any packets
751      *    note: the isomanager does keep on delivering/requesting them
752      * 2) Bringing all buffers & streamprocessors into a know state
753      *    - Clear all capture buffers
754      *    - Put nb_periods*period_size of null frames into the playback buffers
755      * 3) Re-enable the SP's
756      */
757
758     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
759     // start all SP's synchonized
760     bool start_result = false;
761     for (int ntries=0; ntries < STREAMPROCESSORMANAGER_SYNCSTART_TRIES; ntries++) {
762         if(m_shutdown_needed) {
763             debugOutput(DEBUG_LEVEL_VERBOSE, "Shutdown requested...\n");
764             return true;
765         }
766         // put all SP's into dry-running state
767         if (!startDryRunning()) {
768             debugShowBackLog();
769             debugOutput(DEBUG_LEVEL_VERBOSE, "Could not put SP's in dry-running state (try %d)\n", ntries);
770             start_result = false;
771             continue;
772         }
773
774         start_result = syncStartAll();
775         if(start_result) {
776             break;
777         } else {
778             debugOutput(DEBUG_LEVEL_VERBOSE, "Sync start try %d failed...\n", ntries);
779         }
780     }
781     if (!start_result) {
782         debugFatal("Could not syncStartAll...\n");
783         return false;
784     }
785     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
786
787     return true;
788 }
789
790 /**
791  * @brief Waits until the next period of samples is ready
792  *
793  * This function does not return until a full period of samples is (or should be)
794  * ready to be transferred.
795  *
796  * @return true if the period is ready, false if not
797  */
798 bool StreamProcessorManager::waitForPeriod() {
799     if(m_SyncSource == NULL) return false;
800     if(m_shutdown_needed) return false;
801     bool xrun_occurred = false;
802     bool period_not_ready = true;
803
804     // grab the wait lock
805     m_WaitLock.Lock();
806
807     while(period_not_ready) {
808         debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
809                            "waiting for period (%d frames in buffer)...\n",
810                            m_SyncSource->getBufferFill());
811         bool result;
812         if(m_SyncSource->getType() == StreamProcessor::ePT_Receive) {
813             result = m_SyncSource->waitForConsumePeriod();
814         } else {
815             result = m_SyncSource->waitForProducePeriod();
816         }
817 //         if(!result) {
818 //             debugError("Error waiting for signal\n");
819 //             return false;
820 //         }
821
822         // HACK: this should be solved more elegantly
823         period_not_ready = false;
824         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
825             it != m_ReceiveProcessors.end();
826             ++it ) {
827             bool this_sp_period_ready = (*it)->canConsumePeriod();
828             if (!this_sp_period_ready) {
829                 period_not_ready = true;
830             }
831         }
832         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
833             it != m_TransmitProcessors.end();
834             ++it ) {
835             bool this_sp_period_ready = (*it)->canProducePeriod();
836             if (!this_sp_period_ready) {
837                 period_not_ready = true;
838             }
839         }
840         // check for underruns on the ISO side,
841         // those should make us bail out of the wait loop
842         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
843             it != m_ReceiveProcessors.end();
844             ++it ) {
845             // a xrun has occurred on the Iso side
846             xrun_occurred |= (*it)->xrunOccurred();
847         }
848         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
849             it != m_TransmitProcessors.end();
850             ++it ) {
851             // a xrun has occurred on the Iso side
852             xrun_occurred |= (*it)->xrunOccurred();
853         }
854         if(xrun_occurred) break;
855         // FIXME: make sure we also exit this loop when something else happens (e.g. signal, iso error)
856
857         // if we have to shutdown due to some async event (busreset), do so
858         if(m_shutdown_needed) break;
859     }
860
861     if(xrun_occurred) {
862         debugOutput( DEBUG_LEVEL_VERBOSE, "exit due to xrun...\n");
863     }
864
865     // we save the 'ideal' time of the transfer at this point,
866     // because we can have interleaved read - process - write
867     // cycles making that we modify a receiving stream's buffer
868     // before we get to writing.
869     // NOTE: before waitForPeriod() is called again, both the transmit
870     //       and the receive processors should have done their transfer.
871     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
872    
873     #ifdef DEBUG
874     static uint64_t m_time_of_transfer2 = m_time_of_transfer;
875    
876     int ticks_per_period = (int)(m_SyncSource->getTicksPerFrame() * m_period);
877     int diff=diffTicks(m_time_of_transfer, m_time_of_transfer2);
878     // display message if the difference between two successive tick
879     // values is more than 50 ticks. 1 sample at 48k is 512 ticks
880     // so 50 ticks = 10%, which is a rather large jitter value.
881     if(diff-ticks_per_period > 50 || diff-ticks_per_period < -50) {
882         debugOutput(DEBUG_LEVEL_VERBOSE, "rather large TSP difference TS=%011llu => TS=%011llu (%d, nom %d)\n",
883                                             m_time_of_transfer2, m_time_of_transfer, diff, ticks_per_period);
884     }
885     m_time_of_transfer2 = m_time_of_transfer;
886     #endif
887    
888     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
889                         "transfer at %llu ticks...\n",
890                         m_time_of_transfer);
891
892     // this is to notify the client of the delay that we introduced by waiting
893     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
894     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
895                         "delayed for %d usecs...\n",
896                         m_delayed_usecs);
897
898 #ifdef DEBUG
899     int rcv_bf=0, xmt_bf=0;
900     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
901         it != m_ReceiveProcessors.end();
902         ++it ) {
903         rcv_bf = (*it)->getBufferFill();
904     }
905     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
906         it != m_TransmitProcessors.end();
907         ++it ) {
908         xmt_bf = (*it)->getBufferFill();
909     }
910     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
911                         "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
912                         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
913
914     // check if xruns occurred on the Iso side.
915     // also check if xruns will occur should we transfer() now
916     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
917           it != m_ReceiveProcessors.end();
918           ++it ) {
919
920         if ((*it)->xrunOccurred()) {
921             debugWarning("Xrun on RECV SP %p due to ISO side xrun\n",*it);
922             (*it)->dumpInfo();
923         }
924         if (!((*it)->canClientTransferFrames(m_period))) {
925             debugWarning("Xrun on RECV SP %p due to buffer side xrun\n",*it);
926             (*it)->dumpInfo();
927         }
928     }
929     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
930           it != m_TransmitProcessors.end();
931           ++it ) {
932         if ((*it)->xrunOccurred()) {
933             debugWarning("Xrun on XMIT SP %p due to ISO side xrun\n",*it);
934         }
935         if (!((*it)->canClientTransferFrames(m_period))) {
936             debugWarning("Xrun on XMIT SP %p due to buffer side xrun\n",*it);
937         }
938     }
939 #endif
940
941     m_nbperiods++;
942
943     m_WaitLock.Unlock();
944     // now we can signal the client that we are (should be) ready
945     return !xrun_occurred;
946 }
947
948 /**
949  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
950  *
951  * Transfers one period of frames from the client side to the Iso side and vice versa.
952  *
953  * @return true if successful, false otherwise (indicates xrun).
954  */
955 bool StreamProcessorManager::transfer() {
956     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
957     bool retval=true;
958     retval &= transfer(StreamProcessor::ePT_Receive);
959     retval &= transfer(StreamProcessor::ePT_Transmit);
960     return retval;
961 }
962
963 /**
964  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
965  *
966  * Transfers one period of frames from the client side to the Iso side or vice versa.
967  *
968  * @param t The processor type to tranfer for (receive or transmit)
969  * @return true if successful, false otherwise (indicates xrun).
970  */
971 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
972     if(m_SyncSource == NULL) return false;
973     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
974         "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
975         t, m_time_of_transfer,
976         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
977         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
978         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
979
980     bool retval = true;
981     // a static cast could make sure that there is no performance
982     // penalty for the virtual functions (to be checked)
983     if (t==StreamProcessor::ePT_Receive) {
984         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
985                 it != m_ReceiveProcessors.end();
986                 ++it ) {
987             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
988                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
989                             m_period, m_time_of_transfer,*it);
990                 retval &= false; // buffer underrun
991             }
992         }
993     } else {
994         // FIXME: in the SPM it would be nice to have system time instead of
995         //        1394 time
996         float rate = m_SyncSource->getTicksPerFrame();
997         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)((m_nb_buffers * m_period))) * rate);
998
999         // the data we are putting into the buffer is intended to be transmitted
1000         // one ringbuffer size after it has been received
1001         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1002
1003         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1004                 it != m_TransmitProcessors.end();
1005                 ++it ) {
1006             // FIXME: in the SPM it would be nice to have system time instead of
1007             //        1394 time
1008             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
1009                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
1010                         m_period, transmit_timestamp, *it);
1011                 retval &= false; // buffer underrun
1012             }
1013         }
1014     }
1015     return retval;
1016 }
1017
1018 /**
1019  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
1020  *
1021  * Transfers one period of silence to the Iso side for transmit SP's
1022  * or dump one period of frames for receive SP's
1023  *
1024  * @return true if successful, false otherwise (indicates xrun).
1025  */
1026 bool StreamProcessorManager::transferSilence() {
1027     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
1028     bool retval=true;
1029     retval &= transferSilence(StreamProcessor::ePT_Receive);
1030     retval &= transferSilence(StreamProcessor::ePT_Transmit);
1031     return retval;
1032 }
1033
1034 /**
1035  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
1036  *
1037  * Transfers one period of silence to the Iso side for transmit SP's
1038  * or dump one period of frames for receive SP's
1039  *
1040  * @param t The processor type to tranfer for (receive or transmit)
1041  * @return true if successful, false otherwise (indicates xrun).
1042  */
1043 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
1044     if(m_SyncSource == NULL) return false;
1045     debugOutput( DEBUG_LEVEL_VERY_VERBOSE,
1046         "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
1047         t, m_time_of_transfer,
1048         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
1049         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
1050         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
1051
1052     bool retval = true;
1053     // a static cast could make sure that there is no performance
1054     // penalty for the virtual functions (to be checked)
1055     if (t==StreamProcessor::ePT_Receive) {
1056         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1057                 it != m_ReceiveProcessors.end();
1058                 ++it ) {
1059             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
1060                     debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n",
1061                             m_period, m_time_of_transfer,*it);
1062                 retval &= false; // buffer underrun
1063             }
1064         }
1065     } else {
1066         // FIXME: in the SPM it would be nice to have system time instead of
1067         //        1394 time
1068         float rate = m_SyncSource->getTicksPerFrame();
1069         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
1070
1071         // the data we are putting into the buffer is intended to be transmitted
1072         // one ringbuffer size after it has been received
1073         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1074
1075         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1076                 it != m_TransmitProcessors.end();
1077                 ++it ) {
1078             // FIXME: in the SPM it would be nice to have system time instead of
1079             //        1394 time
1080             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
1081                 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
1082                         m_period, transmit_timestamp, *it);
1083                 retval &= false; // buffer underrun
1084             }
1085         }
1086     }
1087     return retval;
1088 }
1089
1090 void StreamProcessorManager::dumpInfo() {
1091     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1092     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
1093     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
1094     debugOutputShort( DEBUG_LEVEL_NORMAL, "Data type: %s\n", (m_audio_datatype==eADT_Float?"float":"int24"));
1095
1096     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
1097     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1098         it != m_ReceiveProcessors.end();
1099         ++it ) {
1100         (*it)->dumpInfo();
1101     }
1102
1103     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
1104     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1105         it != m_TransmitProcessors.end();
1106         ++it ) {
1107         (*it)->dumpInfo();
1108     }
1109
1110     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1111
1112 }
1113
1114 void StreamProcessorManager::setVerboseLevel(int l) {
1115     setDebugLevel(l);
1116     m_WaitLock.setVerboseLevel(l);
1117
1118     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
1119     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1120         it != m_ReceiveProcessors.end();
1121         ++it ) {
1122         (*it)->setVerboseLevel(l);
1123     }
1124
1125     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
1126     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1127         it != m_TransmitProcessors.end();
1128         ++it ) {
1129         (*it)->setVerboseLevel(l);
1130     }
1131 }
1132
1133
1134 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1135     int count=0;
1136
1137     if (direction == Port::E_Capture) {
1138         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1139             it != m_ReceiveProcessors.end();
1140             ++it ) {
1141             count += (*it)->getPortCount(type);
1142         }
1143     } else {
1144         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1145             it != m_TransmitProcessors.end();
1146             ++it ) {
1147             count += (*it)->getPortCount(type);
1148         }
1149     }
1150     return count;
1151 }
1152
1153 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1154     int count=0;
1155
1156     if (direction == Port::E_Capture) {
1157         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1158             it != m_ReceiveProcessors.end();
1159             ++it ) {
1160             count += (*it)->getPortCount();
1161         }
1162     } else {
1163         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1164             it != m_TransmitProcessors.end();
1165             ++it ) {
1166             count += (*it)->getPortCount();
1167         }
1168     }
1169     return count;
1170 }
1171
1172 // TODO: implement a port map here, instead of the loop
1173
1174 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1175     int count=0;
1176     int prevcount=0;
1177
1178     if (direction == Port::E_Capture) {
1179         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1180             it != m_ReceiveProcessors.end();
1181             ++it ) {
1182             count += (*it)->getPortCount();
1183             if (count > idx) {
1184                 return (*it)->getPortAtIdx(idx-prevcount);
1185             }
1186             prevcount=count;
1187         }
1188     } else {
1189         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1190             it != m_TransmitProcessors.end();
1191             ++it ) {
1192             count += (*it)->getPortCount();
1193             if (count > idx) {
1194                 return (*it)->getPortAtIdx(idx-prevcount);
1195             }
1196             prevcount=count;
1197         }
1198     }
1199     return NULL;
1200 }
1201
1202 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1203     m_thread_realtime=rt;
1204     m_thread_priority=priority;
1205     return true;
1206 }
1207
1208
1209 } // end of namespace
Note: See TracBrowser for help on using the browser.