root/branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.cpp

Revision 727, 54.4 kB (checked in by ppalmers, 15 years ago)

stream alignment implemented

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public
11  * License version 2.1, as published by the Free Software Foundation;
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public
19  * License along with this library; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
21  * MA 02110-1301 USA
22  */
23
24 #include "StreamProcessor.h"
25 #include "../util/cycletimer.h"
26 #include "../StreamProcessorManager.h"
27
28 #include "libutil/Atomic.h"
29
30 #include <assert.h>
31 #include <math.h>
32
33 namespace Streaming {
34
35 IMPL_DEBUG_MODULE( StreamProcessor, StreamProcessor, DEBUG_LEVEL_VERBOSE );
36
37 StreamProcessor::StreamProcessor(enum eProcessorType type, int port)
38     : IsoStream((type==ePT_Receive ? IsoStream::eST_Receive : IsoStream::eST_Transmit), port)
39     , m_processor_type ( type )
40     , m_state( ePS_Created )
41     , m_next_state( ePS_Invalid )
42     , m_cycle_to_switch_state( 0 )
43     , m_scratch_buffer( NULL )
44     , m_scratch_buffer_size_bytes( 0 )
45     , m_manager( NULL )
46     , m_ticks_per_frame( 0 )
47     , m_last_cycle( -1 )
48     , m_sync_delay( 0 )
49     , m_in_xrun( false )
50     , m_last_timestamp(0)
51     , m_last_timestamp2(0)
52     , m_dropped(0)
53 {
54     // create the timestamped buffer and register ourselves as its client
55     m_data_buffer = new Util::TimestampedBuffer(this);
56 }
57
58 StreamProcessor::~StreamProcessor() {
59     if (m_data_buffer) delete m_data_buffer;
60     if (m_scratch_buffer) delete[] m_scratch_buffer;
61 }
62
63 uint64_t StreamProcessor::getTimeNow() {
64     return m_handler->getCycleTimerTicks();
65 }
66
67 int StreamProcessor::getMaxFrameLatency() {
68     if (getType() == ePT_Receive) {
69         return (int)(m_handler->getWakeupInterval() * TICKS_PER_CYCLE);
70     } else {
71         return (int)(m_handler->getWakeupInterval() * TICKS_PER_CYCLE);
72     }
73 }
74
75 /***********************************************
76  * Buffer management and manipulation          *
77  ***********************************************/
78 int StreamProcessor::getBufferFill() {
79     return m_data_buffer->getBufferFill();
80 }
81
82 int64_t
83 StreamProcessor::getTimeUntilNextPeriodSignalUsecs()
84 {
85     uint64_t time_at_period=getTimeAtPeriod();
86
87     // we delay the period signal with the sync delay
88     // this makes that the period signals lag a little compared to reality
89     // ISO buffering causes the packets to be received at max
90     // m_handler->getWakeupInterval() later than the time they were received.
91     // hence their payload is available this amount of time later. However, the
92     // period boundary is predicted based upon earlier samples, and therefore can
93     // pass before these packets are processed. Adding this extra term makes that
94     // the period boundary is signalled later
95     time_at_period = addTicks(time_at_period, m_manager->getSyncSource().getSyncDelay());
96
97     uint64_t cycle_timer=m_handler->getCycleTimerTicks();
98
99     // calculate the time until the next period
100     int32_t until_next=diffTicks(time_at_period,cycle_timer);
101
102     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> TAP=%11llu, CTR=%11llu, UTN=%11ld\n",
103         time_at_period, cycle_timer, until_next
104         );
105
106     // now convert to usecs
107     // don't use the mapping function because it only works
108     // for absolute times, not the relative time we are
109     // using here (which can also be negative).
110     return (int64_t)(((float)until_next) / TICKS_PER_USEC);
111 }
112
113 uint64_t
114 StreamProcessor::getTimeAtPeriodUsecs()
115 {
116     return (uint64_t)((float)getTimeAtPeriod() * TICKS_PER_USEC);
117 }
118
119 uint64_t
120 StreamProcessor::getTimeAtPeriod()
121 {
122     if (getType() == ePT_Receive) {
123         ffado_timestamp_t next_period_boundary=m_data_buffer->getTimestampFromHead(m_manager->getPeriodSize());
124    
125         #ifdef DEBUG
126         ffado_timestamp_t ts;
127         signed int fc;
128         m_data_buffer->getBufferTailTimestamp(&ts,&fc);
129    
130         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD="TIMESTAMP_FORMAT_SPEC", LTS="TIMESTAMP_FORMAT_SPEC", FC=%5u, TPF=%f\n",
131             next_period_boundary, ts, fc, getTicksPerFrame()
132             );
133         #endif
134         return (uint64_t)next_period_boundary;
135     } else {
136         ffado_timestamp_t next_period_boundary=m_data_buffer->getTimestampFromTail((m_manager->getNbBuffers()-1) * m_manager->getPeriodSize());
137    
138         #ifdef DEBUG
139         ffado_timestamp_t ts;
140         signed int fc;
141         m_data_buffer->getBufferTailTimestamp(&ts,&fc);
142    
143         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD="TIMESTAMP_FORMAT_SPEC", LTS="TIMESTAMP_FORMAT_SPEC", FC=%5u, TPF=%f\n",
144             next_period_boundary, ts, fc, getTicksPerFrame()
145             );
146         #endif
147         return (uint64_t)next_period_boundary;
148     }
149 }
150
151 float
152 StreamProcessor::getTicksPerFrame()
153 {
154     assert(m_data_buffer != NULL);
155     return m_data_buffer->getRate();
156 }
157
158 bool
159 StreamProcessor::canClientTransferFrames(unsigned int nbframes)
160 {
161     bool can_transfer;
162     unsigned int fc = m_data_buffer->getFrameCounter();
163     if (getType() == ePT_Receive) {
164         can_transfer = (fc >= nbframes);
165     } else {
166         // there has to be enough space to put the frames in
167         can_transfer = m_data_buffer->getBufferSize() - fc > nbframes;
168         // or the buffer is transparent
169         can_transfer |= m_data_buffer->isTransparent();
170     }
171    
172     #ifdef DEBUG
173     if (!can_transfer) {
174         debugWarning("(%p, %s) cannot transfer since fc == %u, nbframes == %u\n",
175             this, ePTToString(getType()), fc, nbframes);
176     }
177     #endif
178    
179     return can_transfer;
180 }
181
182 /***********************************************
183  * I/O API                                     *
184  ***********************************************/
185
186 // Packet transfer API
187 enum raw1394_iso_disposition
188 StreamProcessor::putPacket(unsigned char *data, unsigned int length,
189                            unsigned char channel, unsigned char tag, unsigned char sy,
190                            unsigned int cycle, unsigned int dropped) {
191     if(m_last_cycle == -1) {
192         debugOutput(DEBUG_LEVEL_VERBOSE, "Handler for %s SP %p is alive (cycle = %u)\n", getTypeString(), this, cycle);
193     }
194
195     int dropped_cycles = 0;
196     if (m_last_cycle != (int)cycle && m_last_cycle != -1) {
197         dropped_cycles = diffCycles(cycle, m_last_cycle) - 1;
198         if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles);
199         if (dropped_cycles > 0) {
200             debugWarning("(%p) dropped %d packets on cycle %u\n", this, dropped_cycles, cycle);
201             m_dropped += dropped_cycles;
202         }
203     }
204     m_last_cycle = cycle;
205
206     // bypass based upon state
207     if (m_state == ePS_Invalid) {
208         debugError("Should not have state %s\n", ePSToString(m_state) );
209         return RAW1394_ISO_ERROR;
210     }
211     if (m_state == ePS_Created) {
212         return RAW1394_ISO_DEFER;
213     }
214
215     // store the previous timestamp
216     m_last_timestamp2 = m_last_timestamp;
217
218     // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles)
219     //       it happens on the first 'good' cycle for the wait condition
220     //       or on the first received cycle that is received afterwards (might be a problem)
221
222     // check whether we are waiting for a stream to be disabled
223     if(m_state == ePS_WaitingForStreamDisable) {
224         // we then check whether we have to switch on this cycle
225         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
226             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n");
227             m_next_state = ePS_DryRunning;
228             if (!updateState()) { // we are allowed to change the state directly
229                 debugError("Could not update state!\n");
230                 return RAW1394_ISO_ERROR;
231             }
232         } else {
233             // not time to disable yet
234         }
235         // the received data can be discarded while waiting for the stream
236         // to be disabled
237         return RAW1394_ISO_OK;
238     }
239
240     // check whether we are waiting for a stream to be enabled
241     else if(m_state == ePS_WaitingForStreamEnable) {
242         // we then check whether we have to switch on this cycle
243         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
244             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n");
245             m_next_state = ePS_Running;
246             if (!updateState()) { // we are allowed to change the state directly
247                 debugError("Could not update state!\n");
248                 return RAW1394_ISO_ERROR;
249             }
250         } else {
251             // not time to enable yet
252         }
253         // we are dryRunning hence data should be processed in any case
254     }
255
256     // check the packet header
257     enum eChildReturnValue result = processPacketHeader(data, length, channel, tag, sy, cycle, dropped_cycles);
258     if (result == eCRV_OK) {
259         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "RECV: CY=%04u TS=%011llu\n",
260                 cycle, m_last_timestamp);
261         // update some accounting
262         m_last_good_cycle = cycle;
263         m_last_dropped = dropped_cycles;
264
265         // check whether we are waiting for a stream to startup
266         // this requires that the packet is good
267         if(m_state == ePS_WaitingForStream) {
268             // since we have a packet with an OK header,
269             // we can indicate that the stream started up
270
271             // we then check whether we have to switch on this cycle
272             if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
273                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to good packet\n");
274                 // hence go to the dryRunning state
275                 m_next_state = ePS_DryRunning;
276                 if (!updateState()) { // we are allowed to change the state directly
277                     debugError("Could not update state!\n");
278                     return RAW1394_ISO_ERROR;
279                 }
280             } else {
281                 // not time (yet) to switch state
282             }
283             // in both cases we don't want to process the data
284             return RAW1394_ISO_OK;
285         }
286
287         // check whether a state change has been requested
288         // note that only the wait state changes are synchronized with the cycles
289         else if(m_state != m_next_state) {
290             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
291                                              ePSToString(m_state), ePSToString(m_next_state));
292             // execute the requested change
293             if (!updateState()) { // we are allowed to change the state directly
294                 debugError("Could not update state!\n");
295                 return RAW1394_ISO_ERROR;
296             }
297         }
298
299         // handle dropped cycles
300         if(dropped_cycles) {
301             // they represent a discontinuity in the timestamps, and hence are
302             // to be dealt with
303             debugWarning("(%p) Correcting timestamp for dropped cycles, discarding packet...\n", this);
304             m_data_buffer->setBufferTailTimestamp(m_last_timestamp);
305             if (m_state == ePS_Running) {
306                 // this is an xrun situation
307                 m_in_xrun = true;
308                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to dropped packet xrun\n");
309                 m_cycle_to_switch_state = cycle + 1; // switch in the next cycle
310                 m_next_state = ePS_WaitingForStreamDisable;
311                 // execute the requested change
312                 if (!updateState()) { // we are allowed to change the state directly
313                     debugError("Could not update state!\n");
314                     return RAW1394_ISO_ERROR;
315                 }
316                 return RAW1394_ISO_DEFER;
317             }
318         }
319
320         // for all states that reach this we are allowed to
321         // do protocol specific data reception
322         enum eChildReturnValue result2 = processPacketData(data, length, channel, tag, sy, cycle, dropped_cycles);
323
324         // if an xrun occured, switch to the dryRunning state and
325         // allow for the xrun to be picked up
326         if (result2 == eCRV_XRun) {
327             m_in_xrun = true;
328             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
329             m_cycle_to_switch_state = cycle+1; // switch in the next cycle
330             m_next_state = ePS_WaitingForStreamDisable;
331             // execute the requested change
332             if (!updateState()) { // we are allowed to change the state directly
333                 debugError("Could not update state!\n");
334                 return RAW1394_ISO_ERROR;
335             }
336             return RAW1394_ISO_DEFER;
337         } else if(result2 == eCRV_OK) {
338             // no problem here
339             return RAW1394_ISO_OK;
340         } else {
341             debugError("Invalid response\n");
342             return RAW1394_ISO_ERROR;
343         }
344     } else if(result == eCRV_Invalid) {
345         // apparently we don't have to do anything when the packets are not valid
346         return RAW1394_ISO_OK;
347     } else {
348         debugError("Invalid response\n");
349         return RAW1394_ISO_ERROR;
350     }
351     debugError("reached the unreachable\n");
352     return RAW1394_ISO_ERROR;
353 }
354
355 enum raw1394_iso_disposition
356 StreamProcessor::getPacket(unsigned char *data, unsigned int *length,
357                            unsigned char *tag, unsigned char *sy,
358                            int cycle, unsigned int dropped, unsigned int max_length) {
359     if (cycle<0) {
360         *tag = 0;
361         *sy = 0;
362         *length = 0;
363         return RAW1394_ISO_OK;
364     }
365
366     if(m_last_cycle == -1) {
367         debugOutput(DEBUG_LEVEL_VERBOSE, "Handler for %s SP %p is alive (cycle = %d)\n", getTypeString(), this, cycle);
368     }
369
370     int dropped_cycles = 0;
371     if (m_last_cycle != cycle && m_last_cycle != -1) {
372         dropped_cycles = diffCycles(cycle, m_last_cycle) - 1;
373         if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles);
374         if (dropped_cycles > 0) {
375             debugWarning("(%p) dropped %d packets on cycle %u (last_cycle=%u, dropped=%d)\n", this, dropped_cycles, cycle, m_last_cycle, dropped);
376             m_dropped += dropped_cycles;
377         }
378     }
379     if (cycle >= 0) {
380         m_last_cycle = cycle;
381     }
382
383     // bypass based upon state
384     if (m_state == ePS_Invalid) {
385         debugError("Should not have state %s\n", ePSToString(m_state) );
386         return RAW1394_ISO_ERROR;
387     }
388     if (m_state == ePS_Created) {
389         *tag = 0;
390         *sy = 0;
391         *length = 0;
392         return RAW1394_ISO_DEFER;
393     }
394
395     // normal processing
396     // note that we can't use getCycleTimer directly here,
397     // because packets are queued in advance. This means that
398     // we the packet we are constructing will be sent out
399     // on 'cycle', not 'now'.
400     unsigned int ctr = m_handler->getCycleTimer();
401     int now_cycles = (int)CYCLE_TIMER_GET_CYCLES(ctr);
402
403     // the difference between the cycle this
404     // packet is intended for and 'now'
405     int cycle_diff = diffCycles(cycle, now_cycles);
406
407     #ifdef DEBUG
408     if(cycle_diff < 0) {
409         debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy)\n",
410             cycle, now_cycles);
411     }
412     #endif
413
414     // store the previous timestamp
415     m_last_timestamp2 = m_last_timestamp;
416
417     // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles)
418     //       it happens on the first 'good' cycle for the wait condition
419     //       or on the first received cycle that is received afterwards (might be a problem)
420
421     // check whether we are waiting for a stream to be disabled
422     if(m_state == ePS_WaitingForStreamDisable) {
423         // we then check whether we have to switch on this cycle
424         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
425             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n");
426             m_next_state = ePS_DryRunning;
427             if (!updateState()) { // we are allowed to change the state directly
428                 debugError("Could not update state!\n");
429                 return RAW1394_ISO_ERROR;
430             }
431         } else {
432             // not time to disable yet
433         }
434     }
435     // check whether we are waiting for a stream to be enabled
436     else if(m_state == ePS_WaitingForStreamEnable) {
437         // we then check whether we have to switch on this cycle
438         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
439             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n");
440             m_next_state = ePS_Running;
441             if (!updateState()) { // we are allowed to change the state directly
442                 debugError("Could not update state!\n");
443                 return RAW1394_ISO_ERROR;
444             }
445         } else {
446             // not time to enable yet
447         }
448         // we are dryRunning hence data should be processed in any case
449     }
450     // check whether we are waiting for a stream to startup
451     else if(m_state == ePS_WaitingForStream) {
452         // as long as the cycle parameter is not in sync with
453         // the current time, the stream is considered not
454         // to be 'running'
455         // we then check whether we have to switch on this cycle
456         if ((cycle_diff >= 0) && (diffCycles(cycle, m_cycle_to_switch_state) >= 0)) {
457             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to good packet\n");
458             // hence go to the dryRunning state
459             m_next_state = ePS_DryRunning;
460             if (!updateState()) { // we are allowed to change the state directly
461                 debugError("Could not update state!\n");
462                 return RAW1394_ISO_ERROR;
463             }
464         } else {
465             // not time (yet) to switch state
466         }
467     }
468     else if(m_state == ePS_Running) {
469         // check the packet header
470         enum eChildReturnValue result = generatePacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
471         if (result == eCRV_Packet) {
472             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT: CY=%04u TS=%011llu\n",
473                     cycle, m_last_timestamp);
474             // update some accounting
475             m_last_good_cycle = cycle;
476             m_last_dropped = dropped_cycles;
477
478             // check whether a state change has been requested
479             // note that only the wait state changes are synchronized with the cycles
480             if(m_state != m_next_state) {
481                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
482                                                 ePSToString(m_state), ePSToString(m_next_state));
483                 // execute the requested change
484                 if (!updateState()) { // we are allowed to change the state directly
485                     debugError("Could not update state!\n");
486                     return RAW1394_ISO_ERROR;
487                 }
488             }
489
490             enum eChildReturnValue result2 = generatePacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
491             // if an xrun occured, switch to the dryRunning state and
492             // allow for the xrun to be picked up
493             if (result2 == eCRV_XRun) {
494                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
495                 m_in_xrun = true;
496                 m_cycle_to_switch_state = cycle+1; // switch in the next cycle
497                 m_next_state = ePS_WaitingForStreamDisable;
498                 // execute the requested change
499                 if (!updateState()) { // we are allowed to change the state directly
500                     debugError("Could not update state!\n");
501                     return RAW1394_ISO_ERROR;
502                 }
503                 goto send_empty_packet;
504             }
505             return RAW1394_ISO_OK;
506         } else if (result == eCRV_XRun) { // pick up the possible xruns
507             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to header xrun\n");
508             m_in_xrun = true;
509             m_cycle_to_switch_state = cycle+1; // switch in the next cycle
510             m_next_state = ePS_WaitingForStreamDisable;
511             // execute the requested change
512             if (!updateState()) { // we are allowed to change the state directly
513                 debugError("Could not update state!\n");
514                 return RAW1394_ISO_ERROR;
515             }
516         } else if ((result == eCRV_EmptyPacket) || (result == eCRV_Again)) {
517             if(m_state != m_next_state) {
518                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
519                                                 ePSToString(m_state), ePSToString(m_next_state));
520                 // execute the requested change
521                 if (!updateState()) { // we are allowed to change the state directly
522                     debugError("Could not update state!\n");
523                     return RAW1394_ISO_ERROR;
524                 }
525             }
526             goto send_empty_packet;
527 //         } else if (result == eCRV_Again) {
528 //             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "have to retry cycle %d\n", cycle);
529 //             if(m_state != m_next_state) {
530 //                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
531 //                                                 ePSToString(m_state), ePSToString(m_next_state));
532 //                 // execute the requested change
533 //                 if (!updateState()) { // we are allowed to change the state directly
534 //                     debugError("Could not update state!\n");
535 //                     return RAW1394_ISO_ERROR;
536 //                 }
537 //             }
538 //             // force some delay
539 //             usleep(125);
540 //             return RAW1394_ISO_AGAIN;
541         } else {
542             debugError("Invalid return value: %d\n", result);
543             return RAW1394_ISO_ERROR;
544         }
545     }
546     // we are not running, so send an empty packet
547     // we should generate a valid packet any time
548 send_empty_packet:
549     // note that only the wait state changes are synchronized with the cycles
550     if(m_state != m_next_state) {
551         debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
552                                         ePSToString(m_state), ePSToString(m_next_state));
553         // execute the requested change
554         if (!updateState()) { // we are allowed to change the state directly
555             debugError("Could not update state!\n");
556             return RAW1394_ISO_ERROR;
557         }
558     }
559
560     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT EMPTY: CY=%04u\n", cycle);
561     generateSilentPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
562     generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
563     return RAW1394_ISO_DEFER;
564 }
565
566
567 // Frame Transfer API
568 /**
569  * Transfer a block of frames from the event buffer to the port buffers
570  * @param nbframes number of frames to transfer
571  * @param ts the timestamp that the LAST frame in the block should have
572  * @return
573  */
574 bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) {
575     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.getFrames(%d, %11llu)", nbframes, ts);
576     assert( getType() == ePT_Receive );
577     if(isDryRunning()) return getFramesDry(nbframes, ts);
578     else return getFramesWet(nbframes, ts);
579 }
580
581 bool StreamProcessor::getFramesWet(unsigned int nbframes, int64_t ts) {
582 // FIXME: this should be done somewhere else
583 #ifdef DEBUG
584     uint64_t ts_expected;
585     signed int fc;
586     int32_t lag_ticks;
587     float lag_frames;
588
589     // in order to sync up multiple received streams, we should
590     // use the ts parameter. It specifies the time of the block's
591     // last sample.
592     float srate = m_manager->getSyncSource().getTicksPerFrame();
593     assert(srate != 0.0);
594     int64_t this_block_length_in_ticks = (int64_t)(((float)nbframes) * srate);
595
596     ffado_timestamp_t ts_head_tmp;
597     m_data_buffer->getBufferHeadTimestamp(&ts_head_tmp, &fc);
598     ts_expected = addTicks((uint64_t)ts_head_tmp, this_block_length_in_ticks);
599
600     lag_ticks = diffTicks(ts, ts_expected);
601     lag_frames = (((float)lag_ticks) / srate);
602     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): drifts %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
603                  this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
604     if (lag_frames >= 1.0) {
605         // the stream lags
606         debugWarning( "stream (%p): lags  with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
607                       this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
608     } else if (lag_frames <= -1.0) {
609         // the stream leads
610         debugWarning( "stream (%p): leads with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
611                       this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
612     }
613 #endif
614     // ask the buffer to process nbframes of frames
615     // using it's registered client's processReadBlock(),
616     // which should be ours
617     m_data_buffer->blockProcessReadFrames(nbframes);
618     return true;
619 }
620
621 bool StreamProcessor::getFramesDry(unsigned int nbframes, int64_t ts)
622 {
623     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): dry run %d frames (@ ts=%lld)\n",
624                  this, nbframes, ts);
625     // dry run on this side means that we put silence in all enabled ports
626     // since there is do data put into the ringbuffer in the dry-running state
627     return provideSilenceBlock(nbframes, 0);
628 }
629
630 bool
631 StreamProcessor::dropFrames(unsigned int nbframes, int64_t ts)
632 {
633     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d, %lld)\n", nbframes, ts);
634     return m_data_buffer->dropFrames(nbframes);
635 }
636
637 bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts)
638 {
639     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.putFrames(%d, %11llu)", nbframes, ts);
640     assert( getType() == ePT_Transmit );
641     if(isDryRunning()) return putFramesDry(nbframes, ts);
642     else return putFramesWet(nbframes, ts);
643 }
644
645 bool
646 StreamProcessor::putFramesWet(unsigned int nbframes, int64_t ts)
647 {
648     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesWet(%d, %llu)\n", nbframes, ts);
649     // transfer the data
650     m_data_buffer->blockProcessWriteFrames(nbframes, ts);
651     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, " New timestamp: %llu\n", ts);
652     return true; // FIXME: what about failure?
653 }
654
655 bool
656 StreamProcessor::putFramesDry(unsigned int nbframes, int64_t ts)
657 {
658     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesDry(%d, %llu)\n", nbframes, ts);
659     // do nothing
660     return true;
661 }
662
663 bool
664 StreamProcessor::putSilenceFrames(unsigned int nbframes, int64_t ts)
665 {
666     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putSilenceFrames(%d, %llu)\n", nbframes, ts);
667
668     size_t bytes_per_frame = getEventSize() * getEventsPerFrame();
669     unsigned int scratch_buffer_size_frames = m_scratch_buffer_size_bytes / bytes_per_frame;
670
671     if (nbframes > scratch_buffer_size_frames) {
672         debugError("nframes (%u) > scratch_buffer_size_frames (%u)\n",
673                    nbframes, scratch_buffer_size_frames);
674     }
675
676     assert(m_scratch_buffer);
677     if(!transmitSilenceBlock((char *)m_scratch_buffer, nbframes, 0)) {
678         debugError("Could not prepare silent block\n");
679         return false;
680     }
681     if(!m_data_buffer->writeFrames(nbframes, (char *)m_scratch_buffer, ts)) {
682         debugError("Could not write silent block\n");
683         return false;
684     }
685     return true;
686 }
687
688 bool
689 StreamProcessor::shiftStream(int nbframes)
690 {
691     if(nbframes == 0) return true;
692     if(nbframes > 0) {
693         return m_data_buffer->dropFrames(nbframes);
694     } else {
695         bool result = true;
696         while(nbframes--) {
697             result &= m_data_buffer->writeDummyFrame();
698         }
699         return result;
700     }
701 }
702
703 /***********************************************
704  * State related API                           *
705  ***********************************************/
706 bool StreamProcessor::init()
707 {
708     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "init...\n");
709
710     // initialization can be done without requesting it
711     // from the packet loop
712     m_next_state = ePS_Created;
713     return true;
714 }
715
716 bool StreamProcessor::prepare()
717 {
718     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare SP (%p)...\n", this);
719     if(!m_manager) {
720         debugFatal("Not attached to a manager!\n");
721         return false;
722     }
723
724     // make the scratch buffer one period of frames long
725     m_scratch_buffer_size_bytes = m_manager->getPeriodSize() * getEventsPerFrame() * getEventSize();
726     debugOutput( DEBUG_LEVEL_VERBOSE, " Allocate scratch buffer of %d quadlets\n");
727     if(m_scratch_buffer) delete[] m_scratch_buffer;
728     m_scratch_buffer = new byte_t[m_scratch_buffer_size_bytes];
729     if(m_scratch_buffer == NULL) {
730         debugFatal("Could not allocate scratch buffer\n");
731         return false;
732     }
733
734     if (!prepareChild()) {
735         debugFatal("Could not prepare child\n");
736         return false;
737     }
738
739     // initialization can be done without requesting it
740     // from the packet loop
741     m_next_state = ePS_Stopped;
742     return updateState();
743 }
744
745 bool
746 StreamProcessor::scheduleStateTransition(enum eProcessorState state, uint64_t time_instant)
747 {
748     // first set the time, since in the packet loop we first check m_state == m_next_state before
749     // using the time
750     m_cycle_to_switch_state = TICKS_TO_CYCLES(time_instant);
751     m_next_state = state;
752     return true;
753 }
754
755 bool
756 StreamProcessor::waitForState(enum eProcessorState state, unsigned int timeout_ms)
757 {
758     debugOutput(DEBUG_LEVEL_VERBOSE, "Waiting for state %s\n", ePSToString(state));
759     int cnt = timeout_ms;
760     while (m_state != state && cnt) {
761         usleep(1000);
762         cnt--;
763     }
764     if(cnt==0) {
765         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout\n");
766         return false;
767     }
768     return true;
769 }
770
771 bool StreamProcessor::scheduleStartDryRunning(int64_t t) {
772     uint64_t tx;
773     if (t < 0) {
774         tx = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
775     } else {
776         tx = t;
777     }
778     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
779     uint64_t now = m_handler->getCycleTimerTicks();
780     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
781                           now,
782                           (unsigned int)TICKS_TO_SECS(now),
783                           (unsigned int)TICKS_TO_CYCLES(now),
784                           (unsigned int)TICKS_TO_OFFSET(now));
785     debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%03us %04uc %04ut)\n",
786                           tx,
787                           (unsigned int)TICKS_TO_SECS(tx),
788                           (unsigned int)TICKS_TO_CYCLES(tx),
789                           (unsigned int)TICKS_TO_OFFSET(tx));
790     if (m_state == ePS_Stopped) {
791         return scheduleStateTransition(ePS_WaitingForStream, tx);
792     } else if (m_state == ePS_Running) {
793         return scheduleStateTransition(ePS_WaitingForStreamDisable, tx);
794     } else {
795         debugError("Cannot switch to ePS_DryRunning from %s\n", ePSToString(m_state));
796         return false;
797     }
798 }
799
800 bool StreamProcessor::scheduleStartRunning(int64_t t) {
801     uint64_t tx;
802     if (t < 0) {
803         tx = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
804     } else {
805         tx = t;
806     }
807     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
808     uint64_t now = m_handler->getCycleTimerTicks();
809     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
810                           now,
811                           (unsigned int)TICKS_TO_SECS(now),
812                           (unsigned int)TICKS_TO_CYCLES(now),
813                           (unsigned int)TICKS_TO_OFFSET(now));
814     debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%03us %04uc %04ut)\n",
815                           tx,
816                           (unsigned int)TICKS_TO_SECS(tx),
817                           (unsigned int)TICKS_TO_CYCLES(tx),
818                           (unsigned int)TICKS_TO_OFFSET(tx));
819     return scheduleStateTransition(ePS_WaitingForStreamEnable, tx);
820 }
821
822 bool StreamProcessor::scheduleStopDryRunning(int64_t t) {
823     uint64_t tx;
824     if (t < 0) {
825         tx = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
826     } else {
827         tx = t;
828     }
829     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
830     uint64_t now = m_handler->getCycleTimerTicks();
831     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
832                           now,
833                           (unsigned int)TICKS_TO_SECS(now),
834                           (unsigned int)TICKS_TO_CYCLES(now),
835                           (unsigned int)TICKS_TO_OFFSET(now));
836     debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at               : %011llu (%03us %04uc %04ut)\n",
837                           tx,
838                           (unsigned int)TICKS_TO_SECS(tx),
839                           (unsigned int)TICKS_TO_CYCLES(tx),
840                           (unsigned int)TICKS_TO_OFFSET(tx));
841     return scheduleStateTransition(ePS_Stopped, tx);
842 }
843
844 bool StreamProcessor::scheduleStopRunning(int64_t t) {
845     uint64_t tx;
846     if (t < 0) {
847         tx = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
848     } else {
849         tx = t;
850     }
851     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
852     uint64_t now = m_handler->getCycleTimerTicks();
853     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
854                           now,
855                           (unsigned int)TICKS_TO_SECS(now),
856                           (unsigned int)TICKS_TO_CYCLES(now),
857                           (unsigned int)TICKS_TO_OFFSET(now));
858     debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at               : %011llu (%03us %04uc %04ut)\n",
859                           tx,
860                           (unsigned int)TICKS_TO_SECS(tx),
861                           (unsigned int)TICKS_TO_CYCLES(tx),
862                           (unsigned int)TICKS_TO_OFFSET(tx));
863     return scheduleStateTransition(ePS_WaitingForStreamDisable, tx);
864 }
865
866 bool StreamProcessor::startDryRunning(int64_t t) {
867     if(!scheduleStartDryRunning(t)) {
868         debugError("Could not schedule transition\n");
869         return false;
870     }
871     if(!waitForState(ePS_DryRunning, 2000)) {
872         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning));
873         return false;
874     }
875     return true;
876 }
877
878 bool StreamProcessor::startRunning(int64_t t) {
879     if(!scheduleStartRunning(t)) {
880         debugError("Could not schedule transition\n");
881         return false;
882     }
883     if(!waitForState(ePS_Running, 2000)) {
884         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Running));
885         return false;
886     }
887     return true;
888 }
889
890 bool StreamProcessor::stopDryRunning(int64_t t) {
891     if(!scheduleStopDryRunning(t)) {
892         debugError("Could not schedule transition\n");
893         return false;
894     }
895     if(!waitForState(ePS_Stopped, 2000)) {
896         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Stopped));
897         return false;
898     }
899     return true;
900 }
901
902 bool StreamProcessor::stopRunning(int64_t t) {
903     if(!scheduleStopRunning(t)) {
904         debugError("Could not schedule transition\n");
905         return false;
906     }
907     if(!waitForState(ePS_DryRunning, 2000)) {
908         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning));
909         return false;
910     }
911     return true;
912 }
913
914
915 // internal state API
916
917 /**
918  * @brief Enter the ePS_Stopped state
919  * @return true if successful, false if not
920  *
921  * @pre none
922  *
923  * @post the buffer and the isostream are ready for use.
924  * @post all dynamic structures have been allocated successfully
925  * @post the buffer is transparent and empty, and all parameters are set
926  *       to the correct initial/nominal values.
927  *
928  */
929 bool
930 StreamProcessor::doStop()
931 {
932     float ticks_per_frame;
933     unsigned int ringbuffer_size_frames = m_manager->getNbBuffers() * m_manager->getPeriodSize();
934
935     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
936     bool result = true;
937
938     switch(m_state) {
939         case ePS_Created:
940             assert(m_data_buffer);
941             // object just created
942             result = m_data_buffer->init();
943
944             // prepare the framerate estimate
945             ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_manager->getNominalRate());
946             m_ticks_per_frame = ticks_per_frame;
947             debugOutput(DEBUG_LEVEL_VERBOSE,"Initializing remote ticks/frame to %f\n", ticks_per_frame);
948
949             // initialize internal buffer
950             result &= m_data_buffer->setBufferSize(ringbuffer_size_frames);
951
952             result &= m_data_buffer->setEventSize( getEventSize() );
953             result &= m_data_buffer->setEventsPerFrame( getEventsPerFrame() );
954             if(getType() == ePT_Receive) {
955                 result &= m_data_buffer->setUpdatePeriod( getNominalFramesPerPacket() );
956             } else {
957                 result &= m_data_buffer->setUpdatePeriod( m_manager->getPeriodSize() );
958             }
959             result &= m_data_buffer->setNominalRate(ticks_per_frame);
960             result &= m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND);
961             result &= m_data_buffer->prepare(); // FIXME: the name
962
963             // set the parameters of ports we can:
964             // we want the audio ports to be period buffered,
965             // and the midi ports to be packet buffered
966             for ( PortVectorIterator it = m_Ports.begin();
967                 it != m_Ports.end();
968                 ++it )
969             {
970                 debugOutput(DEBUG_LEVEL_VERBOSE, "Setting up port %s\n",(*it)->getName().c_str());
971                 if(!(*it)->setBufferSize(m_manager->getPeriodSize())) {
972                     debugFatal("Could not set buffer size to %d\n",m_manager->getPeriodSize());
973                     return false;
974                 }
975                 switch ((*it)->getPortType()) {
976                     case Port::E_Audio:
977                         if(!(*it)->setSignalType(Port::E_PeriodSignalled)) {
978                             debugFatal("Could not set signal type to PeriodSignalling");
979                             return false;
980                         }
981                         // buffertype and datatype are dependant on the API
982                         debugWarning("---------------- ! Doing hardcoded dummy setup ! --------------\n");
983                         // buffertype and datatype are dependant on the API
984                         if(!(*it)->setBufferType(Port::E_PointerBuffer)) {
985                             debugFatal("Could not set buffer type");
986                             return false;
987                         }
988                         if(!(*it)->useExternalBuffer(true)) {
989                             debugFatal("Could not set external buffer usage");
990                             return false;
991                         }
992                         if(!(*it)->setDataType(Port::E_Float)) {
993                             debugFatal("Could not set data type");
994                             return false;
995                         }
996                         break;
997                     case Port::E_Midi:
998                         if(!(*it)->setSignalType(Port::E_PacketSignalled)) {
999                             debugFatal("Could not set signal type to PacketSignalling");
1000                             return false;
1001                         }
1002                         // buffertype and datatype are dependant on the API
1003                         debugWarning("---------------- ! Doing hardcoded test setup ! --------------\n");
1004                         // buffertype and datatype are dependant on the API
1005                         if(!(*it)->setBufferType(Port::E_RingBuffer)) {
1006                             debugFatal("Could not set buffer type");
1007                             return false;
1008                         }
1009                         if(!(*it)->setDataType(Port::E_MidiEvent)) {
1010                             debugFatal("Could not set data type");
1011                             return false;
1012                         }
1013                         break;
1014                     default:
1015                         debugWarning("Unsupported port type specified\n");
1016                         break;
1017                 }
1018             }
1019             // the API specific settings of the ports should already be set,
1020             // as this is called from the processorManager->prepare()
1021             // so we can init the ports
1022             result &= PortManager::initPorts();
1023
1024             break;
1025         case ePS_DryRunning:
1026             // what to do here?
1027             break;
1028         default:
1029             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1030             return false;
1031     }
1032
1033     result &= m_data_buffer->clearBuffer(); // FIXME: don't like the reset() name
1034     // make the buffer transparent
1035     m_data_buffer->setTransparent(true);
1036
1037     // reset all ports
1038     result &= PortManager::preparePorts();
1039
1040     m_state = ePS_Stopped;
1041     #ifdef DEBUG
1042     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1043         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1044         dumpInfo();
1045     }
1046     #endif
1047     return result;
1048 }
1049
1050 /**
1051  * @brief Enter the ePS_WaitingForStream state
1052  * @return true if successful, false if not
1053  *
1054  * @pre all dynamic data structures are allocated successfully
1055  *
1056  * @post
1057  *
1058  */
1059 bool
1060 StreamProcessor::doWaitForRunningStream()
1061 {
1062     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1063     switch(m_state) {
1064         case ePS_Stopped:
1065             // we have to start waiting for an incoming stream
1066             // this basically means nothing, the state change will
1067             // be picked up by the packet iterator
1068             break;
1069         default:
1070             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1071             return false;
1072     }
1073     m_state = ePS_WaitingForStream;
1074     #ifdef DEBUG
1075     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1076         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1077         dumpInfo();
1078     }
1079     #endif
1080     return true;
1081 }
1082
1083 /**
1084  * @brief Enter the ePS_DryRunning state
1085  * @return true if successful, false if not
1086  *
1087  * @pre
1088  *
1089  * @post
1090  *
1091  */
1092 bool
1093 StreamProcessor::doDryRunning()
1094 {
1095     bool result = true;
1096     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1097     switch(m_state) {
1098         case ePS_WaitingForStream:
1099             // a running stream has been detected
1100             debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started dry-running at cycle %d\n", this, m_last_cycle);
1101             if (getType() == ePT_Receive) {
1102                 // this to ensure that there is no discontinuity when starting to
1103                 // update the DLL based upon the received packets
1104                 m_data_buffer->setBufferTailTimestamp(m_last_timestamp);
1105             } else {
1106                 // FIXME: PC=master mode will have to do something here I guess...
1107             }
1108             break;
1109         case ePS_WaitingForStreamDisable:
1110             result &= m_data_buffer->clearBuffer(); // FIXME: don't like the reset() name
1111             m_data_buffer->setTransparent(true);
1112             break;
1113         default:
1114             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1115             return false;
1116     }
1117     m_state = ePS_DryRunning;
1118     #ifdef DEBUG
1119     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1120         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1121         dumpInfo();
1122     }
1123     #endif
1124     return result;
1125 }
1126
1127 /**
1128  * @brief Enter the ePS_WaitingForStreamEnable state
1129  * @return true if successful, false if not
1130  *
1131  * @pre
1132  *
1133  * @post
1134  *
1135  */
1136 bool
1137 StreamProcessor::doWaitForStreamEnable()
1138 {
1139     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1140     unsigned int ringbuffer_size_frames;
1141     switch(m_state) {
1142         case ePS_DryRunning:
1143             // we have to start waiting for an incoming stream
1144             // this basically means nothing, the state change will
1145             // be picked up by the packet iterator
1146
1147             if(!m_data_buffer->clearBuffer()) { // FIXME: don't like the reset() name
1148                 debugError("Could not reset data buffer\n");
1149                 return false;
1150             }
1151             if (getType() == ePT_Transmit) {
1152                 ringbuffer_size_frames = m_manager->getNbBuffers() * m_manager->getPeriodSize();
1153                 debugOutput(DEBUG_LEVEL_VERBOSE, "Prefill transmit SP %p with %u frames\n", this, ringbuffer_size_frames);
1154                 // prefill the buffer
1155                 if(!transferSilence(ringbuffer_size_frames)) {
1156                     debugFatal("Could not prefill transmit stream\n");
1157                     return false;
1158                 }
1159             }
1160
1161             break;
1162         default:
1163             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1164             return false;
1165     }
1166     m_state = ePS_WaitingForStreamEnable;
1167     #ifdef DEBUG
1168     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1169         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1170         dumpInfo();
1171     }
1172     #endif
1173     return true;
1174 }
1175
1176 /**
1177  * @brief Enter the ePS_Running state
1178  * @return true if successful, false if not
1179  *
1180  * @pre
1181  *
1182  * @post
1183  *
1184  */
1185 bool
1186 StreamProcessor::doRunning()
1187 {
1188     bool result = true;
1189     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1190     switch(m_state) {
1191         case ePS_WaitingForStreamEnable:
1192             // a running stream has been detected
1193             debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started running at cycle %d\n",
1194                                              this, m_last_cycle);
1195             m_in_xrun = false;
1196             m_data_buffer->setTransparent(false);
1197             break;
1198         default:
1199             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1200             return false;
1201     }
1202     m_state = ePS_Running;
1203     #ifdef DEBUG
1204     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1205         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1206         dumpInfo();
1207     }
1208     #endif
1209     return result;
1210 }
1211
1212 /**
1213  * @brief Enter the ePS_WaitingForStreamDisable state
1214  * @return true if successful, false if not
1215  *
1216  * @pre
1217  *
1218  * @post
1219  *
1220  */
1221 bool
1222 StreamProcessor::doWaitForStreamDisable()
1223 {
1224     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1225     switch(m_state) {
1226         case ePS_Running:
1227             // the thread will do the transition
1228             break;
1229         default:
1230             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1231             return false;
1232     }
1233     m_state = ePS_WaitingForStreamDisable;
1234     #ifdef DEBUG
1235     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1236         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1237         dumpInfo();
1238     }
1239     #endif
1240     return true;
1241 }
1242
1243 /**
1244  * @brief Updates the state machine and calls the necessary transition functions
1245  * @return true if successful, false if not
1246  */
1247 bool StreamProcessor::updateState() {
1248     bool result = false;
1249     // copy the current state locally since it could change value,
1250     // and that's something we don't want to happen inbetween tests
1251     // if m_next_state changes during this routine, we know for sure
1252     // that the previous state change was at least attempted correctly.
1253     enum eProcessorState next_state = m_next_state;
1254
1255     debugOutput(DEBUG_LEVEL_VERBOSE, "Do state transition: %s => %s\n",
1256         ePSToString(m_state), ePSToString(next_state));
1257
1258     if (m_state == next_state) {
1259         debugWarning("ignoring identity state update from/to %s\n", ePSToString(m_state) );
1260         return true;
1261     }
1262
1263     // after creation, only initialization is allowed
1264     if (m_state == ePS_Created) {
1265         if(next_state != ePS_Stopped) {
1266             goto updateState_exit_with_error;
1267         }
1268         // do init here
1269         result = doStop();
1270         if (result) return true;
1271         else goto updateState_exit_change_failed;
1272     }
1273
1274     // after initialization, only WaitingForRunningStream is allowed
1275     if (m_state == ePS_Stopped) {
1276         if(next_state != ePS_WaitingForStream) {
1277             goto updateState_exit_with_error;
1278         }
1279         result = doWaitForRunningStream();
1280         if (result) return true;
1281         else goto updateState_exit_change_failed;
1282     }
1283
1284     // after WaitingForStream, only ePS_DryRunning is allowed
1285     // this means that the stream started running
1286     if (m_state == ePS_WaitingForStream) {
1287         if(next_state != ePS_DryRunning) {
1288             goto updateState_exit_with_error;
1289         }
1290         result = doDryRunning();
1291         if (result) return true;
1292         else goto updateState_exit_change_failed;
1293     }
1294
1295     // from ePS_DryRunning we can go to:
1296     //   - ePS_Stopped if something went wrong during DryRunning
1297     //   - ePS_WaitingForStreamEnable if there is a requested to enable
1298     if (m_state == ePS_DryRunning) {
1299         if((next_state != ePS_Stopped) &&
1300            (next_state != ePS_WaitingForStreamEnable)) {
1301             goto updateState_exit_with_error;
1302         }
1303         if (next_state == ePS_Stopped) {
1304             result = doStop();
1305         } else {
1306             result = doWaitForStreamEnable();
1307         }
1308         if (result) return true;
1309         else goto updateState_exit_change_failed;
1310     }
1311
1312     // from ePS_WaitingForStreamEnable we can go to:
1313     //   - ePS_DryRunning if something went wrong while waiting
1314     //   - ePS_Running if the stream enabled correctly
1315     if (m_state == ePS_WaitingForStreamEnable) {
1316         if((next_state != ePS_DryRunning) &&
1317            (next_state != ePS_Running)) {
1318             goto updateState_exit_with_error;
1319         }
1320         if (next_state == ePS_Stopped) {
1321             result = doDryRunning();
1322         } else {
1323             result = doRunning();
1324         }
1325         if (result) return true;
1326         else goto updateState_exit_change_failed;
1327     }
1328
1329     // from ePS_Running we can only start waiting for a disabled stream
1330     if (m_state == ePS_Running) {
1331         if(next_state != ePS_WaitingForStreamDisable) {
1332             goto updateState_exit_with_error;
1333         }
1334         result = doWaitForStreamDisable();
1335         if (result) return true;
1336         else goto updateState_exit_change_failed;
1337     }
1338
1339     // from ePS_WaitingForStreamDisable we can go to DryRunning
1340     if (m_state == ePS_WaitingForStreamDisable) {
1341         if(next_state != ePS_DryRunning) {
1342             goto updateState_exit_with_error;
1343         }
1344         result = doDryRunning();
1345         if (result) return true;
1346         else goto updateState_exit_change_failed;
1347     }
1348
1349     // if we arrive here there is an error
1350 updateState_exit_with_error:
1351     debugError("Invalid state transition: %s => %s\n",
1352         ePSToString(m_state), ePSToString(next_state));
1353     return false;
1354 updateState_exit_change_failed:
1355     debugError("State transition failed: %s => %s\n",
1356         ePSToString(m_state), ePSToString(next_state));
1357     return false;
1358 }
1359
1360 /***********************************************
1361  * Helper routines                             *
1362  ***********************************************/
1363 bool
1364 StreamProcessor::transferSilence(unsigned int nframes)
1365 {
1366     bool retval;
1367     signed int fc;
1368     ffado_timestamp_t ts_tail_tmp;
1369
1370     // prepare a buffer of silence
1371     char *dummybuffer = (char *)calloc(sizeof(quadlet_t), nframes * getEventsPerFrame());
1372     transmitSilenceBlock(dummybuffer, nframes, 0);
1373
1374     m_data_buffer->getBufferTailTimestamp(&ts_tail_tmp, &fc);
1375     if (fc != 0) {
1376         debugWarning("Prefilling a buffer that already contains %d frames\n", fc);
1377     }
1378
1379     // add the silence data to the ringbuffer
1380     if(m_data_buffer->preloadFrames(nframes, dummybuffer, true)) {
1381         retval = true;
1382     } else {
1383         debugWarning("Could not write to event buffer\n");
1384         retval = false;
1385     }
1386     free(dummybuffer);
1387     return retval;
1388 }
1389
1390 /**
1391  * @brief convert a eProcessorState to a string
1392  * @param s the state
1393  * @return a char * describing the state
1394  */
1395 const char *
1396 StreamProcessor::ePSToString(enum eProcessorState s) {
1397     switch (s) {
1398         case ePS_Invalid: return "ePS_Invalid";
1399         case ePS_Created: return "ePS_Created";
1400         case ePS_Stopped: return "ePS_Stopped";
1401         case ePS_WaitingForStream: return "ePS_WaitingForStream";
1402         case ePS_DryRunning: return "ePS_DryRunning";
1403         case ePS_WaitingForStreamEnable: return "ePS_WaitingForStreamEnable";
1404         case ePS_Running: return "ePS_Running";
1405         case ePS_WaitingForStreamDisable: return "ePS_WaitingForStreamDisable";
1406         default: return "error: unknown state";
1407     }
1408 }
1409
1410 /**
1411  * @brief convert a eProcessorType to a string
1412  * @param t the type
1413  * @return a char * describing the state
1414  */
1415 const char *
1416 StreamProcessor::ePTToString(enum eProcessorType t) {
1417     switch (t) {
1418         case ePT_Receive: return "Receive";
1419         case ePT_Transmit: return "Transmit";
1420         default: return "error: unknown type";
1421     }
1422 }
1423
1424 /***********************************************
1425  * Debug                                       *
1426  ***********************************************/
1427 void
1428 StreamProcessor::dumpInfo()
1429 {
1430     debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor %p information\n", this);
1431     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Iso stream info:\n");
1432
1433     IsoStream::dumpInfo();
1434     debugOutputShort( DEBUG_LEVEL_NORMAL, "  StreamProcessor info:\n");
1435     if (m_handler) {
1436         uint64_t now = m_handler->getCycleTimerTicks();
1437         debugOutputShort( DEBUG_LEVEL_NORMAL, "  Now                   : %011llu (%03us %04uc %04ut)\n",
1438                           now,
1439                           (unsigned int)TICKS_TO_SECS(now),
1440                           (unsigned int)TICKS_TO_CYCLES(now),
1441                           (unsigned int)TICKS_TO_OFFSET(now));
1442     }
1443     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Xruns                 : %s\n", (m_in_xrun ? "True":"False"));
1444     debugOutputShort( DEBUG_LEVEL_NORMAL, "  State                 : %s\n", ePSToString(m_state));
1445     debugOutputShort( DEBUG_LEVEL_NORMAL, "   Next state           : %s\n", ePSToString(m_next_state));
1446     debugOutputShort( DEBUG_LEVEL_NORMAL, "    transition at       : %u\n", m_cycle_to_switch_state);
1447     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer                : %p\n", m_data_buffer);
1448     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Nominal framerate     : %u\n", m_manager->getNominalRate());
1449     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Device framerate      : Sync: %f, Buffer %f\n",
1450         24576000.0/m_manager->getSyncSource().m_data_buffer->getRate(),
1451         24576000.0/m_data_buffer->getRate()
1452         );
1453
1454     m_data_buffer->dumpInfo();
1455 }
1456
1457 void
1458 StreamProcessor::setVerboseLevel(int l) {
1459     setDebugLevel(l);
1460     IsoStream::setVerboseLevel(l);
1461     PortManager::setVerboseLevel(l);
1462     m_data_buffer->setVerboseLevel(l);
1463 }
1464
1465 } // end of namespace
Note: See TracBrowser for help on using the browser.