root/trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

Revision 750, 38.8 kB (checked in by ppalmers, 13 years ago)

Code refactoring. Tries to simplify things and tries to put all code where it belongs.

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "StreamProcessorManager.h"
25 #include "generic/StreamProcessor.h"
26 #include "generic/Port.h"
27 #include "libieee1394/cycletimer.h"
28
29 #include <errno.h>
30 #include <assert.h>
31 #include <math.h>
32
33 #define RUNNING_TIMEOUT_MSEC 4000
34 #define PREPARE_TIMEOUT_MSEC 4000
35 #define ENABLE_TIMEOUT_MSEC 4000
36
37 // allows to add some processing margin. This shifts the time
38 // at which the buffer is transfer()'ed, making things somewhat
39 // more robust. It should be noted though that shifting the transfer
40 // time to a later time instant also causes the xmit buffer fill to be
41 // lower on average.
42 #define FFADO_SIGNAL_DELAY_TICKS 3072*4
43
44 namespace Streaming {
45
46 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
47
48 StreamProcessorManager::StreamProcessorManager()
49     : m_is_slave( false )
50     , m_SyncSource(NULL)
51     , m_nb_buffers( 0 )
52     , m_period( 0 )
53     , m_nominal_framerate ( 0 )
54     , m_xruns(0)
55     , m_xrun_happened( false )
56     , m_nbperiods(0)
57 {
58     addOption(Util::OptionContainer::Option("slaveMode",false));
59 }
60
61 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int framerate, unsigned int nb_buffers)
62     : m_is_slave( false )
63     , m_SyncSource(NULL)
64     , m_nb_buffers(nb_buffers)
65     , m_period(period)
66     , m_nominal_framerate ( framerate )
67     , m_xruns(0)
68     , m_xrun_happened( false )
69     , m_nbperiods(0)
70 {
71     addOption(Util::OptionContainer::Option("slaveMode",false));
72 }
73
74 StreamProcessorManager::~StreamProcessorManager() {
75 }
76
77 /**
78  * Registers \ref processor with this manager.
79  *
80  * also registers it with the isohandlermanager
81  *
82  * be sure to call isohandlermanager->init() first!
83  * and be sure that the processors are also ->init()'ed
84  *
85  * @param processor
86  * @return true if successfull
87  */
88 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
89 {
90     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
91     assert(processor);
92     if (processor->getType() == StreamProcessor::ePT_Receive) {
93         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
94         m_ReceiveProcessors.push_back(processor);
95         return true;
96     }
97
98     if (processor->getType() == StreamProcessor::ePT_Transmit) {
99         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
100         m_TransmitProcessors.push_back(processor);
101         return true;
102     }
103
104     debugFatal("Unsupported processor type!\n");
105     return false;
106 }
107
108 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
109 {
110     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
111     assert(processor);
112
113     if (processor->getType()==StreamProcessor::ePT_Receive) {
114
115         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
116               it != m_ReceiveProcessors.end();
117               ++it )
118         {
119             if ( *it == processor ) {
120                 m_ReceiveProcessors.erase(it);
121                 return true;
122             }
123         }
124     }
125
126     if (processor->getType()==StreamProcessor::ePT_Transmit) {
127         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
128               it != m_TransmitProcessors.end();
129               ++it )
130         {
131             if ( *it == processor ) {
132                 m_TransmitProcessors.erase(it);
133                 return true;
134             }
135         }
136     }
137
138     debugFatal("Processor (%p) not found!\n",processor);
139     return false; //not found
140 }
141
142 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
143     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
144     m_SyncSource=s;
145     return true;
146 }
147
148 bool StreamProcessorManager::prepare() {
149
150     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
151
152     m_is_slave=false;
153     if(!getOption("slaveMode", m_is_slave)) {
154         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
155     }
156
157     // if no sync source is set, select one here
158     if(m_SyncSource == NULL) {
159        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
160     }
161
162     // FIXME: put into separate method
163     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
164           it != m_ReceiveProcessors.end();
165           ++it )
166     {
167         if(m_SyncSource == NULL) {
168             debugWarning(" => Sync Source is %p.\n", *it);
169             m_SyncSource = *it;
170         }
171     }
172     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
173           it != m_TransmitProcessors.end();
174           ++it )
175     {
176         if(m_SyncSource == NULL) {
177             debugWarning(" => Sync Source is %p.\n", *it);
178             m_SyncSource = *it;
179         }
180     }
181
182     // now do the actual preparation of the SP's
183     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
184     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
185         it != m_ReceiveProcessors.end();
186         ++it ) {
187
188         if(!(*it)->setOption("slaveMode", m_is_slave)) {
189             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
190         }
191
192         if(!(*it)->prepare()) {
193             debugFatal(  " could not prepare (%p)...\n",(*it));
194             return false;
195         }
196     }
197     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
198     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
199         it != m_TransmitProcessors.end();
200         ++it ) {
201         if(!(*it)->setOption("slaveMode", m_is_slave)) {
202             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
203         }
204         if(!(*it)->prepare()) {
205             debugFatal( " could not prepare (%p)...\n",(*it));
206             return false;
207         }
208     }
209
210     // if there are no stream processors registered,
211     // fail
212     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
213         debugFatal("No stream processors registered, can't do anything usefull\n");
214         return false;
215     }
216     return true;
217 }
218
219 bool StreamProcessorManager::startDryRunning() {
220     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
221     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
222     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
223             it != m_ReceiveProcessors.end();
224             ++it ) {
225         if (!(*it)->isDryRunning()) {
226             if(!(*it)->scheduleStartDryRunning(-1)) {
227                 debugError("Could not put SP %p into the dry-running state\n", *it);
228                 return false;
229             }
230         } else {
231             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
232         }
233     }
234     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
235             it != m_TransmitProcessors.end();
236             ++it ) {
237         if (!(*it)->isDryRunning()) {
238             if(!(*it)->scheduleStartDryRunning(-1)) {
239                 debugError("Could not put SP %p into the dry-running state\n", *it);
240                 return false;
241             }
242         } else {
243             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
244         }
245     }
246     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
247     // wait for the syncsource to start running.
248     // that will block the waitForPeriod call until everyone has started (theoretically)
249     #define CYCLES_FOR_DRYRUN 40000
250     int cnt = CYCLES_FOR_DRYRUN; // by then it should have started
251     bool all_dry_running = false;
252     while (!all_dry_running && cnt) {
253         all_dry_running = true;
254         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
255                 it != m_ReceiveProcessors.end();
256                 ++it ) {
257             all_dry_running &= (*it)->isDryRunning();
258         }
259         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
260                 it != m_TransmitProcessors.end();
261                 ++it ) {
262             all_dry_running &= (*it)->isDryRunning();
263         }
264
265         usleep(125);
266         cnt--;
267     }
268     if(cnt==0) {
269         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
270         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
271                 it != m_ReceiveProcessors.end();
272                 ++it ) {
273             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
274                 (*it)->getTypeString(), *it, (*it)->getStateString());
275         }
276         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
277                 it != m_TransmitProcessors.end();
278                 ++it ) {
279             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
280                 (*it)->getTypeString(), *it, (*it)->getStateString());
281         }
282         return false;
283     }
284     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
285     return true;
286 }
287
288 bool StreamProcessorManager::syncStartAll() {
289     // figure out when to get the SP's running.
290     // the xmit SP's should also know the base timestamp
291     // streams should be aligned here
292
293     // now find out how long we have to delay the wait operation such that
294     // the received frames will all be presented to the SP
295     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
296     int max_of_min_delay = 0;
297     int min_delay = 0;
298     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
299             it != m_ReceiveProcessors.end();
300             ++it ) {
301         min_delay = (*it)->getMaxFrameLatency();
302         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
303     }
304
305     // add some processing margin. This only shifts the time
306     // at which the buffer is transfer()'ed. This makes things somewhat
307     // more robust. It should be noted though that shifting the transfer
308     // time to a later time instant also causes the xmit buffer fill to be
309     // lower on average.
310     max_of_min_delay += FFADO_SIGNAL_DELAY_TICKS;
311     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n",
312         max_of_min_delay,
313         (unsigned int)TICKS_TO_SECS(max_of_min_delay),
314         (unsigned int)TICKS_TO_CYCLES(max_of_min_delay),
315         (unsigned int)TICKS_TO_OFFSET(max_of_min_delay));
316     m_SyncSource->setSyncDelay(max_of_min_delay);
317
318     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
319     //        have aquired lock
320     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
321     //sleep(2); // FIXME: be smarter here
322
323     // make sure that we are dry-running long enough for the
324     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
325     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
326     int nb_sync_runs=20;
327     int64_t time_till_next_period;
328     while(nb_sync_runs--) { // or while not sync-ed?
329         // check if we were woken up too soon
330         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
331         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
332         if(time_till_next_period > 0) {
333             // wait for the period
334             usleep(time_till_next_period);
335         }
336     }
337
338     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
339     // FIXME: in the SPM it would be nice to have system time instead of
340     //        1394 time
341
342     // we now should have decent sync info on the sync source
343     // determine a point in time where the system should start
344     // figure out where we are now
345     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
346     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
347         time_of_first_sample,
348         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
349         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
350         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
351
352     #define CYCLES_FOR_STARTUP 2000
353     // start wet-running in CYCLES_FOR_STARTUP cycles
354     // this is the time window we have to setup all SP's such that they
355     // can start wet-running correctly.
356     time_of_first_sample = addTicks(time_of_first_sample,
357                                     CYCLES_FOR_STARTUP * TICKS_PER_CYCLE);
358
359     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
360         time_of_first_sample,
361         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
362         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
363         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
364
365     // we should start wet-running the transmit SP's some cycles in advance
366     // such that we know it is wet-running when it should output its first sample
367     #define PRESTART_CYCLES_FOR_XMIT 20
368     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
369                                                  PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE);
370
371     #define PRESTART_CYCLES_FOR_RECV 0
372     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
373                                                  PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE);
374     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
375         time_to_start_xmit,
376         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
377         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
378         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
379     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
380         time_to_start_recv,
381         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
382         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
383         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
384
385     // at this point the buffer head timestamp of the transmit buffers can be set
386     // this is the presentation time of the first sample in the buffer
387     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
388           it != m_TransmitProcessors.end();
389           ++it ) {
390         (*it)->setBufferHeadTimestamp(time_of_first_sample);
391     }
392
393     // STEP X: switch SP's over to the running state
394     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
395           it != m_ReceiveProcessors.end();
396           ++it ) {
397         if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
398             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
399             return false;
400         }
401     }
402     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
403           it != m_TransmitProcessors.end();
404           ++it ) {
405         if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
406             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
407             return false;
408         }
409     }
410     // wait for the syncsource to start running.
411     // that will block the waitForPeriod call until everyone has started (theoretically)
412     int cnt = CYCLES_FOR_STARTUP * 2; // by then it should have started
413     while (!m_SyncSource->isRunning() && cnt) {
414         usleep(125);
415         cnt--;
416     }
417     if(cnt==0) {
418         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
419         return false;
420     }
421
422     // now align the received streams
423     if(!alignReceivedStreams()) {
424         debugError("Could not align streams\n");
425         return false;
426     }
427     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
428     return true;
429 }
430
431 bool
432 StreamProcessorManager::alignReceivedStreams()
433 {
434     #define NB_PERIODS_FOR_ALIGN_AVERAGE 20
435     #define NB_ALIGN_TRIES 20
436     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
437     unsigned int nb_sync_runs;
438     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
439     int64_t diff_between_streams[nb_rcv_sp];
440     int64_t diff;
441
442     unsigned int i;
443
444     bool aligned = false;
445     int cnt = NB_ALIGN_TRIES;
446     while (!aligned && cnt--) {
447         nb_sync_runs = NB_PERIODS_FOR_ALIGN_AVERAGE;
448         while(nb_sync_runs) {
449             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
450             waitForPeriod();
451
452             i = 0;
453             for ( i = 0; i < nb_rcv_sp; i++) {
454                 StreamProcessor *s = m_ReceiveProcessors.at(i);
455                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
456                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",
457                     m_SyncSource, s, diff);
458                 if ( nb_sync_runs == NB_PERIODS_FOR_ALIGN_AVERAGE ) {
459                     diff_between_streams[i] = diff;
460                 } else {
461                     diff_between_streams[i] += diff;
462                 }
463             }
464             if(!transferSilence()) {
465                 debugError("Could not transfer silence\n");
466                 return false;
467             }
468             nb_sync_runs--;
469         }
470         // calculate the average offsets
471         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
472         int diff_between_streams_frames[nb_rcv_sp];
473         aligned = true;
474         for ( i = 0; i < nb_rcv_sp; i++) {
475             StreamProcessor *s = m_ReceiveProcessors.at(i);
476
477             diff_between_streams[i] /= NB_PERIODS_FOR_ALIGN_AVERAGE;
478             diff_between_streams_frames[i] = roundf(diff_between_streams[i] / s->getTicksPerFrame());
479             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",
480                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
481
482             aligned &= (diff_between_streams_frames[i] == 0);
483
484             // reposition the stream
485             if(!s->shiftStream(diff_between_streams_frames[i])) {
486                 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]);
487                 return false;
488             }
489         }
490         if (!aligned) {
491             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
492         }
493     }
494     if (cnt == 0) {
495         debugError("Align failed\n");
496         return false;
497     }
498     return true;
499 }
500
501 bool StreamProcessorManager::start() {
502     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
503
504     // put all SP's into dry-running state
505     if (!startDryRunning()) {
506         debugFatal("Could not put SP's in dry-running state\n");
507         return false;
508     }
509
510     // start all SP's synchonized
511     if (!syncStartAll()) {
512         debugFatal("Could not syncStartAll...\n");
513         return false;
514     }
515     return true;
516 }
517
518 bool StreamProcessorManager::stop() {
519     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
520
521     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
522     // switch SP's over to the dry-running state
523     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
524           it != m_ReceiveProcessors.end();
525           ++it ) {
526         if(!(*it)->scheduleStopRunning(-1)) {
527             debugError("%p->scheduleStopRunning(-1) failed\n", *it);
528             return false;
529         }
530     }
531     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
532           it != m_TransmitProcessors.end();
533           ++it ) {
534         if(!(*it)->scheduleStopRunning(-1)) {
535             debugError("%p->scheduleStopRunning(-1) failed\n", *it);
536             return false;
537         }
538     }
539     // wait for the SP's to get into the dry-running state
540     int cnt = 200;
541     bool ready = false;
542     while (!ready && cnt) {
543         ready = true;
544         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
545             it != m_ReceiveProcessors.end();
546             ++it ) {
547             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
548         }
549         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
550             it != m_TransmitProcessors.end();
551             ++it ) {
552             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
553         }
554         usleep(125);
555         cnt--;
556     }
557     if(cnt==0) {
558         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
559         return false;
560     }
561
562     // switch SP's over to the stopped state
563     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
564           it != m_ReceiveProcessors.end();
565           ++it ) {
566         if(!(*it)->scheduleStopDryRunning(-1)) {
567             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
568             return false;
569         }
570     }
571     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
572           it != m_TransmitProcessors.end();
573           ++it ) {
574         if(!(*it)->scheduleStopDryRunning(-1)) {
575             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
576             return false;
577         }
578     }
579     // wait for the SP's to get into the running state
580     cnt = 200;
581     ready = false;
582     while (!ready && cnt) {
583         ready = true;
584         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
585             it != m_ReceiveProcessors.end();
586             ++it ) {
587             ready &= (*it)->isStopped();
588         }
589         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
590             it != m_TransmitProcessors.end();
591             ++it ) {
592             ready &= (*it)->isStopped();
593         }
594         usleep(125);
595         cnt--;
596     }
597     if(cnt==0) {
598         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to stop\n");
599         return false;
600     }
601     return true;
602 }
603
604 /**
605  * Called upon Xrun events. This brings all StreamProcessors back
606  * into their starting state, and then carries on streaming. This should
607  * have the same effect as restarting the whole thing.
608  *
609  * @return true if successful, false otherwise
610  */
611 bool StreamProcessorManager::handleXrun() {
612
613     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
614
615     dumpInfo();
616
617     /*
618      * Reset means:
619      * 1) Disabling the SP's, so that they don't process any packets
620      *    note: the isomanager does keep on delivering/requesting them
621      * 2) Bringing all buffers & streamprocessors into a know state
622      *    - Clear all capture buffers
623      *    - Put nb_periods*period_size of null frames into the playback buffers
624      * 3) Re-enable the SP's
625      */
626
627     // put all SP's back into dry-running state
628     if (!startDryRunning()) {
629         debugFatal("Could not put SP's in dry-running state\n");
630         return false;
631     }
632
633     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
634     // start all SP's synchonized
635     if (!syncStartAll()) {
636         debugFatal("Could not syncStartAll...\n");
637         return false;
638     }
639
640     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
641
642     return true;
643 }
644
645 /**
646  * @brief Waits until the next period of samples is ready
647  *
648  * This function does not return until a full period of samples is (or should be)
649  * ready to be transferred.
650  *
651  * @return true if the period is ready, false if an xrun occurred
652  */
653 bool StreamProcessorManager::waitForPeriod() {
654     int time_till_next_period;
655     bool xrun_occurred = false;
656
657     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
658
659     assert(m_SyncSource);
660
661     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
662
663     while(time_till_next_period > 0) {
664         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
665
666         // wait for the period
667         usleep(time_till_next_period);
668
669         // check for underruns on the ISO side,
670         // those should make us bail out of the wait loop
671         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
672             it != m_ReceiveProcessors.end();
673             ++it ) {
674             // a xrun has occurred on the Iso side
675             xrun_occurred |= (*it)->xrunOccurred();
676         }
677         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
678             it != m_TransmitProcessors.end();
679             ++it ) {
680             // a xrun has occurred on the Iso side
681             xrun_occurred |= (*it)->xrunOccurred();
682         }
683         if(xrun_occurred) break;
684
685         // check if we were waked up too soon
686         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
687     }
688
689     // we save the 'ideal' time of the transfer at this point,
690     // because we can have interleaved read - process - write
691     // cycles making that we modify a receiving stream's buffer
692     // before we get to writing.
693     // NOTE: before waitForPeriod() is called again, both the transmit
694     //       and the receive processors should have done their transfer.
695     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
696     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
697         m_time_of_transfer);
698
699     // normally we can transfer frames at this time, but in some cases this is not true
700     // e.g. when there are not enough frames in the receive buffer.
701     // however this doesn't have to be a problem, since we can wait some more until we
702     // have enough frames. There is only a problem once the ISO xmit doesn't have packets
703     // to transmit, or if the receive buffer overflows. These conditions are signaled by
704     // the iso threads
705     // check if xruns occurred on the Iso side.
706     // also check if xruns will occur should we transfer() now
707     #ifdef DEBUG
708     int waited = 0;
709     #endif
710     bool ready_for_transfer = false;
711     bool ready;
712     xrun_occurred = false;
713     while (!ready_for_transfer && !xrun_occurred) {
714         ready_for_transfer = true;
715         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
716             it != m_ReceiveProcessors.end();
717             ++it ) {
718             ready = ((*it)->canClientTransferFrames(m_period));
719             ready_for_transfer &= ready;
720             if (!ready) (*it)->flush();
721             xrun_occurred |= (*it)->xrunOccurred();
722         }
723         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
724             it != m_TransmitProcessors.end();
725             ++it ) {
726             ready = ((*it)->canClientTransferFrames(m_period));
727             ready_for_transfer &= ready;
728             if (!ready) (*it)->flush();
729             xrun_occurred |= (*it)->xrunOccurred();
730         }
731         if (!ready_for_transfer) {
732            
733             usleep(125); // MAGIC: one cycle sleep...
734
735             #if 0
736             // in order to avoid this in the future, we increase the sync delay of the sync source SP
737             int d = m_SyncSource->getSyncDelay() + TICKS_PER_CYCLE;
738             m_SyncSource->setSyncDelay(d);
739             #endif
740
741             #ifdef DEBUG
742             waited++;
743             #endif
744         }
745     } // we are either ready or an xrun occurred
746
747     #ifdef DEBUG
748     if(waited > 0) {
749         debugOutput(DEBUG_LEVEL_VERBOSE, "Waited %d x 125us due to SP not ready for transfer\n", waited);
750     }
751     #endif
752
753     // this is to notify the client of the delay that we introduced by waiting
754     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
755     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", m_delayed_usecs);
756
757 #ifdef DEBUG
758     int rcv_bf=0, xmt_bf=0;
759     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
760         it != m_ReceiveProcessors.end();
761         ++it ) {
762         rcv_bf = (*it)->getBufferFill();
763     }
764     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
765         it != m_TransmitProcessors.end();
766         ++it ) {
767         xmt_bf = (*it)->getBufferFill();
768     }
769     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
770         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
771
772     // check if xruns occurred on the Iso side.
773     // also check if xruns will occur should we transfer() now
774     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
775           it != m_ReceiveProcessors.end();
776           ++it ) {
777
778         if ((*it)->xrunOccurred()) {
779             debugWarning("Xrun on RECV SP %p due to ISO side xrun\n",*it);
780             (*it)->dumpInfo();
781         }
782         if (!((*it)->canClientTransferFrames(m_period))) {
783             debugWarning("Xrun on RECV SP %p due to buffer side xrun\n",*it);
784             (*it)->dumpInfo();
785         }
786     }
787     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
788           it != m_TransmitProcessors.end();
789           ++it ) {
790         if ((*it)->xrunOccurred()) {
791             debugWarning("Xrun on XMIT SP %p due to ISO side xrun\n",*it);
792         }
793         if (!((*it)->canClientTransferFrames(m_period))) {
794             debugWarning("Xrun on XMIT SP %p due to buffer side xrun\n",*it);
795         }
796     }
797 #endif
798
799     m_nbperiods++;
800     // now we can signal the client that we are (should be) ready
801     return !xrun_occurred;
802 }
803
804 /**
805  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
806  *
807  * Transfers one period of frames from the client side to the Iso side and vice versa.
808  *
809  * @return true if successful, false otherwise (indicates xrun).
810  */
811 bool StreamProcessorManager::transfer() {
812     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
813     bool retval=true;
814     retval &= transfer(StreamProcessor::ePT_Receive);
815     retval &= transfer(StreamProcessor::ePT_Transmit);
816     return retval;
817 }
818
819 /**
820  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
821  *
822  * Transfers one period of frames from the client side to the Iso side or vice versa.
823  *
824  * @param t The processor type to tranfer for (receive or transmit)
825  * @return true if successful, false otherwise (indicates xrun).
826  */
827 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
828     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
829         t, m_time_of_transfer,
830         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
831         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
832         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
833
834     bool retval = true;
835     // a static cast could make sure that there is no performance
836     // penalty for the virtual functions (to be checked)
837     if (t==StreamProcessor::ePT_Receive) {
838         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
839                 it != m_ReceiveProcessors.end();
840                 ++it ) {
841             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
842                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
843                             m_period, m_time_of_transfer,*it);
844                 retval &= false; // buffer underrun
845             }
846         }
847     } else {
848         // FIXME: in the SPM it would be nice to have system time instead of
849         //        1394 time
850         float rate = m_SyncSource->getTicksPerFrame();
851         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
852
853         // the data we are putting into the buffer is intended to be transmitted
854         // one ringbuffer size after it has been received
855         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
856
857         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
858                 it != m_TransmitProcessors.end();
859                 ++it ) {
860             // FIXME: in the SPM it would be nice to have system time instead of
861             //        1394 time
862             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
863                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
864                         m_period, transmit_timestamp, *it);
865                 retval &= false; // buffer underrun
866             }
867         }
868     }
869     return retval;
870 }
871
872 /**
873  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
874  *
875  * Transfers one period of silence to the Iso side for transmit SP's
876  * or dump one period of frames for receive SP's
877  *
878  * @return true if successful, false otherwise (indicates xrun).
879  */
880 bool StreamProcessorManager::transferSilence() {
881     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
882     bool retval=true;
883     retval &= transferSilence(StreamProcessor::ePT_Receive);
884     retval &= transferSilence(StreamProcessor::ePT_Transmit);
885     return retval;
886 }
887
888 /**
889  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
890  *
891  * Transfers one period of silence to the Iso side for transmit SP's
892  * or dump one period of frames for receive SP's
893  *
894  * @param t The processor type to tranfer for (receive or transmit)
895  * @return true if successful, false otherwise (indicates xrun).
896  */
897 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
898     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
899         t, m_time_of_transfer,
900         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
901         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
902         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
903
904     bool retval = true;
905     // a static cast could make sure that there is no performance
906     // penalty for the virtual functions (to be checked)
907     if (t==StreamProcessor::ePT_Receive) {
908         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
909                 it != m_ReceiveProcessors.end();
910                 ++it ) {
911             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
912                     debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n",
913                             m_period, m_time_of_transfer,*it);
914                 retval &= false; // buffer underrun
915             }
916         }
917     } else {
918         // FIXME: in the SPM it would be nice to have system time instead of
919         //        1394 time
920         float rate = m_SyncSource->getTicksPerFrame();
921         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
922
923         // the data we are putting into the buffer is intended to be transmitted
924         // one ringbuffer size after it has been received
925         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
926
927         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
928                 it != m_TransmitProcessors.end();
929                 ++it ) {
930             // FIXME: in the SPM it would be nice to have system time instead of
931             //        1394 time
932             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
933                 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
934                         m_period, transmit_timestamp, *it);
935                 retval &= false; // buffer underrun
936             }
937         }
938     }
939     return retval;
940 }
941
942 void StreamProcessorManager::dumpInfo() {
943     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
944     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
945     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
946
947     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
948     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
949         it != m_ReceiveProcessors.end();
950         ++it ) {
951         (*it)->dumpInfo();
952     }
953
954     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
955     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
956         it != m_TransmitProcessors.end();
957         ++it ) {
958         (*it)->dumpInfo();
959     }
960
961     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
962
963 }
964
965 void StreamProcessorManager::setVerboseLevel(int l) {
966     setDebugLevel(l);
967
968     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
969     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
970         it != m_ReceiveProcessors.end();
971         ++it ) {
972         (*it)->setVerboseLevel(l);
973     }
974
975     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
976     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
977         it != m_TransmitProcessors.end();
978         ++it ) {
979         (*it)->setVerboseLevel(l);
980     }
981 }
982
983
984 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
985     int count=0;
986
987     if (direction == Port::E_Capture) {
988         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
989             it != m_ReceiveProcessors.end();
990             ++it ) {
991             count += (*it)->getPortCount(type);
992         }
993     } else {
994         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
995             it != m_TransmitProcessors.end();
996             ++it ) {
997             count += (*it)->getPortCount(type);
998         }
999     }
1000     return count;
1001 }
1002
1003 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1004     int count=0;
1005
1006     if (direction == Port::E_Capture) {
1007         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1008             it != m_ReceiveProcessors.end();
1009             ++it ) {
1010             count += (*it)->getPortCount();
1011         }
1012     } else {
1013         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1014             it != m_TransmitProcessors.end();
1015             ++it ) {
1016             count += (*it)->getPortCount();
1017         }
1018     }
1019     return count;
1020 }
1021
1022 // TODO: implement a port map here, instead of the loop
1023
1024 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1025     int count=0;
1026     int prevcount=0;
1027
1028     if (direction == Port::E_Capture) {
1029         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1030             it != m_ReceiveProcessors.end();
1031             ++it ) {
1032             count += (*it)->getPortCount();
1033             if (count > idx) {
1034                 return (*it)->getPortAtIdx(idx-prevcount);
1035             }
1036             prevcount=count;
1037         }
1038     } else {
1039         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1040             it != m_TransmitProcessors.end();
1041             ++it ) {
1042             count += (*it)->getPortCount();
1043             if (count > idx) {
1044                 return (*it)->getPortAtIdx(idx-prevcount);
1045             }
1046             prevcount=count;
1047         }
1048     }
1049     return NULL;
1050 }
1051
1052 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1053     m_thread_realtime=rt;
1054     m_thread_priority=priority;
1055     return true;
1056 }
1057
1058
1059 } // end of namespace
Note: See TracBrowser for help on using the browser.