root/branches/api-cleanup/src/libstreaming/StreamProcessorManager.cpp

Revision 816, 38.2 kB (checked in by ppalmers, 14 years ago)

remove support for per-port datatypes. It's too much hassle and it doesn't add enough value.
It also prevents thorough performance optimizations, especially for larger channel counts (e.g. SSE based).

Audio ports are now either all float or all int24. This can be specified by the ffado_streaming_set_audio_datatype
API function before the streaming is prepared. Hence we can still support the direct conversion to the
clients datatype when demuxing the packets.

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23 #include "config.h"
24
25 #include "StreamProcessorManager.h"
26 #include "generic/StreamProcessor.h"
27 #include "generic/Port.h"
28 #include "libieee1394/cycletimer.h"
29
30 #include "libutil/Time.h"
31
32 #include <errno.h>
33 #include <assert.h>
34 #include <math.h>
35
36 namespace Streaming {
37
38 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
39
40 StreamProcessorManager::StreamProcessorManager()
41     : m_is_slave( false )
42     , m_SyncSource(NULL)
43     , m_xrun_happened( false )
44     , m_nb_buffers( 0 )
45     , m_period( 0 )
46     , m_audio_datatype( eADT_Float )
47     , m_nominal_framerate ( 0 )
48     , m_xruns(0)
49     , m_nbperiods(0)
50 {
51     addOption(Util::OptionContainer::Option("slaveMode",false));
52 }
53
54 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int framerate, unsigned int nb_buffers)
55     : m_is_slave( false )
56     , m_SyncSource(NULL)
57     , m_xrun_happened( false )
58     , m_nb_buffers(nb_buffers)
59     , m_period(period)
60     , m_audio_datatype( eADT_Float )
61     , m_nominal_framerate ( framerate )
62     , m_xruns(0)
63     , m_nbperiods(0)
64 {
65     addOption(Util::OptionContainer::Option("slaveMode",false));
66 }
67
68 StreamProcessorManager::~StreamProcessorManager() {
69 }
70
71 /**
72  * Registers \ref processor with this manager.
73  *
74  * also registers it with the isohandlermanager
75  *
76  * be sure to call isohandlermanager->init() first!
77  * and be sure that the processors are also ->init()'ed
78  *
79  * @param processor
80  * @return true if successfull
81  */
82 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
83 {
84     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
85     assert(processor);
86     if (processor->getType() == StreamProcessor::ePT_Receive) {
87         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
88         m_ReceiveProcessors.push_back(processor);
89         return true;
90     }
91
92     if (processor->getType() == StreamProcessor::ePT_Transmit) {
93         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
94         m_TransmitProcessors.push_back(processor);
95         return true;
96     }
97
98     debugFatal("Unsupported processor type!\n");
99     return false;
100 }
101
102 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
103 {
104     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
105     assert(processor);
106
107     if (processor->getType()==StreamProcessor::ePT_Receive) {
108
109         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
110               it != m_ReceiveProcessors.end();
111               ++it )
112         {
113             if ( *it == processor ) {
114                 if (*it == m_SyncSource) {
115                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
116                     m_SyncSource = NULL;
117                 }
118                 m_ReceiveProcessors.erase(it);
119                 return true;
120             }
121         }
122     }
123
124     if (processor->getType()==StreamProcessor::ePT_Transmit) {
125         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
126               it != m_TransmitProcessors.end();
127               ++it )
128         {
129             if ( *it == processor ) {
130                 if (*it == m_SyncSource) {
131                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n");
132                     m_SyncSource = NULL;
133                 }
134                 m_TransmitProcessors.erase(it);
135                 return true;
136             }
137         }
138     }
139
140     debugFatal("Processor (%p) not found!\n",processor);
141     return false; //not found
142 }
143
144 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
145     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
146     m_SyncSource=s;
147     return true;
148 }
149
150 bool StreamProcessorManager::prepare() {
151
152     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
153
154     m_is_slave=false;
155     if(!getOption("slaveMode", m_is_slave)) {
156         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
157     }
158
159     // if no sync source is set, select one here
160     if(m_SyncSource == NULL) {
161        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
162     }
163
164     // FIXME: put into separate method
165     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
166           it != m_ReceiveProcessors.end();
167           ++it )
168     {
169         if(m_SyncSource == NULL) {
170             debugWarning(" => Sync Source is %p.\n", *it);
171             m_SyncSource = *it;
172         }
173     }
174     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
175           it != m_TransmitProcessors.end();
176           ++it )
177     {
178         if(m_SyncSource == NULL) {
179             debugWarning(" => Sync Source is %p.\n", *it);
180             m_SyncSource = *it;
181         }
182     }
183
184     // now do the actual preparation of the SP's
185     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
186     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
187         it != m_ReceiveProcessors.end();
188         ++it ) {
189
190         if(!(*it)->setOption("slaveMode", m_is_slave)) {
191             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
192         }
193
194         if(!(*it)->prepare()) {
195             debugFatal(  " could not prepare (%p)...\n",(*it));
196             return false;
197         }
198     }
199     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
200     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
201         it != m_TransmitProcessors.end();
202         ++it ) {
203         if(!(*it)->setOption("slaveMode", m_is_slave)) {
204             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
205         }
206         if(!(*it)->prepare()) {
207             debugFatal( " could not prepare (%p)...\n",(*it));
208             return false;
209         }
210     }
211
212     // if there are no stream processors registered,
213     // fail
214     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
215         debugFatal("No stream processors registered, can't do anything usefull\n");
216         return false;
217     }
218     return true;
219 }
220
221 bool StreamProcessorManager::startDryRunning() {
222     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
223     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
224     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
225             it != m_ReceiveProcessors.end();
226             ++it ) {
227         if (!(*it)->isDryRunning()) {
228             if(!(*it)->scheduleStartDryRunning(-1)) {
229                 debugError("Could not put SP %p into the dry-running state\n", *it);
230                 return false;
231             }
232         } else {
233             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
234         }
235     }
236     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
237             it != m_TransmitProcessors.end();
238             ++it ) {
239         if (!(*it)->isDryRunning()) {
240             if(!(*it)->scheduleStartDryRunning(-1)) {
241                 debugError("Could not put SP %p into the dry-running state\n", *it);
242                 return false;
243             }
244         } else {
245             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
246         }
247     }
248     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
249     // wait for the syncsource to start running.
250     // that will block the waitForPeriod call until everyone has started (theoretically)
251     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_DRYRUN; // by then it should have started
252     bool all_dry_running = false;
253     while (!all_dry_running && cnt) {
254         all_dry_running = true;
255         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
256                 it != m_ReceiveProcessors.end();
257                 ++it ) {
258             all_dry_running &= (*it)->isDryRunning();
259         }
260         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
261                 it != m_TransmitProcessors.end();
262                 ++it ) {
263             all_dry_running &= (*it)->isDryRunning();
264         }
265
266         SleepRelativeUsec(125);
267         cnt--;
268     }
269     if(cnt==0) {
270         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
271         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
272                 it != m_ReceiveProcessors.end();
273                 ++it ) {
274             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
275                 (*it)->getTypeString(), *it, (*it)->getStateString());
276         }
277         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
278                 it != m_TransmitProcessors.end();
279                 ++it ) {
280             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
281                 (*it)->getTypeString(), *it, (*it)->getStateString());
282         }
283         return false;
284     }
285     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
286     return true;
287 }
288
289 bool StreamProcessorManager::syncStartAll() {
290     if(m_SyncSource == NULL) return false;
291     // figure out when to get the SP's running.
292     // the xmit SP's should also know the base timestamp
293     // streams should be aligned here
294
295     // now find out how long we have to delay the wait operation such that
296     // the received frames will all be presented to the SP
297     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
298     int max_of_min_delay = 0;
299     int min_delay = 0;
300     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
301             it != m_ReceiveProcessors.end();
302             ++it ) {
303         min_delay = (*it)->getMaxFrameLatency();
304         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
305     }
306
307     // add some processing margin. This only shifts the time
308     // at which the buffer is transfer()'ed. This makes things somewhat
309     // more robust. It should be noted though that shifting the transfer
310     // time to a later time instant also causes the xmit buffer fill to be
311     // lower on average.
312     max_of_min_delay += STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS;
313     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n",
314         max_of_min_delay,
315         (unsigned int)TICKS_TO_SECS(max_of_min_delay),
316         (unsigned int)TICKS_TO_CYCLES(max_of_min_delay),
317         (unsigned int)TICKS_TO_OFFSET(max_of_min_delay));
318     m_SyncSource->setSyncDelay(max_of_min_delay);
319
320     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
321     //        have aquired lock
322     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
323     //sleep(2); // FIXME: be smarter here
324
325     // make sure that we are dry-running long enough for the
326     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
327     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
328     int nb_sync_runs=20;
329     int64_t time_till_next_period;
330     while(nb_sync_runs--) { // or while not sync-ed?
331         // check if we were woken up too soon
332         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
333         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
334         if(time_till_next_period > 0) {
335             // wait for the period
336             SleepRelativeUsec(time_till_next_period);
337         }
338     }
339
340     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
341     // FIXME: in the SPM it would be nice to have system time instead of
342     //        1394 time
343
344     // we now should have decent sync info on the sync source
345     // determine a point in time where the system should start
346     // figure out where we are now
347     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
348     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
349         time_of_first_sample,
350         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
351         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
352         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
353
354     // start wet-running in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles
355     // this is the time window we have to setup all SP's such that they
356     // can start wet-running correctly.
357     time_of_first_sample = addTicks(time_of_first_sample,
358                                     STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP * TICKS_PER_CYCLE);
359
360     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
361         time_of_first_sample,
362         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
363         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
364         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
365
366     // we should start wet-running the transmit SP's some cycles in advance
367     // such that we know it is wet-running when it should output its first sample
368     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
369                                                  STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE);
370
371     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
372                                                  STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE);
373     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
374         time_to_start_xmit,
375         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
376         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
377         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
378     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
379         time_to_start_recv,
380         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
381         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
382         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
383
384     // at this point the buffer head timestamp of the transmit buffers can be set
385     // this is the presentation time of the first sample in the buffer
386     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
387           it != m_TransmitProcessors.end();
388           ++it ) {
389         (*it)->setBufferHeadTimestamp(time_of_first_sample);
390     }
391
392     // STEP X: switch SP's over to the running state
393     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
394           it != m_ReceiveProcessors.end();
395           ++it ) {
396         if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
397             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
398             return false;
399         }
400     }
401     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
402           it != m_TransmitProcessors.end();
403           ++it ) {
404         if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
405             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
406             return false;
407         }
408     }
409     // wait for the syncsource to start running.
410     // that will block the waitForPeriod call until everyone has started (theoretically)
411     int cnt = STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP * 20; // by then it should have started
412     while (!m_SyncSource->isRunning() && cnt) {
413         SleepRelativeUsec(125);
414         cnt--;
415     }
416     if(cnt==0) {
417         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
418         return false;
419     }
420
421     // now align the received streams
422     if(!alignReceivedStreams()) {
423         debugError("Could not align streams\n");
424         return false;
425     }
426     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
427     return true;
428 }
429
430 bool
431 StreamProcessorManager::alignReceivedStreams()
432 {
433     if(m_SyncSource == NULL) return false;
434     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
435     unsigned int nb_sync_runs;
436     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
437     int64_t diff_between_streams[nb_rcv_sp];
438     int64_t diff;
439
440     unsigned int i;
441
442     unsigned int periods_per_align_try = (STREAMPROCESSORMANAGER_ALIGN_AVERAGE_TIME_MSEC * getNominalRate());
443     periods_per_align_try /= 1000;
444     periods_per_align_try /= getPeriodSize();
445     debugOutput( DEBUG_LEVEL_VERBOSE, " averaging over %u periods...\n", periods_per_align_try);
446
447     bool aligned = false;
448     int cnt = STREAMPROCESSORMANAGER_NB_ALIGN_TRIES;
449     while (!aligned && cnt--) {
450         nb_sync_runs = periods_per_align_try;
451         while(nb_sync_runs) {
452             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
453             if(!waitForPeriod()) {
454                 debugWarning("xrun while aligning streams...\n");
455                 return false;
456             };
457
458             i = 0;
459             for ( i = 0; i < nb_rcv_sp; i++) {
460                 StreamProcessor *s = m_ReceiveProcessors.at(i);
461                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
462                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",
463                     m_SyncSource, s, diff);
464                 if ( nb_sync_runs == periods_per_align_try ) {
465                     diff_between_streams[i] = diff;
466                 } else {
467                     diff_between_streams[i] += diff;
468                 }
469             }
470             if(!transferSilence()) {
471                 debugError("Could not transfer silence\n");
472                 return false;
473             }
474             nb_sync_runs--;
475         }
476         // calculate the average offsets
477         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
478         int diff_between_streams_frames[nb_rcv_sp];
479         aligned = true;
480         for ( i = 0; i < nb_rcv_sp; i++) {
481             StreamProcessor *s = m_ReceiveProcessors.at(i);
482
483             diff_between_streams[i] /= periods_per_align_try;
484             diff_between_streams_frames[i] = (int)roundf(diff_between_streams[i] / s->getTicksPerFrame());
485             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",
486                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
487
488             aligned &= (diff_between_streams_frames[i] == 0);
489
490             // reposition the stream
491             if(!s->shiftStream(diff_between_streams_frames[i])) {
492                 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]);
493                 return false;
494             }
495         }
496         if (!aligned) {
497             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
498         }
499     }
500     if (cnt == 0) {
501         debugError("Align failed\n");
502         return false;
503     }
504     return true;
505 }
506
507 bool StreamProcessorManager::start() {
508     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
509
510     // put all SP's into dry-running state
511     if (!startDryRunning()) {
512         debugFatal("Could not put SP's in dry-running state\n");
513         return false;
514     }
515
516     // start all SP's synchonized
517     if (!syncStartAll()) {
518         debugFatal("Could not syncStartAll...\n");
519         return false;
520     }
521     return true;
522 }
523
524 bool StreamProcessorManager::stop() {
525     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
526
527     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
528     // switch SP's over to the dry-running state
529     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
530           it != m_ReceiveProcessors.end();
531           ++it ) {
532         if((*it)->isRunning()) {
533             if(!(*it)->scheduleStopRunning(-1)) {
534                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
535                 return false;
536             }
537         }
538     }
539     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
540           it != m_TransmitProcessors.end();
541           ++it ) {
542         if((*it)->isRunning()) {
543             if(!(*it)->scheduleStopRunning(-1)) {
544                 debugError("%p->scheduleStopRunning(-1) failed\n", *it);
545                 return false;
546             }
547         }
548     }
549     // wait for the SP's to get into the dry-running state
550     int cnt = 2000;
551     bool ready = false;
552     while (!ready && cnt) {
553         ready = true;
554         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
555             it != m_ReceiveProcessors.end();
556             ++it ) {
557             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
558         }
559         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
560             it != m_TransmitProcessors.end();
561             ++it ) {
562             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
563         }
564         SleepRelativeUsec(125);
565         cnt--;
566     }
567     if(cnt==0) {
568         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
569         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
570             it != m_ReceiveProcessors.end();
571             ++it ) {
572             (*it)->dumpInfo();
573         }
574         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
575             it != m_TransmitProcessors.end();
576             ++it ) {
577             (*it)->dumpInfo();
578         }
579         return false;
580     }
581
582     // switch SP's over to the stopped state
583     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
584           it != m_ReceiveProcessors.end();
585           ++it ) {
586         if(!(*it)->scheduleStopDryRunning(-1)) {
587             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
588             return false;
589         }
590     }
591     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
592           it != m_TransmitProcessors.end();
593           ++it ) {
594         if(!(*it)->scheduleStopDryRunning(-1)) {
595             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
596             return false;
597         }
598     }
599     // wait for the SP's to get into the running state
600     cnt = 200;
601     ready = false;
602     while (!ready && cnt) {
603         ready = true;
604         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
605             it != m_ReceiveProcessors.end();
606             ++it ) {
607             ready &= (*it)->isStopped();
608         }
609         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
610             it != m_TransmitProcessors.end();
611             ++it ) {
612             ready &= (*it)->isStopped();
613         }
614         SleepRelativeUsec(125);
615         cnt--;
616     }
617     if(cnt==0) {
618         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to stop\n");
619         return false;
620     }
621     return true;
622 }
623
624 /**
625  * Called upon Xrun events. This brings all StreamProcessors back
626  * into their starting state, and then carries on streaming. This should
627  * have the same effect as restarting the whole thing.
628  *
629  * @return true if successful, false otherwise
630  */
631 bool StreamProcessorManager::handleXrun() {
632
633     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
634
635     dumpInfo();
636
637     /*
638      * Reset means:
639      * 1) Disabling the SP's, so that they don't process any packets
640      *    note: the isomanager does keep on delivering/requesting them
641      * 2) Bringing all buffers & streamprocessors into a know state
642      *    - Clear all capture buffers
643      *    - Put nb_periods*period_size of null frames into the playback buffers
644      * 3) Re-enable the SP's
645      */
646
647     // put all SP's back into dry-running state
648     if (!startDryRunning()) {
649         debugFatal("Could not put SP's in dry-running state\n");
650         return false;
651     }
652
653     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
654     // start all SP's synchonized
655     if (!syncStartAll()) {
656         debugFatal("Could not syncStartAll...\n");
657         return false;
658     }
659
660     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
661
662     return true;
663 }
664
665 /**
666  * @brief Waits until the next period of samples is ready
667  *
668  * This function does not return until a full period of samples is (or should be)
669  * ready to be transferred.
670  *
671  * @return true if the period is ready, false if an xrun occurred
672  */
673 bool StreamProcessorManager::waitForPeriod() {
674     if(m_SyncSource == NULL) return false;
675     bool xrun_occurred = false;
676     bool period_not_ready = true;
677
678     while(period_not_ready) {
679         debugOutput( DEBUG_LEVEL_VERBOSE, "waiting for period (%d frames in buffer)...\n", m_SyncSource->getBufferFill());
680         if(!m_SyncSource->waitForSignal()) {
681             debugError("Error waiting for signal\n");
682             return false;
683         }
684
685         unsigned int bufferfill = m_SyncSource->getBufferFill();
686         period_not_ready = bufferfill < m_period;
687
688 #ifdef DEBUG
689         if(period_not_ready) {
690             debugOutput(DEBUG_LEVEL_VERBOSE, "period is not ready (bufferfill: %u)\n", bufferfill);
691         } else {
692             debugOutput(DEBUG_LEVEL_VERBOSE, "period is ready (bufferfill: %u)\n", bufferfill);
693         }
694 #endif
695
696         // check for underruns on the ISO side,
697         // those should make us bail out of the wait loop
698         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
699             it != m_ReceiveProcessors.end();
700             ++it ) {
701             // a xrun has occurred on the Iso side
702             xrun_occurred |= (*it)->xrunOccurred();
703         }
704         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
705             it != m_TransmitProcessors.end();
706             ++it ) {
707             // a xrun has occurred on the Iso side
708             xrun_occurred |= (*it)->xrunOccurred();
709         }
710         if(xrun_occurred) break;
711         // FIXME: make sure we also exit this loop when something else happens (e.g. signal, iso error)
712     }
713
714     // we save the 'ideal' time of the transfer at this point,
715     // because we can have interleaved read - process - write
716     // cycles making that we modify a receiving stream's buffer
717     // before we get to writing.
718     // NOTE: before waitForPeriod() is called again, both the transmit
719     //       and the receive processors should have done their transfer.
720     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
721     debugOutput( DEBUG_LEVEL_VERBOSE, "transfer at %llu ticks...\n",
722         m_time_of_transfer);
723
724     // this is to notify the client of the delay that we introduced by waiting
725     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
726     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", m_delayed_usecs);
727
728 #ifdef DEBUG
729     int rcv_bf=0, xmt_bf=0;
730     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
731         it != m_ReceiveProcessors.end();
732         ++it ) {
733         rcv_bf = (*it)->getBufferFill();
734     }
735     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
736         it != m_TransmitProcessors.end();
737         ++it ) {
738         xmt_bf = (*it)->getBufferFill();
739     }
740     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
741         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
742
743     // check if xruns occurred on the Iso side.
744     // also check if xruns will occur should we transfer() now
745     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
746           it != m_ReceiveProcessors.end();
747           ++it ) {
748
749         if ((*it)->xrunOccurred()) {
750             debugWarning("Xrun on RECV SP %p due to ISO side xrun\n",*it);
751             (*it)->dumpInfo();
752         }
753         if (!((*it)->canClientTransferFrames(m_period))) {
754             debugWarning("Xrun on RECV SP %p due to buffer side xrun\n",*it);
755             (*it)->dumpInfo();
756         }
757     }
758     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
759           it != m_TransmitProcessors.end();
760           ++it ) {
761         if ((*it)->xrunOccurred()) {
762             debugWarning("Xrun on XMIT SP %p due to ISO side xrun\n",*it);
763         }
764         if (!((*it)->canClientTransferFrames(m_period))) {
765             debugWarning("Xrun on XMIT SP %p due to buffer side xrun\n",*it);
766         }
767     }
768 #endif
769
770     m_nbperiods++;
771     // now we can signal the client that we are (should be) ready
772     return !xrun_occurred;
773 }
774
775 /**
776  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
777  *
778  * Transfers one period of frames from the client side to the Iso side and vice versa.
779  *
780  * @return true if successful, false otherwise (indicates xrun).
781  */
782 bool StreamProcessorManager::transfer() {
783     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
784     bool retval=true;
785     retval &= transfer(StreamProcessor::ePT_Receive);
786     retval &= transfer(StreamProcessor::ePT_Transmit);
787     return retval;
788 }
789
790 /**
791  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
792  *
793  * Transfers one period of frames from the client side to the Iso side or vice versa.
794  *
795  * @param t The processor type to tranfer for (receive or transmit)
796  * @return true if successful, false otherwise (indicates xrun).
797  */
798 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
799     if(m_SyncSource == NULL) return false;
800     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
801         t, m_time_of_transfer,
802         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
803         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
804         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
805
806     bool retval = true;
807     // a static cast could make sure that there is no performance
808     // penalty for the virtual functions (to be checked)
809     if (t==StreamProcessor::ePT_Receive) {
810         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
811                 it != m_ReceiveProcessors.end();
812                 ++it ) {
813             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
814                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
815                             m_period, m_time_of_transfer,*it);
816                 retval &= false; // buffer underrun
817             }
818         }
819     } else {
820         // FIXME: in the SPM it would be nice to have system time instead of
821         //        1394 time
822         float rate = m_SyncSource->getTicksPerFrame();
823         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
824
825         // the data we are putting into the buffer is intended to be transmitted
826         // one ringbuffer size after it has been received
827         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
828
829         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
830                 it != m_TransmitProcessors.end();
831                 ++it ) {
832             // FIXME: in the SPM it would be nice to have system time instead of
833             //        1394 time
834             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
835                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
836                         m_period, transmit_timestamp, *it);
837                 retval &= false; // buffer underrun
838             }
839         }
840     }
841     return retval;
842 }
843
844 /**
845  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
846  *
847  * Transfers one period of silence to the Iso side for transmit SP's
848  * or dump one period of frames for receive SP's
849  *
850  * @return true if successful, false otherwise (indicates xrun).
851  */
852 bool StreamProcessorManager::transferSilence() {
853     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
854     bool retval=true;
855     retval &= transferSilence(StreamProcessor::ePT_Receive);
856     retval &= transferSilence(StreamProcessor::ePT_Transmit);
857     return retval;
858 }
859
860 /**
861  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
862  *
863  * Transfers one period of silence to the Iso side for transmit SP's
864  * or dump one period of frames for receive SP's
865  *
866  * @param t The processor type to tranfer for (receive or transmit)
867  * @return true if successful, false otherwise (indicates xrun).
868  */
869 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
870     if(m_SyncSource == NULL) return false;
871     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
872         t, m_time_of_transfer,
873         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
874         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
875         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
876
877     bool retval = true;
878     // a static cast could make sure that there is no performance
879     // penalty for the virtual functions (to be checked)
880     if (t==StreamProcessor::ePT_Receive) {
881         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
882                 it != m_ReceiveProcessors.end();
883                 ++it ) {
884             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
885                     debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n",
886                             m_period, m_time_of_transfer,*it);
887                 retval &= false; // buffer underrun
888             }
889         }
890     } else {
891         // FIXME: in the SPM it would be nice to have system time instead of
892         //        1394 time
893         float rate = m_SyncSource->getTicksPerFrame();
894         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
895
896         // the data we are putting into the buffer is intended to be transmitted
897         // one ringbuffer size after it has been received
898         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
899
900         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
901                 it != m_TransmitProcessors.end();
902                 ++it ) {
903             // FIXME: in the SPM it would be nice to have system time instead of
904             //        1394 time
905             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
906                 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
907                         m_period, transmit_timestamp, *it);
908                 retval &= false; // buffer underrun
909             }
910         }
911     }
912     return retval;
913 }
914
915 void StreamProcessorManager::dumpInfo() {
916     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
917     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
918     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
919     debugOutputShort( DEBUG_LEVEL_NORMAL, "Data type: %s\n", (m_audio_datatype==eADT_Float?"float":"int24"));
920
921     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
922     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
923         it != m_ReceiveProcessors.end();
924         ++it ) {
925         (*it)->dumpInfo();
926     }
927
928     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
929     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
930         it != m_TransmitProcessors.end();
931         ++it ) {
932         (*it)->dumpInfo();
933     }
934
935     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
936
937 }
938
939 void StreamProcessorManager::setVerboseLevel(int l) {
940     setDebugLevel(l);
941
942     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
943     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
944         it != m_ReceiveProcessors.end();
945         ++it ) {
946         (*it)->setVerboseLevel(l);
947     }
948
949     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
950     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
951         it != m_TransmitProcessors.end();
952         ++it ) {
953         (*it)->setVerboseLevel(l);
954     }
955 }
956
957
958 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
959     int count=0;
960
961     if (direction == Port::E_Capture) {
962         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
963             it != m_ReceiveProcessors.end();
964             ++it ) {
965             count += (*it)->getPortCount(type);
966         }
967     } else {
968         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
969             it != m_TransmitProcessors.end();
970             ++it ) {
971             count += (*it)->getPortCount(type);
972         }
973     }
974     return count;
975 }
976
977 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
978     int count=0;
979
980     if (direction == Port::E_Capture) {
981         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
982             it != m_ReceiveProcessors.end();
983             ++it ) {
984             count += (*it)->getPortCount();
985         }
986     } else {
987         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
988             it != m_TransmitProcessors.end();
989             ++it ) {
990             count += (*it)->getPortCount();
991         }
992     }
993     return count;
994 }
995
996 // TODO: implement a port map here, instead of the loop
997
998 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
999     int count=0;
1000     int prevcount=0;
1001
1002     if (direction == Port::E_Capture) {
1003         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1004             it != m_ReceiveProcessors.end();
1005             ++it ) {
1006             count += (*it)->getPortCount();
1007             if (count > idx) {
1008                 return (*it)->getPortAtIdx(idx-prevcount);
1009             }
1010             prevcount=count;
1011         }
1012     } else {
1013         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1014             it != m_TransmitProcessors.end();
1015             ++it ) {
1016             count += (*it)->getPortCount();
1017             if (count > idx) {
1018                 return (*it)->getPortAtIdx(idx-prevcount);
1019             }
1020             prevcount=count;
1021         }
1022     }
1023     return NULL;
1024 }
1025
1026 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1027     m_thread_realtime=rt;
1028     m_thread_priority=priority;
1029     return true;
1030 }
1031
1032
1033 } // end of namespace
Note: See TracBrowser for help on using the browser.