root/branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp

Revision 722, 34.4 kB (checked in by ppalmers, 13 years ago)

more rewrite of streaming

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public
11  * License version 2.1, as published by the Free Software Foundation;
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public
19  * License along with this library; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
21  * MA 02110-1301 USA
22  */
23
24 #include "StreamProcessorManager.h"
25 #include "generic/StreamProcessor.h"
26 #include "generic/Port.h"
27 #include "util/cycletimer.h"
28
29 #include <errno.h>
30 #include <assert.h>
31
32 #define RUNNING_TIMEOUT_MSEC 4000
33 #define PREPARE_TIMEOUT_MSEC 4000
34 #define ENABLE_TIMEOUT_MSEC 4000
35
36 // allows to add some processing margin. This shifts the time
37 // at which the buffer is transfer()'ed, making things somewhat
38 // more robust. It should be noted though that shifting the transfer
39 // time to a later time instant also causes the xmit buffer fill to be
40 // lower on average.
41 #define FFADO_SIGNAL_DELAY_TICKS 3072
42
43 namespace Streaming {
44
45 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
46
47 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int framerate, unsigned int nb_buffers)
48     : m_is_slave( false )
49     , m_SyncSource(NULL)
50     , m_nb_buffers(nb_buffers)
51     , m_period(period)
52     , m_nominal_framerate ( framerate )
53     , m_xruns(0)
54     , m_isoManager(0)
55     , m_nbperiods(0)
56 {
57     addOption(Util::OptionContainer::Option("slaveMode",false));
58 }
59
60 StreamProcessorManager::~StreamProcessorManager() {
61     if (m_isoManager) delete m_isoManager;
62 }
63
64 /**
65  * Registers \ref processor with this manager.
66  *
67  * also registers it with the isohandlermanager
68  *
69  * be sure to call isohandlermanager->init() first!
70  * and be sure that the processors are also ->init()'ed
71  *
72  * @param processor
73  * @return true if successfull
74  */
75 bool StreamProcessorManager::registerProcessor(StreamProcessor *processor)
76 {
77     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p)\n",processor);
78     assert(processor);
79     assert(m_isoManager);
80
81     if (processor->getType() == StreamProcessor::ePT_Receive) {
82         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
83
84         m_ReceiveProcessors.push_back(processor);
85         processor->setManager(this);
86         return true;
87     }
88
89     if (processor->getType() == StreamProcessor::ePT_Transmit) {
90         processor->setVerboseLevel(getDebugLevel()); // inherit debug level
91
92         m_TransmitProcessors.push_back(processor);
93         processor->setManager(this);
94         return true;
95     }
96
97     debugFatal("Unsupported processor type!\n");
98     return false;
99 }
100
101 bool StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
102 {
103     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p)\n",processor);
104     assert(processor);
105
106     if (processor->getType()==StreamProcessor::ePT_Receive) {
107
108         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
109               it != m_ReceiveProcessors.end();
110               ++it )
111         {
112             if ( *it == processor ) {
113                 m_ReceiveProcessors.erase(it);
114                 processor->clearManager();
115                 if(!m_isoManager->unregisterStream(processor)) {
116                     debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n");
117                     return false;
118                 }
119                 return true;
120             }
121         }
122     }
123
124     if (processor->getType()==StreamProcessor::ePT_Transmit) {
125         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
126               it != m_TransmitProcessors.end();
127               ++it )
128         {
129             if ( *it == processor ) {
130                 m_TransmitProcessors.erase(it);
131                 processor->clearManager();
132                 if(!m_isoManager->unregisterStream(processor)) {
133                     debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n");
134                     return false;
135                 }
136                 return true;
137             }
138         }
139     }
140
141     debugFatal("Processor (%p) not found!\n",processor);
142     return false; //not found
143 }
144
145 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) {
146     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s);
147     m_SyncSource=s;
148     return true;
149 }
150
151 bool StreamProcessorManager::init()
152 {
153     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
154     m_isoManager = new IsoHandlerManager(m_thread_realtime, m_thread_priority + 1);
155     if(!m_isoManager) {
156         debugFatal("Could not create IsoHandlerManager\n");
157         return false;
158     }
159     m_isoManager->setVerboseLevel(getDebugLevel());
160     m_isoManager->setTransmitBufferNbPeriods(getNbBuffers() - 1);
161
162     if(!m_isoManager->init()) {
163         debugFatal("Could not initialize IsoHandlerManager\n");
164         return false;
165     }
166
167     m_xrun_happened=false;
168     return true;
169 }
170
171 bool StreamProcessorManager::prepare() {
172
173     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
174
175     m_is_slave=false;
176     if(!getOption("slaveMode", m_is_slave)) {
177         debugWarning("Could not retrieve slaveMode parameter, defaulting to false\n");
178     }
179
180     // if no sync source is set, select one here
181     if(m_SyncSource == NULL) {
182        debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n");
183     }
184
185     // FIXME: put into separate method
186     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
187           it != m_ReceiveProcessors.end();
188           ++it )
189     {
190         if(m_SyncSource == NULL) {
191             debugWarning(" => Sync Source is %p.\n", *it);
192             m_SyncSource = *it;
193         }
194     }
195     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
196           it != m_TransmitProcessors.end();
197           ++it )
198     {
199         if(m_SyncSource == NULL) {
200             debugWarning(" => Sync Source is %p.\n", *it);
201             m_SyncSource = *it;
202         }
203     }
204
205     // now do the actual preparation of the SP's
206     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");
207     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
208         it != m_ReceiveProcessors.end();
209         ++it ) {
210
211         if(!(*it)->setOption("slaveMode", m_is_slave)) {
212             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
213         }
214
215         if(!(*it)->prepare()) {
216             debugFatal(  " could not prepare (%p)...\n",(*it));
217             return false;
218         }
219     }
220     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n");
221     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
222         it != m_TransmitProcessors.end();
223         ++it ) {
224         if(!(*it)->setOption("slaveMode", m_is_slave)) {
225             debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it));
226         }
227         if(!(*it)->prepare()) {
228             debugFatal( " could not prepare (%p)...\n",(*it));
229             return false;
230         }
231     }
232
233     // if there are no stream processors registered,
234     // fail
235     if (m_ReceiveProcessors.size() + m_TransmitProcessors.size() == 0) {
236         debugFatal("No stream processors registered, can't do anything usefull\n");
237         return false;
238     }
239     return true;
240 }
241
242 bool StreamProcessorManager::startDryRunning() {
243     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for StreamProcessor streams to start dry-running...\n");
244     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
245             it != m_ReceiveProcessors.end();
246             ++it ) {
247         if (!(*it)->isDryRunning()) {
248             if(!(*it)->startDryRunning(-1)) {
249                 debugError("Could not put SP %p into the dry-running state\n", *it);
250                 return false;
251             }
252         }
253     }
254     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
255             it != m_TransmitProcessors.end();
256             ++it ) {
257         if (!(*it)->isDryRunning()) {
258             if(!(*it)->startDryRunning(-1)) {
259                 debugError("Could not put SP %p into the dry-running state\n", *it);
260                 return false;
261             }
262         }
263     }
264     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n");
265     return true;
266 }
267
268 bool StreamProcessorManager::syncStartAll() {
269     // figure out when to get the SP's running.
270     // the xmit SP's should also know the base timestamp
271     // streams should be aligned here
272
273     // now find out how long we have to delay the wait operation such that
274     // the received frames will all be presented to the SP
275     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
276     int max_of_min_delay = 0;
277     int min_delay = 0;
278     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
279             it != m_ReceiveProcessors.end();
280             ++it ) {
281         min_delay = (*it)->getMaxFrameLatency();
282         if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
283     }
284
285     // add some processing margin. This only shifts the time
286     // at which the buffer is transfer()'ed. This makes things somewhat
287     // more robust. It should be noted though that shifting the transfer
288     // time to a later time instant also causes the xmit buffer fill to be
289     // lower on average.
290     max_of_min_delay += FFADO_SIGNAL_DELAY_TICKS;
291     debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n",
292         max_of_min_delay,
293         (unsigned int)TICKS_TO_SECS(max_of_min_delay),
294         (unsigned int)TICKS_TO_CYCLES(max_of_min_delay),
295         (unsigned int)TICKS_TO_OFFSET(max_of_min_delay));
296     m_SyncSource->setSyncDelay(max_of_min_delay);
297
298     //STEP X: when we implement such a function, we can wait for a signal from the devices that they
299     //        have aquired lock
300     //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n");
301     //sleep(2); // FIXME: be smarter here
302
303     // make sure that we are dry-running long enough for the
304     // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)?
305     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n");
306     int nb_sync_runs=20;
307     int64_t time_till_next_period;
308     while(nb_sync_runs--) { // or while not sync-ed?
309         // check if we were waked up too soon
310         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
311         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
312         if(time_till_next_period > 0) {
313             // wait for the period
314             usleep(time_till_next_period);
315         }
316     }
317
318     debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
319     // FIXME: in the SPM it would be nice to have system time instead of
320     //        1394 time
321
322     // we now should have decent sync info on the sync source
323     // determine a point in time where the system should start
324     // figure out where we are now
325     uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod();
326     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",
327         time_of_first_sample,
328         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
329         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
330         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
331
332     #define CYCLES_FOR_STARTUP 200
333     // start wet-running in CYCLES_FOR_STARTUP cycles
334     // this is the time window we have to setup all SP's such that they
335     // can start wet-running correctly.
336     time_of_first_sample = addTicks(time_of_first_sample,
337                                     CYCLES_FOR_STARTUP * TICKS_PER_CYCLE);
338
339     debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
340         time_of_first_sample,
341         (unsigned int)TICKS_TO_SECS(time_of_first_sample),
342         (unsigned int)TICKS_TO_CYCLES(time_of_first_sample),
343         (unsigned int)TICKS_TO_OFFSET(time_of_first_sample));
344
345     // we should start wet-running the transmit SP's some cycles in advance
346     // such that we know it is wet-running when it should output its first sample
347     #define PRESTART_CYCLES_FOR_XMIT 20
348     uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
349                                                  PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE);
350
351     #define PRESTART_CYCLES_FOR_RECV 0
352     uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
353                                                  PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE);
354     debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",
355         time_to_start_xmit,
356         (unsigned int)TICKS_TO_SECS(time_to_start_xmit),
357         (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit),
358         (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit));
359     debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",
360         time_to_start_recv,
361         (unsigned int)TICKS_TO_SECS(time_to_start_recv),
362         (unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
363         (unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
364
365     // at this point the buffer head timestamp of the transmit buffers can be set
366     // this is the presentation time of the first sample in the buffer
367     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
368           it != m_TransmitProcessors.end();
369           ++it ) {
370         (*it)->setBufferHeadTimestamp(time_of_first_sample);
371     }
372
373     // STEP X: switch SP's over to the running state
374     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
375           it != m_ReceiveProcessors.end();
376           ++it ) {
377         if(!(*it)->scheduleStartRunning(time_to_start_recv)) {
378             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv);
379             return false;
380         }
381     }
382     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
383           it != m_TransmitProcessors.end();
384           ++it ) {
385         if(!(*it)->scheduleStartRunning(time_to_start_xmit)) {
386             debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit);
387             return false;
388         }
389     }
390     // wait for the syncsource to start running.
391     // that will block the waitForPeriod call until everyone has started (theoretically)
392     int cnt = CYCLES_FOR_STARTUP * 2; // by then it should have started
393     while (!m_SyncSource->isRunning() && cnt) {
394         usleep(125);
395         cnt--;
396     }
397     if(cnt==0) {
398         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n");
399         return false;
400     }
401     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n");
402     return true;
403 }
404
405 bool StreamProcessorManager::start() {
406     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n");
407     assert(m_isoManager);
408
409     debugOutput( DEBUG_LEVEL_VERBOSE, "Creating handlers for the StreamProcessors...\n");
410     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
411     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
412           it != m_ReceiveProcessors.end();
413           ++it )
414     {
415         if (!m_isoManager->registerStream(*it)) {
416             debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it);
417             return false;
418         }
419     }
420     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
421     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
422           it != m_TransmitProcessors.end();
423           ++it )
424     {
425         if (!m_isoManager->registerStream(*it)) {
426             debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it);
427             return false;
428         }
429     }
430
431     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n");
432     if (!m_isoManager->prepare()) {
433         debugFatal("Could not prepare isoManager\n");
434         return false;
435     }
436
437     debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n");
438     if (!m_isoManager->startHandlers(-1)) {
439         debugFatal("Could not start handlers...\n");
440         return false;
441     }
442
443     // put all SP's into dry-running state
444     if (!startDryRunning()) {
445         debugFatal("Could not put SP's in dry-running state\n");
446         return false;
447     }
448
449     // start all SP's synchonized
450     if (!syncStartAll()) {
451         debugFatal("Could not syncStartAll...\n");
452         return false;
453     }
454
455     // dump the iso stream information when in verbose mode
456     if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {
457         m_isoManager->dumpInfo();
458     }
459
460     return true;
461 }
462
463 bool StreamProcessorManager::stop() {
464     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping...\n");
465     assert(m_isoManager);
466
467     debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n");
468
469     // switch SP's over to the dry-running state
470     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
471           it != m_ReceiveProcessors.end();
472           ++it ) {
473         if(!(*it)->scheduleStopRunning(-1)) {
474             debugError("%p->scheduleStopRunning(-1) failed\n", *it);
475             return false;
476         }
477     }
478     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
479           it != m_TransmitProcessors.end();
480           ++it ) {
481         if(!(*it)->scheduleStopRunning(-1)) {
482             debugError("%p->scheduleStopRunning(-1) failed\n", *it);
483             return false;
484         }
485     }
486     // wait for the SP's to get into the dry-running state
487     int cnt = 200;
488     bool ready = false;
489     while (!ready && cnt) {
490         ready = true;
491         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
492             it != m_ReceiveProcessors.end();
493             ++it ) {
494             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
495         }
496         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
497             it != m_TransmitProcessors.end();
498             ++it ) {
499             ready &= ((*it)->isDryRunning() || (*it)->isStopped());
500         }
501         usleep(125);
502         cnt--;
503     }
504     if(cnt==0) {
505         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n");
506         return false;
507     }
508
509     // switch SP's over to the stopped state
510     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
511           it != m_ReceiveProcessors.end();
512           ++it ) {
513         if(!(*it)->scheduleStopDryRunning(-1)) {
514             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
515             return false;
516         }
517     }
518     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
519           it != m_TransmitProcessors.end();
520           ++it ) {
521         if(!(*it)->scheduleStopDryRunning(-1)) {
522             debugError("%p->scheduleStopDryRunning(-1) failed\n", *it);
523             return false;
524         }
525     }
526     // wait for the SP's to get into the running state
527     cnt = 200;
528     ready = false;
529     while (!ready && cnt) {
530         ready = true;
531         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
532             it != m_ReceiveProcessors.end();
533             ++it ) {
534             ready &= (*it)->isStopped();
535         }
536         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
537             it != m_TransmitProcessors.end();
538             ++it ) {
539             ready &= (*it)->isStopped();
540         }
541         usleep(125);
542         cnt--;
543     }
544     if(cnt==0) {
545         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to stop\n");
546         return false;
547     }
548
549     debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n");
550     if(!m_isoManager->stopHandlers()) {
551        debugFatal("Could not stop ISO handlers\n");
552        return false;
553     }
554
555     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processors from handlers...\n");
556     // now unregister all streams from iso manager
557     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
558     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
559           it != m_ReceiveProcessors.end();
560           ++it ) {
561         if (!m_isoManager->unregisterStream(*it)) {
562             debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it);
563             return false;
564         }
565     }
566     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
567     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
568           it != m_TransmitProcessors.end();
569           ++it ) {
570         if (!m_isoManager->unregisterStream(*it)) {
571             debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it);
572             return false;
573         }
574     }
575     return true;
576 }
577
578 /**
579  * Called upon Xrun events. This brings all StreamProcessors back
580  * into their starting state, and then carries on streaming. This should
581  * have the same effect as restarting the whole thing.
582  *
583  * @return true if successful, false otherwise
584  */
585 bool StreamProcessorManager::handleXrun() {
586
587     debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n");
588
589     dumpInfo();
590
591     /*
592      * Reset means:
593      * 1) Disabling the SP's, so that they don't process any packets
594      *    note: the isomanager does keep on delivering/requesting them
595      * 2) Bringing all buffers & streamprocessors into a know state
596      *    - Clear all capture buffers
597      *    - Put nb_periods*period_size of null frames into the playback buffers
598      * 3) Re-enable the SP's
599      */
600
601     // put all SP's back into dry-running state
602     if (!startDryRunning()) {
603         debugFatal("Could not put SP's in dry-running state\n");
604         return false;
605     }
606
607     debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n");
608     // start all SP's synchonized
609     if (!syncStartAll()) {
610         debugFatal("Could not syncStartAll...\n");
611         return false;
612     }
613
614     debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n");
615
616     return true;
617 }
618
619 /**
620  * @brief Waits until the next period of samples is ready
621  *
622  * This function does not return until a full period of samples is (or should be)
623  * ready to be transferred.
624  *
625  * @return true if the period is ready, false if an xrun occurred
626  */
627 bool StreamProcessorManager::waitForPeriod() {
628     int time_till_next_period;
629     bool xrun_occurred = false;
630
631     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
632
633     assert(m_SyncSource);
634
635     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
636
637     while(time_till_next_period > 0) {
638         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
639
640         // wait for the period
641         usleep(time_till_next_period);
642
643         // check for underruns on the ISO side,
644         // those should make us bail out of the wait loop
645         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
646             it != m_ReceiveProcessors.end();
647             ++it ) {
648             // a xrun has occurred on the Iso side
649             xrun_occurred |= (*it)->xrunOccurred();
650         }
651         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
652             it != m_TransmitProcessors.end();
653             ++it ) {
654             // a xrun has occurred on the Iso side
655             xrun_occurred |= (*it)->xrunOccurred();
656         }
657         if(xrun_occurred) break;
658
659         // check if we were waked up too soon
660         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
661     }
662
663     // we save the 'ideal' time of the transfer at this point,
664     // because we can have interleaved read - process - write
665     // cycles making that we modify a receiving stream's buffer
666     // before we get to writing.
667     // NOTE: before waitForPeriod() is called again, both the transmit
668     //       and the receive processors should have done their transfer.
669     m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
670     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n",
671         m_time_of_transfer);
672
673     // normally we can transfer frames at this time, but in some cases this is not true
674     // e.g. when there are not enough frames in the receive buffer.
675     // however this doesn't have to be a problem, since we can wait some more until we
676     // have enough frames. There is only a problem once the ISO xmit doesn't have packets
677     // to transmit, or if the receive buffer overflows. These conditions are signaled by
678     // the iso threads
679     // check if xruns occurred on the Iso side.
680     // also check if xruns will occur should we transfer() now
681     #ifdef DEBUG
682     int waited = -1;
683     #endif
684     bool ready_for_transfer = false;
685     xrun_occurred = false;
686     while (!ready_for_transfer && !xrun_occurred) {
687         ready_for_transfer = true;
688         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
689             it != m_ReceiveProcessors.end();
690             ++it ) {
691             ready_for_transfer &= ((*it)->canClientTransferFrames(m_period));
692             xrun_occurred |= (*it)->xrunOccurred();
693         }
694         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
695             it != m_TransmitProcessors.end();
696             ++it ) {
697             ready_for_transfer &= ((*it)->canClientTransferFrames(m_period));
698             xrun_occurred |= (*it)->xrunOccurred();
699         }
700         usleep(125); // MAGIC: one cycle sleep...
701         #ifdef DEBUG
702         waited++;
703         #endif
704     } // we are either ready or an xrun occurred
705
706     #ifdef DEBUG
707     if(waited > 0) {
708         debugOutput(DEBUG_LEVEL_VERBOSE, "Waited %d x 125us due to SP not ready for transfer\n", waited);
709     }
710     #endif
711
712     // this is to notify the client of the delay that we introduced by waiting
713     m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
714     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", m_delayed_usecs);
715
716 #ifdef DEBUG
717     int rcv_bf=0, xmt_bf=0;
718     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
719         it != m_ReceiveProcessors.end();
720         ++it ) {
721         rcv_bf = (*it)->getBufferFill();
722     }
723     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
724         it != m_TransmitProcessors.end();
725         ++it ) {
726         xmt_bf = (*it)->getBufferFill();
727     }
728     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n",
729         m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf);
730
731     // check if xruns occurred on the Iso side.
732     // also check if xruns will occur should we transfer() now
733     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
734           it != m_ReceiveProcessors.end();
735           ++it ) {
736
737         if ((*it)->xrunOccurred()) {
738             debugWarning("Xrun on RECV SP %p due to ISO side xrun\n",*it);
739             (*it)->dumpInfo();
740         }
741         if (!((*it)->canClientTransferFrames(m_period))) {
742             debugWarning("Xrun on RECV SP %p due to buffer side xrun\n",*it);
743             (*it)->dumpInfo();
744         }
745     }
746     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
747           it != m_TransmitProcessors.end();
748           ++it ) {
749         if ((*it)->xrunOccurred()) {
750             debugWarning("Xrun on XMIT SP %p due to ISO side xrun\n",*it);
751         }
752         if (!((*it)->canClientTransferFrames(m_period))) {
753             debugWarning("Xrun on XMIT SP %p due to buffer side xrun\n",*it);
754         }
755     }
756 #endif
757
758     m_nbperiods++;
759     // now we can signal the client that we are (should be) ready
760     return !xrun_occurred;
761 }
762
763 /**
764  * @brief Transfer one period of frames for both receive and transmit StreamProcessors
765  *
766  * Transfers one period of frames from the client side to the Iso side and vice versa.
767  *
768  * @return true if successful, false otherwise (indicates xrun).
769  */
770 bool StreamProcessorManager::transfer() {
771
772     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
773     bool retval=true;
774     retval &= transfer(StreamProcessor::ePT_Receive);
775     retval &= transfer(StreamProcessor::ePT_Transmit);
776     return retval;
777 }
778
779 /**
780  * @brief Transfer one period of frames for either the receive or transmit StreamProcessors
781  *
782  * Transfers one period of frames from the client side to the Iso side or vice versa.
783  *
784  * @param t The processor type to tranfer for (receive or transmit)
785  * @return true if successful, false otherwise (indicates xrun).
786  */
787
788 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) {
789     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",
790         t, m_time_of_transfer,
791         (unsigned int)TICKS_TO_SECS(m_time_of_transfer),
792         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer),
793         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer));
794
795     bool retval = true;
796     // a static cast could make sure that there is no performance
797     // penalty for the virtual functions (to be checked)
798     if (t==StreamProcessor::ePT_Receive) {
799         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
800                 it != m_ReceiveProcessors.end();
801                 ++it ) {
802             if(!(*it)->getFrames(m_period, m_time_of_transfer)) {
803                     debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n",
804                             m_period, m_time_of_transfer,*it);
805                 retval &= false; // buffer underrun
806             }
807         }
808     } else {
809         // FIXME: in the SPM it would be nice to have system time instead of
810         //        1394 time
811         float rate = m_SyncSource->getTicksPerFrame();
812         int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
813
814         // the data we are putting into the buffer is intended to be transmitted
815         // one ringbuffer size after it has been received
816         int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
817
818         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
819                 it != m_TransmitProcessors.end();
820                 ++it ) {
821             // FIXME: in the SPM it would be nice to have system time instead of
822             //        1394 time
823             if(!(*it)->putFrames(m_period, transmit_timestamp)) {
824                 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
825                         m_period, transmit_timestamp, *it);
826                 retval &= false; // buffer underrun
827             }
828         }
829     }
830     return retval;
831 }
832
833 void StreamProcessorManager::dumpInfo() {
834     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
835     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
836     debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods);
837
838     debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
839     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
840         it != m_ReceiveProcessors.end();
841         ++it ) {
842         (*it)->dumpInfo();
843     }
844
845     debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
846     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
847         it != m_TransmitProcessors.end();
848         ++it ) {
849         (*it)->dumpInfo();
850     }
851
852     debugOutputShort( DEBUG_LEVEL_NORMAL, "Iso handler info:\n");
853     m_isoManager->dumpInfo();
854     debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n");
855
856 }
857
858 void StreamProcessorManager::setVerboseLevel(int l) {
859     setDebugLevel(l);
860
861     if (m_isoManager) m_isoManager->setVerboseLevel(l);
862
863     debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n");
864     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
865         it != m_ReceiveProcessors.end();
866         ++it ) {
867         (*it)->setVerboseLevel(l);
868     }
869
870     debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n");
871     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
872         it != m_TransmitProcessors.end();
873         ++it ) {
874         (*it)->setVerboseLevel(l);
875     }
876 }
877
878
879 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
880     int count=0;
881
882     if (direction == Port::E_Capture) {
883         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
884             it != m_ReceiveProcessors.end();
885             ++it ) {
886             count += (*it)->getPortCount(type);
887         }
888     } else {
889         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
890             it != m_TransmitProcessors.end();
891             ++it ) {
892             count += (*it)->getPortCount(type);
893         }
894     }
895     return count;
896 }
897
898 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
899     int count=0;
900
901     if (direction == Port::E_Capture) {
902         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
903             it != m_ReceiveProcessors.end();
904             ++it ) {
905             count += (*it)->getPortCount();
906         }
907     } else {
908         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
909             it != m_TransmitProcessors.end();
910             ++it ) {
911             count += (*it)->getPortCount();
912         }
913     }
914     return count;
915 }
916
917 // TODO: implement a port map here, instead of the loop
918
919 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
920     int count=0;
921     int prevcount=0;
922
923     if (direction == Port::E_Capture) {
924         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
925             it != m_ReceiveProcessors.end();
926             ++it ) {
927             count += (*it)->getPortCount();
928             if (count > idx) {
929                 return (*it)->getPortAtIdx(idx-prevcount);
930             }
931             prevcount=count;
932         }
933     } else {
934         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
935             it != m_TransmitProcessors.end();
936             ++it ) {
937             count += (*it)->getPortCount();
938             if (count > idx) {
939                 return (*it)->getPortAtIdx(idx-prevcount);
940             }
941             prevcount=count;
942         }
943     }
944     return NULL;
945 }
946
947 bool StreamProcessorManager::setThreadParameters(bool rt, int priority) {
948     m_thread_realtime=rt;
949     m_thread_priority=priority;
950     return true;
951 }
952
953
954 } // end of namespace
Note: See TracBrowser for help on using the browser.