root/trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

Revision 742, 42.5 kB (checked in by ppalmers, 16 years ago)

- Remove some obsolete support files and dirs

- Clean up the license statements in the source files. Everything is

GPL version 3 now.

- Add license and copyright notices to scons scripts

- Clean up some other text files

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "StreamProcessorManager.h"
25 #include "generic/StreamProcessor.h"
26 #include "generic/Port.h"
27 #include "util/cycletimer.h"
28
29 #include <errno.h>
30 #include <assert.h>
31 #include <math.h>
32
33 #define RUNNING_TIMEOUT_MSEC 4000
34 #define PREPARE_TIMEOUT_MSEC 4000
35 #define ENABLE_TIMEOUT_MSEC 4000
36
37 // allows to add some processing margin. This shifts the time
38 // at which the buffer is transfer()'ed, making things somewhat
39 // more robust. It should be noted though that shifting the transfer
40 // time to a later time instant also causes the xmit buffer fill to be
41 // lower on average.
42 #define FFADO_SIGNAL_DELAY_TICKS 3072*4
43
44 namespace Streaming {
45
46 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
47
48 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int framerate, unsigned int nb_buffers)
49     : m_is_slave( false )
50     , m_SyncSource(NULL)
51     , m_nb_buffers(nb_buffers)
52     , m_period(period)
53     , m_nominal_framerate ( framerate )
54     , m_xruns(0)
55     , m_isoManager(0)
56     , m_nbperiods(0)
57 {
58     addOption(Util::OptionContainer::Option("slaveMode",false));
59 }
60
61 StreamProcessorManager::~StreamProcessorManager() {
62     if (m_isoManager) delete m_isoManager;
63 }
64
65 /**
66  * Registers \ref processor with this manager.
67  *
68  * also registers it with the isohandlermanager
69  *
70  * be sure to call isohandlermanager->init() first!
71  * and be sure that the processors are also ->init()'ed
72  *
73  * @param processor
74  * @return true if successfull
75  */
76 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
77 {
78     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
79     assert(processor);
80     assert(m_isoManager);
81
82     if (processor->getType() == StreamProcessor::ePT_Receive) {
83         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
84
85         m_ReceiveProcessors.push_back(processor);
86         processor->setManager(this);
87         return true;
88     }
89
90     if (processor->getType() == StreamProcessor::ePT_Transmit) {
91         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
92
93         m_TransmitProcessors.push_back(processor);
94         processor->setManager(this);
95         return true;
96     }
97
98     debugFatal("Unsupported processor type!\n");
99     return false;
100 }
101
102 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
103 {
104     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
105     assert(processor);
106
107     if (processor->getType()==StreamProcessor::ePT_Receive) {
108
109         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
110               it != m_ReceiveProcessors.end();
111               ++it )
112         {
113             if ( *it == processor ) {
114                 m_ReceiveProcessors.erase(it);
115                 processor->clearManager();
116                 if(!m_isoManager->unregisterStream(processor)) {
117                     debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
118                     return false;
119                 }
120                 return true;
121             }
122         }
123     }
124
125     if (processor->getType()==StreamProcessor::ePT_Transmit) {
126         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
127               it != m_TransmitProcessors.end();
128               ++it )
129         {
130             if ( *it == processor ) {
131                 m_TransmitProcessors.erase(it);
132                 processor->clearManager();
133                 if(!m_isoManager->unregisterStream(processor)) {
134                     debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
135                     return false;
136                 }
137                 return true;
138             }
139         }
140     }
141
142     debugFatal("Processor (%p) not found!\n",processor);
143     return false; //not found
144 }
145
146 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
147     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
148     m_SyncSource=s;
149     return true;
150 }
151
152 bool StreamProcessorManager::init()
153 {
154     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
155     m_isoManager = new IsoHandlerManager(m_thread_realtime, m_thread_priority + 1);
156     if(!m_isoManager) {
157         debugFatal("Could not create IsoHandlerManager\n");
158         return false;
159     }
160     m_isoManager->setVerboseLevel(getDebugLevel());
161    
162     // try to queue up 75% of the frames in the transmit buffer
163     unsigned int nb_frames = (getNbBuffers() - 1) * getPeriodSize() * 1000 / 2000;
164     m_isoManager->setTransmitBufferNbFrames(nb_frames);
165
166     if(!m_isoManager->init()) {
167         debugFatal("Could not initialize IsoHandlerManager\n");
168         return false;
169     }
170
171     m_xrun_happened=false;
172     return true;
173 }
174
175 bool StreamProcessorManager::prepare() {
176
177     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
178
179     m_is_slave=false;
180     if(!getOption("slaveMode", m_is_slave)) {
181         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
182     }
183
184     // if no sync source is set, select one here
185     if(m_SyncSource == NULL) {
186        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
187     }
188
189     // FIXME: put into separate method
190     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
191           it != m_ReceiveProcessors.end();
192           ++it )
193     {
194         if(m_SyncSource == NULL) {
195             debugWarning(" => Sync Source is %p.\n", *it);
196             m_SyncSource = *it;
197         }
198     }
199     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
200           it != m_TransmitProcessors.end();
201           ++it )
202     {
203         if(m_SyncSource == NULL) {
204             debugWarning(" => Sync Source is %p.\n", *it);
205             m_SyncSource = *it;
206         }
207     }
208
209     // now do the actual preparation of the SP's
210     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
211     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
212         it != m_ReceiveProcessors.end();
213         ++it ) {
214
215         if(!(*it)->setOption("slaveMode", m_is_slave)) {
216             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
217         }
218
219         if(!(*it)->prepare()) {
220             debugFatal(  " could not prepare (%p)...\n",(*it));
221             return false;
222         }
223     }
224     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
225     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
226         it != m_TransmitProcessors.end();
227         ++it ) {
228         if(!(*it)->setOption("slaveMode", m_is_slave)) {
229             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
230         }
231         if(!(*it)->prepare()) {
232             debugFatal( " could not prepare (%p)...\n",(*it));
233             return false;
234         }
235     }
236
237     // if there are no stream processors registered,
238     // fail
239     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
240         debugFatal("No stream processors registered, can't do anything usefull\n");
241         return false;
242     }
243     return true;
244 }
245
246 bool StreamProcessorManager::startDryRunning() {
247     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
248     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
249     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
250             it != m_ReceiveProcessors.end();
251             ++it ) {
252         if (!(*it)->isDryRunning()) {
253             if(!(*it)->scheduleStartDryRunning(-1)) {
254                 debugError("Could not put SP %p into the dry-running state\n", *it);
255                 return false;
256             }
257         } else {
258             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
259         }
260     }
261     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
262             it != m_TransmitProcessors.end();
263             ++it ) {
264         if (!(*it)->isDryRunning()) {
265             if(!(*it)->scheduleStartDryRunning(-1)) {
266                 debugError("Could not put SP %p into the dry-running state\n", *it);
267                 return false;
268             }
269         } else {
270             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
271         }
272     }
273     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
274     // wait for the syncsource to start running.
275     // that will block the waitForPeriod call until everyone has started (theoretically)
276     #define CYCLES_FOR_DRYRUN 40000
277     int cnt = CYCLES_FOR_DRYRUN; // by then it should have started
278     bool all_dry_running = false;
279     while (!all_dry_running && cnt) {
280         all_dry_running = true;
281         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
282                 it != m_ReceiveProcessors.end();
283                 ++it ) {
284             all_dry_running &= (*it)->isDryRunning();
285         }
286         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
287                 it != m_TransmitProcessors.end();
288                 ++it ) {
289             all_dry_running &= (*it)->isDryRunning();
290         }
291
292         usleep(125);
293         cnt--;
294     }
295     if(cnt==0) {
296         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
297         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
298                 it != m_ReceiveProcessors.end();
299                 ++it ) {
300             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
301                 (*it)->getTypeString(), *it, (*it)->getStateString());
302         }
303         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
304                 it != m_TransmitProcessors.end();
305                 ++it ) {
306             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
307                 (*it)->getTypeString(), *it, (*it)->getStateString());
308         }
309         return false;
310     }
311     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
312     return true;
313 }
314
315 bool StreamProcessorManager::syncStartAll() {
316     // figure out when to get the SP's running.
317     // the xmit SP's should also know the base timestamp
318     // streams should be aligned here
319
320     // now find out how long we have to delay the wait operation such that
321     // the received frames will all be presented to the SP
322     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
323     int max_of_min_delay = 0;
324     int min_delay = 0;
325     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
326             it != m_ReceiveProcessors.end();
327             ++it ) {
328         min_delay = (*it)->getMaxFrameLatency();
329         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
330     }
331
332     // add some processing margin. This only shifts the time
333     // at which the buffer is transfer()'ed. This makes things somewhat
334     // more robust. It should be noted though that shifting the transfer
335     // time to a later time instant also causes the xmit buffer fill to be
336     // lower on average.
337     max_of_min_delay += FFADO_SIGNAL_DELAY_TICKS;
338     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n",
339         max_of_min_delay,
340         (unsigned int)TICKS_TO_SECS(max_of_min_delay),
341         (unsigned int)TICKS_TO_CYCLES(max_of_min_delay),
342         (unsigned int)TICKS_TO_OFFSET(max_of_min_delay));
343     m_SyncSource->setSyncDelay(max_of_min_delay);
344
345     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
346     //        have aquired lock
347     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
348     //sleep(2); // FIXME: be smarter here
349
350     // make sure that we are dry-running long enough for the
351     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
352     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
353     int nb_sync_runs=20;
354     int64_t time_till_next_period;
355     while(nb_sync_runs--) { // or while not sync-ed?
356         // check if we were woken up too soon
357         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
358         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
359         if(time_till_next_period > 0) {
360             // wait for the period
361             usleep(time_till_next_period);
362         }
363     }
364
365     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
366     // FIXME: in the SPM it would be nice to have system time instead of
367     //        1394 time
368
369     // we now should have decent sync info on the sync source
370     // determine a point in time where the system should start
371     // figure out where we are now
372     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
373     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
374         time_of_first_sample,
375         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
376         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
377         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
378
379     #define CYCLES_FOR_STARTUP 2000
380     // start wet-running in CYCLES_FOR_STARTUP cycles
381     // this is the time window we have to setup all SP's such that they
382     // can start wet-running correctly.
383     time_of_first_sample = addTicks(time_of_first_sample,
384                                     CYCLES_FOR_STARTUP * TICKS_PER_CYCLE);
385
386     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
387         time_of_first_sample,
388         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
389         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
390         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
391
392     // we should start wet-running the transmit SP's some cycles in advance
393     // such that we know it is wet-running when it should output its first sample
394     #define PRESTART_CYCLES_FOR_XMIT 20
395     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
396                                                  PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE);
397
398     #define PRESTART_CYCLES_FOR_RECV 0
399     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
400                                                  PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE);
401     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
402         time_to_start_xmit,
403         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
404         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
405         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
406     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
407         time_to_start_recv,
408         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
409         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
410         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
411
412     // at this point the buffer head timestamp of the transmit buffers can be set
413     // this is the presentation time of the first sample in the buffer
414     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
415           it != m_TransmitProcessors.end();
416           ++it ) {
417         (*it)->setBufferHeadTimestamp(time_of_first_sample);
418     }
419
420     // STEP X: switch SP's over to the running state
421     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
422           it != m_ReceiveProcessors.end();
423           ++it ) {
424         if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
425             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
426             return false;
427         }
428     }
429     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
430           it != m_TransmitProcessors.end();
431           ++it ) {
432         if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
433             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
434             return false;
435         }
436     }
437     // wait for the syncsource to start running.
438     // that will block the waitForPeriod call until everyone has started (theoretically)
439     int cnt = CYCLES_FOR_STARTUP * 2; // by then it should have started
440     while (!m_SyncSource->isRunning() && cnt) {
441         usleep(125);
442         cnt--;
443     }
444     if(cnt==0) {
445         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
446         return false;
447     }
448
449     // now align the received streams
450     if(!alignReceivedStreams()) {
451         debugError("Could not align streams\n");
452         return false;
453     }
454     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
455     return true;
456 }
457
458 bool
459 StreamProcessorManager::alignReceivedStreams()
460 {
461     #define NB_PERIODS_FOR_ALIGN_AVERAGE 20
462     #define NB_ALIGN_TRIES 20
463     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
464     unsigned int nb_sync_runs;
465     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
466     int64_t diff_between_streams[nb_rcv_sp];
467     int64_t diff;
468
469     unsigned int i;
470
471     bool aligned = false;
472     int cnt = NB_ALIGN_TRIES;
473     while (!aligned && cnt--) {
474         nb_sync_runs = NB_PERIODS_FOR_ALIGN_AVERAGE;
475         while(nb_sync_runs) {
476             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
477             waitForPeriod();
478
479             i = 0;
480             for ( i = 0; i < nb_rcv_sp; i++) {
481                 StreamProcessor *s = m_ReceiveProcessors.at(i);
482                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
483                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",
484                     m_SyncSource, s, diff);
485                 if ( nb_sync_runs == NB_PERIODS_FOR_ALIGN_AVERAGE ) {
486                     diff_between_streams[i] = diff;
487                 } else {
488                     diff_between_streams[i] += diff;
489                 }
490             }
491             if(!transferSilence()) {
492                 debugError("Could not transfer silence\n");
493                 return false;
494             }
495             nb_sync_runs--;
496         }
497         // calculate the average offsets
498         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
499         int diff_between_streams_frames[nb_rcv_sp];
500         aligned = true;
501         for ( i = 0; i < nb_rcv_sp; i++) {
502             StreamProcessor *s = m_ReceiveProcessors.at(i);
503
504             diff_between_streams[i] /= NB_PERIODS_FOR_ALIGN_AVERAGE;
505             diff_between_streams_frames[i] = roundf(diff_between_streams[i] / s->getTicksPerFrame());
506             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",
507                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
508
509             aligned &= (diff_between_streams_frames[i] == 0);
510
511             // reposition the stream
512             if(!s->shiftStream(diff_between_streams_frames[i])) {
513                 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]);
514                 return false;
515             }
516         }
517         if (!aligned) {
518             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
519         }
520     }
521     if (cnt == 0) {
522         debugError("Align failed\n");
523         return false;
524     }
525     return true;
526 }
527
528 bool StreamProcessorManager::start() {
529     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
530     assert(m_isoManager);
531
532     debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
533     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
534     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
535           it != m_ReceiveProcessors.end();
536           ++it )
537     {
538         if (!m_isoManager->registerStream(*it)) {
539             debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
540             return false;
541         }
542     }
543     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
544     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
545           it != m_TransmitProcessors.end();
546           ++it )
547     {
548         if (!m_isoManager->registerStream(*it)) {
549             debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
550             return false;
551         }
552     }
553
554     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
555     if (!m_isoManager->prepare()) {
556         debugFatal("Could not prepare isoManager\n");
557         return false;
558     }
559
560     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n");
561     if (!m_isoManager->startHandlers(-1)) {
562         debugFatal("Could not start handlers...\n");
563         return false;
564     }
565
566     // put all SP's into dry-running state
567     if (!startDryRunning()) {
568         debugFatal("Could not put SP's in dry-running state\n");
569         return false;
570     }
571
572     // start all SP's synchonized
573     if (!syncStartAll()) {
574         debugFatal("Could not syncStartAll...\n");
575         return false;
576     }
577
578     // dump the iso stream information when in verbose mode
579     if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
580         m_isoManager->dumpInfo();
581     }
582
583     return true;
584 }
585
586 bool StreamProcessorManager::stop() {
587     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
588     assert(m_isoManager);
589
590     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
591
592     // switch SP's over to the dry-running state
593     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
594           it != m_ReceiveProcessors.end();
595           ++it ) {
596         if(!(*it)->scheduleStopRunning(-1)) {
597             debugError("%p->scheduleStopRunning(-1) failed\n", *it);
598             return false;
599         }
600     }
601     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
602           it != m_TransmitProcessors.end();
603           ++it ) {
604         if(!(*it)->scheduleStopRunning(-1)) {
605             debugError("%p->scheduleStopRunning(-1) failed\n", *it);
606             return false;
607         }
608     }
609     // wait for the SP's to get into the dry-running state
610     int cnt = 200;
611     bool ready = false;
612     while (!ready && cnt) {
613         ready = true;
614         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
615             it != m_ReceiveProcessors.end();
616             ++it ) {
617             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
618         }
619         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
620             it != m_TransmitProcessors.end();
621             ++it ) {
622             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
623         }
624         usleep(125);
625         cnt--;
626     }
627     if(cnt==0) {
628         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
629         return false;
630     }
631
632     // switch SP's over to the stopped state
633     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
634           it != m_ReceiveProcessors.end();
635           ++it ) {
636         if(!(*it)->scheduleStopDryRunning(-1)) {
637             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
638             return false;
639         }
640     }
641     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
642           it != m_TransmitProcessors.end();
643           ++it ) {
644         if(!(*it)->scheduleStopDryRunning(-1)) {
645             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
646             return false;
647         }
648     }
649     // wait for the SP's to get into the running state
650     cnt = 200;
651     ready = false;
652     while (!ready && cnt) {
653         ready = true;
654         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
655             it != m_ReceiveProcessors.end();
656             ++it ) {
657             ready &= (*it)->isStopped();
658         }
659         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
660             it != m_TransmitProcessors.end();
661             ++it ) {
662             ready &= (*it)->isStopped();
663         }
664         usleep(125);
665         cnt--;
666     }
667     if(cnt==0) {
668         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to stop\n");
669         return false;
670     }
671
672     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n");
673     if(!m_isoManager->stopHandlers()) {
674        debugFatal("Could not stop ISO handlers\n");
675        return false;
676     }
677
678     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processors from handlers...\n");
679     // now unregister all streams from iso manager
680     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
681     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
682           it != m_ReceiveProcessors.end();
683           ++it ) {
684         if (!m_isoManager->unregisterStream(*it)) {
685             debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it);
686             return false;
687         }
688     }
689     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
690     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
691           it != m_TransmitProcessors.end();
692           ++it ) {
693         if (!m_isoManager->unregisterStream(*it)) {
694             debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it);
695             return false;
696         }
697     }
698     return true;
699 }
700
701 /**
702  * Called upon Xrun events. This brings all StreamProcessors back
703  * into their starting state, and then carries on streaming. This should
704  * have the same effect as restarting the whole thing.
705  *
706  * @return true if successful, false otherwise
707  */
708 bool StreamProcessorManager::handleXrun() {
709
710     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
711
712     dumpInfo();
713
714     /*
715      * Reset means:
716      * 1) Disabling the SP's, so that they don't process any packets
717      *    note: the isomanager does keep on delivering/requesting them
718      * 2) Bringing all buffers & streamprocessors into a know state
719      *    - Clear all capture buffers
720      *    - Put nb_periods*period_size of null frames into the playback buffers
721      * 3) Re-enable the SP's
722      */
723
724     // put all SP's back into dry-running state
725     if (!startDryRunning()) {
726         debugFatal("Could not put SP's in dry-running state\n");
727         return false;
728     }
729
730     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
731     // start all SP's synchonized
732     if (!syncStartAll()) {
733         debugFatal("Could not syncStartAll...\n");
734         return false;
735     }
736
737     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
738
739     return true;
740 }
741
742 /**
743  * @brief Waits until the next period of samples is ready
744  *
745  * This function does not return until a full period of samples is (or should be)
746  * ready to be transferred.
747  *
748  * @return true if the period is ready, false if an xrun occurred
749  */
750 bool StreamProcessorManager::waitForPeriod() {
751     int time_till_next_period;
752     bool xrun_occurred = false;
753
754     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
755
756     assert(m_SyncSource);
757
758     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
759
760     while(time_till_next_period > 0) {
761         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
762
763         // wait for the period
764         usleep(time_till_next_period);
765
766         // check for underruns on the ISO side,
767         // those should make us bail out of the wait loop
768         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
769             it != m_ReceiveProcessors.end();
770             ++it ) {
771             // a xrun has occurred on the Iso side
772             xrun_occurred |= (*it)->xrunOccurred();
773         }
774         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
775             it != m_TransmitProcessors.end();
776             ++it ) {
777             // a xrun has occurred on the Iso side
778             xrun_occurred |= (*it)->xrunOccurred();
779         }
780         if(xrun_occurred) break;
781
782         // check if we were waked up too soon
783         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
784     }
785
786     // we save the 'ideal' time of the transfer at this point,
787     // because we can have interleaved read - process - write
788     // cycles making that we modify a receiving stream's buffer
789     // before we get to writing.
790     // NOTE: before waitForPeriod() is called again, both the transmit
791     //       and the receive processors should have done their transfer.
792     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
793     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
794         m_time_of_transfer);
795
796     // normally we can transfer frames at this time, but in some cases this is not true
797     // e.g. when there are not enough frames in the receive buffer.
798     // however this doesn't have to be a problem, since we can wait some more until we
799     // have enough frames. There is only a problem once the ISO xmit doesn't have packets
800     // to transmit, or if the receive buffer overflows. These conditions are signaled by
801     // the iso threads
802     // check if xruns occurred on the Iso side.
803     // also check if xruns will occur should we transfer() now
804     #ifdef DEBUG
805     int waited = 0;
806     #endif
807     bool ready_for_transfer = false;
808     xrun_occurred = false;
809     while (!ready_for_transfer && !xrun_occurred) {
810         ready_for_transfer = true;
811         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
812             it != m_ReceiveProcessors.end();
813             ++it ) {
814             ready_for_transfer &= ((*it)->canClientTransferFrames(m_period));
815             xrun_occurred |= (*it)->xrunOccurred();
816         }
817         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
818             it != m_TransmitProcessors.end();
819             ++it ) {
820             ready_for_transfer &= ((*it)->canClientTransferFrames(m_period));
821             xrun_occurred |= (*it)->xrunOccurred();
822         }
823         if (!ready_for_transfer) {
824             usleep(125); // MAGIC: one cycle sleep...
825
826             // in order to avoid this in the future, we increase the sync delay of the sync source SP
827             int d = m_SyncSource->getSyncDelay() + TICKS_PER_CYCLE;
828             m_SyncSource->setSyncDelay(d);
829
830             #ifdef DEBUG
831             waited++;
832             #endif
833         }
834     } // we are either ready or an xrun occurred
835
836     #ifdef DEBUG
837     if(waited > 0) {
838         debugOutput(DEBUG_LEVEL_VERBOSE, "Waited %d x 125us due to SP not ready for transfer\n", waited);
839     }
840     #endif
841
842     // this is to notify the client of the delay that we introduced by waiting
843     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
844     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", m_delayed_usecs);
845
846 #ifdef DEBUG
847     int rcv_bf=0, xmt_bf=0;
848     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
849         it != m_ReceiveProcessors.end();
850         ++it ) {
851         rcv_bf = (*it)->getBufferFill();
852     }
853     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
854         it != m_TransmitProcessors.end();
855         ++it ) {
856         xmt_bf = (*it)->getBufferFill();
857     }
858     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
859         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
860
861     // check if xruns occurred on the Iso side.
862     // also check if xruns will occur should we transfer() now
863     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
864           it != m_ReceiveProcessors.end();
865           ++it ) {
866
867         if ((*it)->xrunOccurred()) {
868             debugWarning("Xrun on RECV SP %p due to ISO side xrun\n",*it);
869             (*it)->dumpInfo();
870         }
871         if (!((*it)->canClientTransferFrames(m_period))) {
872             debugWarning("Xrun on RECV SP %p due to buffer side xrun\n",*it);
873             (*it)->dumpInfo();
874         }
875     }
876     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
877           it != m_TransmitProcessors.end();
878           ++it ) {
879         if ((*it)->xrunOccurred()) {
880             debugWarning("Xrun on XMIT SP %p due to ISO side xrun\n",*it);
881         }
882         if (!((*it)->canClientTransferFrames(m_period))) {
883             debugWarning("Xrun on XMIT SP %p due to buffer side xrun\n",*it);
884         }
885     }
886 #endif
887
888     m_nbperiods++;
889     // now we can signal the client that we are (should be) ready
890     return !xrun_occurred;
891 }
892
893 /**
894  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
895  *
896  * Transfers one period of frames from the client side to the Iso side and vice versa.
897  *
898  * @return true if successful, false otherwise (indicates xrun).
899  */
900 bool StreamProcessorManager::transfer() {
901     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
902     bool retval=true;
903     retval &= transfer(StreamProcessor::ePT_Receive);
904     retval &= transfer(StreamProcessor::ePT_Transmit);
905     return retval;
906 }
907
908 /**
909  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
910  *
911  * Transfers one period of frames from the client side to the Iso side or vice versa.
912  *
913  * @param t The processor type to tranfer for (receive or transmit)
914  * @return true if successful, false otherwise (indicates xrun).
915  */
916 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
917     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
918         t, m_time_of_transfer,
919         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
920         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
921         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
922
923     bool retval = true;
924     // a static cast could make sure that there is no performance
925     // penalty for the virtual functions (to be checked)
926     if (t==StreamProcessor::ePT_Receive) {
927         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
928                 it != m_ReceiveProcessors.end();
929                 ++it ) {
930             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
931                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
932                             m_period, m_time_of_transfer,*it);
933                 retval &= false; // buffer underrun
934             }
935         }
936     } else {
937         // FIXME: in the SPM it would be nice to have system time instead of
938         //        1394 time
939         float rate = m_SyncSource->getTicksPerFrame();
940         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
941
942         // the data we are putting into the buffer is intended to be transmitted
943         // one ringbuffer size after it has been received
944         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
945
946         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
947                 it != m_TransmitProcessors.end();
948                 ++it ) {
949             // FIXME: in the SPM it would be nice to have system time instead of
950             //        1394 time
951             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
952                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
953                         m_period, transmit_timestamp, *it);
954                 retval &= false; // buffer underrun
955             }
956         }
957     }
958     return retval;
959 }
960
961 /**
962  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
963  *
964  * Transfers one period of silence to the Iso side for transmit SP's
965  * or dump one period of frames for receive SP's
966  *
967  * @return true if successful, false otherwise (indicates xrun).
968  */
969 bool StreamProcessorManager::transferSilence() {
970     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
971     bool retval=true;
972     retval &= transferSilence(StreamProcessor::ePT_Receive);
973     retval &= transferSilence(StreamProcessor::ePT_Transmit);
974     return retval;
975 }
976
977 /**
978  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
979  *
980  * Transfers one period of silence to the Iso side for transmit SP's
981  * or dump one period of frames for receive SP's
982  *
983  * @param t The processor type to tranfer for (receive or transmit)
984  * @return true if successful, false otherwise (indicates xrun).
985  */
986 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
987     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
988         t, m_time_of_transfer,
989         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
990         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
991         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
992
993     bool retval = true;
994     // a static cast could make sure that there is no performance
995     // penalty for the virtual functions (to be checked)
996     if (t==StreamProcessor::ePT_Receive) {
997         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
998                 it != m_ReceiveProcessors.end();
999                 ++it ) {
1000             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
1001                     debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n",
1002                             m_period, m_time_of_transfer,*it);
1003                 retval &= false; // buffer underrun
1004             }
1005         }
1006     } else {
1007         // FIXME: in the SPM it would be nice to have system time instead of
1008         //        1394 time
1009         float rate = m_SyncSource->getTicksPerFrame();
1010         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
1011
1012         // the data we are putting into the buffer is intended to be transmitted
1013         // one ringbuffer size after it has been received
1014         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1015
1016         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1017                 it != m_TransmitProcessors.end();
1018                 ++it ) {
1019             // FIXME: in the SPM it would be nice to have system time instead of
1020             //        1394 time
1021             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
1022                 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
1023                         m_period, transmit_timestamp, *it);
1024                 retval &= false; // buffer underrun
1025             }
1026         }
1027     }
1028     return retval;
1029 }
1030
1031 void StreamProcessorManager::dumpInfo() {
1032     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1033     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
1034     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
1035
1036     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
1037     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1038         it != m_ReceiveProcessors.end();
1039         ++it ) {
1040         (*it)->dumpInfo();
1041     }
1042
1043     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
1044     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1045         it != m_TransmitProcessors.end();
1046         ++it ) {
1047         (*it)->dumpInfo();
1048     }
1049
1050     debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
1051     m_isoManager->dumpInfo();
1052     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1053
1054 }
1055
1056 void StreamProcessorManager::setVerboseLevel(int l) {
1057     setDebugLevel(l);
1058
1059     if (m_isoManager) m_isoManager->setVerboseLevel(l);
1060
1061     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
1062     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1063         it != m_ReceiveProcessors.end();
1064         ++it ) {
1065         (*it)->setVerboseLevel(l);
1066     }
1067
1068     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
1069     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1070         it != m_TransmitProcessors.end();
1071         ++it ) {
1072         (*it)->setVerboseLevel(l);
1073     }
1074 }
1075
1076
1077 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1078     int count=0;
1079
1080     if (direction == Port::E_Capture) {
1081         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1082             it != m_ReceiveProcessors.end();
1083             ++it ) {
1084             count += (*it)->getPortCount(type);
1085         }
1086     } else {
1087         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1088             it != m_TransmitProcessors.end();
1089             ++it ) {
1090             count += (*it)->getPortCount(type);
1091         }
1092     }
1093     return count;
1094 }
1095
1096 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1097     int count=0;
1098
1099     if (direction == Port::E_Capture) {
1100         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1101             it != m_ReceiveProcessors.end();
1102             ++it ) {
1103             count += (*it)->getPortCount();
1104         }
1105     } else {
1106         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1107             it != m_TransmitProcessors.end();
1108             ++it ) {
1109             count += (*it)->getPortCount();
1110         }
1111     }
1112     return count;
1113 }
1114
1115 // TODO: implement a port map here, instead of the loop
1116
1117 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1118     int count=0;
1119     int prevcount=0;
1120
1121     if (direction == Port::E_Capture) {
1122         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1123             it != m_ReceiveProcessors.end();
1124             ++it ) {
1125             count += (*it)->getPortCount();
1126             if (count > idx) {
1127                 return (*it)->getPortAtIdx(idx-prevcount);
1128             }
1129             prevcount=count;
1130         }
1131     } else {
1132         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1133             it != m_TransmitProcessors.end();
1134             ++it ) {
1135             count += (*it)->getPortCount();
1136             if (count > idx) {
1137                 return (*it)->getPortAtIdx(idx-prevcount);
1138             }
1139             prevcount=count;
1140         }
1141     }
1142     return NULL;
1143 }
1144
1145 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1146     m_thread_realtime=rt;
1147     m_thread_priority=priority;
1148     return true;
1149 }
1150
1151
1152 } // end of namespace
Note: See TracBrowser for help on using the browser.