root/trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

Revision 766, 39.1 kB (checked in by ppalmers, 16 years ago)

introduce local references to frequently used objects

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "StreamProcessorManager.h"
25 #include "generic/StreamProcessor.h"
26 #include "generic/Port.h"
27 #include "libieee1394/cycletimer.h"
28
29 #include <errno.h>
30 #include <assert.h>
31 #include <math.h>
32
33 #define RUNNING_TIMEOUT_MSEC 4000
34 #define PREPARE_TIMEOUT_MSEC 4000
35 #define ENABLE_TIMEOUT_MSEC 4000
36
37 // allows to add some processing margin. This shifts the time
38 // at which the buffer is transfer()'ed, making things somewhat
39 // more robust. It should be noted though that shifting the transfer
40 // time to a later time instant also causes the xmit buffer fill to be
41 // lower on average.
42 #define FFADO_SIGNAL_DELAY_TICKS (3072*1)
43
44 namespace Streaming {
45
46 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
47
48 StreamProcessorManager::StreamProcessorManager()
49     : m_is_slave( false )
50     , m_SyncSource(NULL)
51     , m_nb_buffers( 0 )
52     , m_period( 0 )
53     , m_nominal_framerate ( 0 )
54     , m_xrun_happened( false )
55     , m_xruns(0)
56     , m_nbperiods(0)
57 {
58     addOption(Util::OptionContainer::Option("slaveMode",false));
59 }
60
61 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int framerate, unsigned int nb_buffers)
62     : m_is_slave( false )
63     , m_SyncSource(NULL)
64     , m_nb_buffers(nb_buffers)
65     , m_period(period)
66     , m_nominal_framerate ( framerate )
67     , m_xruns(0)
68     , m_xrun_happened( false )
69     , m_nbperiods(0)
70 {
71     addOption(Util::OptionContainer::Option("slaveMode",false));
72 }
73
74 StreamProcessorManager::~StreamProcessorManager() {
75 }
76
77 /**
78  * Registers \ref processor with this manager.
79  *
80  * also registers it with the isohandlermanager
81  *
82  * be sure to call isohandlermanager->init() first!
83  * and be sure that the processors are also ->init()'ed
84  *
85  * @param processor
86  * @return true if successfull
87  */
88 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
89 {
90     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
91     assert(processor);
92     if (processor->getType() == StreamProcessor::ePT_Receive) {
93         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
94         m_ReceiveProcessors.push_back(processor);
95         return true;
96     }
97
98     if (processor->getType() == StreamProcessor::ePT_Transmit) {
99         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
100         m_TransmitProcessors.push_back(processor);
101         return true;
102     }
103
104     debugFatal("Unsupported processor type!\n");
105     return false;
106 }
107
108 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
109 {
110     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
111     assert(processor);
112
113     if (processor->getType()==StreamProcessor::ePT_Receive) {
114
115         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
116               it != m_ReceiveProcessors.end();
117               ++it )
118         {
119             if ( *it == processor ) {
120                 m_ReceiveProcessors.erase(it);
121                 return true;
122             }
123         }
124     }
125
126     if (processor->getType()==StreamProcessor::ePT_Transmit) {
127         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
128               it != m_TransmitProcessors.end();
129               ++it )
130         {
131             if ( *it == processor ) {
132                 m_TransmitProcessors.erase(it);
133                 return true;
134             }
135         }
136     }
137
138     debugFatal("Processor (%p) not found!\n",processor);
139     return false; //not found
140 }
141
142 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
143     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
144     m_SyncSource=s;
145     return true;
146 }
147
148 bool StreamProcessorManager::prepare() {
149
150     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
151
152     m_is_slave=false;
153     if(!getOption("slaveMode", m_is_slave)) {
154         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
155     }
156
157     // if no sync source is set, select one here
158     if(m_SyncSource == NULL) {
159        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
160     }
161
162     // FIXME: put into separate method
163     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
164           it != m_ReceiveProcessors.end();
165           ++it )
166     {
167         if(m_SyncSource == NULL) {
168             debugWarning(" => Sync Source is %p.\n", *it);
169             m_SyncSource = *it;
170         }
171     }
172     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
173           it != m_TransmitProcessors.end();
174           ++it )
175     {
176         if(m_SyncSource == NULL) {
177             debugWarning(" => Sync Source is %p.\n", *it);
178             m_SyncSource = *it;
179         }
180     }
181
182     // now do the actual preparation of the SP's
183     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
184     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
185         it != m_ReceiveProcessors.end();
186         ++it ) {
187
188         if(!(*it)->setOption("slaveMode", m_is_slave)) {
189             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
190         }
191
192         if(!(*it)->prepare()) {
193             debugFatal(  " could not prepare (%p)...\n",(*it));
194             return false;
195         }
196     }
197     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
198     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
199         it != m_TransmitProcessors.end();
200         ++it ) {
201         if(!(*it)->setOption("slaveMode", m_is_slave)) {
202             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
203         }
204         if(!(*it)->prepare()) {
205             debugFatal( " could not prepare (%p)...\n",(*it));
206             return false;
207         }
208     }
209
210     // if there are no stream processors registered,
211     // fail
212     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
213         debugFatal("No stream processors registered, can't do anything usefull\n");
214         return false;
215     }
216     return true;
217 }
218
219 bool StreamProcessorManager::startDryRunning() {
220     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
221     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
222     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
223             it != m_ReceiveProcessors.end();
224             ++it ) {
225         if (!(*it)->isDryRunning()) {
226             if(!(*it)->scheduleStartDryRunning(-1)) {
227                 debugError("Could not put SP %p into the dry-running state\n", *it);
228                 return false;
229             }
230         } else {
231             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
232         }
233     }
234     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
235             it != m_TransmitProcessors.end();
236             ++it ) {
237         if (!(*it)->isDryRunning()) {
238             if(!(*it)->scheduleStartDryRunning(-1)) {
239                 debugError("Could not put SP %p into the dry-running state\n", *it);
240                 return false;
241             }
242         } else {
243             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
244         }
245     }
246     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
247     // wait for the syncsource to start running.
248     // that will block the waitForPeriod call until everyone has started (theoretically)
249     #define CYCLES_FOR_DRYRUN 40000
250     int cnt = CYCLES_FOR_DRYRUN; // by then it should have started
251     bool all_dry_running = false;
252     while (!all_dry_running && cnt) {
253         all_dry_running = true;
254         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
255                 it != m_ReceiveProcessors.end();
256                 ++it ) {
257             all_dry_running &= (*it)->isDryRunning();
258         }
259         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
260                 it != m_TransmitProcessors.end();
261                 ++it ) {
262             all_dry_running &= (*it)->isDryRunning();
263         }
264
265         usleep(125);
266         cnt--;
267     }
268     if(cnt==0) {
269         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
270         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
271                 it != m_ReceiveProcessors.end();
272                 ++it ) {
273             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
274                 (*it)->getTypeString(), *it, (*it)->getStateString());
275         }
276         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
277                 it != m_TransmitProcessors.end();
278                 ++it ) {
279             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
280                 (*it)->getTypeString(), *it, (*it)->getStateString());
281         }
282         return false;
283     }
284     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
285     return true;
286 }
287
288 bool StreamProcessorManager::syncStartAll() {
289     // figure out when to get the SP's running.
290     // the xmit SP's should also know the base timestamp
291     // streams should be aligned here
292
293     // now find out how long we have to delay the wait operation such that
294     // the received frames will all be presented to the SP
295     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
296     int max_of_min_delay = 0;
297     int min_delay = 0;
298     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
299             it != m_ReceiveProcessors.end();
300             ++it ) {
301         min_delay = (*it)->getMaxFrameLatency();
302         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
303     }
304
305     // add some processing margin. This only shifts the time
306     // at which the buffer is transfer()'ed. This makes things somewhat
307     // more robust. It should be noted though that shifting the transfer
308     // time to a later time instant also causes the xmit buffer fill to be
309     // lower on average.
310     max_of_min_delay += FFADO_SIGNAL_DELAY_TICKS;
311     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n",
312         max_of_min_delay,
313         (unsigned int)TICKS_TO_SECS(max_of_min_delay),
314         (unsigned int)TICKS_TO_CYCLES(max_of_min_delay),
315         (unsigned int)TICKS_TO_OFFSET(max_of_min_delay));
316     m_SyncSource->setSyncDelay(max_of_min_delay);
317
318     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
319     //        have aquired lock
320     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
321     //sleep(2); // FIXME: be smarter here
322
323     // make sure that we are dry-running long enough for the
324     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
325     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
326     int nb_sync_runs=20;
327     int64_t time_till_next_period;
328     while(nb_sync_runs--) { // or while not sync-ed?
329         // check if we were woken up too soon
330         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
331         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
332         if(time_till_next_period > 0) {
333             // wait for the period
334             usleep(time_till_next_period);
335         }
336     }
337
338     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
339     // FIXME: in the SPM it would be nice to have system time instead of
340     //        1394 time
341
342     // we now should have decent sync info on the sync source
343     // determine a point in time where the system should start
344     // figure out where we are now
345     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
346     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
347         time_of_first_sample,
348         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
349         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
350         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
351
352     #define CYCLES_FOR_STARTUP 2000
353     // start wet-running in CYCLES_FOR_STARTUP cycles
354     // this is the time window we have to setup all SP's such that they
355     // can start wet-running correctly.
356     time_of_first_sample = addTicks(time_of_first_sample,
357                                     CYCLES_FOR_STARTUP * TICKS_PER_CYCLE);
358
359     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
360         time_of_first_sample,
361         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
362         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
363         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
364
365     // we should start wet-running the transmit SP's some cycles in advance
366     // such that we know it is wet-running when it should output its first sample
367     #define PRESTART_CYCLES_FOR_XMIT 20
368     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
369                                                  PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE);
370
371     #define PRESTART_CYCLES_FOR_RECV 0
372     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
373                                                  PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE);
374     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
375         time_to_start_xmit,
376         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
377         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
378         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
379     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
380         time_to_start_recv,
381         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
382         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
383         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
384
385     // at this point the buffer head timestamp of the transmit buffers can be set
386     // this is the presentation time of the first sample in the buffer
387     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
388           it != m_TransmitProcessors.end();
389           ++it ) {
390         (*it)->setBufferHeadTimestamp(time_of_first_sample);
391     }
392
393     // STEP X: switch SP's over to the running state
394     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
395           it != m_ReceiveProcessors.end();
396           ++it ) {
397         if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
398             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
399             return false;
400         }
401     }
402     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
403           it != m_TransmitProcessors.end();
404           ++it ) {
405         if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
406             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
407             return false;
408         }
409     }
410     // wait for the syncsource to start running.
411     // that will block the waitForPeriod call until everyone has started (theoretically)
412     int cnt = CYCLES_FOR_STARTUP * 2; // by then it should have started
413     while (!m_SyncSource->isRunning() && cnt) {
414         usleep(125);
415         cnt--;
416     }
417     if(cnt==0) {
418         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
419         return false;
420     }
421
422     // now align the received streams
423     if(!alignReceivedStreams()) {
424         debugError("Could not align streams\n");
425         return false;
426     }
427     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
428     return true;
429 }
430
431 bool
432 StreamProcessorManager::alignReceivedStreams()
433 {
434     #define NB_PERIODS_FOR_ALIGN_AVERAGE 20
435     #define NB_ALIGN_TRIES 20
436     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
437     unsigned int nb_sync_runs;
438     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
439     int64_t diff_between_streams[nb_rcv_sp];
440     int64_t diff;
441
442     unsigned int i;
443
444     bool aligned = false;
445     int cnt = NB_ALIGN_TRIES;
446     while (!aligned && cnt--) {
447         nb_sync_runs = NB_PERIODS_FOR_ALIGN_AVERAGE;
448         while(nb_sync_runs) {
449             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
450             waitForPeriod();
451
452             i = 0;
453             for ( i = 0; i < nb_rcv_sp; i++) {
454                 StreamProcessor *s = m_ReceiveProcessors.at(i);
455                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
456                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",
457                     m_SyncSource, s, diff);
458                 if ( nb_sync_runs == NB_PERIODS_FOR_ALIGN_AVERAGE ) {
459                     diff_between_streams[i] = diff;
460                 } else {
461                     diff_between_streams[i] += diff;
462                 }
463             }
464             if(!transferSilence()) {
465                 debugError("Could not transfer silence\n");
466                 return false;
467             }
468             nb_sync_runs--;
469         }
470         // calculate the average offsets
471         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
472         int diff_between_streams_frames[nb_rcv_sp];
473         aligned = true;
474         for ( i = 0; i < nb_rcv_sp; i++) {
475             StreamProcessor *s = m_ReceiveProcessors.at(i);
476
477             diff_between_streams[i] /= NB_PERIODS_FOR_ALIGN_AVERAGE;
478             diff_between_streams_frames[i] = (int)roundf(diff_between_streams[i] / s->getTicksPerFrame());
479             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",
480                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
481
482             aligned &= (diff_between_streams_frames[i] == 0);
483
484             // reposition the stream
485             if(!s->shiftStream(diff_between_streams_frames[i])) {
486                 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]);
487                 return false;
488             }
489         }
490         if (!aligned) {
491             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
492         }
493     }
494     if (cnt == 0) {
495         debugError("Align failed\n");
496         return false;
497     }
498     return true;
499 }
500
501 bool StreamProcessorManager::start() {
502     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
503
504     // put all SP's into dry-running state
505     if (!startDryRunning()) {
506         debugFatal("Could not put SP's in dry-running state\n");
507         return false;
508     }
509
510     // start all SP's synchonized
511     if (!syncStartAll()) {
512         debugFatal("Could not syncStartAll...\n");
513         return false;
514     }
515     return true;
516 }
517
518 bool StreamProcessorManager::stop() {
519     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
520
521     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
522     // switch SP's over to the dry-running state
523     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
524           it != m_ReceiveProcessors.end();
525           ++it ) {
526         if(!(*it)->scheduleStopRunning(-1)) {
527             debugError("%p->scheduleStopRunning(-1) failed\n", *it);
528             return false;
529         }
530     }
531     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
532           it != m_TransmitProcessors.end();
533           ++it ) {
534         if(!(*it)->scheduleStopRunning(-1)) {
535             debugError("%p->scheduleStopRunning(-1) failed\n", *it);
536             return false;
537         }
538     }
539     // wait for the SP's to get into the dry-running state
540     int cnt = 200;
541     bool ready = false;
542     while (!ready && cnt) {
543         ready = true;
544         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
545             it != m_ReceiveProcessors.end();
546             ++it ) {
547             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
548         }
549         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
550             it != m_TransmitProcessors.end();
551             ++it ) {
552             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
553         }
554         usleep(125);
555         cnt--;
556     }
557     if(cnt==0) {
558         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
559         return false;
560     }
561
562     // switch SP's over to the stopped state
563     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
564           it != m_ReceiveProcessors.end();
565           ++it ) {
566         if(!(*it)->scheduleStopDryRunning(-1)) {
567             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
568             return false;
569         }
570     }
571     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
572           it != m_TransmitProcessors.end();
573           ++it ) {
574         if(!(*it)->scheduleStopDryRunning(-1)) {
575             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
576             return false;
577         }
578     }
579     // wait for the SP's to get into the running state
580     cnt = 200;
581     ready = false;
582     while (!ready && cnt) {
583         ready = true;
584         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
585             it != m_ReceiveProcessors.end();
586             ++it ) {
587             ready &= (*it)->isStopped();
588         }
589         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
590             it != m_TransmitProcessors.end();
591             ++it ) {
592             ready &= (*it)->isStopped();
593         }
594         usleep(125);
595         cnt--;
596     }
597     if(cnt==0) {
598         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to stop\n");
599         return false;
600     }
601     return true;
602 }
603
604 /**
605  * Called upon Xrun events. This brings all StreamProcessors back
606  * into their starting state, and then carries on streaming. This should
607  * have the same effect as restarting the whole thing.
608  *
609  * @return true if successful, false otherwise
610  */
611 bool StreamProcessorManager::handleXrun() {
612
613     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
614
615     dumpInfo();
616
617     /*
618      * Reset means:
619      * 1) Disabling the SP's, so that they don't process any packets
620      *    note: the isomanager does keep on delivering/requesting them
621      * 2) Bringing all buffers & streamprocessors into a know state
622      *    - Clear all capture buffers
623      *    - Put nb_periods*period_size of null frames into the playback buffers
624      * 3) Re-enable the SP's
625      */
626
627     // put all SP's back into dry-running state
628     if (!startDryRunning()) {
629         debugFatal("Could not put SP's in dry-running state\n");
630         return false;
631     }
632
633     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
634     // start all SP's synchonized
635     if (!syncStartAll()) {
636         debugFatal("Could not syncStartAll...\n");
637         return false;
638     }
639
640     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
641
642     return true;
643 }
644
645 /**
646  * @brief Waits until the next period of samples is ready
647  *
648  * This function does not return until a full period of samples is (or should be)
649  * ready to be transferred.
650  *
651  * @return true if the period is ready, false if an xrun occurred
652  */
653 bool StreamProcessorManager::waitForPeriod() {
654     int time_till_next_period;
655     bool xrun_occurred = false;
656
657     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
658
659     assert(m_SyncSource);
660
661     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
662
663     while(time_till_next_period > 0) {
664         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
665
666         // wait for the period
667         usleep(time_till_next_period);
668
669         // check for underruns on the ISO side,
670         // those should make us bail out of the wait loop
671         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
672             it != m_ReceiveProcessors.end();
673             ++it ) {
674             // a xrun has occurred on the Iso side
675             xrun_occurred |= (*it)->xrunOccurred();
676         }
677         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
678             it != m_TransmitProcessors.end();
679             ++it ) {
680             // a xrun has occurred on the Iso side
681             xrun_occurred |= (*it)->xrunOccurred();
682         }
683         if(xrun_occurred) break;
684
685         // check if we were waked up too soon
686         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
687     }
688
689     // we save the 'ideal' time of the transfer at this point,
690     // because we can have interleaved read - process - write
691     // cycles making that we modify a receiving stream's buffer
692     // before we get to writing.
693     // NOTE: before waitForPeriod() is called again, both the transmit
694     //       and the receive processors should have done their transfer.
695     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
696     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
697         m_time_of_transfer);
698
699     // normally we can transfer frames at this time, but in some cases this is not true
700     // e.g. when there are not enough frames in the receive buffer.
701     // however this doesn't have to be a problem, since we can wait some more until we
702     // have enough frames. There is only a problem once the ISO xmit doesn't have packets
703     // to transmit, or if the receive buffer overflows. These conditions are signaled by
704     // the iso threads
705     // check if xruns occurred on the Iso side.
706     // also check if xruns will occur should we transfer() now
707     #ifdef DEBUG
708     int waited = 0;
709     #endif
710     bool ready_for_transfer = false;
711     bool ready;
712     xrun_occurred = false;
713     while (!ready_for_transfer && !xrun_occurred) {
714         // FIXME: can deadlock when the iso handlers die (e.g. unplug the device)
715         ready_for_transfer = true;
716         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
717             it != m_ReceiveProcessors.end();
718             ++it ) {
719             ready = ((*it)->canClientTransferFrames(m_period));
720             ready_for_transfer &= ready;
721             if (!ready) (*it)->flush();
722             xrun_occurred |= (*it)->xrunOccurred();
723         }
724         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
725             it != m_TransmitProcessors.end();
726             ++it ) {
727             ready = ((*it)->canClientTransferFrames(m_period));
728             ready_for_transfer &= ready;
729             if (!ready) (*it)->flush();
730             xrun_occurred |= (*it)->xrunOccurred();
731         }
732         if (!ready_for_transfer) {
733            
734             usleep(125); // MAGIC: one cycle sleep...
735
736             // in order to avoid this in the future, we increase the sync delay of the sync source SP
737             int d = m_SyncSource->getSyncDelay() + TICKS_PER_CYCLE;
738             m_SyncSource->setSyncDelay(d);
739
740             #ifdef DEBUG
741             waited++;
742             #endif
743         }
744     } // we are either ready or an xrun occurred
745
746     // in order to avoid a runaway value of the sync delay, we gradually decrease
747     // it. It will be increased by a 'too early' event (cfr some lines higher)
748     // hence we'll be at a good point on average.
749     int d = m_SyncSource->getSyncDelay() - 1;
750     if (d >= 0) m_SyncSource->setSyncDelay(d);
751
752
753     #ifdef DEBUG
754     if(waited > 0) {
755         debugOutput(DEBUG_LEVEL_VERBOSE, "Waited %d x 125us due to SP not ready for transfer\n", waited);
756     }
757     #endif
758
759     // this is to notify the client of the delay that we introduced by waiting
760     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
761     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", m_delayed_usecs);
762
763 #ifdef DEBUG
764     int rcv_bf=0, xmt_bf=0;
765     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
766         it != m_ReceiveProcessors.end();
767         ++it ) {
768         rcv_bf = (*it)->getBufferFill();
769     }
770     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
771         it != m_TransmitProcessors.end();
772         ++it ) {
773         xmt_bf = (*it)->getBufferFill();
774     }
775     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
776         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
777
778     // check if xruns occurred on the Iso side.
779     // also check if xruns will occur should we transfer() now
780     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
781           it != m_ReceiveProcessors.end();
782           ++it ) {
783
784         if ((*it)->xrunOccurred()) {
785             debugWarning("Xrun on RECV SP %p due to ISO side xrun\n",*it);
786             (*it)->dumpInfo();
787         }
788         if (!((*it)->canClientTransferFrames(m_period))) {
789             debugWarning("Xrun on RECV SP %p due to buffer side xrun\n",*it);
790             (*it)->dumpInfo();
791         }
792     }
793     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
794           it != m_TransmitProcessors.end();
795           ++it ) {
796         if ((*it)->xrunOccurred()) {
797             debugWarning("Xrun on XMIT SP %p due to ISO side xrun\n",*it);
798         }
799         if (!((*it)->canClientTransferFrames(m_period))) {
800             debugWarning("Xrun on XMIT SP %p due to buffer side xrun\n",*it);
801         }
802     }
803 #endif
804
805     m_nbperiods++;
806     // now we can signal the client that we are (should be) ready
807     return !xrun_occurred;
808 }
809
810 /**
811  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
812  *
813  * Transfers one period of frames from the client side to the Iso side and vice versa.
814  *
815  * @return true if successful, false otherwise (indicates xrun).
816  */
817 bool StreamProcessorManager::transfer() {
818     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
819     bool retval=true;
820     retval &= transfer(StreamProcessor::ePT_Receive);
821     retval &= transfer(StreamProcessor::ePT_Transmit);
822     return retval;
823 }
824
825 /**
826  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
827  *
828  * Transfers one period of frames from the client side to the Iso side or vice versa.
829  *
830  * @param t The processor type to tranfer for (receive or transmit)
831  * @return true if successful, false otherwise (indicates xrun).
832  */
833 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
834     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
835         t, m_time_of_transfer,
836         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
837         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
838         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
839
840     bool retval = true;
841     // a static cast could make sure that there is no performance
842     // penalty for the virtual functions (to be checked)
843     if (t==StreamProcessor::ePT_Receive) {
844         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
845                 it != m_ReceiveProcessors.end();
846                 ++it ) {
847             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
848                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
849                             m_period, m_time_of_transfer,*it);
850                 retval &= false; // buffer underrun
851             }
852         }
853     } else {
854         // FIXME: in the SPM it would be nice to have system time instead of
855         //        1394 time
856         float rate = m_SyncSource->getTicksPerFrame();
857         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
858
859         // the data we are putting into the buffer is intended to be transmitted
860         // one ringbuffer size after it has been received
861         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
862
863         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
864                 it != m_TransmitProcessors.end();
865                 ++it ) {
866             // FIXME: in the SPM it would be nice to have system time instead of
867             //        1394 time
868             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
869                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
870                         m_period, transmit_timestamp, *it);
871                 retval &= false; // buffer underrun
872             }
873         }
874     }
875     return retval;
876 }
877
878 /**
879  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
880  *
881  * Transfers one period of silence to the Iso side for transmit SP's
882  * or dump one period of frames for receive SP's
883  *
884  * @return true if successful, false otherwise (indicates xrun).
885  */
886 bool StreamProcessorManager::transferSilence() {
887     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
888     bool retval=true;
889     retval &= transferSilence(StreamProcessor::ePT_Receive);
890     retval &= transferSilence(StreamProcessor::ePT_Transmit);
891     return retval;
892 }
893
894 /**
895  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
896  *
897  * Transfers one period of silence to the Iso side for transmit SP's
898  * or dump one period of frames for receive SP's
899  *
900  * @param t The processor type to tranfer for (receive or transmit)
901  * @return true if successful, false otherwise (indicates xrun).
902  */
903 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
904     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
905         t, m_time_of_transfer,
906         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
907         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
908         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
909
910     bool retval = true;
911     // a static cast could make sure that there is no performance
912     // penalty for the virtual functions (to be checked)
913     if (t==StreamProcessor::ePT_Receive) {
914         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
915                 it != m_ReceiveProcessors.end();
916                 ++it ) {
917             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
918                     debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n",
919                             m_period, m_time_of_transfer,*it);
920                 retval &= false; // buffer underrun
921             }
922         }
923     } else {
924         // FIXME: in the SPM it would be nice to have system time instead of
925         //        1394 time
926         float rate = m_SyncSource->getTicksPerFrame();
927         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
928
929         // the data we are putting into the buffer is intended to be transmitted
930         // one ringbuffer size after it has been received
931         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
932
933         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
934                 it != m_TransmitProcessors.end();
935                 ++it ) {
936             // FIXME: in the SPM it would be nice to have system time instead of
937             //        1394 time
938             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
939                 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
940                         m_period, transmit_timestamp, *it);
941                 retval &= false; // buffer underrun
942             }
943         }
944     }
945     return retval;
946 }
947
948 void StreamProcessorManager::dumpInfo() {
949     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
950     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
951     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
952
953     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
954     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
955         it != m_ReceiveProcessors.end();
956         ++it ) {
957         (*it)->dumpInfo();
958     }
959
960     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
961     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
962         it != m_TransmitProcessors.end();
963         ++it ) {
964         (*it)->dumpInfo();
965     }
966
967     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
968
969 }
970
971 void StreamProcessorManager::setVerboseLevel(int l) {
972     setDebugLevel(l);
973
974     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
975     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
976         it != m_ReceiveProcessors.end();
977         ++it ) {
978         (*it)->setVerboseLevel(l);
979     }
980
981     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
982     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
983         it != m_TransmitProcessors.end();
984         ++it ) {
985         (*it)->setVerboseLevel(l);
986     }
987 }
988
989
990 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
991     int count=0;
992
993     if (direction == Port::E_Capture) {
994         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
995             it != m_ReceiveProcessors.end();
996             ++it ) {
997             count += (*it)->getPortCount(type);
998         }
999     } else {
1000         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1001             it != m_TransmitProcessors.end();
1002             ++it ) {
1003             count += (*it)->getPortCount(type);
1004         }
1005     }
1006     return count;
1007 }
1008
1009 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1010     int count=0;
1011
1012     if (direction == Port::E_Capture) {
1013         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1014             it != m_ReceiveProcessors.end();
1015             ++it ) {
1016             count += (*it)->getPortCount();
1017         }
1018     } else {
1019         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1020             it != m_TransmitProcessors.end();
1021             ++it ) {
1022             count += (*it)->getPortCount();
1023         }
1024     }
1025     return count;
1026 }
1027
1028 // TODO: implement a port map here, instead of the loop
1029
1030 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1031     int count=0;
1032     int prevcount=0;
1033
1034     if (direction == Port::E_Capture) {
1035         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1036             it != m_ReceiveProcessors.end();
1037             ++it ) {
1038             count += (*it)->getPortCount();
1039             if (count > idx) {
1040                 return (*it)->getPortAtIdx(idx-prevcount);
1041             }
1042             prevcount=count;
1043         }
1044     } else {
1045         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1046             it != m_TransmitProcessors.end();
1047             ++it ) {
1048             count += (*it)->getPortCount();
1049             if (count > idx) {
1050                 return (*it)->getPortAtIdx(idx-prevcount);
1051             }
1052             prevcount=count;
1053         }
1054     }
1055     return NULL;
1056 }
1057
1058 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1059     m_thread_realtime=rt;
1060     m_thread_priority=priority;
1061     return true;
1062 }
1063
1064
1065 } // end of namespace
Note: See TracBrowser for help on using the browser.