root/branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp

Revision 712, 46.7 kB (checked in by ppalmers, 15 years ago)

almost there...

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public
11  * License version 2.1, as published by the Free Software Foundation;
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public
19  * License along with this library; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
21  * MA 02110-1301 USA
22  */
23
24 #include "StreamProcessorManager.h"
25 #include "generic/StreamProcessor.h"
26 #include "generic/Port.h"
27 #include "util/cycletimer.h"
28
29 #include <errno.h>
30 #include <assert.h>
31
32 #define RUNNING_TIMEOUT_MSEC 4000
33 #define PREPARE_TIMEOUT_MSEC 4000
34 #define ENABLE_TIMEOUT_MSEC 4000
35
36 namespace Streaming {
37
38 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
39
40 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int nb_buffers)
41     : m_is_slave( false )
42     , m_SyncSource(NULL)
43     , m_nb_buffers(nb_buffers)
44     , m_period(period)
45     , m_xruns(0)
46     , m_isoManager(0)
47     , m_nbperiods(0)
48 {
49     addOption(Util::OptionContainer::Option("slaveMode",false));
50 }
51
52 StreamProcessorManager::~StreamProcessorManager() {
53     if (m_isoManager) delete m_isoManager;
54
55 }
56
57 /**
58  * Registers \ref processor with this manager.
59  *
60  * also registers it with the isohandlermanager
61  *
62  * be sure to call isohandlermanager->init() first!
63  * and be sure that the processors are also ->init()'ed
64  *
65  * @param processor
66  * @return true if successfull
67  */
68 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
69 {
70     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
71     assert(processor);
72     assert(m_isoManager);
73
74     if (processor->getType()==StreamProcessor::E_Receive) {
75         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
76
77         m_ReceiveProcessors.push_back(processor);
78
79         processor->setManager(this);
80
81         return true;
82     }
83
84     if (processor->getType()==StreamProcessor::E_Transmit) {
85         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
86
87         m_TransmitProcessors.push_back(processor);
88
89         processor->setManager(this);
90
91         return true;
92     }
93
94     debugFatal("Unsupported processor type!\n");
95
96     return false;
97 }
98
99 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
100 {
101     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
102     assert(processor);
103
104     if (processor->getType()==StreamProcessor::E_Receive) {
105
106         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
107             it != m_ReceiveProcessors.end();
108             ++it ) {
109
110             if ( *it == processor ) {
111                     m_ReceiveProcessors.erase(it);
112
113                     processor->clearManager();
114
115                     if(!m_isoManager->unregisterStream(processor)) {
116                         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
117
118                         return false;
119
120                     }
121
122                     return true;
123                 }
124         }
125     }
126
127     if (processor->getType()==StreamProcessor::E_Transmit) {
128         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
129             it != m_TransmitProcessors.end();
130             ++it ) {
131
132             if ( *it == processor ) {
133                     m_TransmitProcessors.erase(it);
134
135                     processor->clearManager();
136
137                     if(!m_isoManager->unregisterStream(processor)) {
138                         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
139
140                         return false;
141
142                     }
143
144                     return true;
145                 }
146         }
147     }
148
149     debugFatal("Processor (%p) not found!\n",processor);
150
151     return false; //not found
152
153 }
154
155 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
156     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
157
158     m_SyncSource=s;
159     return true;
160 }
161
162 StreamProcessor *StreamProcessorManager::getSyncSource() {
163     return m_SyncSource;
164 }
165
166 bool StreamProcessorManager::init()
167 {
168     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
169
170     m_isoManager=new IsoHandlerManager(m_thread_realtime, m_thread_priority + 1);
171
172     if(!m_isoManager) {
173         debugFatal("Could not create IsoHandlerManager\n");
174         return false;
175     }
176
177     // propagate the debug level
178     m_isoManager->setVerboseLevel(getDebugLevel());
179
180     if(!m_isoManager->init()) {
181         debugFatal("Could not initialize IsoHandlerManager\n");
182         return false;
183     }
184
185     m_xrun_happened=false;
186
187     return true;
188 }
189
190 bool StreamProcessorManager::prepare() {
191
192     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
193
194     m_is_slave=false;
195     if(!getOption("slaveMode", m_is_slave)) {
196         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
197     }
198
199     // if no sync source is set, select one here
200     if(m_SyncSource == NULL) {
201        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
202     }
203
204     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
205         it != m_ReceiveProcessors.end();
206         ++it ) {
207             if(m_SyncSource == NULL) {
208                 debugWarning(" => Sync Source is %p.\n", *it);
209                 m_SyncSource = *it;
210             }
211     }
212
213     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
214         it != m_TransmitProcessors.end();
215         ++it ) {
216             if(m_SyncSource == NULL) {
217                 debugWarning(" => Sync Source is %p.\n", *it);
218                 m_SyncSource = *it;
219             }
220     }
221
222     // now do the actual preparation
223     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
224     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
225         it != m_ReceiveProcessors.end();
226         ++it ) {
227
228         if(!(*it)->setSyncSource(m_SyncSource)) {
229             debugFatal(  " could not set sync source (%p)...\n",(*it));
230             return false;
231         }
232
233         if(!(*it)->setOption("slaveMode", m_is_slave)) {
234             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
235         }
236
237         if(!(*it)->prepare()) {
238             debugFatal(  " could not prepare (%p)...\n",(*it));
239             return false;
240         }
241     }
242
243     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
244     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
245         it != m_TransmitProcessors.end();
246         ++it ) {
247         if(!(*it)->setSyncSource(m_SyncSource)) {
248             debugFatal(  " could not set sync source (%p)...\n",(*it));
249             return false;
250         }
251         if(!(*it)->setOption("slaveMode", m_is_slave)) {
252             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
253         }
254         if(!(*it)->prepare()) {
255             debugFatal( " could not prepare (%p)...\n",(*it));
256             return false;
257         }
258     }
259
260     // if there are no stream processors registered,
261     // fail
262     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
263         debugFatal("No stream processors registered, can't do anything usefull\n");
264         return false;
265     }
266
267     return true;
268 }
269
270 bool StreamProcessorManager::syncStartAll() {
271
272     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for StreamProcessor streams to start running...\n");
273     // we have to wait until all streamprocessors indicate that they are running
274     // i.e. that there is actually some data stream flowing
275     int wait_cycles=RUNNING_TIMEOUT_MSEC; // two seconds
276     bool notRunning=true;
277     while (notRunning && wait_cycles) {
278         wait_cycles--;
279         notRunning=false;
280
281         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
282                 it != m_ReceiveProcessors.end();
283                 ++it ) {
284             if(!(*it)->isRunning()) notRunning=true;
285         }
286
287         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
288                 it != m_TransmitProcessors.end();
289                 ++it ) {
290             if(!(*it)->isRunning()) notRunning=true;
291         }
292
293         usleep(1000);
294         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Running check: %d\n", notRunning);
295     }
296
297     if(!wait_cycles) { // timout has occurred
298         debugFatal("One or more streams are not starting up (timeout):\n");
299
300         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
301                 it != m_ReceiveProcessors.end();
302                 ++it ) {
303             if(!(*it)->isRunning()) {
304                 debugFatal(" receive stream %p not running\n",*it);
305             } else {
306                 debugFatal(" receive stream %p running\n",*it);
307             }
308         }
309
310         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
311                 it != m_TransmitProcessors.end();
312                 ++it ) {
313             if(!(*it)->isRunning()) {
314                 debugFatal(" transmit stream %p not running\n",*it);
315             } else {
316                 debugFatal(" transmit stream %p running\n",*it);
317             }
318         }
319         return false;
320     }
321
322     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
323
324     // now find out how long we have to delay the wait operation such that
325     // the received frames will all be presented to the SP
326     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
327     int max_of_min_delay=0;
328     int min_delay=0;
329     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
330             it != m_ReceiveProcessors.end();
331             ++it ) {
332         min_delay=(*it)->getMaxFrameLatency();
333         if(min_delay>max_of_min_delay) max_of_min_delay=min_delay;
334     }
335
336     debugOutput( DEBUG_LEVEL_VERBOSE, "  %d ticks (%03us %04uc %04ut)...\n",
337         max_of_min_delay,
338         (unsigned int)TICKS_TO_SECS(max_of_min_delay),
339         (unsigned int)TICKS_TO_CYCLES(max_of_min_delay),
340         (unsigned int)TICKS_TO_OFFSET(max_of_min_delay));
341     m_SyncSource->setSyncDelay(max_of_min_delay);
342
343     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device to indicate clock sync lock...\n");
344     //sleep(2); // FIXME: be smarter here
345    
346     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting StreamProcessors...\n");
347     // now we reset the frame counters
348     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
349             it != m_ReceiveProcessors.end();
350             ++it ) {
351         // get the receive SP's going at receiving data
352         (*it)->m_data_buffer->setTransparent(false);
353         (*it)->reset();
354     }
355
356     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
357             it != m_TransmitProcessors.end();
358             ++it ) {
359         // make sure the SP retains it's prefilled data
360         (*it)->m_data_buffer->setTransparent(false);
361         (*it)->reset();
362     }
363    
364     dumpInfo();
365     // All buffers are prefilled and set-up, the only thing
366     // that remains a mistery is the timestamp information.
367 //     m_SyncSource->m_data_buffer->setTransparent(false);
368 //     debugShowBackLog();
369
370 //     m_SyncSource->setVerboseLevel(DEBUG_LEVEL_ULTRA_VERBOSE);
371    
372     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
373     // in order to obtain that, we wait for the first periods to be
374     // received.
375     int nb_sync_runs=20;
376     while(nb_sync_runs--) { // or while not sync-ed?
377         waitForPeriod();
378         // drop the frames for all receive SP's
379         dryRun(StreamProcessor::E_Receive);
380        
381         // we don't have to dryrun for the xmit SP's since they
382         // are not sending data yet.
383        
384         // sync the xmit SP's buffer head timestamps
385         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
386                 it != m_TransmitProcessors.end();
387                 ++it ) {
388             // FIXME: encapsulate
389             (*it)->m_data_buffer->setBufferHeadTimestamp(m_time_of_transfer);
390         }
391     }
392 //     m_SyncSource->setVerboseLevel(DEBUG_LEVEL_VERBOSE);
393
394     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
395         m_time_of_transfer,
396         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
397         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
398         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
399     // FIXME: xruns can screw up the framecounter accounting. do something more sane here
400     resetXrunCounters();
401     // lock the isohandlermanager such that things freeze
402 //     debugShowBackLog();
403
404     // we now should have decent sync info
405     // the buffers of the receive streams should be (approx) empty
406     // the buffers of the xmit streams should be full
407    
408     // note what should the timestamp of the first sample be?
409    
410     // at this point the buffer head timestamp of the transmit buffers can be
411     // set properly since we know the sync source's timestamp of the last
412     // buffer transfer. we also know the rate.
413    
414     debugOutput( DEBUG_LEVEL_VERBOSE, " propagate sync info...\n");
415     // FIXME: in the SPM it would be nice to have system time instead of
416     //        1394 time
417 //     float rate=m_SyncSource->getTicksPerFrame();
418 //     int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers*m_period))*rate);
419 //     // the data at the front of the buffer is intended to be transmitted
420 //     // nb_periods*period_size after the last received period
421 //     int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
422
423     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
424             it != m_TransmitProcessors.end();
425             ++it ) {
426         // FIXME: encapsulate
427         (*it)->m_data_buffer->setBufferHeadTimestamp(m_time_of_transfer);
428         //(*it)->m_data_buffer->setNominalRate(rate); //CHECK!!!
429     }
430    
431     dumpInfo();
432    
433     // this is the place were we should be syncing the received streams too
434     // check how much they differ
435    
436    
437     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n");
438
439     // FIXME: this should not be in cycles, but in 'time'
440     // FIXME: remove the timestamp
441     if (!enableStreamProcessors(0)) {
442         debugFatal("Could not enable StreamProcessors...\n");
443         return false;
444     }
445
446     debugOutput( DEBUG_LEVEL_VERBOSE, "Running dry for a while...\n");
447     #define MAX_DRYRUN_CYCLES               40
448     #define MIN_SUCCESSFUL_DRYRUN_CYCLES    4
449     // run some cycles 'dry' such that everything can stabilize
450     int nb_dryrun_cycles_left = MAX_DRYRUN_CYCLES;
451     int nb_succesful_cycles = 0;
452     while(nb_dryrun_cycles_left > 0 &&
453           nb_succesful_cycles < MIN_SUCCESSFUL_DRYRUN_CYCLES ) {
454
455         waitForPeriod();
456
457         if (dryRun()) {
458             nb_succesful_cycles++;
459         } else {
460             debugOutput( DEBUG_LEVEL_VERBOSE, " This dry-run was not xrun free...\n" );
461             resetXrunCounters();
462             // reset the transmit SP's such that there is no issue with accumulating buffers
463             // FIXME: what about receive SP's
464             for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
465                     it != m_TransmitProcessors.end();
466                     ++it ) {
467                 // FIXME: encapsulate
468                 (*it)->reset(); //CHECK!!!
469                 (*it)->m_data_buffer->setBufferHeadTimestamp(m_time_of_transfer);
470             }
471            
472             nb_succesful_cycles = 0;
473             // FIXME: xruns can screw up the framecounter accounting. do something more sane here
474         }
475         nb_dryrun_cycles_left--;
476     }
477
478     if(nb_dryrun_cycles_left == 0) {
479         debugOutput( DEBUG_LEVEL_VERBOSE, " max # dry-run cycles achieved without steady-state...\n" );
480         return false;
481     }
482     debugOutput( DEBUG_LEVEL_VERBOSE, " dry-run resulted in steady-state...\n" );
483
484     // now we should clear the xrun flags
485     resetXrunCounters();
486
487 /*    debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning streams...\n");
488     // run some cycles 'dry' such that everything can stabilize
489     nb_dryrun_cycles_left = MAX_DRYRUN_CYCLES;
490     nb_succesful_cycles = 0;
491     while(nb_dryrun_cycles_left > 0 &&
492           nb_succesful_cycles < MIN_SUCCESSFUL_DRYRUN_CYCLES ) {
493
494         waitForPeriod();
495
496         // align the received streams
497         int64_t sp_lag;
498         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
499                 it != m_ReceiveProcessors.end();
500                 ++it ) {
501             uint64_t ts_sp=(*it)->getTimeAtPeriod();
502             uint64_t ts_sync=m_SyncSource->getTimeAtPeriod();
503
504             sp_lag = diffTicks(ts_sp, ts_sync);
505             debugOutput( DEBUG_LEVEL_VERBOSE, "  SP(%p) TS=%011llu - TS=%011llu = %04lld\n",
506                 (*it), ts_sp, ts_sync, sp_lag);
507             // sync the other receive SP's to the sync source
508 //             if((*it) != m_SyncSource) {
509 //                 if(!(*it)->m_data_buffer->syncCorrectLag(sp_lag)) {
510 //                         debugOutput(DEBUG_LEVEL_VERBOSE,"could not syncCorrectLag(%11lld) for stream processor (%p)\n",
511 //                                 sp_lag, *it);
512 //                 }
513 //             }
514         }
515
516
517         if (dryRun()) {
518             nb_succesful_cycles++;
519         } else {
520             debugOutput( DEBUG_LEVEL_VERBOSE, " This dry-run was not xrun free...\n" );
521             resetXrunCounters();
522             nb_succesful_cycles = 0;
523             // FIXME: xruns can screw up the framecounter accounting. do something more sane here
524         }
525         nb_dryrun_cycles_left--;
526     }
527
528     if(nb_dryrun_cycles_left == 0) {
529         debugOutput( DEBUG_LEVEL_VERBOSE, " max # dry-run cycles achieved without aligned steady-state...\n" );
530         return false;
531     }
532     debugOutput( DEBUG_LEVEL_VERBOSE, " dry-run resulted in aligned steady-state...\n" );*/
533    
534     // now we should clear the xrun flags
535     resetXrunCounters();
536     // and off we go
537     return true;
538 }
539
540 void StreamProcessorManager::resetXrunCounters(){
541     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting xrun flags...\n");
542     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
543         it != m_ReceiveProcessors.end();
544         ++it )
545     {
546         (*it)->resetXrunCounter();
547     }
548
549     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
550         it != m_TransmitProcessors.end();
551         ++it )
552     {
553         (*it)->resetXrunCounter();
554     }
555 }
556
557 bool StreamProcessorManager::start() {
558     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
559     assert(m_isoManager);
560
561     debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
562     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
563     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
564         it != m_ReceiveProcessors.end();
565         ++it ) {
566             if (!(*it)->prepareForStart()) {
567                 debugOutput(DEBUG_LEVEL_VERBOSE,"Receive stream processor (%p) failed to prepare for start\n", *it);
568                 return false;
569             }
570             if (!m_isoManager->registerStream(*it)) {
571                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
572                 return false;
573             }
574         }
575
576     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
577     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
578         it != m_TransmitProcessors.end();
579         ++it ) {
580             if (!(*it)->prepareForStart()) {
581                 debugOutput(DEBUG_LEVEL_VERBOSE,"Transmit stream processor (%p) failed to prepare for start\n", *it);
582                 return false;
583             }
584             if (!m_isoManager->registerStream(*it)) {
585                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
586                 return false;
587             }
588         }
589
590     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
591     if (!m_isoManager->prepare()) {
592         debugFatal("Could not prepare isoManager\n");
593         return false;
594     }
595
596     debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
597         if (!disableStreamProcessors()) {
598         debugFatal("Could not disable StreamProcessors...\n");
599         return false;
600     }
601
602     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n");
603     if (!m_isoManager->startHandlers(-1)) {
604         debugFatal("Could not start handlers...\n");
605         return false;
606     }
607
608     // start all SP's synchonized
609     if (!syncStartAll()) {
610         debugFatal("Could not syncStartAll...\n");
611         return false;
612     }
613
614     // dump the iso stream information when in verbose mode
615     if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
616         m_isoManager->dumpInfo();
617     }
618
619     return true;
620
621 }
622
623 bool StreamProcessorManager::stop() {
624     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
625     assert(m_isoManager);
626
627     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to prepare to stop...\n");
628     // Most stream processors can just stop without special treatment.  However, some
629     // (like the MOTU) need to do a few things before it's safe to turn off the iso
630     // handling.
631     int wait_cycles=PREPARE_TIMEOUT_MSEC; // two seconds ought to be sufficient
632     bool allReady = false;
633     while (!allReady && wait_cycles) {
634         wait_cycles--;
635         allReady = true;
636
637         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
638             it != m_ReceiveProcessors.end();
639             ++it ) {
640             if(!(*it)->prepareForStop()) allReady = false;
641         }
642
643         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
644             it != m_TransmitProcessors.end();
645             ++it ) {
646             if(!(*it)->prepareForStop()) allReady = false;
647         }
648         usleep(1000);
649     }
650
651     debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
652         if (!disableStreamProcessors()) {
653         debugFatal("Could not disable StreamProcessors...\n");
654         return false;
655     }
656
657     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n");
658     if(!m_isoManager->stopHandlers()) {
659        debugFatal("Could not stop ISO handlers\n");
660        return false;
661     }
662
663     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processors from handlers...\n");
664     // now unregister all streams from iso manager
665     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
666     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
667         it != m_ReceiveProcessors.end();
668         ++it ) {
669             if (!m_isoManager->unregisterStream(*it)) {
670                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it);
671                 return false;
672             }
673
674         }
675
676     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
677     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
678         it != m_TransmitProcessors.end();
679         ++it ) {
680             if (!m_isoManager->unregisterStream(*it)) {
681                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it);
682                 return false;
683             }
684
685         }
686
687     return true;
688
689 }
690
691 /**
692  * Enables the registered StreamProcessors
693  * @return true if successful, false otherwise
694  */
695 bool StreamProcessorManager::enableStreamProcessors(uint64_t time_to_enable_at) {
696     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors at %llu...\n", time_to_enable_at);
697
698     debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor (%p)...\n",m_SyncSource);
699     debugOutput( DEBUG_LEVEL_VERBOSE, "  Prepare...\n");
700     if (!m_SyncSource->prepareForEnable(time_to_enable_at)) {
701             debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n");
702         return false;
703     }
704
705     debugOutput( DEBUG_LEVEL_VERBOSE, "  Enable...\n");
706     m_SyncSource->enable(time_to_enable_at);
707
708     debugOutput( DEBUG_LEVEL_VERBOSE, " Other StreamProcessors...\n");
709
710     // we prepare the streamprocessors for enable
711     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
712             it != m_ReceiveProcessors.end();
713             ++it ) {
714         if(*it != m_SyncSource) {
715             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Receive SP (%p)...\n",*it);
716             (*it)->prepareForEnable(time_to_enable_at);
717         }
718     }
719
720     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
721             it != m_TransmitProcessors.end();
722             ++it ) {
723         if(*it != m_SyncSource) {
724             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Transmit SP (%p)...\n",*it);
725             (*it)->prepareForEnable(time_to_enable_at);
726         }
727     }
728
729     // then we enable the streamprocessors
730     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
731             it != m_ReceiveProcessors.end();
732             ++it ) {
733         if(*it != m_SyncSource) {
734             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Receive SP (%p)...\n",*it);
735             (*it)->enable(time_to_enable_at);
736         }
737     }
738
739     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
740             it != m_TransmitProcessors.end();
741             ++it ) {
742         if(*it != m_SyncSource) {
743             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Transmit SP (%p)...\n",*it);
744             (*it)->enable(time_to_enable_at);
745         }
746     }
747
748     // now we wait for the SP's to get enabled
749 //     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be enabled...\n");
750 //     // we have to wait until all streamprocessors indicate that they are running
751 //     // i.e. that there is actually some data stream flowing
752 //     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds
753 //     bool notEnabled=true;
754 //     while (notEnabled && wait_cycles) {
755 //         wait_cycles--;
756 //         notEnabled=false;
757 //
758 //         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
759 //                 it != m_ReceiveProcessors.end();
760 //                 ++it ) {
761 //             if(!(*it)->isEnabled()) notEnabled=true;
762 //         }
763 //
764 //         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
765 //                 it != m_TransmitProcessors.end();
766 //                 ++it ) {
767 //             if(!(*it)->isEnabled()) notEnabled=true;
768 //         }
769 //         usleep(1000); // one cycle
770 //     }
771 //
772 //     if(!wait_cycles) { // timout has occurred
773 //         debugFatal("One or more streams couldn't be enabled (timeout):\n");
774 //
775 //         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
776 //                 it != m_ReceiveProcessors.end();
777 //                 ++it ) {
778 //             if(!(*it)->isEnabled()) {
779 //                     debugFatal(" receive stream %p not enabled\n",*it);
780 //             } else {
781 //                     debugFatal(" receive stream %p enabled\n",*it);
782 //             }
783 //         }
784 //
785 //         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
786 //                 it != m_TransmitProcessors.end();
787 //                 ++it ) {
788 //             if(!(*it)->isEnabled()) {
789 //                     debugFatal(" transmit stream %p not enabled\n",*it);
790 //             } else {
791 //                     debugFatal(" transmit stream %p enabled\n",*it);
792 //             }
793 //         }
794 //         return false;
795 //     }
796
797     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors enabled...\n");
798
799     return true;
800 }
801
802 /**
803  * Disables the registered StreamProcessors
804  * @return true if successful, false otherwise
805  */
806 bool StreamProcessorManager::disableStreamProcessors() {
807     // we prepare the streamprocessors for disable
808     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
809             it != m_ReceiveProcessors.end();
810             ++it ) {
811         (*it)->prepareForDisable();
812     }
813
814     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
815             it != m_TransmitProcessors.end();
816             ++it ) {
817         (*it)->prepareForDisable();
818     }
819
820     // then we disable the streamprocessors
821     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
822             it != m_ReceiveProcessors.end();
823             ++it ) {
824         (*it)->disable();
825         (*it)->m_data_buffer->setTransparent(true);
826     }
827
828     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
829             it != m_TransmitProcessors.end();
830             ++it ) {
831         (*it)->disable();
832         (*it)->m_data_buffer->setTransparent(true);
833     }
834
835     // now we wait for the SP's to get disabled
836     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be disabled...\n");
837     // we have to wait until all streamprocessors indicate that they are running
838     // i.e. that there is actually some data stream flowing
839     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds
840     bool enabled=true;
841     while (enabled && wait_cycles) {
842         wait_cycles--;
843         enabled=false;
844
845         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
846                 it != m_ReceiveProcessors.end();
847                 ++it ) {
848             if((*it)->isEnabled()) enabled=true;
849         }
850
851         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
852                 it != m_TransmitProcessors.end();
853                 ++it ) {
854             if((*it)->isEnabled()) enabled=true;
855         }
856         usleep(1000); // one cycle
857     }
858
859     if(!wait_cycles) { // timout has occurred
860         debugFatal("One or more streams couldn't be disabled (timeout):\n");
861
862         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
863                 it != m_ReceiveProcessors.end();
864                 ++it ) {
865             if(!(*it)->isEnabled()) {
866                     debugFatal(" receive stream %p not enabled\n",*it);
867             } else {
868                     debugFatal(" receive stream %p enabled\n",*it);
869             }
870         }
871
872         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
873                 it != m_TransmitProcessors.end();
874                 ++it ) {
875             if(!(*it)->isEnabled()) {
876                     debugFatal(" transmit stream %p not enabled\n",*it);
877             } else {
878                     debugFatal(" transmit stream %p enabled\n",*it);
879             }
880         }
881         return false;
882     }
883
884     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors disabled...\n");
885
886     return true;
887 }
888
889 /**
890  * Called upon Xrun events. This brings all StreamProcessors back
891  * into their starting state, and then carries on streaming. This should
892  * have the same effect as restarting the whole thing.
893  *
894  * @return true if successful, false otherwise
895  */
896 bool StreamProcessorManager::handleXrun() {
897
898     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
899
900     dumpInfo();
901
902     /*
903      * Reset means:
904      * 1) Disabling the SP's, so that they don't process any packets
905      *    note: the isomanager does keep on delivering/requesting them
906      * 2) Bringing all buffers & streamprocessors into a know state
907      *    - Clear all capture buffers
908      *    - Put nb_periods*period_size of null frames into the playback buffers
909      * 3) Re-enable the SP's
910      */
911     debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");
912         if (!disableStreamProcessors()) {
913         debugFatal("Could not disable StreamProcessors...\n");
914         return false;
915     }
916
917     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
918     // start all SP's synchonized
919     if (!syncStartAll()) {
920         debugFatal("Could not syncStartAll...\n");
921         return false;
922     }
923
924     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
925
926     return true;
927 }
928
929 /**
930  * @brief Waits until the next period of samples is ready
931  *
932  * This function does not return until a full period of samples is (or should be)
933  * ready to be transferred.
934  *
935  * @return true if the period is ready, false if an xrun occurred
936  */
937 bool StreamProcessorManager::waitForPeriod() {
938     int time_till_next_period;
939     bool xrun_occurred=false;
940
941     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
942
943     assert(m_SyncSource);
944
945     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
946
947     while(time_till_next_period > 0) {
948         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
949
950         // wait for the period
951         usleep(time_till_next_period);
952
953         // check for underruns on the ISO side,
954         // those should make us bail out of the wait loop
955         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
956             it != m_ReceiveProcessors.end();
957             ++it ) {
958             // a xrun has occurred on the Iso side
959             xrun_occurred |= (*it)->xrunOccurred();
960         }
961         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
962             it != m_TransmitProcessors.end();
963             ++it ) {
964             // a xrun has occurred on the Iso side
965             xrun_occurred |= (*it)->xrunOccurred();
966         }
967
968         if(xrun_occurred) break;
969
970         // check if we were waked up too soon
971         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
972     }
973
974     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", time_till_next_period);
975
976     // this is to notify the client of the delay
977     // that we introduced
978     m_delayed_usecs=time_till_next_period;
979
980     // we save the 'ideal' time of the transfer at this point,
981     // because we can have interleaved read - process - write
982     // cycles making that we modify a receiving stream's buffer
983     // before we get to writing.
984     // NOTE: before waitForPeriod() is called again, both the transmit
985     //       and the receive processors should have done their transfer.
986     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
987     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
988         m_time_of_transfer);
989
990 #ifdef DEBUG
991     int rcv_bf=0, xmt_bf=0;
992     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
993         it != m_ReceiveProcessors.end();
994         ++it ) {
995         rcv_bf = (*it)->getBufferFill();
996     }
997     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
998         it != m_TransmitProcessors.end();
999         ++it ) {
1000         xmt_bf = (*it)->getBufferFill();
1001     }
1002     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
1003         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
1004
1005 #endif
1006
1007     xrun_occurred=false;
1008
1009     // check if xruns occurred on the Iso side.
1010     // also check if xruns will occur should we transfer() now
1011
1012     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1013           it != m_ReceiveProcessors.end();
1014           ++it ) {
1015         // a xrun has occurred on the Iso side
1016         xrun_occurred |= (*it)->xrunOccurred();
1017
1018         // if this is true, a xrun will occur
1019         xrun_occurred |= !((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled();
1020
1021 #ifdef DEBUG
1022         if ((*it)->xrunOccurred()) {
1023             debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it);
1024             (*it)->dumpInfo();
1025         }
1026         if (!((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled()) {
1027             debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it);
1028             (*it)->dumpInfo();
1029         }
1030 #endif
1031
1032     }
1033     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1034           it != m_TransmitProcessors.end();
1035           ++it ) {
1036         // a xrun has occurred on the Iso side
1037         xrun_occurred |= (*it)->xrunOccurred();
1038
1039         // if this is true, a xrun will occur
1040         xrun_occurred |= !((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled();
1041
1042 #ifdef DEBUG
1043         if ((*it)->xrunOccurred()) {
1044             debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it);
1045         }
1046         if (!((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled()) {
1047             debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it);
1048         }
1049 #endif
1050     }
1051
1052     m_nbperiods++;
1053
1054     // now we can signal the client that we are (should be) ready
1055     return !xrun_occurred;
1056 }
1057
1058 /**
1059  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
1060  *
1061  * Transfers one period of frames from the client side to the Iso side and vice versa.
1062  *
1063  * @return true if successful, false otherwise (indicates xrun).
1064  */
1065 bool StreamProcessorManager::transfer() {
1066
1067     debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n");
1068     bool retval=true;
1069     retval &= dryRun(StreamProcessor::E_Receive);
1070     retval &= dryRun(StreamProcessor::E_Transmit);
1071     return retval;
1072 }
1073
1074 /**
1075  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
1076  *
1077  * Transfers one period of frames from the client side to the Iso side or vice versa.
1078  *
1079  * @param t The processor type to tranfer for (receive or transmit)
1080  * @return true if successful, false otherwise (indicates xrun).
1081  */
1082
1083 bool StreamProcessorManager::transfer(enum StreamProcessor::EProcessorType t) {
1084     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
1085     bool retval = true;
1086     // a static cast could make sure that there is no performance
1087     // penalty for the virtual functions (to be checked)
1088     if (t==StreamProcessor::E_Receive) {
1089         // determine the time at which we want reception to start
1090         float rate=m_SyncSource->getTicksPerFrame();
1091         int64_t one_frame_in_ticks=(int64_t)(((float)m_period)*rate);
1092        
1093         int64_t receive_timestamp = substractTicks(m_time_of_transfer, one_frame_in_ticks);
1094        
1095         if(receive_timestamp<0) {
1096             debugWarning("receive ts < 0.0 : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
1097              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
1098         }
1099         if(receive_timestamp>(128L*TICKS_PER_SECOND)) {
1100             debugWarning("receive ts > 128L*TICKS_PER_SECOND : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
1101              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
1102         }
1103        
1104         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1105                 it != m_ReceiveProcessors.end();
1106                 ++it ) {
1107
1108             if(!(*it)->getFrames(m_period, receive_timestamp)) {
1109                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)\n",
1110                             m_period, m_time_of_transfer,*it);
1111                 retval &= false; // buffer underrun
1112             }
1113         }
1114     } else {
1115         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1116                 it != m_TransmitProcessors.end();
1117                 ++it ) {
1118             // FIXME: in the SPM it would be nice to have system time instead of
1119             //        1394 time
1120             float rate=m_SyncSource->getTicksPerFrame();
1121             int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers*m_period))*rate);
1122
1123             // the data we are putting into the buffer is intended to be transmitted
1124             // one ringbuffer size after it has been received
1125             int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1126
1127             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
1128                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)\n",
1129                         m_period, transmit_timestamp, *it);
1130                 retval &= false; // buffer underrun
1131             }
1132
1133         }
1134     }
1135     return retval;
1136 }
1137
1138 /**
1139  * @brief Dry run one period for both receive and transmit StreamProcessors
1140  *
1141  * Process one period of frames for all streamprocessors, without touching the
1142  * client buffers. This only removes an incoming period from the ISO receive buffer and
1143  * puts one period of silence into the transmit buffers.
1144  *
1145  * @return true if successful, false otherwise (indicates xrun).
1146  */
1147 bool StreamProcessorManager::dryRun() {
1148     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Dry-running period...\n");
1149     bool retval=true;
1150     retval &= dryRun(StreamProcessor::E_Receive);
1151     retval &= dryRun(StreamProcessor::E_Transmit);
1152     return retval;
1153 }
1154
1155 /**
1156  * @brief Dry run one period for either the receive or transmit StreamProcessors
1157  *
1158  * see dryRun()
1159  *
1160  * @param t The processor type to dryRun for (receive or transmit)
1161  * @return true if successful, false otherwise (indicates xrun).
1162  */
1163
1164 bool StreamProcessorManager::dryRun(enum StreamProcessor::EProcessorType t) {
1165     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Dry-running period...\n");
1166     bool retval = true;
1167     // a static cast could make sure that there is no performance
1168     // penalty for the virtual functions (to be checked)
1169     if (t==StreamProcessor::E_Receive) {
1170         // determine the time at which we want reception to start
1171         float rate=m_SyncSource->getTicksPerFrame();
1172         int64_t one_frame_in_ticks=(int64_t)(((float)m_period)*rate);
1173        
1174         int64_t receive_timestamp = substractTicks(m_time_of_transfer, one_frame_in_ticks);
1175        
1176         if(receive_timestamp<0) {
1177             debugWarning("receive ts < 0.0 : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
1178              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
1179         }
1180         if(receive_timestamp>(128L*TICKS_PER_SECOND)) {
1181             debugWarning("receive ts > 128L*TICKS_PER_SECOND : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
1182              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
1183         }
1184        
1185         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1186                 it != m_ReceiveProcessors.end();
1187                 ++it ) {
1188
1189             if(!(*it)->getFramesDry(m_period, receive_timestamp)) {
1190                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)\n",
1191                             m_period, m_time_of_transfer,*it);
1192                 retval &= false; // buffer underrun
1193             }
1194
1195         }
1196     } else {
1197         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1198                 it != m_TransmitProcessors.end();
1199                 ++it ) {
1200             // FIXME: in the SPM it would be nice to have system time instead of
1201             //        1394 time
1202             float rate=m_SyncSource->getTicksPerFrame();
1203             int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers*m_period))*rate);
1204
1205             // the data we are putting into the buffer is intended to be transmitted
1206             // one ringbuffer size after it has been received
1207             int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1208
1209             if(!(*it)->putFramesDry(m_period, transmit_timestamp)) {
1210                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)\n",
1211                         m_period, transmit_timestamp, *it);
1212                 retval &= false; // buffer underrun
1213             }
1214
1215         }
1216     }
1217     return retval;
1218 }
1219
1220 void StreamProcessorManager::dumpInfo() {
1221     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1222     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
1223     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
1224
1225     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
1226     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1227         it != m_ReceiveProcessors.end();
1228         ++it ) {
1229         (*it)->dumpInfo();
1230     }
1231
1232     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
1233     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1234         it != m_TransmitProcessors.end();
1235         ++it ) {
1236         (*it)->dumpInfo();
1237     }
1238
1239     debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
1240     m_isoManager->dumpInfo();
1241     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1242
1243 }
1244
1245 void StreamProcessorManager::setVerboseLevel(int l) {
1246     setDebugLevel(l);
1247
1248     if (m_isoManager) m_isoManager->setVerboseLevel(l);
1249
1250     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
1251     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1252         it != m_ReceiveProcessors.end();
1253         ++it ) {
1254         (*it)->setVerboseLevel(l);
1255     }
1256
1257     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
1258     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1259         it != m_TransmitProcessors.end();
1260         ++it ) {
1261         (*it)->setVerboseLevel(l);
1262     }
1263 }
1264
1265
1266 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1267     int count=0;
1268
1269     if (direction == Port::E_Capture) {
1270         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1271             it != m_ReceiveProcessors.end();
1272             ++it ) {
1273             count += (*it)->getPortCount(type);
1274         }
1275     } else {
1276         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1277             it != m_TransmitProcessors.end();
1278             ++it ) {
1279             count += (*it)->getPortCount(type);
1280         }
1281     }
1282     return count;
1283 }
1284
1285 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1286     int count=0;
1287
1288     if (direction == Port::E_Capture) {
1289         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1290             it != m_ReceiveProcessors.end();
1291             ++it ) {
1292             count += (*it)->getPortCount();
1293         }
1294     } else {
1295         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1296             it != m_TransmitProcessors.end();
1297             ++it ) {
1298             count += (*it)->getPortCount();
1299         }
1300     }
1301     return count;
1302 }
1303
1304 // TODO: implement a port map here, instead of the loop
1305
1306 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1307     int count=0;
1308     int prevcount=0;
1309
1310     if (direction == Port::E_Capture) {
1311         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1312             it != m_ReceiveProcessors.end();
1313             ++it ) {
1314             count += (*it)->getPortCount();
1315             if (count > idx) {
1316                 return (*it)->getPortAtIdx(idx-prevcount);
1317             }
1318             prevcount=count;
1319         }
1320     } else {
1321         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1322             it != m_TransmitProcessors.end();
1323             ++it ) {
1324             count += (*it)->getPortCount();
1325             if (count > idx) {
1326                 return (*it)->getPortAtIdx(idx-prevcount);
1327             }
1328             prevcount=count;
1329         }
1330     }
1331     return NULL;
1332 }
1333
1334 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1335     m_thread_realtime=rt;
1336     m_thread_priority=priority;
1337     return true;
1338 }
1339
1340
1341 } // end of namespace
Note: See TracBrowser for help on using the browser.