root/trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

Revision 790, 40.1 kB (checked in by ppalmers, 13 years ago)

add debugging code

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "StreamProcessorManager.h"
25 #include "generic/StreamProcessor.h"
26 #include "generic/Port.h"
27 #include "libieee1394/cycletimer.h"
28
29 #include "libutil/Time.h"
30
31 #include <errno.h>
32 #include <assert.h>
33 #include <math.h>
34
35 #define RUNNING_TIMEOUT_MSEC 4000
36 #define PREPARE_TIMEOUT_MSEC 4000
37 #define ENABLE_TIMEOUT_MSEC 4000
38
39 // allows to add some processing margin. This shifts the time
40 // at which the buffer is transfer()'ed, making things somewhat
41 // more robust. It should be noted though that shifting the transfer
42 // time to a later time instant also causes the xmit buffer fill to be
43 // lower on average.
44 #define FFADO_SIGNAL_DELAY_TICKS (3072*1)
45
46 namespace Streaming {
47
48 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
49
50 StreamProcessorManager::StreamProcessorManager()
51     : m_is_slave( false )
52     , m_SyncSource(NULL)
53     , m_nb_buffers( 0 )
54     , m_period( 0 )
55     , m_nominal_framerate ( 0 )
56     , m_xrun_happened( false )
57     , m_xruns(0)
58     , m_nbperiods(0)
59 {
60     addOption(Util::OptionContainer::Option("slaveMode",false));
61 }
62
63 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int framerate, unsigned int nb_buffers)
64     : m_is_slave( false )
65     , m_SyncSource(NULL)
66     , m_nb_buffers(nb_buffers)
67     , m_period(period)
68     , m_nominal_framerate ( framerate )
69     , m_xruns(0)
70     , m_xrun_happened( false )
71     , m_nbperiods(0)
72 {
73     addOption(Util::OptionContainer::Option("slaveMode",false));
74 }
75
76 StreamProcessorManager::~StreamProcessorManager() {
77 }
78
79 /**
80  * Registers \ref processor with this manager.
81  *
82  * also registers it with the isohandlermanager
83  *
84  * be sure to call isohandlermanager->init() first!
85  * and be sure that the processors are also ->init()'ed
86  *
87  * @param processor
88  * @return true if successfull
89  */
90 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
91 {
92     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
93     assert(processor);
94     if (processor->getType() == StreamProcessor::ePT_Receive) {
95         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
96         m_ReceiveProcessors.push_back(processor);
97         return true;
98     }
99
100     if (processor->getType() == StreamProcessor::ePT_Transmit) {
101         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
102         m_TransmitProcessors.push_back(processor);
103         return true;
104     }
105
106     debugFatal("Unsupported processor type!\n");
107     return false;
108 }
109
110 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
111 {
112     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
113     assert(processor);
114
115     if (processor->getType()==StreamProcessor::ePT_Receive) {
116
117         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
118               it != m_ReceiveProcessors.end();
119               ++it )
120         {
121             if ( *it == processor ) {
122                 if (*it == m_SyncSource) {
123                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source");
124                     m_SyncSource = NULL;
125                 }
126                 m_ReceiveProcessors.erase(it);
127                 return true;
128             }
129         }
130     }
131
132     if (processor->getType()==StreamProcessor::ePT_Transmit) {
133         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
134               it != m_TransmitProcessors.end();
135               ++it )
136         {
137             if ( *it == processor ) {
138                 if (*it == m_SyncSource) {
139                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source");
140                     m_SyncSource = NULL;
141                 }
142                 m_TransmitProcessors.erase(it);
143                 return true;
144             }
145         }
146     }
147
148     debugFatal("Processor (%p) not found!\n",processor);
149     return false; //not found
150 }
151
152 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
153     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
154     m_SyncSource=s;
155     return true;
156 }
157
158 bool StreamProcessorManager::prepare() {
159
160     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
161
162     m_is_slave=false;
163     if(!getOption("slaveMode", m_is_slave)) {
164         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
165     }
166
167     // if no sync source is set, select one here
168     if(m_SyncSource == NULL) {
169        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
170     }
171
172     // FIXME: put into separate method
173     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
174           it != m_ReceiveProcessors.end();
175           ++it )
176     {
177         if(m_SyncSource == NULL) {
178             debugWarning(" => Sync Source is %p.\n", *it);
179             m_SyncSource = *it;
180         }
181     }
182     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
183           it != m_TransmitProcessors.end();
184           ++it )
185     {
186         if(m_SyncSource == NULL) {
187             debugWarning(" => Sync Source is %p.\n", *it);
188             m_SyncSource = *it;
189         }
190     }
191
192     // now do the actual preparation of the SP's
193     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
194     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
195         it != m_ReceiveProcessors.end();
196         ++it ) {
197
198         if(!(*it)->setOption("slaveMode", m_is_slave)) {
199             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
200         }
201
202         if(!(*it)->prepare()) {
203             debugFatal(  " could not prepare (%p)...\n",(*it));
204             return false;
205         }
206     }
207     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
208     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
209         it != m_TransmitProcessors.end();
210         ++it ) {
211         if(!(*it)->setOption("slaveMode", m_is_slave)) {
212             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
213         }
214         if(!(*it)->prepare()) {
215             debugFatal( " could not prepare (%p)...\n",(*it));
216             return false;
217         }
218     }
219
220     // if there are no stream processors registered,
221     // fail
222     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
223         debugFatal("No stream processors registered, can't do anything usefull\n");
224         return false;
225     }
226     return true;
227 }
228
229 bool StreamProcessorManager::startDryRunning() {
230     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
231     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
232     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
233             it != m_ReceiveProcessors.end();
234             ++it ) {
235         if (!(*it)->isDryRunning()) {
236             if(!(*it)->scheduleStartDryRunning(-1)) {
237                 debugError("Could not put SP %p into the dry-running state\n", *it);
238                 return false;
239             }
240         } else {
241             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
242         }
243     }
244     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
245             it != m_TransmitProcessors.end();
246             ++it ) {
247         if (!(*it)->isDryRunning()) {
248             if(!(*it)->scheduleStartDryRunning(-1)) {
249                 debugError("Could not put SP %p into the dry-running state\n", *it);
250                 return false;
251             }
252         } else {
253             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
254         }
255     }
256     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
257     // wait for the syncsource to start running.
258     // that will block the waitForPeriod call until everyone has started (theoretically)
259     #define CYCLES_FOR_DRYRUN 40000
260     int cnt = CYCLES_FOR_DRYRUN; // by then it should have started
261     bool all_dry_running = false;
262     while (!all_dry_running && cnt) {
263         all_dry_running = true;
264         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
265                 it != m_ReceiveProcessors.end();
266                 ++it ) {
267             all_dry_running &= (*it)->isDryRunning();
268         }
269         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
270                 it != m_TransmitProcessors.end();
271                 ++it ) {
272             all_dry_running &= (*it)->isDryRunning();
273         }
274
275         SleepRelativeUsec(125);
276         cnt--;
277     }
278     if(cnt==0) {
279         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
280         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
281                 it != m_ReceiveProcessors.end();
282                 ++it ) {
283             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
284                 (*it)->getTypeString(), *it, (*it)->getStateString());
285         }
286         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
287                 it != m_TransmitProcessors.end();
288                 ++it ) {
289             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
290                 (*it)->getTypeString(), *it, (*it)->getStateString());
291         }
292         return false;
293     }
294     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
295     return true;
296 }
297
298 bool StreamProcessorManager::syncStartAll() {
299     if(m_SyncSource == NULL) return false;
300     // figure out when to get the SP's running.
301     // the xmit SP's should also know the base timestamp
302     // streams should be aligned here
303
304     // now find out how long we have to delay the wait operation such that
305     // the received frames will all be presented to the SP
306     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
307     int max_of_min_delay = 0;
308     int min_delay = 0;
309     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
310             it != m_ReceiveProcessors.end();
311             ++it ) {
312         min_delay = (*it)->getMaxFrameLatency();
313         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
314     }
315
316     // add some processing margin. This only shifts the time
317     // at which the buffer is transfer()'ed. This makes things somewhat
318     // more robust. It should be noted though that shifting the transfer
319     // time to a later time instant also causes the xmit buffer fill to be
320     // lower on average.
321     max_of_min_delay += FFADO_SIGNAL_DELAY_TICKS;
322     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n",
323         max_of_min_delay,
324         (unsigned int)TICKS_TO_SECS(max_of_min_delay),
325         (unsigned int)TICKS_TO_CYCLES(max_of_min_delay),
326         (unsigned int)TICKS_TO_OFFSET(max_of_min_delay));
327     m_SyncSource->setSyncDelay(max_of_min_delay);
328
329     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
330     //        have aquired lock
331     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
332     //sleep(2); // FIXME: be smarter here
333
334     // make sure that we are dry-running long enough for the
335     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
336     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
337     int nb_sync_runs=20;
338     int64_t time_till_next_period;
339     while(nb_sync_runs--) { // or while not sync-ed?
340         // check if we were woken up too soon
341         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
342         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
343         if(time_till_next_period > 0) {
344             // wait for the period
345             SleepRelativeUsec(time_till_next_period);
346         }
347     }
348
349     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
350     // FIXME: in the SPM it would be nice to have system time instead of
351     //        1394 time
352
353     // we now should have decent sync info on the sync source
354     // determine a point in time where the system should start
355     // figure out where we are now
356     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
357     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
358         time_of_first_sample,
359         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
360         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
361         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
362
363     #define CYCLES_FOR_STARTUP 2000
364     // start wet-running in CYCLES_FOR_STARTUP cycles
365     // this is the time window we have to setup all SP's such that they
366     // can start wet-running correctly.
367     time_of_first_sample = addTicks(time_of_first_sample,
368                                     CYCLES_FOR_STARTUP * TICKS_PER_CYCLE);
369
370     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
371         time_of_first_sample,
372         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
373         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
374         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
375
376     // we should start wet-running the transmit SP's some cycles in advance
377     // such that we know it is wet-running when it should output its first sample
378     #define PRESTART_CYCLES_FOR_XMIT 20
379     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
380                                                  PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE);
381
382     #define PRESTART_CYCLES_FOR_RECV 0
383     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
384                                                  PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE);
385     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
386         time_to_start_xmit,
387         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
388         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
389         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
390     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
391         time_to_start_recv,
392         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
393         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
394         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
395
396     // at this point the buffer head timestamp of the transmit buffers can be set
397     // this is the presentation time of the first sample in the buffer
398     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
399           it != m_TransmitProcessors.end();
400           ++it ) {
401         (*it)->setBufferHeadTimestamp(time_of_first_sample);
402     }
403
404     // STEP X: switch SP's over to the running state
405     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
406           it != m_ReceiveProcessors.end();
407           ++it ) {
408         if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
409             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
410             return false;
411         }
412     }
413     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
414           it != m_TransmitProcessors.end();
415           ++it ) {
416         if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
417             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
418             return false;
419         }
420     }
421     // wait for the syncsource to start running.
422     // that will block the waitForPeriod call until everyone has started (theoretically)
423     int cnt = CYCLES_FOR_STARTUP * 20; // by then it should have started
424     while (!m_SyncSource->isRunning() && cnt) {
425         SleepRelativeUsec(125);
426         cnt--;
427     }
428     if(cnt==0) {
429         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
430         return false;
431     }
432
433     // now align the received streams
434     if(!alignReceivedStreams()) {
435         debugError("Could not align streams\n");
436         return false;
437     }
438     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
439     return true;
440 }
441
442 bool
443 StreamProcessorManager::alignReceivedStreams()
444 {
445     if(m_SyncSource == NULL) return false;
446     #define ALIGN_AVERAGE_TIME_MSEC 200
447     #define NB_ALIGN_TRIES 40
448     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
449     unsigned int nb_sync_runs;
450     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
451     int64_t diff_between_streams[nb_rcv_sp];
452     int64_t diff;
453
454     unsigned int i;
455
456     unsigned int periods_per_align_try = (ALIGN_AVERAGE_TIME_MSEC * getNominalRate());
457     periods_per_align_try /= 1000;
458     periods_per_align_try /= getPeriodSize();
459     debugOutput( DEBUG_LEVEL_VERBOSE, " averaging over %u periods...\n", periods_per_align_try);
460
461     bool aligned = false;
462     int cnt = NB_ALIGN_TRIES;
463     while (!aligned && cnt--) {
464         nb_sync_runs = periods_per_align_try;
465         while(nb_sync_runs) {
466             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
467             waitForPeriod();
468
469             i = 0;
470             for ( i = 0; i < nb_rcv_sp; i++) {
471                 StreamProcessor *s = m_ReceiveProcessors.at(i);
472                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
473                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",
474                     m_SyncSource, s, diff);
475                 if ( nb_sync_runs == periods_per_align_try ) {
476                     diff_between_streams[i] = diff;
477                 } else {
478                     diff_between_streams[i] += diff;
479                 }
480             }
481             if(!transferSilence()) {
482                 debugError("Could not transfer silence\n");
483                 return false;
484             }
485             nb_sync_runs--;
486         }
487         // calculate the average offsets
488         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
489         int diff_between_streams_frames[nb_rcv_sp];
490         aligned = true;
491         for ( i = 0; i < nb_rcv_sp; i++) {
492             StreamProcessor *s = m_ReceiveProcessors.at(i);
493
494             diff_between_streams[i] /= periods_per_align_try;
495             diff_between_streams_frames[i] = (int)roundf(diff_between_streams[i] / s->getTicksPerFrame());
496             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",
497                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
498
499             aligned &= (diff_between_streams_frames[i] == 0);
500
501             // reposition the stream
502             if(!s->shiftStream(diff_between_streams_frames[i])) {
503                 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]);
504                 return false;
505             }
506         }
507         if (!aligned) {
508             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
509         }
510     }
511     if (cnt == 0) {
512         debugError("Align failed\n");
513         return false;
514     }
515     return true;
516 }
517
518 bool StreamProcessorManager::start() {
519     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
520
521     // put all SP's into dry-running state
522     if (!startDryRunning()) {
523         debugFatal("Could not put SP's in dry-running state\n");
524         return false;
525     }
526
527     // start all SP's synchonized
528     if (!syncStartAll()) {
529         debugFatal("Could not syncStartAll...\n");
530         return false;
531     }
532     return true;
533 }
534
535 bool StreamProcessorManager::stop() {
536     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
537
538     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
539     // switch SP's over to the dry-running state
540     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
541           it != m_ReceiveProcessors.end();
542           ++it ) {
543         if(!(*it)->scheduleStopRunning(-1)) {
544             debugError("%p->scheduleStopRunning(-1) failed\n", *it);
545             return false;
546         }
547     }
548     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
549           it != m_TransmitProcessors.end();
550           ++it ) {
551         if(!(*it)->scheduleStopRunning(-1)) {
552             debugError("%p->scheduleStopRunning(-1) failed\n", *it);
553             return false;
554         }
555     }
556     // wait for the SP's to get into the dry-running state
557     int cnt = 200;
558     bool ready = false;
559     while (!ready && cnt) {
560         ready = true;
561         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
562             it != m_ReceiveProcessors.end();
563             ++it ) {
564             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
565         }
566         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
567             it != m_TransmitProcessors.end();
568             ++it ) {
569             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
570         }
571         SleepRelativeUsec(125);
572         cnt--;
573     }
574     if(cnt==0) {
575         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
576         return false;
577     }
578
579     // switch SP's over to the stopped state
580     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
581           it != m_ReceiveProcessors.end();
582           ++it ) {
583         if(!(*it)->scheduleStopDryRunning(-1)) {
584             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
585             return false;
586         }
587     }
588     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
589           it != m_TransmitProcessors.end();
590           ++it ) {
591         if(!(*it)->scheduleStopDryRunning(-1)) {
592             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
593             return false;
594         }
595     }
596     // wait for the SP's to get into the running state
597     cnt = 200;
598     ready = false;
599     while (!ready && cnt) {
600         ready = true;
601         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
602             it != m_ReceiveProcessors.end();
603             ++it ) {
604             ready &= (*it)->isStopped();
605         }
606         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
607             it != m_TransmitProcessors.end();
608             ++it ) {
609             ready &= (*it)->isStopped();
610         }
611         SleepRelativeUsec(125);
612         cnt--;
613     }
614     if(cnt==0) {
615         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to stop\n");
616         return false;
617     }
618     return true;
619 }
620
621 /**
622  * Called upon Xrun events. This brings all StreamProcessors back
623  * into their starting state, and then carries on streaming. This should
624  * have the same effect as restarting the whole thing.
625  *
626  * @return true if successful, false otherwise
627  */
628 bool StreamProcessorManager::handleXrun() {
629
630     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
631
632     dumpInfo();
633
634     /*
635      * Reset means:
636      * 1) Disabling the SP's, so that they don't process any packets
637      *    note: the isomanager does keep on delivering/requesting them
638      * 2) Bringing all buffers & streamprocessors into a know state
639      *    - Clear all capture buffers
640      *    - Put nb_periods*period_size of null frames into the playback buffers
641      * 3) Re-enable the SP's
642      */
643
644     // put all SP's back into dry-running state
645     if (!startDryRunning()) {
646         debugFatal("Could not put SP's in dry-running state\n");
647         return false;
648     }
649
650     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
651     // start all SP's synchonized
652     if (!syncStartAll()) {
653         debugFatal("Could not syncStartAll...\n");
654         return false;
655     }
656
657     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
658
659     return true;
660 }
661
662 /**
663  * @brief Waits until the next period of samples is ready
664  *
665  * This function does not return until a full period of samples is (or should be)
666  * ready to be transferred.
667  *
668  * @return true if the period is ready, false if an xrun occurred
669  */
670 bool StreamProcessorManager::waitForPeriod() {
671     if(m_SyncSource == NULL) return false;
672     int time_till_next_period;
673     bool xrun_occurred = false;
674
675     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
676
677     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
678
679     while(time_till_next_period > 0) {
680         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
681
682         // wait for the period
683         SleepRelativeUsec(time_till_next_period);
684
685         // check for underruns on the ISO side,
686         // those should make us bail out of the wait loop
687         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
688             it != m_ReceiveProcessors.end();
689             ++it ) {
690             // a xrun has occurred on the Iso side
691             xrun_occurred |= (*it)->xrunOccurred();
692         }
693         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
694             it != m_TransmitProcessors.end();
695             ++it ) {
696             // a xrun has occurred on the Iso side
697             xrun_occurred |= (*it)->xrunOccurred();
698         }
699         if(xrun_occurred) break;
700
701         // check if we were waked up too soon
702         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
703     }
704
705     // we save the 'ideal' time of the transfer at this point,
706     // because we can have interleaved read - process - write
707     // cycles making that we modify a receiving stream's buffer
708     // before we get to writing.
709     // NOTE: before waitForPeriod() is called again, both the transmit
710     //       and the receive processors should have done their transfer.
711     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
712     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
713         m_time_of_transfer);
714
715     // normally we can transfer frames at this time, but in some cases this is not true
716     // e.g. when there are not enough frames in the receive buffer.
717     // however this doesn't have to be a problem, since we can wait some more until we
718     // have enough frames. There is only a problem once the ISO xmit doesn't have packets
719     // to transmit, or if the receive buffer overflows. These conditions are signaled by
720     // the iso threads
721     // check if xruns occurred on the Iso side.
722     // also check if xruns will occur should we transfer() now
723     #ifdef DEBUG
724     int waited = 0;
725     #endif
726     bool ready_for_transfer = false;
727     bool ready;
728     xrun_occurred = false;
729     while (!ready_for_transfer && !xrun_occurred) {
730         // FIXME: can deadlock when the iso handlers die (e.g. unplug the device)
731         ready_for_transfer = true;
732         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
733             it != m_ReceiveProcessors.end();
734             ++it ) {
735             ready = ((*it)->canClientTransferFrames(m_period));
736             ready_for_transfer &= ready;
737             if (!ready) (*it)->flush();
738             xrun_occurred |= (*it)->xrunOccurred();
739         }
740         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
741             it != m_TransmitProcessors.end();
742             ++it ) {
743             ready = ((*it)->canClientTransferFrames(m_period));
744             ready_for_transfer &= ready;
745             if (!ready) (*it)->flush();
746             xrun_occurred |= (*it)->xrunOccurred();
747         }
748         if (!ready_for_transfer) {
749            
750             SleepRelativeUsec(125); // MAGIC: one cycle sleep...
751
752             // in order to avoid this in the future, we increase the sync delay of the sync source SP
753             int d = m_SyncSource->getSyncDelay() + TICKS_PER_CYCLE;
754             m_SyncSource->setSyncDelay(d);
755             d = m_SyncSource->getSyncDelay();
756             debugOutput(DEBUG_LEVEL_VERBOSE, "Increased the Sync delay to: %d\n", d);
757
758             #ifdef DEBUG
759             waited++;
760             #endif
761         }
762     } // we are either ready or an xrun occurred
763
764     // in order to avoid a runaway value of the sync delay, we gradually decrease
765     // it. It will be increased by a 'too early' event (cfr some lines higher)
766     // hence we'll be at a good point on average.
767     int d = m_SyncSource->getSyncDelay() - 1;
768     if (d >= 0) m_SyncSource->setSyncDelay(d);
769
770
771     #ifdef DEBUG
772     if(waited > 0) {
773         debugOutput(DEBUG_LEVEL_VERBOSE, "Waited %d x 125us due to SP not ready for transfer\n", waited);
774     }
775     #endif
776
777     // this is to notify the client of the delay that we introduced by waiting
778     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
779     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", m_delayed_usecs);
780
781 #ifdef DEBUG
782     int rcv_bf=0, xmt_bf=0;
783     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
784         it != m_ReceiveProcessors.end();
785         ++it ) {
786         rcv_bf = (*it)->getBufferFill();
787     }
788     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
789         it != m_TransmitProcessors.end();
790         ++it ) {
791         xmt_bf = (*it)->getBufferFill();
792     }
793     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
794         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
795
796     // check if xruns occurred on the Iso side.
797     // also check if xruns will occur should we transfer() now
798     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
799           it != m_ReceiveProcessors.end();
800           ++it ) {
801
802         if ((*it)->xrunOccurred()) {
803             debugWarning("Xrun on RECV SP %p due to ISO side xrun\n",*it);
804             (*it)->dumpInfo();
805         }
806         if (!((*it)->canClientTransferFrames(m_period))) {
807             debugWarning("Xrun on RECV SP %p due to buffer side xrun\n",*it);
808             (*it)->dumpInfo();
809         }
810     }
811     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
812           it != m_TransmitProcessors.end();
813           ++it ) {
814         if ((*it)->xrunOccurred()) {
815             debugWarning("Xrun on XMIT SP %p due to ISO side xrun\n",*it);
816         }
817         if (!((*it)->canClientTransferFrames(m_period))) {
818             debugWarning("Xrun on XMIT SP %p due to buffer side xrun\n",*it);
819         }
820     }
821 #endif
822
823     m_nbperiods++;
824     // now we can signal the client that we are (should be) ready
825     return !xrun_occurred;
826 }
827
828 /**
829  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
830  *
831  * Transfers one period of frames from the client side to the Iso side and vice versa.
832  *
833  * @return true if successful, false otherwise (indicates xrun).
834  */
835 bool StreamProcessorManager::transfer() {
836     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
837     bool retval=true;
838     retval &= transfer(StreamProcessor::ePT_Receive);
839     retval &= transfer(StreamProcessor::ePT_Transmit);
840     return retval;
841 }
842
843 /**
844  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
845  *
846  * Transfers one period of frames from the client side to the Iso side or vice versa.
847  *
848  * @param t The processor type to tranfer for (receive or transmit)
849  * @return true if successful, false otherwise (indicates xrun).
850  */
851 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
852     if(m_SyncSource == NULL) return false;
853     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
854         t, m_time_of_transfer,
855         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
856         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
857         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
858
859     bool retval = true;
860     // a static cast could make sure that there is no performance
861     // penalty for the virtual functions (to be checked)
862     if (t==StreamProcessor::ePT_Receive) {
863         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
864                 it != m_ReceiveProcessors.end();
865                 ++it ) {
866             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
867                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
868                             m_period, m_time_of_transfer,*it);
869                 retval &= false; // buffer underrun
870             }
871         }
872     } else {
873         // FIXME: in the SPM it would be nice to have system time instead of
874         //        1394 time
875         float rate = m_SyncSource->getTicksPerFrame();
876         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
877
878         // the data we are putting into the buffer is intended to be transmitted
879         // one ringbuffer size after it has been received
880         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
881
882         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
883                 it != m_TransmitProcessors.end();
884                 ++it ) {
885             // FIXME: in the SPM it would be nice to have system time instead of
886             //        1394 time
887             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
888                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
889                         m_period, transmit_timestamp, *it);
890                 retval &= false; // buffer underrun
891             }
892         }
893     }
894     return retval;
895 }
896
897 /**
898  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
899  *
900  * Transfers one period of silence to the Iso side for transmit SP's
901  * or dump one period of frames for receive SP's
902  *
903  * @return true if successful, false otherwise (indicates xrun).
904  */
905 bool StreamProcessorManager::transferSilence() {
906     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
907     bool retval=true;
908     retval &= transferSilence(StreamProcessor::ePT_Receive);
909     retval &= transferSilence(StreamProcessor::ePT_Transmit);
910     return retval;
911 }
912
913 /**
914  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
915  *
916  * Transfers one period of silence to the Iso side for transmit SP's
917  * or dump one period of frames for receive SP's
918  *
919  * @param t The processor type to tranfer for (receive or transmit)
920  * @return true if successful, false otherwise (indicates xrun).
921  */
922 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
923     if(m_SyncSource == NULL) return false;
924     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
925         t, m_time_of_transfer,
926         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
927         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
928         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
929
930     bool retval = true;
931     // a static cast could make sure that there is no performance
932     // penalty for the virtual functions (to be checked)
933     if (t==StreamProcessor::ePT_Receive) {
934         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
935                 it != m_ReceiveProcessors.end();
936                 ++it ) {
937             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
938                     debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n",
939                             m_period, m_time_of_transfer,*it);
940                 retval &= false; // buffer underrun
941             }
942         }
943     } else {
944         // FIXME: in the SPM it would be nice to have system time instead of
945         //        1394 time
946         float rate = m_SyncSource->getTicksPerFrame();
947         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
948
949         // the data we are putting into the buffer is intended to be transmitted
950         // one ringbuffer size after it has been received
951         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
952
953         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
954                 it != m_TransmitProcessors.end();
955                 ++it ) {
956             // FIXME: in the SPM it would be nice to have system time instead of
957             //        1394 time
958             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
959                 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
960                         m_period, transmit_timestamp, *it);
961                 retval &= false; // buffer underrun
962             }
963         }
964     }
965     return retval;
966 }
967
968 void StreamProcessorManager::dumpInfo() {
969     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
970     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
971     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
972
973     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
974     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
975         it != m_ReceiveProcessors.end();
976         ++it ) {
977         (*it)->dumpInfo();
978     }
979
980     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
981     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
982         it != m_TransmitProcessors.end();
983         ++it ) {
984         (*it)->dumpInfo();
985     }
986
987     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
988
989 }
990
991 void StreamProcessorManager::setVerboseLevel(int l) {
992     setDebugLevel(l);
993
994     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
995     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
996         it != m_ReceiveProcessors.end();
997         ++it ) {
998         (*it)->setVerboseLevel(l);
999     }
1000
1001     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
1002     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1003         it != m_TransmitProcessors.end();
1004         ++it ) {
1005         (*it)->setVerboseLevel(l);
1006     }
1007 }
1008
1009
1010 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1011     int count=0;
1012
1013     if (direction == Port::E_Capture) {
1014         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1015             it != m_ReceiveProcessors.end();
1016             ++it ) {
1017             count += (*it)->getPortCount(type);
1018         }
1019     } else {
1020         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1021             it != m_TransmitProcessors.end();
1022             ++it ) {
1023             count += (*it)->getPortCount(type);
1024         }
1025     }
1026     return count;
1027 }
1028
1029 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1030     int count=0;
1031
1032     if (direction == Port::E_Capture) {
1033         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1034             it != m_ReceiveProcessors.end();
1035             ++it ) {
1036             count += (*it)->getPortCount();
1037         }
1038     } else {
1039         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1040             it != m_TransmitProcessors.end();
1041             ++it ) {
1042             count += (*it)->getPortCount();
1043         }
1044     }
1045     return count;
1046 }
1047
1048 // TODO: implement a port map here, instead of the loop
1049
1050 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1051     int count=0;
1052     int prevcount=0;
1053
1054     if (direction == Port::E_Capture) {
1055         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1056             it != m_ReceiveProcessors.end();
1057             ++it ) {
1058             count += (*it)->getPortCount();
1059             if (count > idx) {
1060                 return (*it)->getPortAtIdx(idx-prevcount);
1061             }
1062             prevcount=count;
1063         }
1064     } else {
1065         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1066             it != m_TransmitProcessors.end();
1067             ++it ) {
1068             count += (*it)->getPortCount();
1069             if (count > idx) {
1070                 return (*it)->getPortAtIdx(idx-prevcount);
1071             }
1072             prevcount=count;
1073         }
1074     }
1075     return NULL;
1076 }
1077
1078 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1079     m_thread_realtime=rt;
1080     m_thread_priority=priority;
1081     return true;
1082 }
1083
1084
1085 } // end of namespace
Note: See TracBrowser for help on using the browser.