root/branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.cpp

Revision 729, 54.5 kB (checked in by ppalmers, 14 years ago)

some more transmit tweaks

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public
11  * License version 2.1, as published by the Free Software Foundation;
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public
19  * License along with this library; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
21  * MA 02110-1301 USA
22  */
23
24 #include "StreamProcessor.h"
25 #include "../util/cycletimer.h"
26 #include "../StreamProcessorManager.h"
27
28 #include "libutil/Atomic.h"
29
30 #include <assert.h>
31 #include <math.h>
32
33 namespace Streaming {
34
35 IMPL_DEBUG_MODULE( StreamProcessor, StreamProcessor, DEBUG_LEVEL_VERBOSE );
36
37 StreamProcessor::StreamProcessor(enum eProcessorType type, int port)
38     : IsoStream((type==ePT_Receive ? IsoStream::eST_Receive : IsoStream::eST_Transmit), port)
39     , m_processor_type ( type )
40     , m_state( ePS_Created )
41     , m_next_state( ePS_Invalid )
42     , m_cycle_to_switch_state( 0 )
43     , m_scratch_buffer( NULL )
44     , m_scratch_buffer_size_bytes( 0 )
45     , m_manager( NULL )
46     , m_ticks_per_frame( 0 )
47     , m_last_cycle( -1 )
48     , m_sync_delay( 0 )
49     , m_in_xrun( false )
50     , m_last_timestamp(0)
51     , m_last_timestamp2(0)
52     , m_dropped(0)
53 {
54     // create the timestamped buffer and register ourselves as its client
55     m_data_buffer = new Util::TimestampedBuffer(this);
56 }
57
58 StreamProcessor::~StreamProcessor() {
59     if (m_data_buffer) delete m_data_buffer;
60     if (m_scratch_buffer) delete[] m_scratch_buffer;
61 }
62
63 uint64_t StreamProcessor::getTimeNow() {
64     return m_handler->getCycleTimerTicks();
65 }
66
67 int StreamProcessor::getMaxFrameLatency() {
68     if (getType() == ePT_Receive) {
69         return (int)(m_handler->getWakeupInterval() * TICKS_PER_CYCLE);
70     } else {
71         return (int)(m_handler->getWakeupInterval() * TICKS_PER_CYCLE);
72     }
73 }
74
75 /***********************************************
76  * Buffer management and manipulation          *
77  ***********************************************/
78 int StreamProcessor::getBufferFill() {
79     return m_data_buffer->getBufferFill();
80 }
81
82 int64_t
83 StreamProcessor::getTimeUntilNextPeriodSignalUsecs()
84 {
85     uint64_t time_at_period=getTimeAtPeriod();
86
87     // we delay the period signal with the sync delay
88     // this makes that the period signals lag a little compared to reality
89     // ISO buffering causes the packets to be received at max
90     // m_handler->getWakeupInterval() later than the time they were received.
91     // hence their payload is available this amount of time later. However, the
92     // period boundary is predicted based upon earlier samples, and therefore can
93     // pass before these packets are processed. Adding this extra term makes that
94     // the period boundary is signalled later
95     time_at_period = addTicks(time_at_period, m_manager->getSyncSource().getSyncDelay());
96
97     uint64_t cycle_timer=m_handler->getCycleTimerTicks();
98
99     // calculate the time until the next period
100     int32_t until_next=diffTicks(time_at_period,cycle_timer);
101
102     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> TAP=%11llu, CTR=%11llu, UTN=%11ld\n",
103         time_at_period, cycle_timer, until_next
104         );
105
106     // now convert to usecs
107     // don't use the mapping function because it only works
108     // for absolute times, not the relative time we are
109     // using here (which can also be negative).
110     return (int64_t)(((float)until_next) / TICKS_PER_USEC);
111 }
112
113 void
114 StreamProcessor::setSyncDelay(int d) {
115     debugOutput(DEBUG_LEVEL_VERBOSE, "Setting SP %p SyncDelay to %d ticks\n", this, d);
116     m_sync_delay = d;
117 }
118
119 uint64_t
120 StreamProcessor::getTimeAtPeriodUsecs()
121 {
122     return (uint64_t)((float)getTimeAtPeriod() * TICKS_PER_USEC);
123 }
124
125 uint64_t
126 StreamProcessor::getTimeAtPeriod()
127 {
128     if (getType() == ePT_Receive) {
129         ffado_timestamp_t next_period_boundary=m_data_buffer->getTimestampFromHead(m_manager->getPeriodSize());
130    
131         #ifdef DEBUG
132         ffado_timestamp_t ts;
133         signed int fc;
134         m_data_buffer->getBufferTailTimestamp(&ts,&fc);
135    
136         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD="TIMESTAMP_FORMAT_SPEC", LTS="TIMESTAMP_FORMAT_SPEC", FC=%5u, TPF=%f\n",
137             next_period_boundary, ts, fc, getTicksPerFrame()
138             );
139         #endif
140         return (uint64_t)next_period_boundary;
141     } else {
142         ffado_timestamp_t next_period_boundary=m_data_buffer->getTimestampFromTail((m_manager->getNbBuffers()-1) * m_manager->getPeriodSize());
143    
144         #ifdef DEBUG
145         ffado_timestamp_t ts;
146         signed int fc;
147         m_data_buffer->getBufferTailTimestamp(&ts,&fc);
148    
149         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD="TIMESTAMP_FORMAT_SPEC", LTS="TIMESTAMP_FORMAT_SPEC", FC=%5u, TPF=%f\n",
150             next_period_boundary, ts, fc, getTicksPerFrame()
151             );
152         #endif
153         return (uint64_t)next_period_boundary;
154     }
155 }
156
157 float
158 StreamProcessor::getTicksPerFrame()
159 {
160     assert(m_data_buffer != NULL);
161     return m_data_buffer->getRate();
162 }
163
164 bool
165 StreamProcessor::canClientTransferFrames(unsigned int nbframes)
166 {
167     bool can_transfer;
168     unsigned int fc = m_data_buffer->getFrameCounter();
169     if (getType() == ePT_Receive) {
170         can_transfer = (fc >= nbframes);
171     } else {
172         // there has to be enough space to put the frames in
173         can_transfer = m_data_buffer->getBufferSize() - fc > nbframes;
174         // or the buffer is transparent
175         can_transfer |= m_data_buffer->isTransparent();
176     }
177    
178     #ifdef DEBUG
179     if (!can_transfer) {
180         debugWarning("(%p, %s) cannot transfer since fc == %u, nbframes == %u\n",
181             this, ePTToString(getType()), fc, nbframes);
182     }
183     #endif
184    
185     return can_transfer;
186 }
187
188 /***********************************************
189  * I/O API                                     *
190  ***********************************************/
191
192 // Packet transfer API
193 enum raw1394_iso_disposition
194 StreamProcessor::putPacket(unsigned char *data, unsigned int length,
195                            unsigned char channel, unsigned char tag, unsigned char sy,
196                            unsigned int cycle, unsigned int dropped) {
197     if(m_last_cycle == -1) {
198         debugOutput(DEBUG_LEVEL_VERBOSE, "Handler for %s SP %p is alive (cycle = %u)\n", getTypeString(), this, cycle);
199     }
200
201     int dropped_cycles = 0;
202     if (m_last_cycle != (int)cycle && m_last_cycle != -1) {
203         dropped_cycles = diffCycles(cycle, m_last_cycle) - 1;
204         if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles);
205         if (dropped_cycles > 0) {
206             debugWarning("(%p) dropped %d packets on cycle %u\n", this, dropped_cycles, cycle);
207             m_dropped += dropped_cycles;
208         }
209     }
210     m_last_cycle = cycle;
211
212     // bypass based upon state
213     if (m_state == ePS_Invalid) {
214         debugError("Should not have state %s\n", ePSToString(m_state) );
215         return RAW1394_ISO_ERROR;
216     }
217     if (m_state == ePS_Created) {
218         return RAW1394_ISO_DEFER;
219     }
220
221     // store the previous timestamp
222     m_last_timestamp2 = m_last_timestamp;
223
224     // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles)
225     //       it happens on the first 'good' cycle for the wait condition
226     //       or on the first received cycle that is received afterwards (might be a problem)
227
228     // check whether we are waiting for a stream to be disabled
229     if(m_state == ePS_WaitingForStreamDisable) {
230         // we then check whether we have to switch on this cycle
231         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
232             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n");
233             m_next_state = ePS_DryRunning;
234             if (!updateState()) { // we are allowed to change the state directly
235                 debugError("Could not update state!\n");
236                 return RAW1394_ISO_ERROR;
237             }
238         } else {
239             // not time to disable yet
240         }
241         // the received data can be discarded while waiting for the stream
242         // to be disabled
243         return RAW1394_ISO_OK;
244     }
245
246     // check whether we are waiting for a stream to be enabled
247     else if(m_state == ePS_WaitingForStreamEnable) {
248         // we then check whether we have to switch on this cycle
249         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
250             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n");
251             m_next_state = ePS_Running;
252             if (!updateState()) { // we are allowed to change the state directly
253                 debugError("Could not update state!\n");
254                 return RAW1394_ISO_ERROR;
255             }
256         } else {
257             // not time to enable yet
258         }
259         // we are dryRunning hence data should be processed in any case
260     }
261
262     // check the packet header
263     enum eChildReturnValue result = processPacketHeader(data, length, channel, tag, sy, cycle, dropped_cycles);
264     if (result == eCRV_OK) {
265         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "RECV: CY=%04u TS=%011llu\n",
266                 cycle, m_last_timestamp);
267         // update some accounting
268         m_last_good_cycle = cycle;
269         m_last_dropped = dropped_cycles;
270
271         // check whether we are waiting for a stream to startup
272         // this requires that the packet is good
273         if(m_state == ePS_WaitingForStream) {
274             // since we have a packet with an OK header,
275             // we can indicate that the stream started up
276
277             // we then check whether we have to switch on this cycle
278             if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
279                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to good packet\n");
280                 // hence go to the dryRunning state
281                 m_next_state = ePS_DryRunning;
282                 if (!updateState()) { // we are allowed to change the state directly
283                     debugError("Could not update state!\n");
284                     return RAW1394_ISO_ERROR;
285                 }
286             } else {
287                 // not time (yet) to switch state
288             }
289             // in both cases we don't want to process the data
290             return RAW1394_ISO_OK;
291         }
292
293         // check whether a state change has been requested
294         // note that only the wait state changes are synchronized with the cycles
295         else if(m_state != m_next_state) {
296             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
297                                              ePSToString(m_state), ePSToString(m_next_state));
298             // execute the requested change
299             if (!updateState()) { // we are allowed to change the state directly
300                 debugError("Could not update state!\n");
301                 return RAW1394_ISO_ERROR;
302             }
303         }
304
305         // handle dropped cycles
306         if(dropped_cycles) {
307             // they represent a discontinuity in the timestamps, and hence are
308             // to be dealt with
309             debugWarning("(%p) Correcting timestamp for dropped cycles, discarding packet...\n", this);
310             m_data_buffer->setBufferTailTimestamp(m_last_timestamp);
311             if (m_state == ePS_Running) {
312                 // this is an xrun situation
313                 m_in_xrun = true;
314                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to dropped packet xrun\n");
315                 m_cycle_to_switch_state = cycle + 1; // switch in the next cycle
316                 m_next_state = ePS_WaitingForStreamDisable;
317                 // execute the requested change
318                 if (!updateState()) { // we are allowed to change the state directly
319                     debugError("Could not update state!\n");
320                     return RAW1394_ISO_ERROR;
321                 }
322                 return RAW1394_ISO_DEFER;
323             }
324         }
325
326         // for all states that reach this we are allowed to
327         // do protocol specific data reception
328         enum eChildReturnValue result2 = processPacketData(data, length, channel, tag, sy, cycle, dropped_cycles);
329
330         // if an xrun occured, switch to the dryRunning state and
331         // allow for the xrun to be picked up
332         if (result2 == eCRV_XRun) {
333             m_in_xrun = true;
334             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
335             m_cycle_to_switch_state = cycle+1; // switch in the next cycle
336             m_next_state = ePS_WaitingForStreamDisable;
337             // execute the requested change
338             if (!updateState()) { // we are allowed to change the state directly
339                 debugError("Could not update state!\n");
340                 return RAW1394_ISO_ERROR;
341             }
342             return RAW1394_ISO_DEFER;
343         } else if(result2 == eCRV_OK) {
344             // no problem here
345             return RAW1394_ISO_OK;
346         } else {
347             debugError("Invalid response\n");
348             return RAW1394_ISO_ERROR;
349         }
350     } else if(result == eCRV_Invalid) {
351         // apparently we don't have to do anything when the packets are not valid
352         return RAW1394_ISO_OK;
353     } else {
354         debugError("Invalid response\n");
355         return RAW1394_ISO_ERROR;
356     }
357     debugError("reached the unreachable\n");
358     return RAW1394_ISO_ERROR;
359 }
360
361 enum raw1394_iso_disposition
362 StreamProcessor::getPacket(unsigned char *data, unsigned int *length,
363                            unsigned char *tag, unsigned char *sy,
364                            int cycle, unsigned int dropped, unsigned int max_length) {
365     if (cycle<0) {
366         *tag = 0;
367         *sy = 0;
368         *length = 0;
369         return RAW1394_ISO_OK;
370     }
371
372     if(m_last_cycle == -1) {
373         debugOutput(DEBUG_LEVEL_VERBOSE, "Handler for %s SP %p is alive (cycle = %d)\n", getTypeString(), this, cycle);
374     }
375
376     int dropped_cycles = 0;
377     if (m_last_cycle != cycle && m_last_cycle != -1) {
378         dropped_cycles = diffCycles(cycle, m_last_cycle) - 1;
379         if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles);
380         if (dropped_cycles > 0) {
381             debugWarning("(%p) dropped %d packets on cycle %u (last_cycle=%u, dropped=%d)\n", this, dropped_cycles, cycle, m_last_cycle, dropped);
382             m_dropped += dropped_cycles;
383         }
384     }
385     if (cycle >= 0) {
386         m_last_cycle = cycle;
387     }
388
389     // bypass based upon state
390     if (m_state == ePS_Invalid) {
391         debugError("Should not have state %s\n", ePSToString(m_state) );
392         return RAW1394_ISO_ERROR;
393     }
394     if (m_state == ePS_Created) {
395         *tag = 0;
396         *sy = 0;
397         *length = 0;
398         return RAW1394_ISO_DEFER;
399     }
400
401     // normal processing
402     // note that we can't use getCycleTimer directly here,
403     // because packets are queued in advance. This means that
404     // we the packet we are constructing will be sent out
405     // on 'cycle', not 'now'.
406     unsigned int ctr = m_handler->getCycleTimer();
407     int now_cycles = (int)CYCLE_TIMER_GET_CYCLES(ctr);
408
409     // the difference between the cycle this
410     // packet is intended for and 'now'
411     int cycle_diff = diffCycles(cycle, now_cycles);
412
413     #ifdef DEBUG
414     if(cycle_diff < 0) {
415         debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy)\n",
416             cycle, now_cycles);
417     }
418     #endif
419
420     // store the previous timestamp
421     m_last_timestamp2 = m_last_timestamp;
422
423     // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles)
424     //       it happens on the first 'good' cycle for the wait condition
425     //       or on the first received cycle that is received afterwards (might be a problem)
426
427     // check whether we are waiting for a stream to be disabled
428     if(m_state == ePS_WaitingForStreamDisable) {
429         // we then check whether we have to switch on this cycle
430         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
431             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n");
432             m_next_state = ePS_DryRunning;
433             if (!updateState()) { // we are allowed to change the state directly
434                 debugError("Could not update state!\n");
435                 return RAW1394_ISO_ERROR;
436             }
437         } else {
438             // not time to disable yet
439         }
440     }
441     // check whether we are waiting for a stream to be enabled
442     else if(m_state == ePS_WaitingForStreamEnable) {
443         // we then check whether we have to switch on this cycle
444         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
445             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n");
446             m_next_state = ePS_Running;
447             if (!updateState()) { // we are allowed to change the state directly
448                 debugError("Could not update state!\n");
449                 return RAW1394_ISO_ERROR;
450             }
451         } else {
452             // not time to enable yet
453         }
454         // we are dryRunning hence data should be processed in any case
455     }
456     // check whether we are waiting for a stream to startup
457     else if(m_state == ePS_WaitingForStream) {
458         // as long as the cycle parameter is not in sync with
459         // the current time, the stream is considered not
460         // to be 'running'
461         // we then check whether we have to switch on this cycle
462         if ((cycle_diff >= 0) && (diffCycles(cycle, m_cycle_to_switch_state) >= 0)) {
463             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to good packet\n");
464             // hence go to the dryRunning state
465             m_next_state = ePS_DryRunning;
466             if (!updateState()) { // we are allowed to change the state directly
467                 debugError("Could not update state!\n");
468                 return RAW1394_ISO_ERROR;
469             }
470         } else {
471             // not time (yet) to switch state
472         }
473     }
474     else if(m_state == ePS_Running) {
475         // check the packet header
476         enum eChildReturnValue result = generatePacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
477         if (result == eCRV_Packet) {
478             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT: CY=%04u TS=%011llu\n",
479                     cycle, m_last_timestamp);
480             // update some accounting
481             m_last_good_cycle = cycle;
482             m_last_dropped = dropped_cycles;
483
484             // check whether a state change has been requested
485             // note that only the wait state changes are synchronized with the cycles
486             if(m_state != m_next_state) {
487                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
488                                                 ePSToString(m_state), ePSToString(m_next_state));
489                 // execute the requested change
490                 if (!updateState()) { // we are allowed to change the state directly
491                     debugError("Could not update state!\n");
492                     return RAW1394_ISO_ERROR;
493                 }
494             }
495
496             enum eChildReturnValue result2 = generatePacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
497             // if an xrun occured, switch to the dryRunning state and
498             // allow for the xrun to be picked up
499             if (result2 == eCRV_XRun) {
500                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
501                 m_in_xrun = true;
502                 m_cycle_to_switch_state = cycle+1; // switch in the next cycle
503                 m_next_state = ePS_WaitingForStreamDisable;
504                 // execute the requested change
505                 if (!updateState()) { // we are allowed to change the state directly
506                     debugError("Could not update state!\n");
507                     return RAW1394_ISO_ERROR;
508                 }
509                 goto send_empty_packet;
510             }
511             return RAW1394_ISO_OK;
512         } else if (result == eCRV_XRun) { // pick up the possible xruns
513             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to header xrun\n");
514             m_in_xrun = true;
515             m_cycle_to_switch_state = cycle+1; // switch in the next cycle
516             m_next_state = ePS_WaitingForStreamDisable;
517             // execute the requested change
518             if (!updateState()) { // we are allowed to change the state directly
519                 debugError("Could not update state!\n");
520                 return RAW1394_ISO_ERROR;
521             }
522         } else if ((result == eCRV_EmptyPacket) || (result == eCRV_Again)) {
523             if(m_state != m_next_state) {
524                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
525                                                 ePSToString(m_state), ePSToString(m_next_state));
526                 // execute the requested change
527                 if (!updateState()) { // we are allowed to change the state directly
528                     debugError("Could not update state!\n");
529                     return RAW1394_ISO_ERROR;
530                 }
531             }
532             goto send_empty_packet;
533 //         } else if (result == eCRV_Again) {
534 //             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "have to retry cycle %d\n", cycle);
535 //             if(m_state != m_next_state) {
536 //                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
537 //                                                 ePSToString(m_state), ePSToString(m_next_state));
538 //                 // execute the requested change
539 //                 if (!updateState()) { // we are allowed to change the state directly
540 //                     debugError("Could not update state!\n");
541 //                     return RAW1394_ISO_ERROR;
542 //                 }
543 //             }
544             // force some delay
545 //             usleep(125);
546 //             return RAW1394_ISO_AGAIN;
547         } else {
548             debugError("Invalid return value: %d\n", result);
549             return RAW1394_ISO_ERROR;
550         }
551     }
552     // we are not running, so send an empty packet
553     // we should generate a valid packet any time
554 send_empty_packet:
555     // note that only the wait state changes are synchronized with the cycles
556     if(m_state != m_next_state) {
557         debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
558                                         ePSToString(m_state), ePSToString(m_next_state));
559         // execute the requested change
560         if (!updateState()) { // we are allowed to change the state directly
561             debugError("Could not update state!\n");
562             return RAW1394_ISO_ERROR;
563         }
564     }
565
566     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT EMPTY: CY=%04u\n", cycle);
567     generateSilentPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
568     generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
569     return RAW1394_ISO_DEFER;
570 }
571
572
573 // Frame Transfer API
574 /**
575  * Transfer a block of frames from the event buffer to the port buffers
576  * @param nbframes number of frames to transfer
577  * @param ts the timestamp that the LAST frame in the block should have
578  * @return
579  */
580 bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) {
581     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.getFrames(%d, %11llu)", nbframes, ts);
582     assert( getType() == ePT_Receive );
583     if(isDryRunning()) return getFramesDry(nbframes, ts);
584     else return getFramesWet(nbframes, ts);
585 }
586
587 bool StreamProcessor::getFramesWet(unsigned int nbframes, int64_t ts) {
588 // FIXME: this should be done somewhere else
589 #ifdef DEBUG
590     uint64_t ts_expected;
591     signed int fc;
592     int32_t lag_ticks;
593     float lag_frames;
594
595     // in order to sync up multiple received streams, we should
596     // use the ts parameter. It specifies the time of the block's
597     // last sample.
598     float srate = m_manager->getSyncSource().getTicksPerFrame();
599     assert(srate != 0.0);
600     int64_t this_block_length_in_ticks = (int64_t)(((float)nbframes) * srate);
601
602     ffado_timestamp_t ts_head_tmp;
603     m_data_buffer->getBufferHeadTimestamp(&ts_head_tmp, &fc);
604     ts_expected = addTicks((uint64_t)ts_head_tmp, this_block_length_in_ticks);
605
606     lag_ticks = diffTicks(ts, ts_expected);
607     lag_frames = (((float)lag_ticks) / srate);
608     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): drifts %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
609                  this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
610     if (lag_frames >= 1.0) {
611         // the stream lags
612         debugWarning( "stream (%p): lags  with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
613                       this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
614     } else if (lag_frames <= -1.0) {
615         // the stream leads
616         debugWarning( "stream (%p): leads with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
617                       this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
618     }
619 #endif
620     // ask the buffer to process nbframes of frames
621     // using it's registered client's processReadBlock(),
622     // which should be ours
623     m_data_buffer->blockProcessReadFrames(nbframes);
624     return true;
625 }
626
627 bool StreamProcessor::getFramesDry(unsigned int nbframes, int64_t ts)
628 {
629     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): dry run %d frames (@ ts=%lld)\n",
630                  this, nbframes, ts);
631     // dry run on this side means that we put silence in all enabled ports
632     // since there is do data put into the ringbuffer in the dry-running state
633     return provideSilenceBlock(nbframes, 0);
634 }
635
636 bool
637 StreamProcessor::dropFrames(unsigned int nbframes, int64_t ts)
638 {
639     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d, %lld)\n", nbframes, ts);
640     return m_data_buffer->dropFrames(nbframes);
641 }
642
643 bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts)
644 {
645     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.putFrames(%d, %11llu)", nbframes, ts);
646     assert( getType() == ePT_Transmit );
647     if(isDryRunning()) return putFramesDry(nbframes, ts);
648     else return putFramesWet(nbframes, ts);
649 }
650
651 bool
652 StreamProcessor::putFramesWet(unsigned int nbframes, int64_t ts)
653 {
654     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesWet(%d, %llu)\n", nbframes, ts);
655     // transfer the data
656     m_data_buffer->blockProcessWriteFrames(nbframes, ts);
657     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, " New timestamp: %llu\n", ts);
658     return true; // FIXME: what about failure?
659 }
660
661 bool
662 StreamProcessor::putFramesDry(unsigned int nbframes, int64_t ts)
663 {
664     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesDry(%d, %llu)\n", nbframes, ts);
665     // do nothing
666     return true;
667 }
668
669 bool
670 StreamProcessor::putSilenceFrames(unsigned int nbframes, int64_t ts)
671 {
672     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putSilenceFrames(%d, %llu)\n", nbframes, ts);
673
674     size_t bytes_per_frame = getEventSize() * getEventsPerFrame();
675     unsigned int scratch_buffer_size_frames = m_scratch_buffer_size_bytes / bytes_per_frame;
676
677     if (nbframes > scratch_buffer_size_frames) {
678         debugError("nframes (%u) > scratch_buffer_size_frames (%u)\n",
679                    nbframes, scratch_buffer_size_frames);
680     }
681
682     assert(m_scratch_buffer);
683     if(!transmitSilenceBlock((char *)m_scratch_buffer, nbframes, 0)) {
684         debugError("Could not prepare silent block\n");
685         return false;
686     }
687     if(!m_data_buffer->writeFrames(nbframes, (char *)m_scratch_buffer, ts)) {
688         debugError("Could not write silent block\n");
689         return false;
690     }
691     return true;
692 }
693
694 bool
695 StreamProcessor::shiftStream(int nbframes)
696 {
697     if(nbframes == 0) return true;
698     if(nbframes > 0) {
699         return m_data_buffer->dropFrames(nbframes);
700     } else {
701         bool result = true;
702         while(nbframes++) {
703             result &= m_data_buffer->writeDummyFrame();
704         }
705         return result;
706     }
707 }
708
709 /***********************************************
710  * State related API                           *
711  ***********************************************/
712 bool StreamProcessor::init()
713 {
714     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "init...\n");
715
716     // initialization can be done without requesting it
717     // from the packet loop
718     m_next_state = ePS_Created;
719     return true;
720 }
721
722 bool StreamProcessor::prepare()
723 {
724     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare SP (%p)...\n", this);
725     if(!m_manager) {
726         debugFatal("Not attached to a manager!\n");
727         return false;
728     }
729
730     // make the scratch buffer one period of frames long
731     m_scratch_buffer_size_bytes = m_manager->getPeriodSize() * getEventsPerFrame() * getEventSize();
732     debugOutput( DEBUG_LEVEL_VERBOSE, " Allocate scratch buffer of %d quadlets\n");
733     if(m_scratch_buffer) delete[] m_scratch_buffer;
734     m_scratch_buffer = new byte_t[m_scratch_buffer_size_bytes];
735     if(m_scratch_buffer == NULL) {
736         debugFatal("Could not allocate scratch buffer\n");
737         return false;
738     }
739
740     if (!prepareChild()) {
741         debugFatal("Could not prepare child\n");
742         return false;
743     }
744
745     // initialization can be done without requesting it
746     // from the packet loop
747     m_next_state = ePS_Stopped;
748     return updateState();
749 }
750
751 bool
752 StreamProcessor::scheduleStateTransition(enum eProcessorState state, uint64_t time_instant)
753 {
754     // first set the time, since in the packet loop we first check m_state == m_next_state before
755     // using the time
756     m_cycle_to_switch_state = TICKS_TO_CYCLES(time_instant);
757     m_next_state = state;
758     return true;
759 }
760
761 bool
762 StreamProcessor::waitForState(enum eProcessorState state, unsigned int timeout_ms)
763 {
764     debugOutput(DEBUG_LEVEL_VERBOSE, "Waiting for state %s\n", ePSToString(state));
765     int cnt = timeout_ms;
766     while (m_state != state && cnt) {
767         usleep(1000);
768         cnt--;
769     }
770     if(cnt==0) {
771         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout\n");
772         return false;
773     }
774     return true;
775 }
776
777 bool StreamProcessor::scheduleStartDryRunning(int64_t t) {
778     uint64_t tx;
779     if (t < 0) {
780         tx = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
781     } else {
782         tx = t;
783     }
784     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
785     uint64_t now = m_handler->getCycleTimerTicks();
786     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
787                           now,
788                           (unsigned int)TICKS_TO_SECS(now),
789                           (unsigned int)TICKS_TO_CYCLES(now),
790                           (unsigned int)TICKS_TO_OFFSET(now));
791     debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%03us %04uc %04ut)\n",
792                           tx,
793                           (unsigned int)TICKS_TO_SECS(tx),
794                           (unsigned int)TICKS_TO_CYCLES(tx),
795                           (unsigned int)TICKS_TO_OFFSET(tx));
796     if (m_state == ePS_Stopped) {
797         return scheduleStateTransition(ePS_WaitingForStream, tx);
798     } else if (m_state == ePS_Running) {
799         return scheduleStateTransition(ePS_WaitingForStreamDisable, tx);
800     } else {
801         debugError("Cannot switch to ePS_DryRunning from %s\n", ePSToString(m_state));
802         return false;
803     }
804 }
805
806 bool StreamProcessor::scheduleStartRunning(int64_t t) {
807     uint64_t tx;
808     if (t < 0) {
809         tx = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
810     } else {
811         tx = t;
812     }
813     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
814     uint64_t now = m_handler->getCycleTimerTicks();
815     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
816                           now,
817                           (unsigned int)TICKS_TO_SECS(now),
818                           (unsigned int)TICKS_TO_CYCLES(now),
819                           (unsigned int)TICKS_TO_OFFSET(now));
820     debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%03us %04uc %04ut)\n",
821                           tx,
822                           (unsigned int)TICKS_TO_SECS(tx),
823                           (unsigned int)TICKS_TO_CYCLES(tx),
824                           (unsigned int)TICKS_TO_OFFSET(tx));
825     return scheduleStateTransition(ePS_WaitingForStreamEnable, tx);
826 }
827
828 bool StreamProcessor::scheduleStopDryRunning(int64_t t) {
829     uint64_t tx;
830     if (t < 0) {
831         tx = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
832     } else {
833         tx = t;
834     }
835     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
836     uint64_t now = m_handler->getCycleTimerTicks();
837     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
838                           now,
839                           (unsigned int)TICKS_TO_SECS(now),
840                           (unsigned int)TICKS_TO_CYCLES(now),
841                           (unsigned int)TICKS_TO_OFFSET(now));
842     debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at               : %011llu (%03us %04uc %04ut)\n",
843                           tx,
844                           (unsigned int)TICKS_TO_SECS(tx),
845                           (unsigned int)TICKS_TO_CYCLES(tx),
846                           (unsigned int)TICKS_TO_OFFSET(tx));
847     return scheduleStateTransition(ePS_Stopped, tx);
848 }
849
850 bool StreamProcessor::scheduleStopRunning(int64_t t) {
851     uint64_t tx;
852     if (t < 0) {
853         tx = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
854     } else {
855         tx = t;
856     }
857     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
858     uint64_t now = m_handler->getCycleTimerTicks();
859     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
860                           now,
861                           (unsigned int)TICKS_TO_SECS(now),
862                           (unsigned int)TICKS_TO_CYCLES(now),
863                           (unsigned int)TICKS_TO_OFFSET(now));
864     debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at               : %011llu (%03us %04uc %04ut)\n",
865                           tx,
866                           (unsigned int)TICKS_TO_SECS(tx),
867                           (unsigned int)TICKS_TO_CYCLES(tx),
868                           (unsigned int)TICKS_TO_OFFSET(tx));
869     return scheduleStateTransition(ePS_WaitingForStreamDisable, tx);
870 }
871
872 bool StreamProcessor::startDryRunning(int64_t t) {
873     if(!scheduleStartDryRunning(t)) {
874         debugError("Could not schedule transition\n");
875         return false;
876     }
877     if(!waitForState(ePS_DryRunning, 2000)) {
878         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning));
879         return false;
880     }
881     return true;
882 }
883
884 bool StreamProcessor::startRunning(int64_t t) {
885     if(!scheduleStartRunning(t)) {
886         debugError("Could not schedule transition\n");
887         return false;
888     }
889     if(!waitForState(ePS_Running, 2000)) {
890         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Running));
891         return false;
892     }
893     return true;
894 }
895
896 bool StreamProcessor::stopDryRunning(int64_t t) {
897     if(!scheduleStopDryRunning(t)) {
898         debugError("Could not schedule transition\n");
899         return false;
900     }
901     if(!waitForState(ePS_Stopped, 2000)) {
902         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Stopped));
903         return false;
904     }
905     return true;
906 }
907
908 bool StreamProcessor::stopRunning(int64_t t) {
909     if(!scheduleStopRunning(t)) {
910         debugError("Could not schedule transition\n");
911         return false;
912     }
913     if(!waitForState(ePS_DryRunning, 2000)) {
914         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning));
915         return false;
916     }
917     return true;
918 }
919
920
921 // internal state API
922
923 /**
924  * @brief Enter the ePS_Stopped state
925  * @return true if successful, false if not
926  *
927  * @pre none
928  *
929  * @post the buffer and the isostream are ready for use.
930  * @post all dynamic structures have been allocated successfully
931  * @post the buffer is transparent and empty, and all parameters are set
932  *       to the correct initial/nominal values.
933  *
934  */
935 bool
936 StreamProcessor::doStop()
937 {
938     float ticks_per_frame;
939     unsigned int ringbuffer_size_frames = (m_manager->getNbBuffers() + 1) * m_manager->getPeriodSize();
940
941     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
942     bool result = true;
943
944     switch(m_state) {
945         case ePS_Created:
946             assert(m_data_buffer);
947             // object just created
948             result = m_data_buffer->init();
949
950             // prepare the framerate estimate
951             ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_manager->getNominalRate());
952             m_ticks_per_frame = ticks_per_frame;
953             debugOutput(DEBUG_LEVEL_VERBOSE,"Initializing remote ticks/frame to %f\n", ticks_per_frame);
954
955             // initialize internal buffer
956             result &= m_data_buffer->setBufferSize(ringbuffer_size_frames);
957
958             result &= m_data_buffer->setEventSize( getEventSize() );
959             result &= m_data_buffer->setEventsPerFrame( getEventsPerFrame() );
960             if(getType() == ePT_Receive) {
961                 result &= m_data_buffer->setUpdatePeriod( getNominalFramesPerPacket() );
962             } else {
963                 result &= m_data_buffer->setUpdatePeriod( m_manager->getPeriodSize() );
964             }
965             result &= m_data_buffer->setNominalRate(ticks_per_frame);
966             result &= m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND);
967             result &= m_data_buffer->prepare(); // FIXME: the name
968
969             // set the parameters of ports we can:
970             // we want the audio ports to be period buffered,
971             // and the midi ports to be packet buffered
972             for ( PortVectorIterator it = m_Ports.begin();
973                 it != m_Ports.end();
974                 ++it )
975             {
976                 debugOutput(DEBUG_LEVEL_VERBOSE, "Setting up port %s\n",(*it)->getName().c_str());
977                 if(!(*it)->setBufferSize(m_manager->getPeriodSize())) {
978                     debugFatal("Could not set buffer size to %d\n",m_manager->getPeriodSize());
979                     return false;
980                 }
981                 switch ((*it)->getPortType()) {
982                     case Port::E_Audio:
983                         if(!(*it)->setSignalType(Port::E_PeriodSignalled)) {
984                             debugFatal("Could not set signal type to PeriodSignalling");
985                             return false;
986                         }
987                         // buffertype and datatype are dependant on the API
988                         debugWarning("---------------- ! Doing hardcoded dummy setup ! --------------\n");
989                         // buffertype and datatype are dependant on the API
990                         if(!(*it)->setBufferType(Port::E_PointerBuffer)) {
991                             debugFatal("Could not set buffer type");
992                             return false;
993                         }
994                         if(!(*it)->useExternalBuffer(true)) {
995                             debugFatal("Could not set external buffer usage");
996                             return false;
997                         }
998                         if(!(*it)->setDataType(Port::E_Float)) {
999                             debugFatal("Could not set data type");
1000                             return false;
1001                         }
1002                         break;
1003                     case Port::E_Midi:
1004                         if(!(*it)->setSignalType(Port::E_PacketSignalled)) {
1005                             debugFatal("Could not set signal type to PacketSignalling");
1006                             return false;
1007                         }
1008                         // buffertype and datatype are dependant on the API
1009                         debugWarning("---------------- ! Doing hardcoded test setup ! --------------\n");
1010                         // buffertype and datatype are dependant on the API
1011                         if(!(*it)->setBufferType(Port::E_RingBuffer)) {
1012                             debugFatal("Could not set buffer type");
1013                             return false;
1014                         }
1015                         if(!(*it)->setDataType(Port::E_MidiEvent)) {
1016                             debugFatal("Could not set data type");
1017                             return false;
1018                         }
1019                         break;
1020                     default:
1021                         debugWarning("Unsupported port type specified\n");
1022                         break;
1023                 }
1024             }
1025             // the API specific settings of the ports should already be set,
1026             // as this is called from the processorManager->prepare()
1027             // so we can init the ports
1028             result &= PortManager::initPorts();
1029
1030             break;
1031         case ePS_DryRunning:
1032             // what to do here?
1033             break;
1034         default:
1035             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1036             return false;
1037     }
1038
1039     result &= m_data_buffer->clearBuffer(); // FIXME: don't like the reset() name
1040     // make the buffer transparent
1041     m_data_buffer->setTransparent(true);
1042
1043     // reset all ports
1044     result &= PortManager::preparePorts();
1045
1046     m_state = ePS_Stopped;
1047     #ifdef DEBUG
1048     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1049         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1050         dumpInfo();
1051     }
1052     #endif
1053     return result;
1054 }
1055
1056 /**
1057  * @brief Enter the ePS_WaitingForStream state
1058  * @return true if successful, false if not
1059  *
1060  * @pre all dynamic data structures are allocated successfully
1061  *
1062  * @post
1063  *
1064  */
1065 bool
1066 StreamProcessor::doWaitForRunningStream()
1067 {
1068     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1069     switch(m_state) {
1070         case ePS_Stopped:
1071             // we have to start waiting for an incoming stream
1072             // this basically means nothing, the state change will
1073             // be picked up by the packet iterator
1074             break;
1075         default:
1076             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1077             return false;
1078     }
1079     m_state = ePS_WaitingForStream;
1080     #ifdef DEBUG
1081     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1082         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1083         dumpInfo();
1084     }
1085     #endif
1086     return true;
1087 }
1088
1089 /**
1090  * @brief Enter the ePS_DryRunning state
1091  * @return true if successful, false if not
1092  *
1093  * @pre
1094  *
1095  * @post
1096  *
1097  */
1098 bool
1099 StreamProcessor::doDryRunning()
1100 {
1101     bool result = true;
1102     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1103     switch(m_state) {
1104         case ePS_WaitingForStream:
1105             // a running stream has been detected
1106             debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started dry-running at cycle %d\n", this, m_last_cycle);
1107             if (getType() == ePT_Receive) {
1108                 // this to ensure that there is no discontinuity when starting to
1109                 // update the DLL based upon the received packets
1110                 m_data_buffer->setBufferTailTimestamp(m_last_timestamp);
1111             } else {
1112                 // FIXME: PC=master mode will have to do something here I guess...
1113             }
1114             break;
1115         case ePS_WaitingForStreamDisable:
1116             result &= m_data_buffer->clearBuffer(); // FIXME: don't like the reset() name
1117             m_data_buffer->setTransparent(true);
1118             break;
1119         default:
1120             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1121             return false;
1122     }
1123     m_state = ePS_DryRunning;
1124     #ifdef DEBUG
1125     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1126         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1127         dumpInfo();
1128     }
1129     #endif
1130     return result;
1131 }
1132
1133 /**
1134  * @brief Enter the ePS_WaitingForStreamEnable state
1135  * @return true if successful, false if not
1136  *
1137  * @pre
1138  *
1139  * @post
1140  *
1141  */
1142 bool
1143 StreamProcessor::doWaitForStreamEnable()
1144 {
1145     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1146     unsigned int ringbuffer_size_frames;
1147     switch(m_state) {
1148         case ePS_DryRunning:
1149             // we have to start waiting for an incoming stream
1150             // this basically means nothing, the state change will
1151             // be picked up by the packet iterator
1152
1153             if(!m_data_buffer->clearBuffer()) { // FIXME: don't like the reset() name
1154                 debugError("Could not reset data buffer\n");
1155                 return false;
1156             }
1157             if (getType() == ePT_Transmit) {
1158                 ringbuffer_size_frames = m_manager->getNbBuffers() * m_manager->getPeriodSize();
1159                 debugOutput(DEBUG_LEVEL_VERBOSE, "Prefill transmit SP %p with %u frames\n", this, ringbuffer_size_frames);
1160                 // prefill the buffer
1161                 if(!transferSilence(ringbuffer_size_frames)) {
1162                     debugFatal("Could not prefill transmit stream\n");
1163                     return false;
1164                 }
1165             }
1166
1167             break;
1168         default:
1169             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1170             return false;
1171     }
1172     m_state = ePS_WaitingForStreamEnable;
1173     #ifdef DEBUG
1174     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1175         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1176         dumpInfo();
1177     }
1178     #endif
1179     return true;
1180 }
1181
1182 /**
1183  * @brief Enter the ePS_Running state
1184  * @return true if successful, false if not
1185  *
1186  * @pre
1187  *
1188  * @post
1189  *
1190  */
1191 bool
1192 StreamProcessor::doRunning()
1193 {
1194     bool result = true;
1195     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1196     switch(m_state) {
1197         case ePS_WaitingForStreamEnable:
1198             // a running stream has been detected
1199             debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started running at cycle %d\n",
1200                                              this, m_last_cycle);
1201             m_in_xrun = false;
1202             m_data_buffer->setTransparent(false);
1203             break;
1204         default:
1205             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1206             return false;
1207     }
1208     m_state = ePS_Running;
1209     #ifdef DEBUG
1210     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1211         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1212         dumpInfo();
1213     }
1214     #endif
1215     return result;
1216 }
1217
1218 /**
1219  * @brief Enter the ePS_WaitingForStreamDisable state
1220  * @return true if successful, false if not
1221  *
1222  * @pre
1223  *
1224  * @post
1225  *
1226  */
1227 bool
1228 StreamProcessor::doWaitForStreamDisable()
1229 {
1230     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1231     switch(m_state) {
1232         case ePS_Running:
1233             // the thread will do the transition
1234             break;
1235         default:
1236             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1237             return false;
1238     }
1239     m_state = ePS_WaitingForStreamDisable;
1240     #ifdef DEBUG
1241     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1242         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1243         dumpInfo();
1244     }
1245     #endif
1246     return true;
1247 }
1248
1249 /**
1250  * @brief Updates the state machine and calls the necessary transition functions
1251  * @return true if successful, false if not
1252  */
1253 bool StreamProcessor::updateState() {
1254     bool result = false;
1255     // copy the current state locally since it could change value,
1256     // and that's something we don't want to happen inbetween tests
1257     // if m_next_state changes during this routine, we know for sure
1258     // that the previous state change was at least attempted correctly.
1259     enum eProcessorState next_state = m_next_state;
1260
1261     debugOutput(DEBUG_LEVEL_VERBOSE, "Do state transition: %s => %s\n",
1262         ePSToString(m_state), ePSToString(next_state));
1263
1264     if (m_state == next_state) {
1265         debugWarning("ignoring identity state update from/to %s\n", ePSToString(m_state) );
1266         return true;
1267     }
1268
1269     // after creation, only initialization is allowed
1270     if (m_state == ePS_Created) {
1271         if(next_state != ePS_Stopped) {
1272             goto updateState_exit_with_error;
1273         }
1274         // do init here
1275         result = doStop();
1276         if (result) return true;
1277         else goto updateState_exit_change_failed;
1278     }
1279
1280     // after initialization, only WaitingForRunningStream is allowed
1281     if (m_state == ePS_Stopped) {
1282         if(next_state != ePS_WaitingForStream) {
1283             goto updateState_exit_with_error;
1284         }
1285         result = doWaitForRunningStream();
1286         if (result) return true;
1287         else goto updateState_exit_change_failed;
1288     }
1289
1290     // after WaitingForStream, only ePS_DryRunning is allowed
1291     // this means that the stream started running
1292     if (m_state == ePS_WaitingForStream) {
1293         if(next_state != ePS_DryRunning) {
1294             goto updateState_exit_with_error;
1295         }
1296         result = doDryRunning();
1297         if (result) return true;
1298         else goto updateState_exit_change_failed;
1299     }
1300
1301     // from ePS_DryRunning we can go to:
1302     //   - ePS_Stopped if something went wrong during DryRunning
1303     //   - ePS_WaitingForStreamEnable if there is a requested to enable
1304     if (m_state == ePS_DryRunning) {
1305         if((next_state != ePS_Stopped) &&
1306            (next_state != ePS_WaitingForStreamEnable)) {
1307             goto updateState_exit_with_error;
1308         }
1309         if (next_state == ePS_Stopped) {
1310             result = doStop();
1311         } else {
1312             result = doWaitForStreamEnable();
1313         }
1314         if (result) return true;
1315         else goto updateState_exit_change_failed;
1316     }
1317
1318     // from ePS_WaitingForStreamEnable we can go to:
1319     //   - ePS_DryRunning if something went wrong while waiting
1320     //   - ePS_Running if the stream enabled correctly
1321     if (m_state == ePS_WaitingForStreamEnable) {
1322         if((next_state != ePS_DryRunning) &&
1323            (next_state != ePS_Running)) {
1324             goto updateState_exit_with_error;
1325         }
1326         if (next_state == ePS_Stopped) {
1327             result = doDryRunning();
1328         } else {
1329             result = doRunning();
1330         }
1331         if (result) return true;
1332         else goto updateState_exit_change_failed;
1333     }
1334
1335     // from ePS_Running we can only start waiting for a disabled stream
1336     if (m_state == ePS_Running) {
1337         if(next_state != ePS_WaitingForStreamDisable) {
1338             goto updateState_exit_with_error;
1339         }
1340         result = doWaitForStreamDisable();
1341         if (result) return true;
1342         else goto updateState_exit_change_failed;
1343     }
1344
1345     // from ePS_WaitingForStreamDisable we can go to DryRunning
1346     if (m_state == ePS_WaitingForStreamDisable) {
1347         if(next_state != ePS_DryRunning) {
1348             goto updateState_exit_with_error;
1349         }
1350         result = doDryRunning();
1351         if (result) return true;
1352         else goto updateState_exit_change_failed;
1353     }
1354
1355     // if we arrive here there is an error
1356 updateState_exit_with_error:
1357     debugError("Invalid state transition: %s => %s\n",
1358         ePSToString(m_state), ePSToString(next_state));
1359     return false;
1360 updateState_exit_change_failed:
1361     debugError("State transition failed: %s => %s\n",
1362         ePSToString(m_state), ePSToString(next_state));
1363     return false;
1364 }
1365
1366 /***********************************************
1367  * Helper routines                             *
1368  ***********************************************/
1369 bool
1370 StreamProcessor::transferSilence(unsigned int nframes)
1371 {
1372     bool retval;
1373     signed int fc;
1374     ffado_timestamp_t ts_tail_tmp;
1375
1376     // prepare a buffer of silence
1377     char *dummybuffer = (char *)calloc(sizeof(quadlet_t), nframes * getEventsPerFrame());
1378     transmitSilenceBlock(dummybuffer, nframes, 0);
1379
1380     m_data_buffer->getBufferTailTimestamp(&ts_tail_tmp, &fc);
1381     if (fc != 0) {
1382         debugWarning("Prefilling a buffer that already contains %d frames\n", fc);
1383     }
1384
1385     // add the silence data to the ringbuffer
1386     if(m_data_buffer->preloadFrames(nframes, dummybuffer, true)) {
1387         retval = true;
1388     } else {
1389         debugWarning("Could not write to event buffer\n");
1390         retval = false;
1391     }
1392     free(dummybuffer);
1393     return retval;
1394 }
1395
1396 /**
1397  * @brief convert a eProcessorState to a string
1398  * @param s the state
1399  * @return a char * describing the state
1400  */
1401 const char *
1402 StreamProcessor::ePSToString(enum eProcessorState s) {
1403     switch (s) {
1404         case ePS_Invalid: return "ePS_Invalid";
1405         case ePS_Created: return "ePS_Created";
1406         case ePS_Stopped: return "ePS_Stopped";
1407         case ePS_WaitingForStream: return "ePS_WaitingForStream";
1408         case ePS_DryRunning: return "ePS_DryRunning";
1409         case ePS_WaitingForStreamEnable: return "ePS_WaitingForStreamEnable";
1410         case ePS_Running: return "ePS_Running";
1411         case ePS_WaitingForStreamDisable: return "ePS_WaitingForStreamDisable";
1412         default: return "error: unknown state";
1413     }
1414 }
1415
1416 /**
1417  * @brief convert a eProcessorType to a string
1418  * @param t the type
1419  * @return a char * describing the state
1420  */
1421 const char *
1422 StreamProcessor::ePTToString(enum eProcessorType t) {
1423     switch (t) {
1424         case ePT_Receive: return "Receive";
1425         case ePT_Transmit: return "Transmit";
1426         default: return "error: unknown type";
1427     }
1428 }
1429
1430 /***********************************************
1431  * Debug                                       *
1432  ***********************************************/
1433 void
1434 StreamProcessor::dumpInfo()
1435 {
1436     debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor %p information\n", this);
1437     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Iso stream info:\n");
1438
1439     IsoStream::dumpInfo();
1440     debugOutputShort( DEBUG_LEVEL_NORMAL, "  StreamProcessor info:\n");
1441     if (m_handler) {
1442         uint64_t now = m_handler->getCycleTimerTicks();
1443         debugOutputShort( DEBUG_LEVEL_NORMAL, "  Now                   : %011llu (%03us %04uc %04ut)\n",
1444                           now,
1445                           (unsigned int)TICKS_TO_SECS(now),
1446                           (unsigned int)TICKS_TO_CYCLES(now),
1447                           (unsigned int)TICKS_TO_OFFSET(now));
1448     }
1449     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Xruns                 : %s\n", (m_in_xrun ? "True":"False"));
1450     debugOutputShort( DEBUG_LEVEL_NORMAL, "  State                 : %s\n", ePSToString(m_state));
1451     debugOutputShort( DEBUG_LEVEL_NORMAL, "   Next state           : %s\n", ePSToString(m_next_state));
1452     debugOutputShort( DEBUG_LEVEL_NORMAL, "    transition at       : %u\n", m_cycle_to_switch_state);
1453     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer                : %p\n", m_data_buffer);
1454     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Nominal framerate     : %u\n", m_manager->getNominalRate());
1455     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Device framerate      : Sync: %f, Buffer %f\n",
1456         24576000.0/m_manager->getSyncSource().m_data_buffer->getRate(),
1457         24576000.0/m_data_buffer->getRate()
1458         );
1459
1460     m_data_buffer->dumpInfo();
1461 }
1462
1463 void
1464 StreamProcessor::setVerboseLevel(int l) {
1465     setDebugLevel(l);
1466     IsoStream::setVerboseLevel(l);
1467     PortManager::setVerboseLevel(l);
1468     m_data_buffer->setVerboseLevel(l);
1469 }
1470
1471 } // end of namespace
Note: See TracBrowser for help on using the browser.