root/trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

Revision 797, 40.2 kB (checked in by ppalmers, 13 years ago)

parameters for better latency performance

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23 #include "config.h"
24
25 #include "StreamProcessorManager.h"
26 #include "generic/StreamProcessor.h"
27 #include "generic/Port.h"
28 #include "libieee1394/cycletimer.h"
29
30 #include "libutil/Time.h"
31
32 #include <errno.h>
33 #include <assert.h>
34 #include <math.h>
35
36 namespace Streaming {
37
38 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
39
40 StreamProcessorManager::StreamProcessorManager()
41     : m_is_slave( false )
42     , m_SyncSource(NULL)
43     , m_nb_buffers( 0 )
44     , m_period( 0 )
45     , m_nominal_framerate ( 0 )
46     , m_xrun_happened( false )
47     , m_xruns(0)
48     , m_nbperiods(0)
49 {
50     addOption(Util::OptionContainer::Option("slaveMode",false));
51 }
52
53 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int framerate, unsigned int nb_buffers)
54     : m_is_slave( false )
55     , m_SyncSource(NULL)
56     , m_nb_buffers(nb_buffers)
57     , m_period(period)
58     , m_nominal_framerate ( framerate )
59     , m_xruns(0)
60     , m_xrun_happened( false )
61     , m_nbperiods(0)
62 {
63     addOption(Util::OptionContainer::Option("slaveMode",false));
64 }
65
66 StreamProcessorManager::~StreamProcessorManager() {
67 }
68
69 /**
70  * Registers \ref processor with this manager.
71  *
72  * also registers it with the isohandlermanager
73  *
74  * be sure to call isohandlermanager->init() first!
75  * and be sure that the processors are also ->init()'ed
76  *
77  * @param processor
78  * @return true if successfull
79  */
80 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
81 {
82     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
83     assert(processor);
84     if (processor->getType() == StreamProcessor::ePT_Receive) {
85         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
86         m_ReceiveProcessors.push_back(processor);
87         return true;
88     }
89
90     if (processor->getType() == StreamProcessor::ePT_Transmit) {
91         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
92         m_TransmitProcessors.push_back(processor);
93         return true;
94     }
95
96     debugFatal("Unsupported processor type!\n");
97     return false;
98 }
99
100 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
101 {
102     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
103     assert(processor);
104
105     if (processor->getType()==StreamProcessor::ePT_Receive) {
106
107         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
108               it != m_ReceiveProcessors.end();
109               ++it )
110         {
111             if ( *it == processor ) {
112                 if (*it == m_SyncSource) {
113                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source");
114                     m_SyncSource = NULL;
115                 }
116                 m_ReceiveProcessors.erase(it);
117                 return true;
118             }
119         }
120     }
121
122     if (processor->getType()==StreamProcessor::ePT_Transmit) {
123         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
124               it != m_TransmitProcessors.end();
125               ++it )
126         {
127             if ( *it == processor ) {
128                 if (*it == m_SyncSource) {
129                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source");
130                     m_SyncSource = NULL;
131                 }
132                 m_TransmitProcessors.erase(it);
133                 return true;
134             }
135         }
136     }
137
138     debugFatal("Processor (%p) not found!\n",processor);
139     return false; //not found
140 }
141
142 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
143     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
144     m_SyncSource=s;
145     return true;
146 }
147
148 bool StreamProcessorManager::prepare() {
149
150     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
151
152     m_is_slave=false;
153     if(!getOption("slaveMode", m_is_slave)) {
154         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
155     }
156
157     // if no sync source is set, select one here
158     if(m_SyncSource == NULL) {
159        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
160     }
161
162     // FIXME: put into separate method
163     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
164           it != m_ReceiveProcessors.end();
165           ++it )
166     {
167         if(m_SyncSource == NULL) {
168             debugWarning(" => Sync Source is %p.\n", *it);
169             m_SyncSource = *it;
170         }
171     }
172     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
173           it != m_TransmitProcessors.end();
174           ++it )
175     {
176         if(m_SyncSource == NULL) {
177             debugWarning(" => Sync Source is %p.\n", *it);
178             m_SyncSource = *it;
179         }
180     }
181
182     // now do the actual preparation of the SP's
183     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
184     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
185         it != m_ReceiveProcessors.end();
186         ++it ) {
187
188         if(!(*it)->setOption("slaveMode", m_is_slave)) {
189             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
190         }
191
192         if(!(*it)->prepare()) {
193             debugFatal(  " could not prepare (%p)...\n",(*it));
194             return false;
195         }
196     }
197     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
198     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
199         it != m_TransmitProcessors.end();
200         ++it ) {
201         if(!(*it)->setOption("slaveMode", m_is_slave)) {
202             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
203         }
204         if(!(*it)->prepare()) {
205             debugFatal( " could not prepare (%p)...\n",(*it));
206             return false;
207         }
208     }
209
210     // if there are no stream processors registered,
211     // fail
212     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
213         debugFatal("No stream processors registered, can't do anything usefull\n");
214         return false;
215     }
216     return true;
217 }
218
219 bool StreamProcessorManager::startDryRunning() {
220     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
221     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
222     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
223             it != m_ReceiveProcessors.end();
224             ++it ) {
225         if (!(*it)->isDryRunning()) {
226             if(!(*it)->scheduleStartDryRunning(-1)) {
227                 debugError("Could not put SP %p into the dry-running state\n", *it);
228                 return false;
229             }
230         } else {
231             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
232         }
233     }
234     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
235             it != m_TransmitProcessors.end();
236             ++it ) {
237         if (!(*it)->isDryRunning()) {
238             if(!(*it)->scheduleStartDryRunning(-1)) {
239                 debugError("Could not put SP %p into the dry-running state\n", *it);
240                 return false;
241             }
242         } else {
243             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
244         }
245     }
246     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
247     // wait for the syncsource to start running.
248     // that will block the waitForPeriod call until everyone has started (theoretically)
249     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_DRYRUN; // by then it should have started
250     bool all_dry_running = false;
251     while (!all_dry_running && cnt) {
252         all_dry_running = true;
253         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
254                 it != m_ReceiveProcessors.end();
255                 ++it ) {
256             all_dry_running &= (*it)->isDryRunning();
257         }
258         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
259                 it != m_TransmitProcessors.end();
260                 ++it ) {
261             all_dry_running &= (*it)->isDryRunning();
262         }
263
264         SleepRelativeUsec(125);
265         cnt--;
266     }
267     if(cnt==0) {
268         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
269         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
270                 it != m_ReceiveProcessors.end();
271                 ++it ) {
272             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
273                 (*it)->getTypeString(), *it, (*it)->getStateString());
274         }
275         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
276                 it != m_TransmitProcessors.end();
277                 ++it ) {
278             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
279                 (*it)->getTypeString(), *it, (*it)->getStateString());
280         }
281         return false;
282     }
283     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
284     return true;
285 }
286
287 bool StreamProcessorManager::syncStartAll() {
288     if(m_SyncSource == NULL) return false;
289     // figure out when to get the SP's running.
290     // the xmit SP's should also know the base timestamp
291     // streams should be aligned here
292
293     // now find out how long we have to delay the wait operation such that
294     // the received frames will all be presented to the SP
295     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
296     int max_of_min_delay = 0;
297     int min_delay = 0;
298     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
299             it != m_ReceiveProcessors.end();
300             ++it ) {
301         min_delay = (*it)->getMaxFrameLatency();
302         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
303     }
304
305     // add some processing margin. This only shifts the time
306     // at which the buffer is transfer()'ed. This makes things somewhat
307     // more robust. It should be noted though that shifting the transfer
308     // time to a later time instant also causes the xmit buffer fill to be
309     // lower on average.
310     max_of_min_delay += STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS;
311     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n",
312         max_of_min_delay,
313         (unsigned int)TICKS_TO_SECS(max_of_min_delay),
314         (unsigned int)TICKS_TO_CYCLES(max_of_min_delay),
315         (unsigned int)TICKS_TO_OFFSET(max_of_min_delay));
316     m_SyncSource->setSyncDelay(max_of_min_delay);
317
318     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
319     //        have aquired lock
320     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
321     //sleep(2); // FIXME: be smarter here
322
323     // make sure that we are dry-running long enough for the
324     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
325     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
326     int nb_sync_runs=20;
327     int64_t time_till_next_period;
328     while(nb_sync_runs--) { // or while not sync-ed?
329         // check if we were woken up too soon
330         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
331         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
332         if(time_till_next_period > 0) {
333             // wait for the period
334             SleepRelativeUsec(time_till_next_period);
335         }
336     }
337
338     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
339     // FIXME: in the SPM it would be nice to have system time instead of
340     //        1394 time
341
342     // we now should have decent sync info on the sync source
343     // determine a point in time where the system should start
344     // figure out where we are now
345     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
346     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
347         time_of_first_sample,
348         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
349         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
350         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
351
352     // start wet-running in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles
353     // this is the time window we have to setup all SP's such that they
354     // can start wet-running correctly.
355     time_of_first_sample = addTicks(time_of_first_sample,
356                                     STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP * TICKS_PER_CYCLE);
357
358     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
359         time_of_first_sample,
360         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
361         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
362         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
363
364     // we should start wet-running the transmit SP's some cycles in advance
365     // such that we know it is wet-running when it should output its first sample
366     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
367                                                  STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE);
368
369     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
370                                                  STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE);
371     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
372         time_to_start_xmit,
373         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
374         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
375         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
376     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
377         time_to_start_recv,
378         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
379         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
380         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
381
382     // at this point the buffer head timestamp of the transmit buffers can be set
383     // this is the presentation time of the first sample in the buffer
384     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
385           it != m_TransmitProcessors.end();
386           ++it ) {
387         (*it)->setBufferHeadTimestamp(time_of_first_sample);
388     }
389
390     // STEP X: switch SP's over to the running state
391     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
392           it != m_ReceiveProcessors.end();
393           ++it ) {
394         if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
395             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
396             return false;
397         }
398     }
399     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
400           it != m_TransmitProcessors.end();
401           ++it ) {
402         if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
403             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
404             return false;
405         }
406     }
407     // wait for the syncsource to start running.
408     // that will block the waitForPeriod call until everyone has started (theoretically)
409     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP * 20; // by then it should have started
410     while (!m_SyncSource->isRunning() && cnt) {
411         SleepRelativeUsec(125);
412         cnt--;
413     }
414     if(cnt==0) {
415         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
416         return false;
417     }
418
419     // now align the received streams
420     if(!alignReceivedStreams()) {
421         debugError("Could not align streams\n");
422         return false;
423     }
424     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
425     return true;
426 }
427
428 bool
429 StreamProcessorManager::alignReceivedStreams()
430 {
431     if(m_SyncSource == NULL) return false;
432     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
433     unsigned int nb_sync_runs;
434     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
435     int64_t diff_between_streams[nb_rcv_sp];
436     int64_t diff;
437
438     unsigned int i;
439
440     unsigned int periods_per_align_try = (STREAMPROCESSORMANAGER_ALIGN_AVERAGE_TIME_MSEC * getNominalRate());
441     periods_per_align_try /= 1000;
442     periods_per_align_try /= getPeriodSize();
443     debugOutput( DEBUG_LEVEL_VERBOSE, " averaging over %u periods...\n", periods_per_align_try);
444
445     bool aligned = false;
446     int cnt = STREAMPROCESSORMANAGER_NB_ALIGN_TRIES;
447     while (!aligned && cnt--) {
448         nb_sync_runs = periods_per_align_try;
449         while(nb_sync_runs) {
450             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
451             if(!waitForPeriod()) {
452                 debugWarning("xrun while aligning streams...\n");
453                 return false;
454             };
455
456             i = 0;
457             for ( i = 0; i < nb_rcv_sp; i++) {
458                 StreamProcessor *s = m_ReceiveProcessors.at(i);
459                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
460                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",
461                     m_SyncSource, s, diff);
462                 if ( nb_sync_runs == periods_per_align_try ) {
463                     diff_between_streams[i] = diff;
464                 } else {
465                     diff_between_streams[i] += diff;
466                 }
467             }
468             if(!transferSilence()) {
469                 debugError("Could not transfer silence\n");
470                 return false;
471             }
472             nb_sync_runs--;
473         }
474         // calculate the average offsets
475         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
476         int diff_between_streams_frames[nb_rcv_sp];
477         aligned = true;
478         for ( i = 0; i < nb_rcv_sp; i++) {
479             StreamProcessor *s = m_ReceiveProcessors.at(i);
480
481             diff_between_streams[i] /= periods_per_align_try;
482             diff_between_streams_frames[i] = (int)roundf(diff_between_streams[i] / s->getTicksPerFrame());
483             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",
484                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
485
486             aligned &= (diff_between_streams_frames[i] == 0);
487
488             // reposition the stream
489             if(!s->shiftStream(diff_between_streams_frames[i])) {
490                 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]);
491                 return false;
492             }
493         }
494         if (!aligned) {
495             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
496         }
497     }
498     if (cnt == 0) {
499         debugError("Align failed\n");
500         return false;
501     }
502     return true;
503 }
504
505 bool StreamProcessorManager::start() {
506     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
507
508     // put all SP's into dry-running state
509     if (!startDryRunning()) {
510         debugFatal("Could not put SP's in dry-running state\n");
511         return false;
512     }
513
514     // start all SP's synchonized
515     if (!syncStartAll()) {
516         debugFatal("Could not syncStartAll...\n");
517         return false;
518     }
519     return true;
520 }
521
522 bool StreamProcessorManager::stop() {
523     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
524
525     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
526     // switch SP's over to the dry-running state
527     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
528           it != m_ReceiveProcessors.end();
529           ++it ) {
530         if((*it)->isRunning()) {
531             if(!(*it)->scheduleStopRunning(-1)) {
532                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
533                 return false;
534             }
535         }
536     }
537     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
538           it != m_TransmitProcessors.end();
539           ++it ) {
540         if((*it)->isRunning()) {
541             if(!(*it)->scheduleStopRunning(-1)) {
542                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
543                 return false;
544             }
545         }
546     }
547     // wait for the SP's to get into the dry-running state
548     int cnt = 200;
549     bool ready = false;
550     while (!ready && cnt) {
551         ready = true;
552         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
553             it != m_ReceiveProcessors.end();
554             ++it ) {
555             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
556         }
557         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
558             it != m_TransmitProcessors.end();
559             ++it ) {
560             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
561         }
562         SleepRelativeUsec(125);
563         cnt--;
564     }
565     if(cnt==0) {
566         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
567         return false;
568     }
569
570     // switch SP's over to the stopped state
571     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
572           it != m_ReceiveProcessors.end();
573           ++it ) {
574         if(!(*it)->scheduleStopDryRunning(-1)) {
575             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
576             return false;
577         }
578     }
579     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
580           it != m_TransmitProcessors.end();
581           ++it ) {
582         if(!(*it)->scheduleStopDryRunning(-1)) {
583             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
584             return false;
585         }
586     }
587     // wait for the SP's to get into the running state
588     cnt = 200;
589     ready = false;
590     while (!ready && cnt) {
591         ready = true;
592         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
593             it != m_ReceiveProcessors.end();
594             ++it ) {
595             ready &= (*it)->isStopped();
596         }
597         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
598             it != m_TransmitProcessors.end();
599             ++it ) {
600             ready &= (*it)->isStopped();
601         }
602         SleepRelativeUsec(125);
603         cnt--;
604     }
605     if(cnt==0) {
606         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to stop\n");
607         return false;
608     }
609     return true;
610 }
611
612 /**
613  * Called upon Xrun events. This brings all StreamProcessors back
614  * into their starting state, and then carries on streaming. This should
615  * have the same effect as restarting the whole thing.
616  *
617  * @return true if successful, false otherwise
618  */
619 bool StreamProcessorManager::handleXrun() {
620
621     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
622
623     dumpInfo();
624
625     /*
626      * Reset means:
627      * 1) Disabling the SP's, so that they don't process any packets
628      *    note: the isomanager does keep on delivering/requesting them
629      * 2) Bringing all buffers & streamprocessors into a know state
630      *    - Clear all capture buffers
631      *    - Put nb_periods*period_size of null frames into the playback buffers
632      * 3) Re-enable the SP's
633      */
634
635     // put all SP's back into dry-running state
636     if (!startDryRunning()) {
637         debugFatal("Could not put SP's in dry-running state\n");
638         return false;
639     }
640
641     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
642     // start all SP's synchonized
643     if (!syncStartAll()) {
644         debugFatal("Could not syncStartAll...\n");
645         return false;
646     }
647
648     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
649
650     return true;
651 }
652
653 /**
654  * @brief Waits until the next period of samples is ready
655  *
656  * This function does not return until a full period of samples is (or should be)
657  * ready to be transferred.
658  *
659  * @return true if the period is ready, false if an xrun occurred
660  */
661 bool StreamProcessorManager::waitForPeriod() {
662     if(m_SyncSource == NULL) return false;
663     int time_till_next_period;
664     bool xrun_occurred = false;
665
666     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
667
668     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
669
670     while(time_till_next_period > 0) {
671         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
672
673         // wait for the period
674         SleepRelativeUsec(time_till_next_period);
675
676         // check for underruns on the ISO side,
677         // those should make us bail out of the wait loop
678         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
679             it != m_ReceiveProcessors.end();
680             ++it ) {
681             // a xrun has occurred on the Iso side
682             xrun_occurred |= (*it)->xrunOccurred();
683         }
684         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
685             it != m_TransmitProcessors.end();
686             ++it ) {
687             // a xrun has occurred on the Iso side
688             xrun_occurred |= (*it)->xrunOccurred();
689         }
690         if(xrun_occurred) break;
691
692         // check if we were waked up too soon
693         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
694     }
695
696     // we save the 'ideal' time of the transfer at this point,
697     // because we can have interleaved read - process - write
698     // cycles making that we modify a receiving stream's buffer
699     // before we get to writing.
700     // NOTE: before waitForPeriod() is called again, both the transmit
701     //       and the receive processors should have done their transfer.
702     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
703     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
704         m_time_of_transfer);
705
706     // normally we can transfer frames at this time, but in some cases this is not true
707     // e.g. when there are not enough frames in the receive buffer.
708     // however this doesn't have to be a problem, since we can wait some more until we
709     // have enough frames. There is only a problem once the ISO xmit doesn't have packets
710     // to transmit, or if the receive buffer overflows. These conditions are signaled by
711     // the iso threads
712     // check if xruns occurred on the Iso side.
713     // also check if xruns will occur should we transfer() now
714 #if STREAMPROCESSORMANAGER_DYNAMIC_SYNC_DELAY
715     #ifdef DEBUG
716     int waited = 0;
717     #endif
718 #endif
719     bool ready_for_transfer = false;
720     bool ready;
721     xrun_occurred = false;
722     while (!ready_for_transfer && !xrun_occurred) {
723         // FIXME: can deadlock when the iso handlers die (e.g. unplug the device)
724         ready_for_transfer = true;
725         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
726             it != m_ReceiveProcessors.end();
727             ++it ) {
728             ready = ((*it)->canClientTransferFrames(m_period));
729             ready_for_transfer &= ready;
730             xrun_occurred |= (*it)->xrunOccurred();
731         }
732         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
733             it != m_TransmitProcessors.end();
734             ++it ) {
735             ready = ((*it)->canClientTransferFrames(m_period));
736             //ready_for_transfer &= ready;
737             xrun_occurred |= (*it)->xrunOccurred();
738         }
739 #if STREAMPROCESSORMANAGER_DYNAMIC_SYNC_DELAY
740         if (!ready_for_transfer) {
741            
742             SleepRelativeUsec(125); // MAGIC: one cycle sleep...
743
744             // in order to avoid this in the future, we increase the sync delay of the sync source SP
745             int d = m_SyncSource->getSyncDelay() + TICKS_PER_CYCLE;
746             m_SyncSource->setSyncDelay(d);
747             d = m_SyncSource->getSyncDelay();
748             debugOutput(DEBUG_LEVEL_VERBOSE, "Increased the Sync delay to: %d ticks (%f frames, %f cy)\n",
749                                              d, ((float)d)/m_SyncSource->getTicksPerFrame(),
750                                              ((float)d)/((float)TICKS_PER_CYCLE));
751
752             #ifdef DEBUG
753             waited++;
754             #endif
755         }
756 #endif
757     } // we are either ready or an xrun occurred
758
759 #if STREAMPROCESSORMANAGER_DYNAMIC_SYNC_DELAY
760     // in order to avoid a runaway value of the sync delay, we gradually decrease
761     // it. It will be increased by a 'too early' event (cfr some lines higher)
762     // hence we'll be at a good point on average.
763     int d = m_SyncSource->getSyncDelay() - 1;
764     if (d >= 0) m_SyncSource->setSyncDelay(d);
765
766
767     #ifdef DEBUG
768     if(waited > 0) {
769         debugOutput(DEBUG_LEVEL_VERBOSE, "Waited %d x 125us due to SP not ready for transfer\n", waited);
770     }
771     #endif
772 #endif
773
774     // this is to notify the client of the delay that we introduced by waiting
775     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
776     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", m_delayed_usecs);
777
778 #ifdef DEBUG
779     int rcv_bf=0, xmt_bf=0;
780     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
781         it != m_ReceiveProcessors.end();
782         ++it ) {
783         rcv_bf = (*it)->getBufferFill();
784     }
785     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
786         it != m_TransmitProcessors.end();
787         ++it ) {
788         xmt_bf = (*it)->getBufferFill();
789     }
790     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
791         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
792
793     // check if xruns occurred on the Iso side.
794     // also check if xruns will occur should we transfer() now
795     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
796           it != m_ReceiveProcessors.end();
797           ++it ) {
798
799         if ((*it)->xrunOccurred()) {
800             debugWarning("Xrun on RECV SP %p due to ISO side xrun\n",*it);
801             (*it)->dumpInfo();
802         }
803         if (!((*it)->canClientTransferFrames(m_period))) {
804             debugWarning("Xrun on RECV SP %p due to buffer side xrun\n",*it);
805             (*it)->dumpInfo();
806         }
807     }
808     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
809           it != m_TransmitProcessors.end();
810           ++it ) {
811         if ((*it)->xrunOccurred()) {
812             debugWarning("Xrun on XMIT SP %p due to ISO side xrun\n",*it);
813         }
814         if (!((*it)->canClientTransferFrames(m_period))) {
815             debugWarning("Xrun on XMIT SP %p due to buffer side xrun\n",*it);
816         }
817     }
818 #endif
819
820     m_nbperiods++;
821     // now we can signal the client that we are (should be) ready
822     return !xrun_occurred;
823 }
824
825 /**
826  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
827  *
828  * Transfers one period of frames from the client side to the Iso side and vice versa.
829  *
830  * @return true if successful, false otherwise (indicates xrun).
831  */
832 bool StreamProcessorManager::transfer() {
833     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
834     bool retval=true;
835     retval &= transfer(StreamProcessor::ePT_Receive);
836     retval &= transfer(StreamProcessor::ePT_Transmit);
837     return retval;
838 }
839
840 /**
841  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
842  *
843  * Transfers one period of frames from the client side to the Iso side or vice versa.
844  *
845  * @param t The processor type to tranfer for (receive or transmit)
846  * @return true if successful, false otherwise (indicates xrun).
847  */
848 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
849     if(m_SyncSource == NULL) return false;
850     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
851         t, m_time_of_transfer,
852         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
853         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
854         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
855
856     bool retval = true;
857     // a static cast could make sure that there is no performance
858     // penalty for the virtual functions (to be checked)
859     if (t==StreamProcessor::ePT_Receive) {
860         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
861                 it != m_ReceiveProcessors.end();
862                 ++it ) {
863             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
864                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
865                             m_period, m_time_of_transfer,*it);
866                 retval &= false; // buffer underrun
867             }
868         }
869     } else {
870         // FIXME: in the SPM it would be nice to have system time instead of
871         //        1394 time
872         float rate = m_SyncSource->getTicksPerFrame();
873         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
874
875         // the data we are putting into the buffer is intended to be transmitted
876         // one ringbuffer size after it has been received
877         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
878
879         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
880                 it != m_TransmitProcessors.end();
881                 ++it ) {
882             // FIXME: in the SPM it would be nice to have system time instead of
883             //        1394 time
884             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
885                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
886                         m_period, transmit_timestamp, *it);
887                 retval &= false; // buffer underrun
888             }
889         }
890     }
891     return retval;
892 }
893
894 /**
895  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
896  *
897  * Transfers one period of silence to the Iso side for transmit SP's
898  * or dump one period of frames for receive SP's
899  *
900  * @return true if successful, false otherwise (indicates xrun).
901  */
902 bool StreamProcessorManager::transferSilence() {
903     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
904     bool retval=true;
905     retval &= transferSilence(StreamProcessor::ePT_Receive);
906     retval &= transferSilence(StreamProcessor::ePT_Transmit);
907     return retval;
908 }
909
910 /**
911  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
912  *
913  * Transfers one period of silence to the Iso side for transmit SP's
914  * or dump one period of frames for receive SP's
915  *
916  * @param t The processor type to tranfer for (receive or transmit)
917  * @return true if successful, false otherwise (indicates xrun).
918  */
919 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
920     if(m_SyncSource == NULL) return false;
921     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
922         t, m_time_of_transfer,
923         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
924         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
925         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
926
927     bool retval = true;
928     // a static cast could make sure that there is no performance
929     // penalty for the virtual functions (to be checked)
930     if (t==StreamProcessor::ePT_Receive) {
931         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
932                 it != m_ReceiveProcessors.end();
933                 ++it ) {
934             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
935                     debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n",
936                             m_period, m_time_of_transfer,*it);
937                 retval &= false; // buffer underrun
938             }
939         }
940     } else {
941         // FIXME: in the SPM it would be nice to have system time instead of
942         //        1394 time
943         float rate = m_SyncSource->getTicksPerFrame();
944         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
945
946         // the data we are putting into the buffer is intended to be transmitted
947         // one ringbuffer size after it has been received
948         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
949
950         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
951                 it != m_TransmitProcessors.end();
952                 ++it ) {
953             // FIXME: in the SPM it would be nice to have system time instead of
954             //        1394 time
955             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
956                 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
957                         m_period, transmit_timestamp, *it);
958                 retval &= false; // buffer underrun
959             }
960         }
961     }
962     return retval;
963 }
964
965 void StreamProcessorManager::dumpInfo() {
966     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
967     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
968     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
969
970     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
971     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
972         it != m_ReceiveProcessors.end();
973         ++it ) {
974         (*it)->dumpInfo();
975     }
976
977     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
978     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
979         it != m_TransmitProcessors.end();
980         ++it ) {
981         (*it)->dumpInfo();
982     }
983
984     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
985
986 }
987
988 void StreamProcessorManager::setVerboseLevel(int l) {
989     setDebugLevel(l);
990
991     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
992     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
993         it != m_ReceiveProcessors.end();
994         ++it ) {
995         (*it)->setVerboseLevel(l);
996     }
997
998     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
999     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1000         it != m_TransmitProcessors.end();
1001         ++it ) {
1002         (*it)->setVerboseLevel(l);
1003     }
1004 }
1005
1006
1007 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1008     int count=0;
1009
1010     if (direction == Port::E_Capture) {
1011         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1012             it != m_ReceiveProcessors.end();
1013             ++it ) {
1014             count += (*it)->getPortCount(type);
1015         }
1016     } else {
1017         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1018             it != m_TransmitProcessors.end();
1019             ++it ) {
1020             count += (*it)->getPortCount(type);
1021         }
1022     }
1023     return count;
1024 }
1025
1026 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1027     int count=0;
1028
1029     if (direction == Port::E_Capture) {
1030         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1031             it != m_ReceiveProcessors.end();
1032             ++it ) {
1033             count += (*it)->getPortCount();
1034         }
1035     } else {
1036         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1037             it != m_TransmitProcessors.end();
1038             ++it ) {
1039             count += (*it)->getPortCount();
1040         }
1041     }
1042     return count;
1043 }
1044
1045 // TODO: implement a port map here, instead of the loop
1046
1047 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1048     int count=0;
1049     int prevcount=0;
1050
1051     if (direction == Port::E_Capture) {
1052         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1053             it != m_ReceiveProcessors.end();
1054             ++it ) {
1055             count += (*it)->getPortCount();
1056             if (count > idx) {
1057                 return (*it)->getPortAtIdx(idx-prevcount);
1058             }
1059             prevcount=count;
1060         }
1061     } else {
1062         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1063             it != m_TransmitProcessors.end();
1064             ++it ) {
1065             count += (*it)->getPortCount();
1066             if (count > idx) {
1067                 return (*it)->getPortAtIdx(idx-prevcount);
1068             }
1069             prevcount=count;
1070         }
1071     }
1072     return NULL;
1073 }
1074
1075 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1076     m_thread_realtime=rt;
1077     m_thread_priority=priority;
1078     return true;
1079 }
1080
1081
1082 } // end of namespace
Note: See TracBrowser for help on using the browser.