root/trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

Revision 795, 40.5 kB (checked in by ppalmers, 13 years ago)

small bugfixes + debug print changes

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "StreamProcessorManager.h"
25 #include "generic/StreamProcessor.h"
26 #include "generic/Port.h"
27 #include "libieee1394/cycletimer.h"
28
29 #include "libutil/Time.h"
30
31 #include <errno.h>
32 #include <assert.h>
33 #include <math.h>
34
35 #define RUNNING_TIMEOUT_MSEC 4000
36 #define PREPARE_TIMEOUT_MSEC 4000
37 #define ENABLE_TIMEOUT_MSEC 4000
38
39 // allows to add some processing margin. This shifts the time
40 // at which the buffer is transfer()'ed, making things somewhat
41 // more robust. It should be noted though that shifting the transfer
42 // time to a later time instant also causes the xmit buffer fill to be
43 // lower on average.
44 #define FFADO_SIGNAL_DELAY_TICKS (3072*1)
45
46 namespace Streaming {
47
48 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
49
50 StreamProcessorManager::StreamProcessorManager()
51     : m_is_slave( false )
52     , m_SyncSource(NULL)
53     , m_nb_buffers( 0 )
54     , m_period( 0 )
55     , m_nominal_framerate ( 0 )
56     , m_xrun_happened( false )
57     , m_xruns(0)
58     , m_nbperiods(0)
59 {
60     addOption(Util::OptionContainer::Option("slaveMode",false));
61 }
62
63 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int framerate, unsigned int nb_buffers)
64     : m_is_slave( false )
65     , m_SyncSource(NULL)
66     , m_nb_buffers(nb_buffers)
67     , m_period(period)
68     , m_nominal_framerate ( framerate )
69     , m_xruns(0)
70     , m_xrun_happened( false )
71     , m_nbperiods(0)
72 {
73     addOption(Util::OptionContainer::Option("slaveMode",false));
74 }
75
76 StreamProcessorManager::~StreamProcessorManager() {
77 }
78
79 /**
80  * Registers \ref processor with this manager.
81  *
82  * also registers it with the isohandlermanager
83  *
84  * be sure to call isohandlermanager->init() first!
85  * and be sure that the processors are also ->init()'ed
86  *
87  * @param processor
88  * @return true if successfull
89  */
90 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
91 {
92     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
93     assert(processor);
94     if (processor->getType() == StreamProcessor::ePT_Receive) {
95         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
96         m_ReceiveProcessors.push_back(processor);
97         return true;
98     }
99
100     if (processor->getType() == StreamProcessor::ePT_Transmit) {
101         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
102         m_TransmitProcessors.push_back(processor);
103         return true;
104     }
105
106     debugFatal("Unsupported processor type!\n");
107     return false;
108 }
109
110 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
111 {
112     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
113     assert(processor);
114
115     if (processor->getType()==StreamProcessor::ePT_Receive) {
116
117         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
118               it != m_ReceiveProcessors.end();
119               ++it )
120         {
121             if ( *it == processor ) {
122                 if (*it == m_SyncSource) {
123                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source");
124                     m_SyncSource = NULL;
125                 }
126                 m_ReceiveProcessors.erase(it);
127                 return true;
128             }
129         }
130     }
131
132     if (processor->getType()==StreamProcessor::ePT_Transmit) {
133         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
134               it != m_TransmitProcessors.end();
135               ++it )
136         {
137             if ( *it == processor ) {
138                 if (*it == m_SyncSource) {
139                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source");
140                     m_SyncSource = NULL;
141                 }
142                 m_TransmitProcessors.erase(it);
143                 return true;
144             }
145         }
146     }
147
148     debugFatal("Processor (%p) not found!\n",processor);
149     return false; //not found
150 }
151
152 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
153     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
154     m_SyncSource=s;
155     return true;
156 }
157
158 bool StreamProcessorManager::prepare() {
159
160     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
161
162     m_is_slave=false;
163     if(!getOption("slaveMode", m_is_slave)) {
164         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
165     }
166
167     // if no sync source is set, select one here
168     if(m_SyncSource == NULL) {
169        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
170     }
171
172     // FIXME: put into separate method
173     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
174           it != m_ReceiveProcessors.end();
175           ++it )
176     {
177         if(m_SyncSource == NULL) {
178             debugWarning(" => Sync Source is %p.\n", *it);
179             m_SyncSource = *it;
180         }
181     }
182     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
183           it != m_TransmitProcessors.end();
184           ++it )
185     {
186         if(m_SyncSource == NULL) {
187             debugWarning(" => Sync Source is %p.\n", *it);
188             m_SyncSource = *it;
189         }
190     }
191
192     // now do the actual preparation of the SP's
193     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
194     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
195         it != m_ReceiveProcessors.end();
196         ++it ) {
197
198         if(!(*it)->setOption("slaveMode", m_is_slave)) {
199             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
200         }
201
202         if(!(*it)->prepare()) {
203             debugFatal(  " could not prepare (%p)...\n",(*it));
204             return false;
205         }
206     }
207     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
208     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
209         it != m_TransmitProcessors.end();
210         ++it ) {
211         if(!(*it)->setOption("slaveMode", m_is_slave)) {
212             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
213         }
214         if(!(*it)->prepare()) {
215             debugFatal( " could not prepare (%p)...\n",(*it));
216             return false;
217         }
218     }
219
220     // if there are no stream processors registered,
221     // fail
222     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
223         debugFatal("No stream processors registered, can't do anything usefull\n");
224         return false;
225     }
226     return true;
227 }
228
229 bool StreamProcessorManager::startDryRunning() {
230     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
231     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
232     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
233             it != m_ReceiveProcessors.end();
234             ++it ) {
235         if (!(*it)->isDryRunning()) {
236             if(!(*it)->scheduleStartDryRunning(-1)) {
237                 debugError("Could not put SP %p into the dry-running state\n", *it);
238                 return false;
239             }
240         } else {
241             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
242         }
243     }
244     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
245             it != m_TransmitProcessors.end();
246             ++it ) {
247         if (!(*it)->isDryRunning()) {
248             if(!(*it)->scheduleStartDryRunning(-1)) {
249                 debugError("Could not put SP %p into the dry-running state\n", *it);
250                 return false;
251             }
252         } else {
253             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
254         }
255     }
256     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
257     // wait for the syncsource to start running.
258     // that will block the waitForPeriod call until everyone has started (theoretically)
259     #define CYCLES_FOR_DRYRUN 40000
260     int cnt = CYCLES_FOR_DRYRUN; // by then it should have started
261     bool all_dry_running = false;
262     while (!all_dry_running && cnt) {
263         all_dry_running = true;
264         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
265                 it != m_ReceiveProcessors.end();
266                 ++it ) {
267             all_dry_running &= (*it)->isDryRunning();
268         }
269         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
270                 it != m_TransmitProcessors.end();
271                 ++it ) {
272             all_dry_running &= (*it)->isDryRunning();
273         }
274
275         SleepRelativeUsec(125);
276         cnt--;
277     }
278     if(cnt==0) {
279         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
280         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
281                 it != m_ReceiveProcessors.end();
282                 ++it ) {
283             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
284                 (*it)->getTypeString(), *it, (*it)->getStateString());
285         }
286         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
287                 it != m_TransmitProcessors.end();
288                 ++it ) {
289             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
290                 (*it)->getTypeString(), *it, (*it)->getStateString());
291         }
292         return false;
293     }
294     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
295     return true;
296 }
297
298 bool StreamProcessorManager::syncStartAll() {
299     if(m_SyncSource == NULL) return false;
300     // figure out when to get the SP's running.
301     // the xmit SP's should also know the base timestamp
302     // streams should be aligned here
303
304     // now find out how long we have to delay the wait operation such that
305     // the received frames will all be presented to the SP
306     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
307     int max_of_min_delay = 0;
308     int min_delay = 0;
309     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
310             it != m_ReceiveProcessors.end();
311             ++it ) {
312         min_delay = (*it)->getMaxFrameLatency();
313         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
314     }
315
316     // add some processing margin. This only shifts the time
317     // at which the buffer is transfer()'ed. This makes things somewhat
318     // more robust. It should be noted though that shifting the transfer
319     // time to a later time instant also causes the xmit buffer fill to be
320     // lower on average.
321     max_of_min_delay += FFADO_SIGNAL_DELAY_TICKS;
322     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n",
323         max_of_min_delay,
324         (unsigned int)TICKS_TO_SECS(max_of_min_delay),
325         (unsigned int)TICKS_TO_CYCLES(max_of_min_delay),
326         (unsigned int)TICKS_TO_OFFSET(max_of_min_delay));
327     m_SyncSource->setSyncDelay(max_of_min_delay);
328
329     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
330     //        have aquired lock
331     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
332     //sleep(2); // FIXME: be smarter here
333
334     // make sure that we are dry-running long enough for the
335     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
336     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
337     int nb_sync_runs=20;
338     int64_t time_till_next_period;
339     while(nb_sync_runs--) { // or while not sync-ed?
340         // check if we were woken up too soon
341         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
342         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
343         if(time_till_next_period > 0) {
344             // wait for the period
345             SleepRelativeUsec(time_till_next_period);
346         }
347     }
348
349     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
350     // FIXME: in the SPM it would be nice to have system time instead of
351     //        1394 time
352
353     // we now should have decent sync info on the sync source
354     // determine a point in time where the system should start
355     // figure out where we are now
356     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
357     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
358         time_of_first_sample,
359         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
360         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
361         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
362
363     #define CYCLES_FOR_STARTUP 2000
364     // start wet-running in CYCLES_FOR_STARTUP cycles
365     // this is the time window we have to setup all SP's such that they
366     // can start wet-running correctly.
367     time_of_first_sample = addTicks(time_of_first_sample,
368                                     CYCLES_FOR_STARTUP * TICKS_PER_CYCLE);
369
370     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
371         time_of_first_sample,
372         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
373         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
374         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
375
376     // we should start wet-running the transmit SP's some cycles in advance
377     // such that we know it is wet-running when it should output its first sample
378     #define PRESTART_CYCLES_FOR_XMIT 20
379     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
380                                                  PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE);
381
382     #define PRESTART_CYCLES_FOR_RECV 0
383     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
384                                                  PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE);
385     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
386         time_to_start_xmit,
387         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
388         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
389         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
390     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
391         time_to_start_recv,
392         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
393         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
394         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
395
396     // at this point the buffer head timestamp of the transmit buffers can be set
397     // this is the presentation time of the first sample in the buffer
398     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
399           it != m_TransmitProcessors.end();
400           ++it ) {
401         (*it)->setBufferHeadTimestamp(time_of_first_sample);
402     }
403
404     // STEP X: switch SP's over to the running state
405     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
406           it != m_ReceiveProcessors.end();
407           ++it ) {
408         if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
409             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
410             return false;
411         }
412     }
413     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
414           it != m_TransmitProcessors.end();
415           ++it ) {
416         if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
417             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
418             return false;
419         }
420     }
421     // wait for the syncsource to start running.
422     // that will block the waitForPeriod call until everyone has started (theoretically)
423     int cnt = CYCLES_FOR_STARTUP * 20; // by then it should have started
424     while (!m_SyncSource->isRunning() && cnt) {
425         SleepRelativeUsec(125);
426         cnt--;
427     }
428     if(cnt==0) {
429         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
430         return false;
431     }
432
433     // now align the received streams
434     if(!alignReceivedStreams()) {
435         debugError("Could not align streams\n");
436         return false;
437     }
438     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
439     return true;
440 }
441
442 bool
443 StreamProcessorManager::alignReceivedStreams()
444 {
445     if(m_SyncSource == NULL) return false;
446     #define ALIGN_AVERAGE_TIME_MSEC 200
447     #define NB_ALIGN_TRIES 40
448     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
449     unsigned int nb_sync_runs;
450     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
451     int64_t diff_between_streams[nb_rcv_sp];
452     int64_t diff;
453
454     unsigned int i;
455
456     unsigned int periods_per_align_try = (ALIGN_AVERAGE_TIME_MSEC * getNominalRate());
457     periods_per_align_try /= 1000;
458     periods_per_align_try /= getPeriodSize();
459     debugOutput( DEBUG_LEVEL_VERBOSE, " averaging over %u periods...\n", periods_per_align_try);
460
461     bool aligned = false;
462     int cnt = NB_ALIGN_TRIES;
463     while (!aligned && cnt--) {
464         nb_sync_runs = periods_per_align_try;
465         while(nb_sync_runs) {
466             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
467             if(!waitForPeriod()) {
468                 debugWarning("xrun while aligning streams...\n");
469                 return false;
470             };
471
472             i = 0;
473             for ( i = 0; i < nb_rcv_sp; i++) {
474                 StreamProcessor *s = m_ReceiveProcessors.at(i);
475                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
476                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",
477                     m_SyncSource, s, diff);
478                 if ( nb_sync_runs == periods_per_align_try ) {
479                     diff_between_streams[i] = diff;
480                 } else {
481                     diff_between_streams[i] += diff;
482                 }
483             }
484             if(!transferSilence()) {
485                 debugError("Could not transfer silence\n");
486                 return false;
487             }
488             nb_sync_runs--;
489         }
490         // calculate the average offsets
491         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
492         int diff_between_streams_frames[nb_rcv_sp];
493         aligned = true;
494         for ( i = 0; i < nb_rcv_sp; i++) {
495             StreamProcessor *s = m_ReceiveProcessors.at(i);
496
497             diff_between_streams[i] /= periods_per_align_try;
498             diff_between_streams_frames[i] = (int)roundf(diff_between_streams[i] / s->getTicksPerFrame());
499             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",
500                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
501
502             aligned &= (diff_between_streams_frames[i] == 0);
503
504             // reposition the stream
505             if(!s->shiftStream(diff_between_streams_frames[i])) {
506                 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]);
507                 return false;
508             }
509         }
510         if (!aligned) {
511             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
512         }
513     }
514     if (cnt == 0) {
515         debugError("Align failed\n");
516         return false;
517     }
518     return true;
519 }
520
521 bool StreamProcessorManager::start() {
522     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
523
524     // put all SP's into dry-running state
525     if (!startDryRunning()) {
526         debugFatal("Could not put SP's in dry-running state\n");
527         return false;
528     }
529
530     // start all SP's synchonized
531     if (!syncStartAll()) {
532         debugFatal("Could not syncStartAll...\n");
533         return false;
534     }
535     return true;
536 }
537
538 bool StreamProcessorManager::stop() {
539     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
540
541     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
542     // switch SP's over to the dry-running state
543     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
544           it != m_ReceiveProcessors.end();
545           ++it ) {
546         if((*it)->isRunning()) {
547             if(!(*it)->scheduleStopRunning(-1)) {
548                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
549                 return false;
550             }
551         }
552     }
553     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
554           it != m_TransmitProcessors.end();
555           ++it ) {
556         if((*it)->isRunning()) {
557             if(!(*it)->scheduleStopRunning(-1)) {
558                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
559                 return false;
560             }
561         }
562     }
563     // wait for the SP's to get into the dry-running state
564     int cnt = 200;
565     bool ready = false;
566     while (!ready && cnt) {
567         ready = true;
568         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
569             it != m_ReceiveProcessors.end();
570             ++it ) {
571             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
572         }
573         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
574             it != m_TransmitProcessors.end();
575             ++it ) {
576             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
577         }
578         SleepRelativeUsec(125);
579         cnt--;
580     }
581     if(cnt==0) {
582         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
583         return false;
584     }
585
586     // switch SP's over to the stopped state
587     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
588           it != m_ReceiveProcessors.end();
589           ++it ) {
590         if(!(*it)->scheduleStopDryRunning(-1)) {
591             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
592             return false;
593         }
594     }
595     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
596           it != m_TransmitProcessors.end();
597           ++it ) {
598         if(!(*it)->scheduleStopDryRunning(-1)) {
599             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
600             return false;
601         }
602     }
603     // wait for the SP's to get into the running state
604     cnt = 200;
605     ready = false;
606     while (!ready && cnt) {
607         ready = true;
608         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
609             it != m_ReceiveProcessors.end();
610             ++it ) {
611             ready &= (*it)->isStopped();
612         }
613         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
614             it != m_TransmitProcessors.end();
615             ++it ) {
616             ready &= (*it)->isStopped();
617         }
618         SleepRelativeUsec(125);
619         cnt--;
620     }
621     if(cnt==0) {
622         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to stop\n");
623         return false;
624     }
625     return true;
626 }
627
628 /**
629  * Called upon Xrun events. This brings all StreamProcessors back
630  * into their starting state, and then carries on streaming. This should
631  * have the same effect as restarting the whole thing.
632  *
633  * @return true if successful, false otherwise
634  */
635 bool StreamProcessorManager::handleXrun() {
636
637     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
638
639     dumpInfo();
640
641     /*
642      * Reset means:
643      * 1) Disabling the SP's, so that they don't process any packets
644      *    note: the isomanager does keep on delivering/requesting them
645      * 2) Bringing all buffers & streamprocessors into a know state
646      *    - Clear all capture buffers
647      *    - Put nb_periods*period_size of null frames into the playback buffers
648      * 3) Re-enable the SP's
649      */
650
651     // put all SP's back into dry-running state
652     if (!startDryRunning()) {
653         debugFatal("Could not put SP's in dry-running state\n");
654         return false;
655     }
656
657     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
658     // start all SP's synchonized
659     if (!syncStartAll()) {
660         debugFatal("Could not syncStartAll...\n");
661         return false;
662     }
663
664     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
665
666     return true;
667 }
668
669 /**
670  * @brief Waits until the next period of samples is ready
671  *
672  * This function does not return until a full period of samples is (or should be)
673  * ready to be transferred.
674  *
675  * @return true if the period is ready, false if an xrun occurred
676  */
677 bool StreamProcessorManager::waitForPeriod() {
678     if(m_SyncSource == NULL) return false;
679     int time_till_next_period;
680     bool xrun_occurred = false;
681
682     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
683
684     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
685
686     while(time_till_next_period > 0) {
687         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
688
689         // wait for the period
690         SleepRelativeUsec(time_till_next_period);
691
692         // check for underruns on the ISO side,
693         // those should make us bail out of the wait loop
694         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
695             it != m_ReceiveProcessors.end();
696             ++it ) {
697             // a xrun has occurred on the Iso side
698             xrun_occurred |= (*it)->xrunOccurred();
699         }
700         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
701             it != m_TransmitProcessors.end();
702             ++it ) {
703             // a xrun has occurred on the Iso side
704             xrun_occurred |= (*it)->xrunOccurred();
705         }
706         if(xrun_occurred) break;
707
708         // check if we were waked up too soon
709         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
710     }
711
712     // we save the 'ideal' time of the transfer at this point,
713     // because we can have interleaved read - process - write
714     // cycles making that we modify a receiving stream's buffer
715     // before we get to writing.
716     // NOTE: before waitForPeriod() is called again, both the transmit
717     //       and the receive processors should have done their transfer.
718     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
719     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
720         m_time_of_transfer);
721
722     // normally we can transfer frames at this time, but in some cases this is not true
723     // e.g. when there are not enough frames in the receive buffer.
724     // however this doesn't have to be a problem, since we can wait some more until we
725     // have enough frames. There is only a problem once the ISO xmit doesn't have packets
726     // to transmit, or if the receive buffer overflows. These conditions are signaled by
727     // the iso threads
728     // check if xruns occurred on the Iso side.
729     // also check if xruns will occur should we transfer() now
730     #ifdef DEBUG
731     int waited = 0;
732     #endif
733     bool ready_for_transfer = false;
734     bool ready;
735     xrun_occurred = false;
736     while (!ready_for_transfer && !xrun_occurred) {
737         // FIXME: can deadlock when the iso handlers die (e.g. unplug the device)
738         ready_for_transfer = true;
739         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
740             it != m_ReceiveProcessors.end();
741             ++it ) {
742             ready = ((*it)->canClientTransferFrames(m_period));
743             ready_for_transfer &= ready;
744             xrun_occurred |= (*it)->xrunOccurred();
745         }
746         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
747             it != m_TransmitProcessors.end();
748             ++it ) {
749             ready = ((*it)->canClientTransferFrames(m_period));
750             //ready_for_transfer &= ready;
751             xrun_occurred |= (*it)->xrunOccurred();
752         }
753         if (!ready_for_transfer) {
754            
755             SleepRelativeUsec(125); // MAGIC: one cycle sleep...
756
757             // in order to avoid this in the future, we increase the sync delay of the sync source SP
758             int d = m_SyncSource->getSyncDelay() + TICKS_PER_CYCLE;
759             m_SyncSource->setSyncDelay(d);
760             d = m_SyncSource->getSyncDelay();
761             debugOutput(DEBUG_LEVEL_VERBOSE, "Increased the Sync delay to: %d ticks (%f frames, %f cy)\n",
762                                              d, ((float)d)/m_SyncSource->getTicksPerFrame(),
763                                              ((float)d)/((float)TICKS_PER_CYCLE));
764
765             #ifdef DEBUG
766             waited++;
767             #endif
768         }
769     } // we are either ready or an xrun occurred
770
771     // in order to avoid a runaway value of the sync delay, we gradually decrease
772     // it. It will be increased by a 'too early' event (cfr some lines higher)
773     // hence we'll be at a good point on average.
774     int d = m_SyncSource->getSyncDelay() - 1;
775     if (d >= 0) m_SyncSource->setSyncDelay(d);
776
777
778     #ifdef DEBUG
779     if(waited > 0) {
780         debugOutput(DEBUG_LEVEL_VERBOSE, "Waited %d x 125us due to SP not ready for transfer\n", waited);
781     }
782     #endif
783
784     // this is to notify the client of the delay that we introduced by waiting
785     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
786     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", m_delayed_usecs);
787
788 #ifdef DEBUG
789     int rcv_bf=0, xmt_bf=0;
790     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
791         it != m_ReceiveProcessors.end();
792         ++it ) {
793         rcv_bf = (*it)->getBufferFill();
794     }
795     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
796         it != m_TransmitProcessors.end();
797         ++it ) {
798         xmt_bf = (*it)->getBufferFill();
799     }
800     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
801         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
802
803     // check if xruns occurred on the Iso side.
804     // also check if xruns will occur should we transfer() now
805     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
806           it != m_ReceiveProcessors.end();
807           ++it ) {
808
809         if ((*it)->xrunOccurred()) {
810             debugWarning("Xrun on RECV SP %p due to ISO side xrun\n",*it);
811             (*it)->dumpInfo();
812         }
813         if (!((*it)->canClientTransferFrames(m_period))) {
814             debugWarning("Xrun on RECV SP %p due to buffer side xrun\n",*it);
815             (*it)->dumpInfo();
816         }
817     }
818     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
819           it != m_TransmitProcessors.end();
820           ++it ) {
821         if ((*it)->xrunOccurred()) {
822             debugWarning("Xrun on XMIT SP %p due to ISO side xrun\n",*it);
823         }
824         if (!((*it)->canClientTransferFrames(m_period))) {
825             debugWarning("Xrun on XMIT SP %p due to buffer side xrun\n",*it);
826         }
827     }
828 #endif
829
830     m_nbperiods++;
831     // now we can signal the client that we are (should be) ready
832     return !xrun_occurred;
833 }
834
835 /**
836  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
837  *
838  * Transfers one period of frames from the client side to the Iso side and vice versa.
839  *
840  * @return true if successful, false otherwise (indicates xrun).
841  */
842 bool StreamProcessorManager::transfer() {
843     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
844     bool retval=true;
845     retval &= transfer(StreamProcessor::ePT_Receive);
846     retval &= transfer(StreamProcessor::ePT_Transmit);
847     return retval;
848 }
849
850 /**
851  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
852  *
853  * Transfers one period of frames from the client side to the Iso side or vice versa.
854  *
855  * @param t The processor type to tranfer for (receive or transmit)
856  * @return true if successful, false otherwise (indicates xrun).
857  */
858 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
859     if(m_SyncSource == NULL) return false;
860     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
861         t, m_time_of_transfer,
862         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
863         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
864         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
865
866     bool retval = true;
867     // a static cast could make sure that there is no performance
868     // penalty for the virtual functions (to be checked)
869     if (t==StreamProcessor::ePT_Receive) {
870         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
871                 it != m_ReceiveProcessors.end();
872                 ++it ) {
873             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
874                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
875                             m_period, m_time_of_transfer,*it);
876                 retval &= false; // buffer underrun
877             }
878         }
879     } else {
880         // FIXME: in the SPM it would be nice to have system time instead of
881         //        1394 time
882         float rate = m_SyncSource->getTicksPerFrame();
883         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
884
885         // the data we are putting into the buffer is intended to be transmitted
886         // one ringbuffer size after it has been received
887         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
888
889         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
890                 it != m_TransmitProcessors.end();
891                 ++it ) {
892             // FIXME: in the SPM it would be nice to have system time instead of
893             //        1394 time
894             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
895                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
896                         m_period, transmit_timestamp, *it);
897                 retval &= false; // buffer underrun
898             }
899         }
900     }
901     return retval;
902 }
903
904 /**
905  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
906  *
907  * Transfers one period of silence to the Iso side for transmit SP's
908  * or dump one period of frames for receive SP's
909  *
910  * @return true if successful, false otherwise (indicates xrun).
911  */
912 bool StreamProcessorManager::transferSilence() {
913     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
914     bool retval=true;
915     retval &= transferSilence(StreamProcessor::ePT_Receive);
916     retval &= transferSilence(StreamProcessor::ePT_Transmit);
917     return retval;
918 }
919
920 /**
921  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
922  *
923  * Transfers one period of silence to the Iso side for transmit SP's
924  * or dump one period of frames for receive SP's
925  *
926  * @param t The processor type to tranfer for (receive or transmit)
927  * @return true if successful, false otherwise (indicates xrun).
928  */
929 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
930     if(m_SyncSource == NULL) return false;
931     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
932         t, m_time_of_transfer,
933         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
934         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
935         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
936
937     bool retval = true;
938     // a static cast could make sure that there is no performance
939     // penalty for the virtual functions (to be checked)
940     if (t==StreamProcessor::ePT_Receive) {
941         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
942                 it != m_ReceiveProcessors.end();
943                 ++it ) {
944             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
945                     debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n",
946                             m_period, m_time_of_transfer,*it);
947                 retval &= false; // buffer underrun
948             }
949         }
950     } else {
951         // FIXME: in the SPM it would be nice to have system time instead of
952         //        1394 time
953         float rate = m_SyncSource->getTicksPerFrame();
954         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
955
956         // the data we are putting into the buffer is intended to be transmitted
957         // one ringbuffer size after it has been received
958         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
959
960         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
961                 it != m_TransmitProcessors.end();
962                 ++it ) {
963             // FIXME: in the SPM it would be nice to have system time instead of
964             //        1394 time
965             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
966                 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
967                         m_period, transmit_timestamp, *it);
968                 retval &= false; // buffer underrun
969             }
970         }
971     }
972     return retval;
973 }
974
975 void StreamProcessorManager::dumpInfo() {
976     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
977     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
978     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
979
980     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
981     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
982         it != m_ReceiveProcessors.end();
983         ++it ) {
984         (*it)->dumpInfo();
985     }
986
987     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
988     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
989         it != m_TransmitProcessors.end();
990         ++it ) {
991         (*it)->dumpInfo();
992     }
993
994     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
995
996 }
997
998 void StreamProcessorManager::setVerboseLevel(int l) {
999     setDebugLevel(l);
1000
1001     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
1002     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1003         it != m_ReceiveProcessors.end();
1004         ++it ) {
1005         (*it)->setVerboseLevel(l);
1006     }
1007
1008     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
1009     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1010         it != m_TransmitProcessors.end();
1011         ++it ) {
1012         (*it)->setVerboseLevel(l);
1013     }
1014 }
1015
1016
1017 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1018     int count=0;
1019
1020     if (direction == Port::E_Capture) {
1021         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1022             it != m_ReceiveProcessors.end();
1023             ++it ) {
1024             count += (*it)->getPortCount(type);
1025         }
1026     } else {
1027         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1028             it != m_TransmitProcessors.end();
1029             ++it ) {
1030             count += (*it)->getPortCount(type);
1031         }
1032     }
1033     return count;
1034 }
1035
1036 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1037     int count=0;
1038
1039     if (direction == Port::E_Capture) {
1040         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1041             it != m_ReceiveProcessors.end();
1042             ++it ) {
1043             count += (*it)->getPortCount();
1044         }
1045     } else {
1046         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1047             it != m_TransmitProcessors.end();
1048             ++it ) {
1049             count += (*it)->getPortCount();
1050         }
1051     }
1052     return count;
1053 }
1054
1055 // TODO: implement a port map here, instead of the loop
1056
1057 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1058     int count=0;
1059     int prevcount=0;
1060
1061     if (direction == Port::E_Capture) {
1062         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1063             it != m_ReceiveProcessors.end();
1064             ++it ) {
1065             count += (*it)->getPortCount();
1066             if (count > idx) {
1067                 return (*it)->getPortAtIdx(idx-prevcount);
1068             }
1069             prevcount=count;
1070         }
1071     } else {
1072         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1073             it != m_TransmitProcessors.end();
1074             ++it ) {
1075             count += (*it)->getPortCount();
1076             if (count > idx) {
1077                 return (*it)->getPortAtIdx(idx-prevcount);
1078             }
1079             prevcount=count;
1080         }
1081     }
1082     return NULL;
1083 }
1084
1085 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1086     m_thread_realtime=rt;
1087     m_thread_priority=priority;
1088     return true;
1089 }
1090
1091
1092 } // end of namespace
Note: See TracBrowser for help on using the browser.