root/trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

Revision 776, 39.4 kB (checked in by ppalmers, 13 years ago)

try to fix deadlock / performace issues

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "StreamProcessorManager.h"
25 #include "generic/StreamProcessor.h"
26 #include "generic/Port.h"
27 #include "libieee1394/cycletimer.h"
28
29 #include <errno.h>
30 #include <assert.h>
31 #include <math.h>
32
33 #define RUNNING_TIMEOUT_MSEC 4000
34 #define PREPARE_TIMEOUT_MSEC 4000
35 #define ENABLE_TIMEOUT_MSEC 4000
36
37 // allows to add some processing margin. This shifts the time
38 // at which the buffer is transfer()'ed, making things somewhat
39 // more robust. It should be noted though that shifting the transfer
40 // time to a later time instant also causes the xmit buffer fill to be
41 // lower on average.
42 #define FFADO_SIGNAL_DELAY_TICKS (3072*1)
43
44 namespace Streaming {
45
46 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
47
48 StreamProcessorManager::StreamProcessorManager()
49     : m_is_slave( false )
50     , m_SyncSource(NULL)
51     , m_nb_buffers( 0 )
52     , m_period( 0 )
53     , m_nominal_framerate ( 0 )
54     , m_xrun_happened( false )
55     , m_xruns(0)
56     , m_nbperiods(0)
57 {
58     addOption(Util::OptionContainer::Option("slaveMode",false));
59 }
60
61 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int framerate, unsigned int nb_buffers)
62     : m_is_slave( false )
63     , m_SyncSource(NULL)
64     , m_nb_buffers(nb_buffers)
65     , m_period(period)
66     , m_nominal_framerate ( framerate )
67     , m_xruns(0)
68     , m_xrun_happened( false )
69     , m_nbperiods(0)
70 {
71     addOption(Util::OptionContainer::Option("slaveMode",false));
72 }
73
74 StreamProcessorManager::~StreamProcessorManager() {
75 }
76
77 /**
78  * Registers \ref processor with this manager.
79  *
80  * also registers it with the isohandlermanager
81  *
82  * be sure to call isohandlermanager->init() first!
83  * and be sure that the processors are also ->init()'ed
84  *
85  * @param processor
86  * @return true if successfull
87  */
88 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
89 {
90     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
91     assert(processor);
92     if (processor->getType() == StreamProcessor::ePT_Receive) {
93         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
94         m_ReceiveProcessors.push_back(processor);
95         return true;
96     }
97
98     if (processor->getType() == StreamProcessor::ePT_Transmit) {
99         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
100         m_TransmitProcessors.push_back(processor);
101         return true;
102     }
103
104     debugFatal("Unsupported processor type!\n");
105     return false;
106 }
107
108 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
109 {
110     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
111     assert(processor);
112
113     if (processor->getType()==StreamProcessor::ePT_Receive) {
114
115         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
116               it != m_ReceiveProcessors.end();
117               ++it )
118         {
119             if ( *it == processor ) {
120                 m_ReceiveProcessors.erase(it);
121                 return true;
122             }
123         }
124     }
125
126     if (processor->getType()==StreamProcessor::ePT_Transmit) {
127         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
128               it != m_TransmitProcessors.end();
129               ++it )
130         {
131             if ( *it == processor ) {
132                 m_TransmitProcessors.erase(it);
133                 return true;
134             }
135         }
136     }
137
138     debugFatal("Processor (%p) not found!\n",processor);
139     return false; //not found
140 }
141
142 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
143     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
144     m_SyncSource=s;
145     return true;
146 }
147
148 bool StreamProcessorManager::prepare() {
149
150     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
151
152     m_is_slave=false;
153     if(!getOption("slaveMode", m_is_slave)) {
154         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
155     }
156
157     // if no sync source is set, select one here
158     if(m_SyncSource == NULL) {
159        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
160     }
161
162     // FIXME: put into separate method
163     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
164           it != m_ReceiveProcessors.end();
165           ++it )
166     {
167         if(m_SyncSource == NULL) {
168             debugWarning(" => Sync Source is %p.\n", *it);
169             m_SyncSource = *it;
170         }
171     }
172     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
173           it != m_TransmitProcessors.end();
174           ++it )
175     {
176         if(m_SyncSource == NULL) {
177             debugWarning(" => Sync Source is %p.\n", *it);
178             m_SyncSource = *it;
179         }
180     }
181
182     // now do the actual preparation of the SP's
183     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
184     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
185         it != m_ReceiveProcessors.end();
186         ++it ) {
187
188         if(!(*it)->setOption("slaveMode", m_is_slave)) {
189             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
190         }
191
192         if(!(*it)->prepare()) {
193             debugFatal(  " could not prepare (%p)...\n",(*it));
194             return false;
195         }
196     }
197     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
198     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
199         it != m_TransmitProcessors.end();
200         ++it ) {
201         if(!(*it)->setOption("slaveMode", m_is_slave)) {
202             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
203         }
204         if(!(*it)->prepare()) {
205             debugFatal( " could not prepare (%p)...\n",(*it));
206             return false;
207         }
208     }
209
210     // if there are no stream processors registered,
211     // fail
212     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
213         debugFatal("No stream processors registered, can't do anything usefull\n");
214         return false;
215     }
216     return true;
217 }
218
219 bool StreamProcessorManager::startDryRunning() {
220     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
221     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
222     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
223             it != m_ReceiveProcessors.end();
224             ++it ) {
225         if (!(*it)->isDryRunning()) {
226             if(!(*it)->scheduleStartDryRunning(-1)) {
227                 debugError("Could not put SP %p into the dry-running state\n", *it);
228                 return false;
229             }
230         } else {
231             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
232         }
233     }
234     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
235             it != m_TransmitProcessors.end();
236             ++it ) {
237         if (!(*it)->isDryRunning()) {
238             if(!(*it)->scheduleStartDryRunning(-1)) {
239                 debugError("Could not put SP %p into the dry-running state\n", *it);
240                 return false;
241             }
242         } else {
243             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
244         }
245     }
246     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
247     // wait for the syncsource to start running.
248     // that will block the waitForPeriod call until everyone has started (theoretically)
249     #define CYCLES_FOR_DRYRUN 40000
250     int cnt = CYCLES_FOR_DRYRUN; // by then it should have started
251     bool all_dry_running = false;
252     while (!all_dry_running && cnt) {
253         all_dry_running = true;
254         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
255                 it != m_ReceiveProcessors.end();
256                 ++it ) {
257             all_dry_running &= (*it)->isDryRunning();
258         }
259         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
260                 it != m_TransmitProcessors.end();
261                 ++it ) {
262             all_dry_running &= (*it)->isDryRunning();
263         }
264
265         usleep(125);
266         cnt--;
267     }
268     if(cnt==0) {
269         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
270         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
271                 it != m_ReceiveProcessors.end();
272                 ++it ) {
273             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
274                 (*it)->getTypeString(), *it, (*it)->getStateString());
275         }
276         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
277                 it != m_TransmitProcessors.end();
278                 ++it ) {
279             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
280                 (*it)->getTypeString(), *it, (*it)->getStateString());
281         }
282         return false;
283     }
284     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
285     return true;
286 }
287
288 bool StreamProcessorManager::syncStartAll() {
289     // figure out when to get the SP's running.
290     // the xmit SP's should also know the base timestamp
291     // streams should be aligned here
292
293     // now find out how long we have to delay the wait operation such that
294     // the received frames will all be presented to the SP
295     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
296     int max_of_min_delay = 0;
297     int min_delay = 0;
298     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
299             it != m_ReceiveProcessors.end();
300             ++it ) {
301         min_delay = (*it)->getMaxFrameLatency();
302         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
303     }
304
305     // add some processing margin. This only shifts the time
306     // at which the buffer is transfer()'ed. This makes things somewhat
307     // more robust. It should be noted though that shifting the transfer
308     // time to a later time instant also causes the xmit buffer fill to be
309     // lower on average.
310     max_of_min_delay += FFADO_SIGNAL_DELAY_TICKS;
311     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n",
312         max_of_min_delay,
313         (unsigned int)TICKS_TO_SECS(max_of_min_delay),
314         (unsigned int)TICKS_TO_CYCLES(max_of_min_delay),
315         (unsigned int)TICKS_TO_OFFSET(max_of_min_delay));
316     m_SyncSource->setSyncDelay(max_of_min_delay);
317
318     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
319     //        have aquired lock
320     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
321     //sleep(2); // FIXME: be smarter here
322
323     // make sure that we are dry-running long enough for the
324     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
325     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
326     int nb_sync_runs=20;
327     int64_t time_till_next_period;
328     while(nb_sync_runs--) { // or while not sync-ed?
329         // check if we were woken up too soon
330         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
331         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
332         if(time_till_next_period > 0) {
333             // wait for the period
334             usleep(time_till_next_period);
335         }
336     }
337
338     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
339     // FIXME: in the SPM it would be nice to have system time instead of
340     //        1394 time
341
342     // we now should have decent sync info on the sync source
343     // determine a point in time where the system should start
344     // figure out where we are now
345     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
346     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
347         time_of_first_sample,
348         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
349         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
350         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
351
352     #define CYCLES_FOR_STARTUP 2000
353     // start wet-running in CYCLES_FOR_STARTUP cycles
354     // this is the time window we have to setup all SP's such that they
355     // can start wet-running correctly.
356     time_of_first_sample = addTicks(time_of_first_sample,
357                                     CYCLES_FOR_STARTUP * TICKS_PER_CYCLE);
358
359     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
360         time_of_first_sample,
361         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
362         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
363         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
364
365     // we should start wet-running the transmit SP's some cycles in advance
366     // such that we know it is wet-running when it should output its first sample
367     #define PRESTART_CYCLES_FOR_XMIT 20
368     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
369                                                  PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE);
370
371     #define PRESTART_CYCLES_FOR_RECV 0
372     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
373                                                  PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE);
374     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
375         time_to_start_xmit,
376         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
377         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
378         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
379     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
380         time_to_start_recv,
381         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
382         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
383         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
384
385     // at this point the buffer head timestamp of the transmit buffers can be set
386     // this is the presentation time of the first sample in the buffer
387     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
388           it != m_TransmitProcessors.end();
389           ++it ) {
390         (*it)->setBufferHeadTimestamp(time_of_first_sample);
391     }
392
393     // STEP X: switch SP's over to the running state
394     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
395           it != m_ReceiveProcessors.end();
396           ++it ) {
397         if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
398             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
399             return false;
400         }
401     }
402     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
403           it != m_TransmitProcessors.end();
404           ++it ) {
405         if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
406             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
407             return false;
408         }
409     }
410     // wait for the syncsource to start running.
411     // that will block the waitForPeriod call until everyone has started (theoretically)
412     int cnt = CYCLES_FOR_STARTUP * 20; // by then it should have started
413     while (!m_SyncSource->isRunning() && cnt) {
414         usleep(125);
415         cnt--;
416     }
417     if(cnt==0) {
418         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
419         return false;
420     }
421
422     // now align the received streams
423     if(!alignReceivedStreams()) {
424         debugError("Could not align streams\n");
425         return false;
426     }
427     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
428     return true;
429 }
430
431 bool
432 StreamProcessorManager::alignReceivedStreams()
433 {
434     #define ALIGN_AVERAGE_TIME_MSEC 200
435     #define NB_ALIGN_TRIES 40
436     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
437     unsigned int nb_sync_runs;
438     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
439     int64_t diff_between_streams[nb_rcv_sp];
440     int64_t diff;
441
442     unsigned int i;
443
444     unsigned int periods_per_align_try = (ALIGN_AVERAGE_TIME_MSEC * getNominalRate());
445     periods_per_align_try /= 1000;
446     periods_per_align_try /= getPeriodSize();
447     debugOutput( DEBUG_LEVEL_VERBOSE, " averaging over %u periods...\n", periods_per_align_try);
448
449     bool aligned = false;
450     int cnt = NB_ALIGN_TRIES;
451     while (!aligned && cnt--) {
452         nb_sync_runs = periods_per_align_try;
453         while(nb_sync_runs) {
454             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
455             waitForPeriod();
456
457             i = 0;
458             for ( i = 0; i < nb_rcv_sp; i++) {
459                 StreamProcessor *s = m_ReceiveProcessors.at(i);
460                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
461                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",
462                     m_SyncSource, s, diff);
463                 if ( nb_sync_runs == periods_per_align_try ) {
464                     diff_between_streams[i] = diff;
465                 } else {
466                     diff_between_streams[i] += diff;
467                 }
468             }
469             if(!transferSilence()) {
470                 debugError("Could not transfer silence\n");
471                 return false;
472             }
473             nb_sync_runs--;
474         }
475         // calculate the average offsets
476         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
477         int diff_between_streams_frames[nb_rcv_sp];
478         aligned = true;
479         for ( i = 0; i < nb_rcv_sp; i++) {
480             StreamProcessor *s = m_ReceiveProcessors.at(i);
481
482             diff_between_streams[i] /= periods_per_align_try;
483             diff_between_streams_frames[i] = (int)roundf(diff_between_streams[i] / s->getTicksPerFrame());
484             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",
485                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
486
487             aligned &= (diff_between_streams_frames[i] == 0);
488
489             // reposition the stream
490             if(!s->shiftStream(diff_between_streams_frames[i])) {
491                 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]);
492                 return false;
493             }
494         }
495         if (!aligned) {
496             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
497         }
498     }
499     if (cnt == 0) {
500         debugError("Align failed\n");
501         return false;
502     }
503     return true;
504 }
505
506 bool StreamProcessorManager::start() {
507     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
508
509     // put all SP's into dry-running state
510     if (!startDryRunning()) {
511         debugFatal("Could not put SP's in dry-running state\n");
512         return false;
513     }
514
515     // start all SP's synchonized
516     if (!syncStartAll()) {
517         debugFatal("Could not syncStartAll...\n");
518         return false;
519     }
520     return true;
521 }
522
523 bool StreamProcessorManager::stop() {
524     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
525
526     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
527     // switch SP's over to the dry-running state
528     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
529           it != m_ReceiveProcessors.end();
530           ++it ) {
531         if(!(*it)->scheduleStopRunning(-1)) {
532             debugError("%p->scheduleStopRunning(-1) failed\n", *it);
533             return false;
534         }
535     }
536     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
537           it != m_TransmitProcessors.end();
538           ++it ) {
539         if(!(*it)->scheduleStopRunning(-1)) {
540             debugError("%p->scheduleStopRunning(-1) failed\n", *it);
541             return false;
542         }
543     }
544     // wait for the SP's to get into the dry-running state
545     int cnt = 200;
546     bool ready = false;
547     while (!ready && cnt) {
548         ready = true;
549         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
550             it != m_ReceiveProcessors.end();
551             ++it ) {
552             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
553         }
554         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
555             it != m_TransmitProcessors.end();
556             ++it ) {
557             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
558         }
559         usleep(125);
560         cnt--;
561     }
562     if(cnt==0) {
563         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
564         return false;
565     }
566
567     // switch SP's over to the stopped state
568     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
569           it != m_ReceiveProcessors.end();
570           ++it ) {
571         if(!(*it)->scheduleStopDryRunning(-1)) {
572             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
573             return false;
574         }
575     }
576     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
577           it != m_TransmitProcessors.end();
578           ++it ) {
579         if(!(*it)->scheduleStopDryRunning(-1)) {
580             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
581             return false;
582         }
583     }
584     // wait for the SP's to get into the running state
585     cnt = 200;
586     ready = false;
587     while (!ready && cnt) {
588         ready = true;
589         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
590             it != m_ReceiveProcessors.end();
591             ++it ) {
592             ready &= (*it)->isStopped();
593         }
594         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
595             it != m_TransmitProcessors.end();
596             ++it ) {
597             ready &= (*it)->isStopped();
598         }
599         usleep(125);
600         cnt--;
601     }
602     if(cnt==0) {
603         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to stop\n");
604         return false;
605     }
606     return true;
607 }
608
609 /**
610  * Called upon Xrun events. This brings all StreamProcessors back
611  * into their starting state, and then carries on streaming. This should
612  * have the same effect as restarting the whole thing.
613  *
614  * @return true if successful, false otherwise
615  */
616 bool StreamProcessorManager::handleXrun() {
617
618     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
619
620     dumpInfo();
621
622     /*
623      * Reset means:
624      * 1) Disabling the SP's, so that they don't process any packets
625      *    note: the isomanager does keep on delivering/requesting them
626      * 2) Bringing all buffers & streamprocessors into a know state
627      *    - Clear all capture buffers
628      *    - Put nb_periods*period_size of null frames into the playback buffers
629      * 3) Re-enable the SP's
630      */
631
632     // put all SP's back into dry-running state
633     if (!startDryRunning()) {
634         debugFatal("Could not put SP's in dry-running state\n");
635         return false;
636     }
637
638     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
639     // start all SP's synchonized
640     if (!syncStartAll()) {
641         debugFatal("Could not syncStartAll...\n");
642         return false;
643     }
644
645     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
646
647     return true;
648 }
649
650 /**
651  * @brief Waits until the next period of samples is ready
652  *
653  * This function does not return until a full period of samples is (or should be)
654  * ready to be transferred.
655  *
656  * @return true if the period is ready, false if an xrun occurred
657  */
658 bool StreamProcessorManager::waitForPeriod() {
659     int time_till_next_period;
660     bool xrun_occurred = false;
661
662     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
663
664     assert(m_SyncSource);
665
666     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
667
668     while(time_till_next_period > 0) {
669         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
670
671         // wait for the period
672         usleep(time_till_next_period);
673
674         // check for underruns on the ISO side,
675         // those should make us bail out of the wait loop
676         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
677             it != m_ReceiveProcessors.end();
678             ++it ) {
679             // a xrun has occurred on the Iso side
680             xrun_occurred |= (*it)->xrunOccurred();
681         }
682         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
683             it != m_TransmitProcessors.end();
684             ++it ) {
685             // a xrun has occurred on the Iso side
686             xrun_occurred |= (*it)->xrunOccurred();
687         }
688         if(xrun_occurred) break;
689
690         // check if we were waked up too soon
691         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
692     }
693
694     // we save the 'ideal' time of the transfer at this point,
695     // because we can have interleaved read - process - write
696     // cycles making that we modify a receiving stream's buffer
697     // before we get to writing.
698     // NOTE: before waitForPeriod() is called again, both the transmit
699     //       and the receive processors should have done their transfer.
700     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
701     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
702         m_time_of_transfer);
703
704     // normally we can transfer frames at this time, but in some cases this is not true
705     // e.g. when there are not enough frames in the receive buffer.
706     // however this doesn't have to be a problem, since we can wait some more until we
707     // have enough frames. There is only a problem once the ISO xmit doesn't have packets
708     // to transmit, or if the receive buffer overflows. These conditions are signaled by
709     // the iso threads
710     // check if xruns occurred on the Iso side.
711     // also check if xruns will occur should we transfer() now
712     #ifdef DEBUG
713     int waited = 0;
714     #endif
715     bool ready_for_transfer = false;
716     bool ready;
717     xrun_occurred = false;
718     while (!ready_for_transfer && !xrun_occurred) {
719         // FIXME: can deadlock when the iso handlers die (e.g. unplug the device)
720         ready_for_transfer = true;
721         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
722             it != m_ReceiveProcessors.end();
723             ++it ) {
724             ready = ((*it)->canClientTransferFrames(m_period));
725             ready_for_transfer &= ready;
726             if (!ready) (*it)->flush();
727             xrun_occurred |= (*it)->xrunOccurred();
728         }
729         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
730             it != m_TransmitProcessors.end();
731             ++it ) {
732             ready = ((*it)->canClientTransferFrames(m_period));
733             ready_for_transfer &= ready;
734             if (!ready) (*it)->flush();
735             xrun_occurred |= (*it)->xrunOccurred();
736         }
737         if (!ready_for_transfer) {
738            
739             usleep(125); // MAGIC: one cycle sleep...
740
741             // in order to avoid this in the future, we increase the sync delay of the sync source SP
742             int d = m_SyncSource->getSyncDelay() + TICKS_PER_CYCLE;
743             m_SyncSource->setSyncDelay(d);
744
745             #ifdef DEBUG
746             waited++;
747             #endif
748         }
749     } // we are either ready or an xrun occurred
750
751     // in order to avoid a runaway value of the sync delay, we gradually decrease
752     // it. It will be increased by a 'too early' event (cfr some lines higher)
753     // hence we'll be at a good point on average.
754     int d = m_SyncSource->getSyncDelay() - 1;
755     if (d >= 0) m_SyncSource->setSyncDelay(d);
756
757
758     #ifdef DEBUG
759     if(waited > 0) {
760         debugOutput(DEBUG_LEVEL_VERBOSE, "Waited %d x 125us due to SP not ready for transfer\n", waited);
761     }
762     #endif
763
764     // this is to notify the client of the delay that we introduced by waiting
765     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
766     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", m_delayed_usecs);
767
768 #ifdef DEBUG
769     int rcv_bf=0, xmt_bf=0;
770     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
771         it != m_ReceiveProcessors.end();
772         ++it ) {
773         rcv_bf = (*it)->getBufferFill();
774     }
775     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
776         it != m_TransmitProcessors.end();
777         ++it ) {
778         xmt_bf = (*it)->getBufferFill();
779     }
780     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
781         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
782
783     // check if xruns occurred on the Iso side.
784     // also check if xruns will occur should we transfer() now
785     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
786           it != m_ReceiveProcessors.end();
787           ++it ) {
788
789         if ((*it)->xrunOccurred()) {
790             debugWarning("Xrun on RECV SP %p due to ISO side xrun\n",*it);
791             (*it)->dumpInfo();
792         }
793         if (!((*it)->canClientTransferFrames(m_period))) {
794             debugWarning("Xrun on RECV SP %p due to buffer side xrun\n",*it);
795             (*it)->dumpInfo();
796         }
797     }
798     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
799           it != m_TransmitProcessors.end();
800           ++it ) {
801         if ((*it)->xrunOccurred()) {
802             debugWarning("Xrun on XMIT SP %p due to ISO side xrun\n",*it);
803         }
804         if (!((*it)->canClientTransferFrames(m_period))) {
805             debugWarning("Xrun on XMIT SP %p due to buffer side xrun\n",*it);
806         }
807     }
808 #endif
809
810     m_nbperiods++;
811     // now we can signal the client that we are (should be) ready
812     return !xrun_occurred;
813 }
814
815 /**
816  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
817  *
818  * Transfers one period of frames from the client side to the Iso side and vice versa.
819  *
820  * @return true if successful, false otherwise (indicates xrun).
821  */
822 bool StreamProcessorManager::transfer() {
823     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
824     bool retval=true;
825     retval &= transfer(StreamProcessor::ePT_Receive);
826     retval &= transfer(StreamProcessor::ePT_Transmit);
827     return retval;
828 }
829
830 /**
831  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
832  *
833  * Transfers one period of frames from the client side to the Iso side or vice versa.
834  *
835  * @param t The processor type to tranfer for (receive or transmit)
836  * @return true if successful, false otherwise (indicates xrun).
837  */
838 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
839     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
840         t, m_time_of_transfer,
841         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
842         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
843         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
844
845     bool retval = true;
846     // a static cast could make sure that there is no performance
847     // penalty for the virtual functions (to be checked)
848     if (t==StreamProcessor::ePT_Receive) {
849         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
850                 it != m_ReceiveProcessors.end();
851                 ++it ) {
852             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
853                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
854                             m_period, m_time_of_transfer,*it);
855                 retval &= false; // buffer underrun
856             }
857         }
858     } else {
859         // FIXME: in the SPM it would be nice to have system time instead of
860         //        1394 time
861         float rate = m_SyncSource->getTicksPerFrame();
862         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
863
864         // the data we are putting into the buffer is intended to be transmitted
865         // one ringbuffer size after it has been received
866         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
867
868         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
869                 it != m_TransmitProcessors.end();
870                 ++it ) {
871             // FIXME: in the SPM it would be nice to have system time instead of
872             //        1394 time
873             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
874                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
875                         m_period, transmit_timestamp, *it);
876                 retval &= false; // buffer underrun
877             }
878         }
879     }
880     return retval;
881 }
882
883 /**
884  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
885  *
886  * Transfers one period of silence to the Iso side for transmit SP's
887  * or dump one period of frames for receive SP's
888  *
889  * @return true if successful, false otherwise (indicates xrun).
890  */
891 bool StreamProcessorManager::transferSilence() {
892     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
893     bool retval=true;
894     retval &= transferSilence(StreamProcessor::ePT_Receive);
895     retval &= transferSilence(StreamProcessor::ePT_Transmit);
896     return retval;
897 }
898
899 /**
900  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
901  *
902  * Transfers one period of silence to the Iso side for transmit SP's
903  * or dump one period of frames for receive SP's
904  *
905  * @param t The processor type to tranfer for (receive or transmit)
906  * @return true if successful, false otherwise (indicates xrun).
907  */
908 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
909     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
910         t, m_time_of_transfer,
911         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
912         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
913         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
914
915     bool retval = true;
916     // a static cast could make sure that there is no performance
917     // penalty for the virtual functions (to be checked)
918     if (t==StreamProcessor::ePT_Receive) {
919         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
920                 it != m_ReceiveProcessors.end();
921                 ++it ) {
922             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
923                     debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n",
924                             m_period, m_time_of_transfer,*it);
925                 retval &= false; // buffer underrun
926             }
927         }
928     } else {
929         // FIXME: in the SPM it would be nice to have system time instead of
930         //        1394 time
931         float rate = m_SyncSource->getTicksPerFrame();
932         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
933
934         // the data we are putting into the buffer is intended to be transmitted
935         // one ringbuffer size after it has been received
936         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
937
938         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
939                 it != m_TransmitProcessors.end();
940                 ++it ) {
941             // FIXME: in the SPM it would be nice to have system time instead of
942             //        1394 time
943             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
944                 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
945                         m_period, transmit_timestamp, *it);
946                 retval &= false; // buffer underrun
947             }
948         }
949     }
950     return retval;
951 }
952
953 void StreamProcessorManager::dumpInfo() {
954     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
955     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
956     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
957
958     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
959     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
960         it != m_ReceiveProcessors.end();
961         ++it ) {
962         (*it)->dumpInfo();
963     }
964
965     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
966     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
967         it != m_TransmitProcessors.end();
968         ++it ) {
969         (*it)->dumpInfo();
970     }
971
972     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
973
974 }
975
976 void StreamProcessorManager::setVerboseLevel(int l) {
977     setDebugLevel(l);
978
979     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
980     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
981         it != m_ReceiveProcessors.end();
982         ++it ) {
983         (*it)->setVerboseLevel(l);
984     }
985
986     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
987     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
988         it != m_TransmitProcessors.end();
989         ++it ) {
990         (*it)->setVerboseLevel(l);
991     }
992 }
993
994
995 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
996     int count=0;
997
998     if (direction == Port::E_Capture) {
999         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1000             it != m_ReceiveProcessors.end();
1001             ++it ) {
1002             count += (*it)->getPortCount(type);
1003         }
1004     } else {
1005         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1006             it != m_TransmitProcessors.end();
1007             ++it ) {
1008             count += (*it)->getPortCount(type);
1009         }
1010     }
1011     return count;
1012 }
1013
1014 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1015     int count=0;
1016
1017     if (direction == Port::E_Capture) {
1018         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1019             it != m_ReceiveProcessors.end();
1020             ++it ) {
1021             count += (*it)->getPortCount();
1022         }
1023     } else {
1024         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1025             it != m_TransmitProcessors.end();
1026             ++it ) {
1027             count += (*it)->getPortCount();
1028         }
1029     }
1030     return count;
1031 }
1032
1033 // TODO: implement a port map here, instead of the loop
1034
1035 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1036     int count=0;
1037     int prevcount=0;
1038
1039     if (direction == Port::E_Capture) {
1040         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1041             it != m_ReceiveProcessors.end();
1042             ++it ) {
1043             count += (*it)->getPortCount();
1044             if (count > idx) {
1045                 return (*it)->getPortAtIdx(idx-prevcount);
1046             }
1047             prevcount=count;
1048         }
1049     } else {
1050         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1051             it != m_TransmitProcessors.end();
1052             ++it ) {
1053             count += (*it)->getPortCount();
1054             if (count > idx) {
1055                 return (*it)->getPortAtIdx(idx-prevcount);
1056             }
1057             prevcount=count;
1058         }
1059     }
1060     return NULL;
1061 }
1062
1063 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1064     m_thread_realtime=rt;
1065     m_thread_priority=priority;
1066     return true;
1067 }
1068
1069
1070 } // end of namespace
Note: See TracBrowser for help on using the browser.