root/branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp

Revision 727, 42.1 kB (checked in by ppalmers, 15 years ago)

stream alignment implemented

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public
11  * License version 2.1, as published by the Free Software Foundation;
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public
19  * License along with this library; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
21  * MA 02110-1301 USA
22  */
23
24 #include "StreamProcessorManager.h"
25 #include "generic/StreamProcessor.h"
26 #include "generic/Port.h"
27 #include "util/cycletimer.h"
28
29 #include <errno.h>
30 #include <assert.h>
31 #include <math.h>
32
33 #define RUNNING_TIMEOUT_MSEC 4000
34 #define PREPARE_TIMEOUT_MSEC 4000
35 #define ENABLE_TIMEOUT_MSEC 4000
36
37 // allows to add some processing margin. This shifts the time
38 // at which the buffer is transfer()'ed, making things somewhat
39 // more robust. It should be noted though that shifting the transfer
40 // time to a later time instant also causes the xmit buffer fill to be
41 // lower on average.
42 #define FFADO_SIGNAL_DELAY_TICKS 3072
43
44 namespace Streaming {
45
46 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
47
48 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int framerate, unsigned int nb_buffers)
49     : m_is_slave( false )
50     , m_SyncSource(NULL)
51     , m_nb_buffers(nb_buffers)
52     , m_period(period)
53     , m_nominal_framerate ( framerate )
54     , m_xruns(0)
55     , m_isoManager(0)
56     , m_nbperiods(0)
57 {
58     addOption(Util::OptionContainer::Option("slaveMode",false));
59 }
60
61 StreamProcessorManager::~StreamProcessorManager() {
62     if (m_isoManager) delete m_isoManager;
63 }
64
65 /**
66  * Registers \ref processor with this manager.
67  *
68  * also registers it with the isohandlermanager
69  *
70  * be sure to call isohandlermanager->init() first!
71  * and be sure that the processors are also ->init()'ed
72  *
73  * @param processor
74  * @return true if successfull
75  */
76 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
77 {
78     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
79     assert(processor);
80     assert(m_isoManager);
81
82     if (processor->getType() == StreamProcessor::ePT_Receive) {
83         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
84
85         m_ReceiveProcessors.push_back(processor);
86         processor->setManager(this);
87         return true;
88     }
89
90     if (processor->getType() == StreamProcessor::ePT_Transmit) {
91         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
92
93         m_TransmitProcessors.push_back(processor);
94         processor->setManager(this);
95         return true;
96     }
97
98     debugFatal("Unsupported processor type!\n");
99     return false;
100 }
101
102 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
103 {
104     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
105     assert(processor);
106
107     if (processor->getType()==StreamProcessor::ePT_Receive) {
108
109         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
110               it != m_ReceiveProcessors.end();
111               ++it )
112         {
113             if ( *it == processor ) {
114                 m_ReceiveProcessors.erase(it);
115                 processor->clearManager();
116                 if(!m_isoManager->unregisterStream(processor)) {
117                     debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
118                     return false;
119                 }
120                 return true;
121             }
122         }
123     }
124
125     if (processor->getType()==StreamProcessor::ePT_Transmit) {
126         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
127               it != m_TransmitProcessors.end();
128               ++it )
129         {
130             if ( *it == processor ) {
131                 m_TransmitProcessors.erase(it);
132                 processor->clearManager();
133                 if(!m_isoManager->unregisterStream(processor)) {
134                     debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
135                     return false;
136                 }
137                 return true;
138             }
139         }
140     }
141
142     debugFatal("Processor (%p) not found!\n",processor);
143     return false; //not found
144 }
145
146 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
147     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
148     m_SyncSource=s;
149     return true;
150 }
151
152 bool StreamProcessorManager::init()
153 {
154     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
155     m_isoManager = new IsoHandlerManager(m_thread_realtime, m_thread_priority + 1);
156     if(!m_isoManager) {
157         debugFatal("Could not create IsoHandlerManager\n");
158         return false;
159     }
160     m_isoManager->setVerboseLevel(getDebugLevel());
161     m_isoManager->setTransmitBufferNbPeriods(getNbBuffers() - 1);
162
163     if(!m_isoManager->init()) {
164         debugFatal("Could not initialize IsoHandlerManager\n");
165         return false;
166     }
167
168     m_xrun_happened=false;
169     return true;
170 }
171
172 bool StreamProcessorManager::prepare() {
173
174     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
175
176     m_is_slave=false;
177     if(!getOption("slaveMode", m_is_slave)) {
178         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
179     }
180
181     // if no sync source is set, select one here
182     if(m_SyncSource == NULL) {
183        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
184     }
185
186     // FIXME: put into separate method
187     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
188           it != m_ReceiveProcessors.end();
189           ++it )
190     {
191         if(m_SyncSource == NULL) {
192             debugWarning(" => Sync Source is %p.\n", *it);
193             m_SyncSource = *it;
194         }
195     }
196     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
197           it != m_TransmitProcessors.end();
198           ++it )
199     {
200         if(m_SyncSource == NULL) {
201             debugWarning(" => Sync Source is %p.\n", *it);
202             m_SyncSource = *it;
203         }
204     }
205
206     // now do the actual preparation of the SP's
207     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
208     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
209         it != m_ReceiveProcessors.end();
210         ++it ) {
211
212         if(!(*it)->setOption("slaveMode", m_is_slave)) {
213             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
214         }
215
216         if(!(*it)->prepare()) {
217             debugFatal(  " could not prepare (%p)...\n",(*it));
218             return false;
219         }
220     }
221     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
222     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
223         it != m_TransmitProcessors.end();
224         ++it ) {
225         if(!(*it)->setOption("slaveMode", m_is_slave)) {
226             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
227         }
228         if(!(*it)->prepare()) {
229             debugFatal( " could not prepare (%p)...\n",(*it));
230             return false;
231         }
232     }
233
234     // if there are no stream processors registered,
235     // fail
236     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
237         debugFatal("No stream processors registered, can't do anything usefull\n");
238         return false;
239     }
240     return true;
241 }
242
243 bool StreamProcessorManager::startDryRunning() {
244     debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n");
245     debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n");
246     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
247             it != m_ReceiveProcessors.end();
248             ++it ) {
249         if (!(*it)->isDryRunning()) {
250             if(!(*it)->scheduleStartDryRunning(-1)) {
251                 debugError("Could not put SP %p into the dry-running state\n", *it);
252                 return false;
253             }
254         } else {
255             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
256         }
257     }
258     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
259             it != m_TransmitProcessors.end();
260             ++it ) {
261         if (!(*it)->isDryRunning()) {
262             if(!(*it)->scheduleStartDryRunning(-1)) {
263                 debugError("Could not put SP %p into the dry-running state\n", *it);
264                 return false;
265             }
266         } else {
267             debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it);
268         }
269     }
270     debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n");
271     // wait for the syncsource to start running.
272     // that will block the waitForPeriod call until everyone has started (theoretically)
273     #define CYCLES_FOR_DRYRUN 40000
274     int cnt = CYCLES_FOR_DRYRUN; // by then it should have started
275     bool all_dry_running = false;
276     while (!all_dry_running && cnt) {
277         all_dry_running = true;
278         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
279                 it != m_ReceiveProcessors.end();
280                 ++it ) {
281             all_dry_running &= (*it)->isDryRunning();
282         }
283         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
284                 it != m_TransmitProcessors.end();
285                 ++it ) {
286             all_dry_running &= (*it)->isDryRunning();
287         }
288
289         usleep(125);
290         cnt--;
291     }
292     if(cnt==0) {
293         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
294         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
295                 it != m_ReceiveProcessors.end();
296                 ++it ) {
297             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
298                 (*it)->getTypeString(), *it, (*it)->getStateString());
299         }
300         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
301                 it != m_TransmitProcessors.end();
302                 ++it ) {
303             debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n",
304                 (*it)->getTypeString(), *it, (*it)->getStateString());
305         }
306         return false;
307     }
308     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
309     return true;
310 }
311
312 bool StreamProcessorManager::syncStartAll() {
313     // figure out when to get the SP's running.
314     // the xmit SP's should also know the base timestamp
315     // streams should be aligned here
316
317     // now find out how long we have to delay the wait operation such that
318     // the received frames will all be presented to the SP
319     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
320     int max_of_min_delay = 0;
321     int min_delay = 0;
322     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
323             it != m_ReceiveProcessors.end();
324             ++it ) {
325         min_delay = (*it)->getMaxFrameLatency();
326         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
327     }
328
329     // add some processing margin. This only shifts the time
330     // at which the buffer is transfer()'ed. This makes things somewhat
331     // more robust. It should be noted though that shifting the transfer
332     // time to a later time instant also causes the xmit buffer fill to be
333     // lower on average.
334     max_of_min_delay += FFADO_SIGNAL_DELAY_TICKS;
335     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n",
336         max_of_min_delay,
337         (unsigned int)TICKS_TO_SECS(max_of_min_delay),
338         (unsigned int)TICKS_TO_CYCLES(max_of_min_delay),
339         (unsigned int)TICKS_TO_OFFSET(max_of_min_delay));
340     m_SyncSource->setSyncDelay(max_of_min_delay);
341
342     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
343     //        have aquired lock
344     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
345     //sleep(2); // FIXME: be smarter here
346
347     // make sure that we are dry-running long enough for the
348     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
349     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
350     int nb_sync_runs=20;
351     int64_t time_till_next_period;
352     while(nb_sync_runs--) { // or while not sync-ed?
353         // check if we were woken up too soon
354         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
355         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
356         if(time_till_next_period > 0) {
357             // wait for the period
358             usleep(time_till_next_period);
359         }
360     }
361
362     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
363     // FIXME: in the SPM it would be nice to have system time instead of
364     //        1394 time
365
366     // we now should have decent sync info on the sync source
367     // determine a point in time where the system should start
368     // figure out where we are now
369     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
370     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
371         time_of_first_sample,
372         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
373         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
374         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
375
376     #define CYCLES_FOR_STARTUP 2000
377     // start wet-running in CYCLES_FOR_STARTUP cycles
378     // this is the time window we have to setup all SP's such that they
379     // can start wet-running correctly.
380     time_of_first_sample = addTicks(time_of_first_sample,
381                                     CYCLES_FOR_STARTUP * TICKS_PER_CYCLE);
382
383     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
384         time_of_first_sample,
385         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
386         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
387         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
388
389     // we should start wet-running the transmit SP's some cycles in advance
390     // such that we know it is wet-running when it should output its first sample
391     #define PRESTART_CYCLES_FOR_XMIT 20
392     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
393                                                  PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE);
394
395     #define PRESTART_CYCLES_FOR_RECV 0
396     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
397                                                  PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE);
398     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
399         time_to_start_xmit,
400         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
401         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
402         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
403     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
404         time_to_start_recv,
405         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
406         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
407         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
408
409     // at this point the buffer head timestamp of the transmit buffers can be set
410     // this is the presentation time of the first sample in the buffer
411     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
412           it != m_TransmitProcessors.end();
413           ++it ) {
414         (*it)->setBufferHeadTimestamp(time_of_first_sample);
415     }
416
417     // STEP X: switch SP's over to the running state
418     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
419           it != m_ReceiveProcessors.end();
420           ++it ) {
421         if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
422             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
423             return false;
424         }
425     }
426     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
427           it != m_TransmitProcessors.end();
428           ++it ) {
429         if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
430             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
431             return false;
432         }
433     }
434     // wait for the syncsource to start running.
435     // that will block the waitForPeriod call until everyone has started (theoretically)
436     int cnt = CYCLES_FOR_STARTUP * 2; // by then it should have started
437     while (!m_SyncSource->isRunning() && cnt) {
438         usleep(125);
439         cnt--;
440     }
441     if(cnt==0) {
442         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
443         return false;
444     }
445
446     // now align the received streams
447     if(!alignReceivedStreams()) {
448         debugError("Could not align streams\n");
449         return false;
450     }
451     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
452     return true;
453 }
454
455 bool
456 StreamProcessorManager::alignReceivedStreams()
457 {
458     #define NB_PERIODS_FOR_ALIGN_AVERAGE 20
459     #define NB_ALIGN_TRIES 20
460     debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n");
461     unsigned int nb_sync_runs;
462     unsigned int nb_rcv_sp = m_ReceiveProcessors.size();
463     int64_t diff_between_streams[nb_rcv_sp];
464     int64_t diff;
465
466     unsigned int i;
467
468     bool aligned = false;
469     int cnt = NB_ALIGN_TRIES;
470     while (!aligned && cnt--) {
471         nb_sync_runs = NB_PERIODS_FOR_ALIGN_AVERAGE;
472         while(nb_sync_runs) {
473             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs);
474             waitForPeriod();
475
476             i = 0;
477             for ( i = 0; i < nb_rcv_sp; i++) {
478                 StreamProcessor *s = m_ReceiveProcessors.at(i);
479                 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod());
480                 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",
481                     m_SyncSource, s, diff);
482                 if ( nb_sync_runs == NB_PERIODS_FOR_ALIGN_AVERAGE ) {
483                     diff_between_streams[i] = diff;
484                 } else {
485                     diff_between_streams[i] += diff;
486                 }
487             }
488             if(!transferSilence()) {
489                 debugError("Could not transfer silence\n");
490                 return false;
491             }
492             nb_sync_runs--;
493         }
494         // calculate the average offsets
495         debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n");
496         int diff_between_streams_frames[nb_rcv_sp];
497         aligned = true;
498         for ( i = 0; i < nb_rcv_sp; i++) {
499             StreamProcessor *s = m_ReceiveProcessors.at(i);
500
501             diff_between_streams[i] /= NB_PERIODS_FOR_ALIGN_AVERAGE;
502             diff_between_streams_frames[i] = roundf(diff_between_streams[i] / s->getTicksPerFrame());
503             debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",
504                 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]);
505
506             aligned &= (diff_between_streams_frames[i] == 0);
507
508             // position the stream
509             if(!s->shiftStream(diff_between_streams_frames[i])) {
510                 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]);
511                 return false;
512             }
513         }
514         if (!aligned) {
515             debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n");
516         }
517     }
518     if (cnt == 0) {
519         debugError("Align failed\n");
520         return false;
521     }
522     return true;
523 }
524
525 bool StreamProcessorManager::start() {
526     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
527     assert(m_isoManager);
528
529     debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
530     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
531     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
532           it != m_ReceiveProcessors.end();
533           ++it )
534     {
535         if (!m_isoManager->registerStream(*it)) {
536             debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
537             return false;
538         }
539     }
540     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
541     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
542           it != m_TransmitProcessors.end();
543           ++it )
544     {
545         if (!m_isoManager->registerStream(*it)) {
546             debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
547             return false;
548         }
549     }
550
551     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
552     if (!m_isoManager->prepare()) {
553         debugFatal("Could not prepare isoManager\n");
554         return false;
555     }
556
557     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n");
558     if (!m_isoManager->startHandlers(-1)) {
559         debugFatal("Could not start handlers...\n");
560         return false;
561     }
562
563     // put all SP's into dry-running state
564     if (!startDryRunning()) {
565         debugFatal("Could not put SP's in dry-running state\n");
566         return false;
567     }
568
569     // start all SP's synchonized
570     if (!syncStartAll()) {
571         debugFatal("Could not syncStartAll...\n");
572         return false;
573     }
574
575     // dump the iso stream information when in verbose mode
576     if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
577         m_isoManager->dumpInfo();
578     }
579
580     return true;
581 }
582
583 bool StreamProcessorManager::stop() {
584     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
585     assert(m_isoManager);
586
587     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
588
589     // switch SP's over to the dry-running state
590     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
591           it != m_ReceiveProcessors.end();
592           ++it ) {
593         if(!(*it)->scheduleStopRunning(-1)) {
594             debugError("%p->scheduleStopRunning(-1) failed\n", *it);
595             return false;
596         }
597     }
598     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
599           it != m_TransmitProcessors.end();
600           ++it ) {
601         if(!(*it)->scheduleStopRunning(-1)) {
602             debugError("%p->scheduleStopRunning(-1) failed\n", *it);
603             return false;
604         }
605     }
606     // wait for the SP's to get into the dry-running state
607     int cnt = 200;
608     bool ready = false;
609     while (!ready && cnt) {
610         ready = true;
611         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
612             it != m_ReceiveProcessors.end();
613             ++it ) {
614             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
615         }
616         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
617             it != m_TransmitProcessors.end();
618             ++it ) {
619             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
620         }
621         usleep(125);
622         cnt--;
623     }
624     if(cnt==0) {
625         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
626         return false;
627     }
628
629     // switch SP's over to the stopped state
630     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
631           it != m_ReceiveProcessors.end();
632           ++it ) {
633         if(!(*it)->scheduleStopDryRunning(-1)) {
634             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
635             return false;
636         }
637     }
638     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
639           it != m_TransmitProcessors.end();
640           ++it ) {
641         if(!(*it)->scheduleStopDryRunning(-1)) {
642             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
643             return false;
644         }
645     }
646     // wait for the SP's to get into the running state
647     cnt = 200;
648     ready = false;
649     while (!ready && cnt) {
650         ready = true;
651         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
652             it != m_ReceiveProcessors.end();
653             ++it ) {
654             ready &= (*it)->isStopped();
655         }
656         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
657             it != m_TransmitProcessors.end();
658             ++it ) {
659             ready &= (*it)->isStopped();
660         }
661         usleep(125);
662         cnt--;
663     }
664     if(cnt==0) {
665         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to stop\n");
666         return false;
667     }
668
669     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n");
670     if(!m_isoManager->stopHandlers()) {
671        debugFatal("Could not stop ISO handlers\n");
672        return false;
673     }
674
675     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processors from handlers...\n");
676     // now unregister all streams from iso manager
677     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
678     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
679           it != m_ReceiveProcessors.end();
680           ++it ) {
681         if (!m_isoManager->unregisterStream(*it)) {
682             debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it);
683             return false;
684         }
685     }
686     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
687     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
688           it != m_TransmitProcessors.end();
689           ++it ) {
690         if (!m_isoManager->unregisterStream(*it)) {
691             debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it);
692             return false;
693         }
694     }
695     return true;
696 }
697
698 /**
699  * Called upon Xrun events. This brings all StreamProcessors back
700  * into their starting state, and then carries on streaming. This should
701  * have the same effect as restarting the whole thing.
702  *
703  * @return true if successful, false otherwise
704  */
705 bool StreamProcessorManager::handleXrun() {
706
707     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
708
709     dumpInfo();
710
711     /*
712      * Reset means:
713      * 1) Disabling the SP's, so that they don't process any packets
714      *    note: the isomanager does keep on delivering/requesting them
715      * 2) Bringing all buffers & streamprocessors into a know state
716      *    - Clear all capture buffers
717      *    - Put nb_periods*period_size of null frames into the playback buffers
718      * 3) Re-enable the SP's
719      */
720
721     // put all SP's back into dry-running state
722     if (!startDryRunning()) {
723         debugFatal("Could not put SP's in dry-running state\n");
724         return false;
725     }
726
727     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
728     // start all SP's synchonized
729     if (!syncStartAll()) {
730         debugFatal("Could not syncStartAll...\n");
731         return false;
732     }
733
734     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
735
736     return true;
737 }
738
739 /**
740  * @brief Waits until the next period of samples is ready
741  *
742  * This function does not return until a full period of samples is (or should be)
743  * ready to be transferred.
744  *
745  * @return true if the period is ready, false if an xrun occurred
746  */
747 bool StreamProcessorManager::waitForPeriod() {
748     int time_till_next_period;
749     bool xrun_occurred = false;
750
751     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
752
753     assert(m_SyncSource);
754
755     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
756
757     while(time_till_next_period > 0) {
758         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
759
760         // wait for the period
761         usleep(time_till_next_period);
762
763         // check for underruns on the ISO side,
764         // those should make us bail out of the wait loop
765         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
766             it != m_ReceiveProcessors.end();
767             ++it ) {
768             // a xrun has occurred on the Iso side
769             xrun_occurred |= (*it)->xrunOccurred();
770         }
771         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
772             it != m_TransmitProcessors.end();
773             ++it ) {
774             // a xrun has occurred on the Iso side
775             xrun_occurred |= (*it)->xrunOccurred();
776         }
777         if(xrun_occurred) break;
778
779         // check if we were waked up too soon
780         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
781     }
782
783     // we save the 'ideal' time of the transfer at this point,
784     // because we can have interleaved read - process - write
785     // cycles making that we modify a receiving stream's buffer
786     // before we get to writing.
787     // NOTE: before waitForPeriod() is called again, both the transmit
788     //       and the receive processors should have done their transfer.
789     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
790     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
791         m_time_of_transfer);
792
793     // normally we can transfer frames at this time, but in some cases this is not true
794     // e.g. when there are not enough frames in the receive buffer.
795     // however this doesn't have to be a problem, since we can wait some more until we
796     // have enough frames. There is only a problem once the ISO xmit doesn't have packets
797     // to transmit, or if the receive buffer overflows. These conditions are signaled by
798     // the iso threads
799     // check if xruns occurred on the Iso side.
800     // also check if xruns will occur should we transfer() now
801     #ifdef DEBUG
802     int waited = -1;
803     #endif
804     bool ready_for_transfer = false;
805     xrun_occurred = false;
806     while (!ready_for_transfer && !xrun_occurred) {
807         ready_for_transfer = true;
808         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
809             it != m_ReceiveProcessors.end();
810             ++it ) {
811             ready_for_transfer &= ((*it)->canClientTransferFrames(m_period));
812             xrun_occurred |= (*it)->xrunOccurred();
813         }
814         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
815             it != m_TransmitProcessors.end();
816             ++it ) {
817             ready_for_transfer &= ((*it)->canClientTransferFrames(m_period));
818             xrun_occurred |= (*it)->xrunOccurred();
819         }
820         usleep(125); // MAGIC: one cycle sleep...
821         #ifdef DEBUG
822         waited++;
823         #endif
824     } // we are either ready or an xrun occurred
825
826     #ifdef DEBUG
827     if(waited > 0) {
828         debugOutput(DEBUG_LEVEL_VERBOSE, "Waited %d x 125us due to SP not ready for transfer\n", waited);
829     }
830     #endif
831
832     // this is to notify the client of the delay that we introduced by waiting
833     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
834     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", m_delayed_usecs);
835
836 #ifdef DEBUG
837     int rcv_bf=0, xmt_bf=0;
838     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
839         it != m_ReceiveProcessors.end();
840         ++it ) {
841         rcv_bf = (*it)->getBufferFill();
842     }
843     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
844         it != m_TransmitProcessors.end();
845         ++it ) {
846         xmt_bf = (*it)->getBufferFill();
847     }
848     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
849         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
850
851     // check if xruns occurred on the Iso side.
852     // also check if xruns will occur should we transfer() now
853     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
854           it != m_ReceiveProcessors.end();
855           ++it ) {
856
857         if ((*it)->xrunOccurred()) {
858             debugWarning("Xrun on RECV SP %p due to ISO side xrun\n",*it);
859             (*it)->dumpInfo();
860         }
861         if (!((*it)->canClientTransferFrames(m_period))) {
862             debugWarning("Xrun on RECV SP %p due to buffer side xrun\n",*it);
863             (*it)->dumpInfo();
864         }
865     }
866     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
867           it != m_TransmitProcessors.end();
868           ++it ) {
869         if ((*it)->xrunOccurred()) {
870             debugWarning("Xrun on XMIT SP %p due to ISO side xrun\n",*it);
871         }
872         if (!((*it)->canClientTransferFrames(m_period))) {
873             debugWarning("Xrun on XMIT SP %p due to buffer side xrun\n",*it);
874         }
875     }
876 #endif
877
878     m_nbperiods++;
879     // now we can signal the client that we are (should be) ready
880     return !xrun_occurred;
881 }
882
883 /**
884  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
885  *
886  * Transfers one period of frames from the client side to the Iso side and vice versa.
887  *
888  * @return true if successful, false otherwise (indicates xrun).
889  */
890 bool StreamProcessorManager::transfer() {
891     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
892     bool retval=true;
893     retval &= transfer(StreamProcessor::ePT_Receive);
894     retval &= transfer(StreamProcessor::ePT_Transmit);
895     return retval;
896 }
897
898 /**
899  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
900  *
901  * Transfers one period of frames from the client side to the Iso side or vice versa.
902  *
903  * @param t The processor type to tranfer for (receive or transmit)
904  * @return true if successful, false otherwise (indicates xrun).
905  */
906 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
907     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
908         t, m_time_of_transfer,
909         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
910         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
911         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
912
913     bool retval = true;
914     // a static cast could make sure that there is no performance
915     // penalty for the virtual functions (to be checked)
916     if (t==StreamProcessor::ePT_Receive) {
917         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
918                 it != m_ReceiveProcessors.end();
919                 ++it ) {
920             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
921                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
922                             m_period, m_time_of_transfer,*it);
923                 retval &= false; // buffer underrun
924             }
925         }
926     } else {
927         // FIXME: in the SPM it would be nice to have system time instead of
928         //        1394 time
929         float rate = m_SyncSource->getTicksPerFrame();
930         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
931
932         // the data we are putting into the buffer is intended to be transmitted
933         // one ringbuffer size after it has been received
934         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
935
936         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
937                 it != m_TransmitProcessors.end();
938                 ++it ) {
939             // FIXME: in the SPM it would be nice to have system time instead of
940             //        1394 time
941             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
942                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
943                         m_period, transmit_timestamp, *it);
944                 retval &= false; // buffer underrun
945             }
946         }
947     }
948     return retval;
949 }
950
951 /**
952  * @brief Transfer one period of silence for both receive and transmit StreamProcessors
953  *
954  * Transfers one period of silence to the Iso side for transmit SP's
955  * or dump one period of frames for receive SP's
956  *
957  * @return true if successful, false otherwise (indicates xrun).
958  */
959 bool StreamProcessorManager::transferSilence() {
960     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n");
961     bool retval=true;
962     retval &= transferSilence(StreamProcessor::ePT_Receive);
963     retval &= transferSilence(StreamProcessor::ePT_Transmit);
964     return retval;
965 }
966
967 /**
968  * @brief Transfer one period of silence for either the receive or transmit StreamProcessors
969  *
970  * Transfers one period of silence to the Iso side for transmit SP's
971  * or dump one period of frames for receive SP's
972  *
973  * @param t The processor type to tranfer for (receive or transmit)
974  * @return true if successful, false otherwise (indicates xrun).
975  */
976 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) {
977     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
978         t, m_time_of_transfer,
979         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
980         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
981         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
982
983     bool retval = true;
984     // a static cast could make sure that there is no performance
985     // penalty for the virtual functions (to be checked)
986     if (t==StreamProcessor::ePT_Receive) {
987         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
988                 it != m_ReceiveProcessors.end();
989                 ++it ) {
990             if(!(*it)->dropFrames(m_period, m_time_of_transfer)) {
991                     debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n",
992                             m_period, m_time_of_transfer,*it);
993                 retval &= false; // buffer underrun
994             }
995         }
996     } else {
997         // FIXME: in the SPM it would be nice to have system time instead of
998         //        1394 time
999         float rate = m_SyncSource->getTicksPerFrame();
1000         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
1001
1002         // the data we are putting into the buffer is intended to be transmitted
1003         // one ringbuffer size after it has been received
1004         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1005
1006         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1007                 it != m_TransmitProcessors.end();
1008                 ++it ) {
1009             // FIXME: in the SPM it would be nice to have system time instead of
1010             //        1394 time
1011             if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
1012                 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
1013                         m_period, transmit_timestamp, *it);
1014                 retval &= false; // buffer underrun
1015             }
1016         }
1017     }
1018     return retval;
1019 }
1020
1021 void StreamProcessorManager::dumpInfo() {
1022     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1023     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
1024     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
1025
1026     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
1027     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1028         it != m_ReceiveProcessors.end();
1029         ++it ) {
1030         (*it)->dumpInfo();
1031     }
1032
1033     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
1034     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1035         it != m_TransmitProcessors.end();
1036         ++it ) {
1037         (*it)->dumpInfo();
1038     }
1039
1040     debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
1041     m_isoManager->dumpInfo();
1042     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
1043
1044 }
1045
1046 void StreamProcessorManager::setVerboseLevel(int l) {
1047     setDebugLevel(l);
1048
1049     if (m_isoManager) m_isoManager->setVerboseLevel(l);
1050
1051     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
1052     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1053         it != m_ReceiveProcessors.end();
1054         ++it ) {
1055         (*it)->setVerboseLevel(l);
1056     }
1057
1058     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
1059     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1060         it != m_TransmitProcessors.end();
1061         ++it ) {
1062         (*it)->setVerboseLevel(l);
1063     }
1064 }
1065
1066
1067 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
1068     int count=0;
1069
1070     if (direction == Port::E_Capture) {
1071         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1072             it != m_ReceiveProcessors.end();
1073             ++it ) {
1074             count += (*it)->getPortCount(type);
1075         }
1076     } else {
1077         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1078             it != m_TransmitProcessors.end();
1079             ++it ) {
1080             count += (*it)->getPortCount(type);
1081         }
1082     }
1083     return count;
1084 }
1085
1086 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
1087     int count=0;
1088
1089     if (direction == Port::E_Capture) {
1090         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1091             it != m_ReceiveProcessors.end();
1092             ++it ) {
1093             count += (*it)->getPortCount();
1094         }
1095     } else {
1096         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1097             it != m_TransmitProcessors.end();
1098             ++it ) {
1099             count += (*it)->getPortCount();
1100         }
1101     }
1102     return count;
1103 }
1104
1105 // TODO: implement a port map here, instead of the loop
1106
1107 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
1108     int count=0;
1109     int prevcount=0;
1110
1111     if (direction == Port::E_Capture) {
1112         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1113             it != m_ReceiveProcessors.end();
1114             ++it ) {
1115             count += (*it)->getPortCount();
1116             if (count > idx) {
1117                 return (*it)->getPortAtIdx(idx-prevcount);
1118             }
1119             prevcount=count;
1120         }
1121     } else {
1122         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1123             it != m_TransmitProcessors.end();
1124             ++it ) {
1125             count += (*it)->getPortCount();
1126             if (count > idx) {
1127                 return (*it)->getPortAtIdx(idx-prevcount);
1128             }
1129             prevcount=count;
1130         }
1131     }
1132     return NULL;
1133 }
1134
1135 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
1136     m_thread_realtime=rt;
1137     m_thread_priority=priority;
1138     return true;
1139 }
1140
1141
1142 } // end of namespace
Note: See TracBrowser for help on using the browser.