root/branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp

Revision 719, 33.7 kB (checked in by ppalmers, 16 years ago)

backup commit

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public
11  * License version 2.1, as published by the Free Software Foundation;
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public
19  * License along with this library; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
21  * MA 02110-1301 USA
22  */
23
24 #include "StreamProcessorManager.h"
25 #include "generic/StreamProcessor.h"
26 #include "generic/Port.h"
27 #include "util/cycletimer.h"
28
29 #include <errno.h>
30 #include <assert.h>
31
32 #define RUNNING_TIMEOUT_MSEC 4000
33 #define PREPARE_TIMEOUT_MSEC 4000
34 #define ENABLE_TIMEOUT_MSEC 4000
35
36 // allows to add some processing margin. This shifts the time
37 // at which the buffer is transfer()'ed, making things somewhat
38 // more robust. It should be noted though that shifting the transfer
39 // time to a later time instant also causes the xmit buffer fill to be
40 // lower on average.
41 #define FFADO_SIGNAL_DELAY_TICKS 3072
42
43 namespace Streaming {
44
45 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
46
47 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int framerate, unsigned int nb_buffers)
48     : m_is_slave( false )
49     , m_SyncSource(NULL)
50     , m_nb_buffers(nb_buffers)
51     , m_period(period)
52     , m_nominal_framerate ( framerate )
53     , m_xruns(0)
54     , m_isoManager(0)
55     , m_nbperiods(0)
56 {
57     addOption(Util::OptionContainer::Option("slaveMode",false));
58 }
59
60 StreamProcessorManager::~StreamProcessorManager() {
61     if (m_isoManager) delete m_isoManager;
62 }
63
64 /**
65  * Registers \ref processor with this manager.
66  *
67  * also registers it with the isohandlermanager
68  *
69  * be sure to call isohandlermanager->init() first!
70  * and be sure that the processors are also ->init()'ed
71  *
72  * @param processor
73  * @return true if successfull
74  */
75 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
76 {
77     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
78     assert(processor);
79     assert(m_isoManager);
80
81     if (processor->getType() == StreamProcessor::ePT_Receive) {
82         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
83
84         m_ReceiveProcessors.push_back(processor);
85         processor->setManager(this);
86         return true;
87     }
88
89     if (processor->getType() == StreamProcessor::ePT_Transmit) {
90         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
91
92         m_TransmitProcessors.push_back(processor);
93         processor->setManager(this);
94         return true;
95     }
96
97     debugFatal("Unsupported processor type!\n");
98     return false;
99 }
100
101 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
102 {
103     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
104     assert(processor);
105
106     if (processor->getType()==StreamProcessor::ePT_Receive) {
107
108         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
109               it != m_ReceiveProcessors.end();
110               ++it )
111         {
112             if ( *it == processor ) {
113                 m_ReceiveProcessors.erase(it);
114                 processor->clearManager();
115                 if(!m_isoManager->unregisterStream(processor)) {
116                     debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
117                     return false;
118                 }
119                 return true;
120             }
121         }
122     }
123
124     if (processor->getType()==StreamProcessor::ePT_Transmit) {
125         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
126               it != m_TransmitProcessors.end();
127               ++it )
128         {
129             if ( *it == processor ) {
130                 m_TransmitProcessors.erase(it);
131                 processor->clearManager();
132                 if(!m_isoManager->unregisterStream(processor)) {
133                     debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
134                     return false;
135                 }
136                 return true;
137             }
138         }
139     }
140
141     debugFatal("Processor (%p) not found!\n",processor);
142     return false; //not found
143 }
144
145 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
146     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
147     m_SyncSource=s;
148     return true;
149 }
150
151 bool StreamProcessorManager::init()
152 {
153     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
154     m_isoManager = new IsoHandlerManager(m_thread_realtime, m_thread_priority + 1);
155     if(!m_isoManager) {
156         debugFatal("Could not create IsoHandlerManager\n");
157         return false;
158     }
159     m_isoManager->setVerboseLevel(getDebugLevel());
160     if(!m_isoManager->init()) {
161         debugFatal("Could not initialize IsoHandlerManager\n");
162         return false;
163     }
164
165     m_xrun_happened=false;
166     return true;
167 }
168
169 bool StreamProcessorManager::prepare() {
170
171     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
172
173     m_is_slave=false;
174     if(!getOption("slaveMode", m_is_slave)) {
175         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
176     }
177
178     // if no sync source is set, select one here
179     if(m_SyncSource == NULL) {
180        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
181     }
182
183     // FIXME: put into separate method
184     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
185           it != m_ReceiveProcessors.end();
186           ++it )
187     {
188         if(m_SyncSource == NULL) {
189             debugWarning(" => Sync Source is %p.\n", *it);
190             m_SyncSource = *it;
191         }
192     }
193     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
194           it != m_TransmitProcessors.end();
195           ++it )
196     {
197         if(m_SyncSource == NULL) {
198             debugWarning(" => Sync Source is %p.\n", *it);
199             m_SyncSource = *it;
200         }
201     }
202
203     // now do the actual preparation of the SP's
204     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
205     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
206         it != m_ReceiveProcessors.end();
207         ++it ) {
208
209         if(!(*it)->setOption("slaveMode", m_is_slave)) {
210             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
211         }
212
213         if(!(*it)->prepare()) {
214             debugFatal(  " could not prepare (%p)...\n",(*it));
215             return false;
216         }
217     }
218     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
219     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
220         it != m_TransmitProcessors.end();
221         ++it ) {
222         if(!(*it)->setOption("slaveMode", m_is_slave)) {
223             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
224         }
225         if(!(*it)->prepare()) {
226             debugFatal( " could not prepare (%p)...\n",(*it));
227             return false;
228         }
229     }
230
231     // if there are no stream processors registered,
232     // fail
233     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
234         debugFatal("No stream processors registered, can't do anything usefull\n");
235         return false;
236     }
237     return true;
238 }
239
240 bool StreamProcessorManager::startDryRunning() {
241     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for StreamProcessor streams to start dry-running...\n");
242     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
243             it != m_ReceiveProcessors.end();
244             ++it ) {
245         if(!(*it)->startDryRunning(-1)) {
246             debugError("Could not put SP %p into the dry-running state\n", *it);
247             return false;
248         }
249     }
250     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
251             it != m_TransmitProcessors.end();
252             ++it ) {
253         if(!(*it)->startDryRunning(-1)) {
254             debugError("Could not put SP %p into the dry-running state\n", *it);
255             return false;
256         }
257     }
258     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
259     return true;
260 }
261
262 bool StreamProcessorManager::syncStartAll() {
263     // figure out when to get the SP's running.
264     // the xmit SP's should also know the base timestamp
265     // streams should be aligned here
266
267     // now find out how long we have to delay the wait operation such that
268     // the received frames will all be presented to the SP
269     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
270     int max_of_min_delay=0;
271     int min_delay=0;
272     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
273             it != m_ReceiveProcessors.end();
274             ++it ) {
275         min_delay=(*it)->getMaxFrameLatency();
276         if(min_delay>max_of_min_delay) max_of_min_delay=min_delay;
277     }
278
279     // add some processing margin. This only shifts the time
280     // at which the buffer is transfer()'ed. This makes things somewhat
281     // more robust. It should be noted though that shifting the transfer
282     // time to a later time instant also causes the xmit buffer fill to be
283     // lower on average.
284     max_of_min_delay += FFADO_SIGNAL_DELAY_TICKS;
285     debugOutput( DEBUG_LEVEL_VERBOSE, "  %d ticks (%03us %04uc %04ut)...\n",
286         max_of_min_delay,
287         (unsigned int)TICKS_TO_SECS(max_of_min_delay),
288         (unsigned int)TICKS_TO_CYCLES(max_of_min_delay),
289         (unsigned int)TICKS_TO_OFFSET(max_of_min_delay));
290     m_SyncSource->setSyncDelay(max_of_min_delay);
291
292     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
293     //        have aquired lock
294     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
295     //sleep(2); // FIXME: be smarter here
296
297     // wait for some sort of sync
298     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
299     // in order to obtain that, we wait for the first periods to be received.
300     int nb_sync_runs=20;
301     int64_t time_till_next_period;
302     while(nb_sync_runs--) { // or while not sync-ed?
303         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
304         debugOutput( DEBUG_LEVEL_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
305         if(time_till_next_period > 0) {
306             // wait for the period
307             usleep(time_till_next_period);
308         }
309     }
310
311     // figure out where we are now
312     uint64_t time_of_transfer = m_SyncSource->getTimeAtPeriod();
313     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
314         time_of_transfer,
315         (unsigned int)TICKS_TO_SECS(time_of_transfer),
316         (unsigned int)TICKS_TO_CYCLES(time_of_transfer),
317         (unsigned int)TICKS_TO_OFFSET(time_of_transfer));
318
319     // start wet-running in 200 cycles
320     // this is the timeframe in which the remaining code should be ready
321     time_of_transfer = addTicks(time_of_transfer, 200*TICKS_PER_CYCLE);
322
323     debugOutput( DEBUG_LEVEL_VERBOSE, "  => start at TS=%011llu (%03us %04uc %04ut)...\n",
324         time_of_transfer,
325         (unsigned int)TICKS_TO_SECS(time_of_transfer),
326         (unsigned int)TICKS_TO_CYCLES(time_of_transfer),
327         (unsigned int)TICKS_TO_OFFSET(time_of_transfer));
328     // we now should have decent sync info
329     // the buffers of the receive streams should be (approx) empty
330     // the buffers of the xmit streams should be full
331    
332     // at this point the buffer head timestamp of the transmit buffers can be
333     // set properly since we know the sync source's timestamp of the last
334     // buffer transfer. we also know the rate.
335    
336     debugOutput( DEBUG_LEVEL_VERBOSE, " propagate sync info...\n");
337     // FIXME: in the SPM it would be nice to have system time instead of
338     //        1394 time
339 //     float rate=m_SyncSource->getTicksPerFrame();
340 //     int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers*m_period))*rate);
341 //     // the data at the front of the buffer is intended to be transmitted
342 //     // nb_periods*period_size after the last received period
343 //     int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
344
345 //     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
346 //             it != m_TransmitProcessors.end();
347 //             ++it ) {
348 //         // FIXME: encapsulate
349 //         (*it)->m_data_buffer->setBufferHeadTimestamp(m_time_of_transfer);
350 //         //(*it)->m_data_buffer->setNominalRate(rate); //CHECK!!!
351 //     }
352    
353     dumpInfo();
354
355     // STEP X: switch SP's over to the running state
356     uint64_t time_to_start = addTicks(time_of_transfer,
357                                       m_SyncSource->getTicksPerFrame() * getPeriodSize());
358     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
359             it != m_ReceiveProcessors.end();
360             ++it ) {
361         if(!(*it)->startRunning(time_to_start)) {
362             debugError("Could not put SP %p into the running state\n", *it);
363             return false;
364         }
365     }
366     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
367             it != m_TransmitProcessors.end();
368             ++it ) {
369         if(!(*it)->startRunning(time_to_start)) {
370             debugError("Could not put SP %p into the running state\n", *it);
371             return false;
372         }
373     }
374     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
375     return true;
376 }
377
378 bool StreamProcessorManager::start() {
379     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
380     assert(m_isoManager);
381
382     debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
383     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
384     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
385           it != m_ReceiveProcessors.end();
386           ++it )
387     {
388         if (!m_isoManager->registerStream(*it)) {
389             debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
390             return false;
391         }
392     }
393     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
394     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
395           it != m_TransmitProcessors.end();
396           ++it )
397     {
398         if (!m_isoManager->registerStream(*it)) {
399             debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
400             return false;
401         }
402     }
403
404     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
405     if (!m_isoManager->prepare()) {
406         debugFatal("Could not prepare isoManager\n");
407         return false;
408     }
409
410     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n");
411     if (!m_isoManager->startHandlers(-1)) {
412         debugFatal("Could not start handlers...\n");
413         return false;
414     }
415
416     // put all SP's into dry-running state
417     if (!startDryRunning()) {
418         debugFatal("Could not put SP's in dry-running state\n");
419         return false;
420     }
421
422     // start all SP's synchonized
423     if (!syncStartAll()) {
424         debugFatal("Could not syncStartAll...\n");
425         return false;
426     }
427
428     // dump the iso stream information when in verbose mode
429     if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
430         m_isoManager->dumpInfo();
431     }
432
433     return true;
434 }
435
436 bool StreamProcessorManager::stop() {
437     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
438     assert(m_isoManager);
439
440     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to prepare to stop...\n");
441     // Most stream processors can just stop without special treatment.  However, some
442     // (like the MOTU) need to do a few things before it's safe to turn off the iso
443     // handling.
444     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
445           it != m_ReceiveProcessors.end();
446           ++it ) {
447         if(!(*it)->stop()) {
448             debugError("Could not stop SP %p", (*it));
449         }
450     }
451     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
452           it != m_TransmitProcessors.end();
453           ++it ) {
454         if(!(*it)->stop()) {
455             debugError("Could not stop SP %p", (*it));
456         }
457     }
458
459     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n");
460     if(!m_isoManager->stopHandlers()) {
461        debugFatal("Could not stop ISO handlers\n");
462        return false;
463     }
464
465     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processors from handlers...\n");
466     // now unregister all streams from iso manager
467     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
468     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
469           it != m_ReceiveProcessors.end();
470           ++it ) {
471         if (!m_isoManager->unregisterStream(*it)) {
472             debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it);
473             return false;
474         }
475     }
476     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
477     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
478           it != m_TransmitProcessors.end();
479           ++it ) {
480         if (!m_isoManager->unregisterStream(*it)) {
481             debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it);
482             return false;
483         }
484     }
485
486     return true;
487 }
488
489 /**
490  * Called upon Xrun events. This brings all StreamProcessors back
491  * into their starting state, and then carries on streaming. This should
492  * have the same effect as restarting the whole thing.
493  *
494  * @return true if successful, false otherwise
495  */
496 bool StreamProcessorManager::handleXrun() {
497
498     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
499
500     dumpInfo();
501
502     /*
503      * Reset means:
504      * 1) Disabling the SP's, so that they don't process any packets
505      *    note: the isomanager does keep on delivering/requesting them
506      * 2) Bringing all buffers & streamprocessors into a know state
507      *    - Clear all capture buffers
508      *    - Put nb_periods*period_size of null frames into the playback buffers
509      * 3) Re-enable the SP's
510      */
511
512     // put all SP's back into dry-running state
513     if (!startDryRunning()) {
514         debugFatal("Could not put SP's in dry-running state\n");
515         return false;
516     }
517
518     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
519     // start all SP's synchonized
520     if (!syncStartAll()) {
521         debugFatal("Could not syncStartAll...\n");
522         return false;
523     }
524
525     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
526
527     return true;
528 }
529
530 /**
531  * @brief Waits until the next period of samples is ready
532  *
533  * This function does not return until a full period of samples is (or should be)
534  * ready to be transferred.
535  *
536  * @return true if the period is ready, false if an xrun occurred
537  */
538 bool StreamProcessorManager::waitForPeriod() {
539     int time_till_next_period;
540     bool xrun_occurred=false;
541
542     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
543
544     assert(m_SyncSource);
545
546     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
547
548     while(time_till_next_period > 0) {
549         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
550
551         // wait for the period
552         usleep(time_till_next_period);
553
554         // check for underruns on the ISO side,
555         // those should make us bail out of the wait loop
556         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
557             it != m_ReceiveProcessors.end();
558             ++it ) {
559             // a xrun has occurred on the Iso side
560             xrun_occurred |= (*it)->xrunOccurred();
561         }
562         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
563             it != m_TransmitProcessors.end();
564             ++it ) {
565             // a xrun has occurred on the Iso side
566             xrun_occurred |= (*it)->xrunOccurred();
567         }
568
569         if(xrun_occurred) break;
570
571         // check if we were waked up too soon
572         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
573     }
574
575     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", time_till_next_period);
576
577     // this is to notify the client of the delay
578     // that we introduced
579     m_delayed_usecs = time_till_next_period;
580
581     // we save the 'ideal' time of the transfer at this point,
582     // because we can have interleaved read - process - write
583     // cycles making that we modify a receiving stream's buffer
584     // before we get to writing.
585     // NOTE: before waitForPeriod() is called again, both the transmit
586     //       and the receive processors should have done their transfer.
587     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
588     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
589         m_time_of_transfer);
590
591 #ifdef DEBUG
592     int rcv_bf=0, xmt_bf=0;
593     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
594         it != m_ReceiveProcessors.end();
595         ++it ) {
596         rcv_bf = (*it)->getBufferFill();
597     }
598     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
599         it != m_TransmitProcessors.end();
600         ++it ) {
601         xmt_bf = (*it)->getBufferFill();
602     }
603     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
604         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
605
606 #endif
607
608     xrun_occurred=false;
609
610     // check if xruns occurred on the Iso side.
611     // also check if xruns will occur should we transfer() now
612
613     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
614           it != m_ReceiveProcessors.end();
615           ++it ) {
616         // a xrun has occurred on the Iso side
617         xrun_occurred |= (*it)->xrunOccurred();
618
619         // if this is true, a xrun will occur
620         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
621
622 #ifdef DEBUG
623         if ((*it)->xrunOccurred()) {
624             debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it);
625             (*it)->dumpInfo();
626         }
627         if (!((*it)->canClientTransferFrames(m_period))) {
628             debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it);
629             (*it)->dumpInfo();
630         }
631 #endif
632
633     }
634     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
635           it != m_TransmitProcessors.end();
636           ++it ) {
637         // a xrun has occurred on the Iso side
638         xrun_occurred |= (*it)->xrunOccurred();
639
640         // if this is true, a xrun will occur
641         xrun_occurred |= !((*it)->canClientTransferFrames(m_period));
642
643 #ifdef DEBUG
644         if ((*it)->xrunOccurred()) {
645             debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it);
646         }
647         if (!((*it)->canClientTransferFrames(m_period))) {
648             debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it);
649         }
650 #endif
651     }
652
653     m_nbperiods++;
654
655     // now we can signal the client that we are (should be) ready
656     return !xrun_occurred;
657 }
658
659 /**
660  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
661  *
662  * Transfers one period of frames from the client side to the Iso side and vice versa.
663  *
664  * @return true if successful, false otherwise (indicates xrun).
665  */
666 bool StreamProcessorManager::transfer() {
667
668     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
669     bool retval=true;
670     retval &= transfer(StreamProcessor::ePT_Receive);
671     retval &= transfer(StreamProcessor::ePT_Transmit);
672     return retval;
673 }
674
675 /**
676  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
677  *
678  * Transfers one period of frames from the client side to the Iso side or vice versa.
679  *
680  * @param t The processor type to tranfer for (receive or transmit)
681  * @return true if successful, false otherwise (indicates xrun).
682  */
683
684 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
685     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period for type (%d)...\n", t);
686     bool retval = true;
687     // a static cast could make sure that there is no performance
688     // penalty for the virtual functions (to be checked)
689     if (t==StreamProcessor::ePT_Receive) {
690         // determine the time at which we want reception to start
691         float rate=m_SyncSource->getTicksPerFrame();
692         int64_t one_frame_in_ticks=(int64_t)(((float)m_period)*rate);
693        
694         int64_t receive_timestamp = substractTicks(m_time_of_transfer, one_frame_in_ticks);
695        
696         if(receive_timestamp<0) {
697             debugWarning("receive ts < 0.0 : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
698              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
699         }
700         if(receive_timestamp>(128L*TICKS_PER_SECOND)) {
701             debugWarning("receive ts > 128L*TICKS_PER_SECOND : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
702              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
703         }
704        
705         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
706                 it != m_ReceiveProcessors.end();
707                 ++it ) {
708             if(!(*it)->getFrames(m_period, receive_timestamp)) {
709                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
710                             m_period, m_time_of_transfer,*it);
711                 retval &= false; // buffer underrun
712             }
713         }
714     } else {
715         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
716                 it != m_TransmitProcessors.end();
717                 ++it ) {
718             // FIXME: in the SPM it would be nice to have system time instead of
719             //        1394 time
720             float rate=m_SyncSource->getTicksPerFrame();
721             int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers*m_period))*rate);
722
723             // the data we are putting into the buffer is intended to be transmitted
724             // one ringbuffer size after it has been received
725             int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
726
727             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
728                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
729                         m_period, transmit_timestamp, *it);
730                 retval &= false; // buffer underrun
731             }
732
733         }
734     }
735     return retval;
736 }
737
738 /**
739  * @brief Dry run one period for both receive and transmit StreamProcessors
740  *
741  * Process one period of frames for all streamprocessors, without touching the
742  * client buffers. This only removes an incoming period from the ISO receive buffer and
743  * puts one period of silence into the transmit buffers.
744  *
745  * @return true if successful, false otherwise (indicates xrun).
746  */
747 bool StreamProcessorManager::dryRun() {
748     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Dry-running period...\n");
749     bool retval=true;
750     retval &= dryRun(StreamProcessor::ePT_Receive);
751     retval &= dryRun(StreamProcessor::ePT_Transmit);
752     return retval;
753 }
754
755 /**
756  * @brief Dry run one period for either the receive or transmit StreamProcessors
757  *
758  * see dryRun()
759  *
760  * @param t The processor type to dryRun for (receive or transmit)
761  * @return true if successful, false otherwise (indicates xrun).
762  */
763
764 bool StreamProcessorManager::dryRun(enum StreamProcessor::eProcessorType t) {
765     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Dry-running period...\n");
766     bool retval = true;
767     // a static cast could make sure that there is no performance
768     // penalty for the virtual functions (to be checked)
769     if (t==StreamProcessor::ePT_Receive) {
770         // determine the time at which we want reception to start
771         float rate=m_SyncSource->getTicksPerFrame();
772         int64_t one_frame_in_ticks=(int64_t)(((float)m_period)*rate);
773        
774         int64_t receive_timestamp = substractTicks(m_time_of_transfer, one_frame_in_ticks);
775        
776         if(receive_timestamp<0) {
777             debugWarning("receive ts < 0.0 : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
778              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
779         }
780         if(receive_timestamp>(128L*TICKS_PER_SECOND)) {
781             debugWarning("receive ts > 128L*TICKS_PER_SECOND : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n",
782              receive_timestamp, m_time_of_transfer, one_frame_in_ticks);
783         }
784        
785         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
786                 it != m_ReceiveProcessors.end();
787                 ++it ) {
788
789             if(!(*it)->getFramesDry(m_period, receive_timestamp)) {
790                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)\n",
791                             m_period, m_time_of_transfer,*it);
792                 retval &= false; // buffer underrun
793             }
794
795         }
796     } else {
797         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
798                 it != m_TransmitProcessors.end();
799                 ++it ) {
800             // FIXME: in the SPM it would be nice to have system time instead of
801             //        1394 time
802             float rate=m_SyncSource->getTicksPerFrame();
803             int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers*m_period))*rate);
804
805             // the data we are putting into the buffer is intended to be transmitted
806             // one ringbuffer size after it has been received
807             int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
808
809             if(!(*it)->putFramesDry(m_period, transmit_timestamp)) {
810                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)\n",
811                         m_period, transmit_timestamp, *it);
812                 retval &= false; // buffer underrun
813             }
814
815         }
816     }
817     return retval;
818 }
819
820 void StreamProcessorManager::dumpInfo() {
821     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
822     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
823     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
824
825     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
826     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
827         it != m_ReceiveProcessors.end();
828         ++it ) {
829         (*it)->dumpInfo();
830     }
831
832     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
833     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
834         it != m_TransmitProcessors.end();
835         ++it ) {
836         (*it)->dumpInfo();
837     }
838
839     debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
840     m_isoManager->dumpInfo();
841     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
842
843 }
844
845 void StreamProcessorManager::setVerboseLevel(int l) {
846     setDebugLevel(l);
847
848     if (m_isoManager) m_isoManager->setVerboseLevel(l);
849
850     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
851     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
852         it != m_ReceiveProcessors.end();
853         ++it ) {
854         (*it)->setVerboseLevel(l);
855     }
856
857     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
858     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
859         it != m_TransmitProcessors.end();
860         ++it ) {
861         (*it)->setVerboseLevel(l);
862     }
863 }
864
865
866 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
867     int count=0;
868
869     if (direction == Port::E_Capture) {
870         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
871             it != m_ReceiveProcessors.end();
872             ++it ) {
873             count += (*it)->getPortCount(type);
874         }
875     } else {
876         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
877             it != m_TransmitProcessors.end();
878             ++it ) {
879             count += (*it)->getPortCount(type);
880         }
881     }
882     return count;
883 }
884
885 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
886     int count=0;
887
888     if (direction == Port::E_Capture) {
889         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
890             it != m_ReceiveProcessors.end();
891             ++it ) {
892             count += (*it)->getPortCount();
893         }
894     } else {
895         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
896             it != m_TransmitProcessors.end();
897             ++it ) {
898             count += (*it)->getPortCount();
899         }
900     }
901     return count;
902 }
903
904 // TODO: implement a port map here, instead of the loop
905
906 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
907     int count=0;
908     int prevcount=0;
909
910     if (direction == Port::E_Capture) {
911         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
912             it != m_ReceiveProcessors.end();
913             ++it ) {
914             count += (*it)->getPortCount();
915             if (count > idx) {
916                 return (*it)->getPortAtIdx(idx-prevcount);
917             }
918             prevcount=count;
919         }
920     } else {
921         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
922             it != m_TransmitProcessors.end();
923             ++it ) {
924             count += (*it)->getPortCount();
925             if (count > idx) {
926                 return (*it)->getPortAtIdx(idx-prevcount);
927             }
928             prevcount=count;
929         }
930     }
931     return NULL;
932 }
933
934 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
935     m_thread_realtime=rt;
936     m_thread_priority=priority;
937     return true;
938 }
939
940
941 } // end of namespace
Note: See TracBrowser for help on using the browser.