root/branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.cpp

Revision 733, 56.9 kB (checked in by ppalmers, 13 years ago)

adapt motu code to new SP base class (compiles, needs real testing)

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public
11  * License version 2.1, as published by the Free Software Foundation;
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public
19  * License along with this library; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
21  * MA 02110-1301 USA
22  */
23
24 #include "StreamProcessor.h"
25 #include "../util/cycletimer.h"
26 #include "../StreamProcessorManager.h"
27
28 #include "libutil/Atomic.h"
29
30 #include <assert.h>
31 #include <math.h>
32
33 namespace Streaming {
34
35 IMPL_DEBUG_MODULE( StreamProcessor, StreamProcessor, DEBUG_LEVEL_VERBOSE );
36
37 StreamProcessor::StreamProcessor(enum eProcessorType type, int port)
38     : IsoStream((type==ePT_Receive ? IsoStream::eST_Receive : IsoStream::eST_Transmit), port)
39     , m_processor_type ( type )
40     , m_state( ePS_Created )
41     , m_next_state( ePS_Invalid )
42     , m_cycle_to_switch_state( 0 )
43     , m_scratch_buffer( NULL )
44     , m_scratch_buffer_size_bytes( 0 )
45     , m_manager( NULL )
46     , m_ticks_per_frame( 0 )
47     , m_last_cycle( -1 )
48     , m_sync_delay( 0 )
49     , m_in_xrun( false )
50     , m_last_timestamp(0)
51     , m_last_timestamp2(0)
52     , m_dropped(0)
53 {
54     // create the timestamped buffer and register ourselves as its client
55     m_data_buffer = new Util::TimestampedBuffer(this);
56 }
57
58 StreamProcessor::~StreamProcessor() {
59     if (m_data_buffer) delete m_data_buffer;
60     if (m_scratch_buffer) delete[] m_scratch_buffer;
61 }
62
63 uint64_t StreamProcessor::getTimeNow() {
64     return m_handler->getCycleTimerTicks();
65 }
66
67 int StreamProcessor::getMaxFrameLatency() {
68     if (getType() == ePT_Receive) {
69         return (int)(m_handler->getWakeupInterval() * TICKS_PER_CYCLE);
70     } else {
71         return (int)(m_handler->getWakeupInterval() * TICKS_PER_CYCLE);
72     }
73 }
74
75 unsigned int
76 StreamProcessor::getNominalPacketsNeeded(unsigned int nframes)
77 {
78     unsigned int nominal_frames_per_second = m_manager->getNominalRate();
79     uint64_t nominal_ticks_per_frame = TICKS_PER_SECOND / nominal_frames_per_second;
80     uint64_t nominal_ticks = nominal_ticks_per_frame * nframes;
81     uint64_t nominal_packets = nominal_ticks / TICKS_PER_CYCLE;
82     return nominal_packets;
83 }
84
85 /***********************************************
86  * Buffer management and manipulation          *
87  ***********************************************/
88 int StreamProcessor::getBufferFill() {
89     return m_data_buffer->getBufferFill();
90 }
91
92 int64_t
93 StreamProcessor::getTimeUntilNextPeriodSignalUsecs()
94 {
95     uint64_t time_at_period=getTimeAtPeriod();
96
97     // we delay the period signal with the sync delay
98     // this makes that the period signals lag a little compared to reality
99     // ISO buffering causes the packets to be received at max
100     // m_handler->getWakeupInterval() later than the time they were received.
101     // hence their payload is available this amount of time later. However, the
102     // period boundary is predicted based upon earlier samples, and therefore can
103     // pass before these packets are processed. Adding this extra term makes that
104     // the period boundary is signalled later
105     time_at_period = addTicks(time_at_period, m_manager->getSyncSource().getSyncDelay());
106
107     uint64_t cycle_timer=m_handler->getCycleTimerTicks();
108
109     // calculate the time until the next period
110     int32_t until_next=diffTicks(time_at_period,cycle_timer);
111
112     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> TAP=%11llu, CTR=%11llu, UTN=%11ld\n",
113         time_at_period, cycle_timer, until_next
114         );
115
116     // now convert to usecs
117     // don't use the mapping function because it only works
118     // for absolute times, not the relative time we are
119     // using here (which can also be negative).
120     return (int64_t)(((float)until_next) / TICKS_PER_USEC);
121 }
122
123 void
124 StreamProcessor::setSyncDelay(int d) {
125     debugOutput(DEBUG_LEVEL_VERBOSE, "Setting SP %p SyncDelay to %d ticks\n", this, d);
126     m_sync_delay = d;
127 }
128
129 uint64_t
130 StreamProcessor::getTimeAtPeriodUsecs()
131 {
132     return (uint64_t)((float)getTimeAtPeriod() * TICKS_PER_USEC);
133 }
134
135 uint64_t
136 StreamProcessor::getTimeAtPeriod()
137 {
138     if (getType() == ePT_Receive) {
139         ffado_timestamp_t next_period_boundary=m_data_buffer->getTimestampFromHead(m_manager->getPeriodSize());
140    
141         #ifdef DEBUG
142         ffado_timestamp_t ts;
143         signed int fc;
144         m_data_buffer->getBufferTailTimestamp(&ts,&fc);
145    
146         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD="TIMESTAMP_FORMAT_SPEC", LTS="TIMESTAMP_FORMAT_SPEC", FC=%5u, TPF=%f\n",
147             next_period_boundary, ts, fc, getTicksPerFrame()
148             );
149         #endif
150         return (uint64_t)next_period_boundary;
151     } else {
152         ffado_timestamp_t next_period_boundary=m_data_buffer->getTimestampFromTail((m_manager->getNbBuffers()-1) * m_manager->getPeriodSize());
153    
154         #ifdef DEBUG
155         ffado_timestamp_t ts;
156         signed int fc;
157         m_data_buffer->getBufferTailTimestamp(&ts,&fc);
158    
159         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD="TIMESTAMP_FORMAT_SPEC", LTS="TIMESTAMP_FORMAT_SPEC", FC=%5u, TPF=%f\n",
160             next_period_boundary, ts, fc, getTicksPerFrame()
161             );
162         #endif
163         return (uint64_t)next_period_boundary;
164     }
165 }
166
167 float
168 StreamProcessor::getTicksPerFrame()
169 {
170     assert(m_data_buffer != NULL);
171     return m_data_buffer->getRate();
172 }
173
174 bool
175 StreamProcessor::canClientTransferFrames(unsigned int nbframes)
176 {
177     bool can_transfer;
178     unsigned int fc = m_data_buffer->getFrameCounter();
179     if (getType() == ePT_Receive) {
180         can_transfer = (fc >= nbframes);
181     } else {
182         // there has to be enough space to put the frames in
183         can_transfer = m_data_buffer->getBufferSize() - fc > nbframes;
184         // or the buffer is transparent
185         can_transfer |= m_data_buffer->isTransparent();
186     }
187    
188     #ifdef DEBUG
189     if (!can_transfer) {
190         debugWarning("(%p, %s) cannot transfer since fc == %u, nbframes == %u\n",
191             this, ePTToString(getType()), fc, nbframes);
192     }
193     #endif
194    
195     return can_transfer;
196 }
197
198 /***********************************************
199  * I/O API                                     *
200  ***********************************************/
201
202 // Packet transfer API
203 enum raw1394_iso_disposition
204 StreamProcessor::putPacket(unsigned char *data, unsigned int length,
205                            unsigned char channel, unsigned char tag, unsigned char sy,
206                            unsigned int cycle, unsigned int dropped) {
207     if(m_last_cycle == -1) {
208         debugOutput(DEBUG_LEVEL_VERBOSE, "Handler for %s SP %p is alive (cycle = %u)\n", getTypeString(), this, cycle);
209     }
210
211     int dropped_cycles = 0;
212     if (m_last_cycle != (int)cycle && m_last_cycle != -1) {
213         dropped_cycles = diffCycles(cycle, m_last_cycle) - 1;
214         if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles);
215         if (dropped_cycles > 0) {
216             debugWarning("(%p) dropped %d packets on cycle %u\n", this, dropped_cycles, cycle);
217             m_dropped += dropped_cycles;
218         }
219     }
220     m_last_cycle = cycle;
221
222     // bypass based upon state
223     if (m_state == ePS_Invalid) {
224         debugError("Should not have state %s\n", ePSToString(m_state) );
225         return RAW1394_ISO_ERROR;
226     }
227     if (m_state == ePS_Created) {
228         return RAW1394_ISO_DEFER;
229     }
230
231     // store the previous timestamp
232     m_last_timestamp2 = m_last_timestamp;
233
234     // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles)
235     //       it happens on the first 'good' cycle for the wait condition
236     //       or on the first received cycle that is received afterwards (might be a problem)
237
238     // check whether we are waiting for a stream to be disabled
239     if(m_state == ePS_WaitingForStreamDisable) {
240         // we then check whether we have to switch on this cycle
241         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
242             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n");
243             m_next_state = ePS_DryRunning;
244             if (!updateState()) { // we are allowed to change the state directly
245                 debugError("Could not update state!\n");
246                 return RAW1394_ISO_ERROR;
247             }
248         } else {
249             // not time to disable yet
250         }
251         // the received data can be discarded while waiting for the stream
252         // to be disabled
253         return RAW1394_ISO_OK;
254     }
255
256     // check whether we are waiting for a stream to be enabled
257     else if(m_state == ePS_WaitingForStreamEnable) {
258         // we then check whether we have to switch on this cycle
259         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
260             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n");
261             m_next_state = ePS_Running;
262             if (!updateState()) { // we are allowed to change the state directly
263                 debugError("Could not update state!\n");
264                 return RAW1394_ISO_ERROR;
265             }
266         } else {
267             // not time to enable yet
268         }
269         // we are dryRunning hence data should be processed in any case
270     }
271
272     // check the packet header
273     enum eChildReturnValue result = processPacketHeader(data, length, channel, tag, sy, cycle, dropped_cycles);
274     if (result == eCRV_OK) {
275         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "RECV: CY=%04u TS=%011llu\n",
276                 cycle, m_last_timestamp);
277         // update some accounting
278         m_last_good_cycle = cycle;
279         m_last_dropped = dropped_cycles;
280
281         // check whether we are waiting for a stream to startup
282         // this requires that the packet is good
283         if(m_state == ePS_WaitingForStream) {
284             // since we have a packet with an OK header,
285             // we can indicate that the stream started up
286
287             // we then check whether we have to switch on this cycle
288             if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
289                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to good packet\n");
290                 // hence go to the dryRunning state
291                 m_next_state = ePS_DryRunning;
292                 if (!updateState()) { // we are allowed to change the state directly
293                     debugError("Could not update state!\n");
294                     return RAW1394_ISO_ERROR;
295                 }
296             } else {
297                 // not time (yet) to switch state
298             }
299             // in both cases we don't want to process the data
300             return RAW1394_ISO_OK;
301         }
302
303         // check whether a state change has been requested
304         // note that only the wait state changes are synchronized with the cycles
305         else if(m_state != m_next_state) {
306             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
307                                              ePSToString(m_state), ePSToString(m_next_state));
308             // execute the requested change
309             if (!updateState()) { // we are allowed to change the state directly
310                 debugError("Could not update state!\n");
311                 return RAW1394_ISO_ERROR;
312             }
313         }
314
315         // handle dropped cycles
316         if(dropped_cycles) {
317             // they represent a discontinuity in the timestamps, and hence are
318             // to be dealt with
319             debugWarning("(%p) Correcting timestamp for dropped cycles, discarding packet...\n", this);
320             m_data_buffer->setBufferTailTimestamp(m_last_timestamp);
321             if (m_state == ePS_Running) {
322                 // this is an xrun situation
323                 m_in_xrun = true;
324                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to dropped packet xrun\n");
325                 m_cycle_to_switch_state = cycle + 1; // switch in the next cycle
326                 m_next_state = ePS_WaitingForStreamDisable;
327                 // execute the requested change
328                 if (!updateState()) { // we are allowed to change the state directly
329                     debugError("Could not update state!\n");
330                     return RAW1394_ISO_ERROR;
331                 }
332                 return RAW1394_ISO_DEFER;
333             }
334         }
335
336         // for all states that reach this we are allowed to
337         // do protocol specific data reception
338         enum eChildReturnValue result2 = processPacketData(data, length, channel, tag, sy, cycle, dropped_cycles);
339
340         // if an xrun occured, switch to the dryRunning state and
341         // allow for the xrun to be picked up
342         if (result2 == eCRV_XRun) {
343             m_in_xrun = true;
344             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
345             m_cycle_to_switch_state = cycle+1; // switch in the next cycle
346             m_next_state = ePS_WaitingForStreamDisable;
347             // execute the requested change
348             if (!updateState()) { // we are allowed to change the state directly
349                 debugError("Could not update state!\n");
350                 return RAW1394_ISO_ERROR;
351             }
352             return RAW1394_ISO_DEFER;
353         } else if(result2 == eCRV_OK) {
354             // no problem here
355             return RAW1394_ISO_OK;
356         } else {
357             debugError("Invalid response\n");
358             return RAW1394_ISO_ERROR;
359         }
360     } else if(result == eCRV_Invalid) {
361         // apparently we don't have to do anything when the packets are not valid
362         return RAW1394_ISO_OK;
363     } else {
364         debugError("Invalid response\n");
365         return RAW1394_ISO_ERROR;
366     }
367     debugError("reached the unreachable\n");
368     return RAW1394_ISO_ERROR;
369 }
370
371 enum raw1394_iso_disposition
372 StreamProcessor::getPacket(unsigned char *data, unsigned int *length,
373                            unsigned char *tag, unsigned char *sy,
374                            int cycle, unsigned int dropped, unsigned int max_length) {
375     if (cycle<0) {
376         *tag = 0;
377         *sy = 0;
378         *length = 0;
379         return RAW1394_ISO_OK;
380     }
381
382     if(m_last_cycle == -1) {
383         debugOutput(DEBUG_LEVEL_VERBOSE, "Handler for %s SP %p is alive (cycle = %d)\n", getTypeString(), this, cycle);
384     }
385
386     int dropped_cycles = 0;
387     if (m_last_cycle != cycle && m_last_cycle != -1) {
388         dropped_cycles = diffCycles(cycle, m_last_cycle) - 1;
389         if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles);
390         if (dropped_cycles > 0) {
391             debugWarning("(%p) dropped %d packets on cycle %u (last_cycle=%u, dropped=%d)\n", this, dropped_cycles, cycle, m_last_cycle, dropped);
392             m_dropped += dropped_cycles;
393         }
394     }
395     if (cycle >= 0) {
396         m_last_cycle = cycle;
397     }
398
399     // bypass based upon state
400     if (m_state == ePS_Invalid) {
401         debugError("Should not have state %s\n", ePSToString(m_state) );
402         return RAW1394_ISO_ERROR;
403     }
404     if (m_state == ePS_Created) {
405         *tag = 0;
406         *sy = 0;
407         *length = 0;
408         return RAW1394_ISO_DEFER;
409     }
410
411     // normal processing
412     // note that we can't use getCycleTimer directly here,
413     // because packets are queued in advance. This means that
414     // we the packet we are constructing will be sent out
415     // on 'cycle', not 'now'.
416     unsigned int ctr = m_handler->getCycleTimer();
417     int now_cycles = (int)CYCLE_TIMER_GET_CYCLES(ctr);
418
419     // the difference between the cycle this
420     // packet is intended for and 'now'
421     int cycle_diff = diffCycles(cycle, now_cycles);
422
423     #ifdef DEBUG
424     if(cycle_diff < 0) {
425         debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy)\n",
426             cycle, now_cycles);
427     }
428     #endif
429
430     // store the previous timestamp
431     m_last_timestamp2 = m_last_timestamp;
432
433     // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles)
434     //       it happens on the first 'good' cycle for the wait condition
435     //       or on the first received cycle that is received afterwards (might be a problem)
436
437     // check whether we are waiting for a stream to be disabled
438     if(m_state == ePS_WaitingForStreamDisable) {
439         // we then check whether we have to switch on this cycle
440         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
441             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n");
442             m_next_state = ePS_DryRunning;
443             if (!updateState()) { // we are allowed to change the state directly
444                 debugError("Could not update state!\n");
445                 return RAW1394_ISO_ERROR;
446             }
447         } else {
448             // not time to disable yet
449         }
450     }
451     // check whether we are waiting for a stream to be enabled
452     else if(m_state == ePS_WaitingForStreamEnable) {
453         // we then check whether we have to switch on this cycle
454         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
455             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n");
456             m_next_state = ePS_Running;
457             if (!updateState()) { // we are allowed to change the state directly
458                 debugError("Could not update state!\n");
459                 return RAW1394_ISO_ERROR;
460             }
461         } else {
462             // not time to enable yet
463         }
464         // we are dryRunning hence data should be processed in any case
465     }
466     // check whether we are waiting for a stream to startup
467     else if(m_state == ePS_WaitingForStream) {
468         // as long as the cycle parameter is not in sync with
469         // the current time, the stream is considered not
470         // to be 'running'
471         // we then check whether we have to switch on this cycle
472         if ((cycle_diff >= 0) && (diffCycles(cycle, m_cycle_to_switch_state) >= 0)) {
473             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to good packet\n");
474             // hence go to the dryRunning state
475             m_next_state = ePS_DryRunning;
476             if (!updateState()) { // we are allowed to change the state directly
477                 debugError("Could not update state!\n");
478                 return RAW1394_ISO_ERROR;
479             }
480         } else {
481             // not time (yet) to switch state
482         }
483     }
484     else if(m_state == ePS_Running) {
485         // check the packet header
486         enum eChildReturnValue result = generatePacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
487         if (result == eCRV_Packet) {
488             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT: CY=%04u TS=%011llu\n",
489                     cycle, m_last_timestamp);
490             // update some accounting
491             m_last_good_cycle = cycle;
492             m_last_dropped = dropped_cycles;
493
494             // check whether a state change has been requested
495             // note that only the wait state changes are synchronized with the cycles
496             if(m_state != m_next_state) {
497                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
498                                                 ePSToString(m_state), ePSToString(m_next_state));
499                 // execute the requested change
500                 if (!updateState()) { // we are allowed to change the state directly
501                     debugError("Could not update state!\n");
502                     return RAW1394_ISO_ERROR;
503                 }
504             }
505
506             enum eChildReturnValue result2 = generatePacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
507             // if an xrun occured, switch to the dryRunning state and
508             // allow for the xrun to be picked up
509             if (result2 == eCRV_XRun) {
510                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
511                 m_in_xrun = true;
512                 m_cycle_to_switch_state = cycle+1; // switch in the next cycle
513                 m_next_state = ePS_WaitingForStreamDisable;
514                 // execute the requested change
515                 if (!updateState()) { // we are allowed to change the state directly
516                     debugError("Could not update state!\n");
517                     return RAW1394_ISO_ERROR;
518                 }
519                 goto send_empty_packet;
520             }
521             return RAW1394_ISO_OK;
522         } else if (result == eCRV_XRun) { // pick up the possible xruns
523             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to header xrun\n");
524             m_in_xrun = true;
525             m_cycle_to_switch_state = cycle+1; // switch in the next cycle
526             m_next_state = ePS_WaitingForStreamDisable;
527             // execute the requested change
528             if (!updateState()) { // we are allowed to change the state directly
529                 debugError("Could not update state!\n");
530                 return RAW1394_ISO_ERROR;
531             }
532         } else if ((result == eCRV_EmptyPacket) || (result == eCRV_Again)) {
533             if(m_state != m_next_state) {
534                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
535                                                 ePSToString(m_state), ePSToString(m_next_state));
536                 // execute the requested change
537                 if (!updateState()) { // we are allowed to change the state directly
538                     debugError("Could not update state!\n");
539                     return RAW1394_ISO_ERROR;
540                 }
541             }
542             goto send_empty_packet;
543 //         } else if (result == eCRV_Again) {
544 //             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "have to retry cycle %d\n", cycle);
545 //             if(m_state != m_next_state) {
546 //                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
547 //                                                 ePSToString(m_state), ePSToString(m_next_state));
548 //                 // execute the requested change
549 //                 if (!updateState()) { // we are allowed to change the state directly
550 //                     debugError("Could not update state!\n");
551 //                     return RAW1394_ISO_ERROR;
552 //                 }
553 //             }
554             // force some delay
555 //             usleep(125);
556 //             return RAW1394_ISO_AGAIN;
557         } else {
558             debugError("Invalid return value: %d\n", result);
559             return RAW1394_ISO_ERROR;
560         }
561     }
562     // we are not running, so send an empty packet
563     // we should generate a valid packet any time
564 send_empty_packet:
565     // note that only the wait state changes are synchronized with the cycles
566     if(m_state != m_next_state) {
567         debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
568                                         ePSToString(m_state), ePSToString(m_next_state));
569         // execute the requested change
570         if (!updateState()) { // we are allowed to change the state directly
571             debugError("Could not update state!\n");
572             return RAW1394_ISO_ERROR;
573         }
574     }
575
576     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT EMPTY: CY=%04u\n", cycle);
577     generateSilentPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
578     generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
579     return RAW1394_ISO_DEFER;
580 }
581
582
583 // Frame Transfer API
584 /**
585  * Transfer a block of frames from the event buffer to the port buffers
586  * @param nbframes number of frames to transfer
587  * @param ts the timestamp that the LAST frame in the block should have
588  * @return
589  */
590 bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) {
591     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.getFrames(%d, %11llu)", nbframes, ts);
592     assert( getType() == ePT_Receive );
593     if(isDryRunning()) return getFramesDry(nbframes, ts);
594     else return getFramesWet(nbframes, ts);
595 }
596
597 bool StreamProcessor::getFramesWet(unsigned int nbframes, int64_t ts) {
598 // FIXME: this should be done somewhere else
599 #ifdef DEBUG
600     uint64_t ts_expected;
601     signed int fc;
602     int32_t lag_ticks;
603     float lag_frames;
604
605     // in order to sync up multiple received streams, we should
606     // use the ts parameter. It specifies the time of the block's
607     // last sample.
608     float srate = m_manager->getSyncSource().getTicksPerFrame();
609     assert(srate != 0.0);
610     int64_t this_block_length_in_ticks = (int64_t)(((float)nbframes) * srate);
611
612     ffado_timestamp_t ts_head_tmp;
613     m_data_buffer->getBufferHeadTimestamp(&ts_head_tmp, &fc);
614     ts_expected = addTicks((uint64_t)ts_head_tmp, this_block_length_in_ticks);
615
616     lag_ticks = diffTicks(ts, ts_expected);
617     lag_frames = (((float)lag_ticks) / srate);
618     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): drifts %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
619                  this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
620     if (lag_frames >= 1.0) {
621         // the stream lags
622         debugWarning( "stream (%p): lags  with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
623                       this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
624     } else if (lag_frames <= -1.0) {
625         // the stream leads
626         debugWarning( "stream (%p): leads with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
627                       this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
628     }
629 #endif
630     // ask the buffer to process nbframes of frames
631     // using it's registered client's processReadBlock(),
632     // which should be ours
633     m_data_buffer->blockProcessReadFrames(nbframes);
634     return true;
635 }
636
637 bool StreamProcessor::getFramesDry(unsigned int nbframes, int64_t ts)
638 {
639     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): dry run %d frames (@ ts=%lld)\n",
640                  this, nbframes, ts);
641     // dry run on this side means that we put silence in all enabled ports
642     // since there is do data put into the ringbuffer in the dry-running state
643     return provideSilenceBlock(nbframes, 0);
644 }
645
646 bool
647 StreamProcessor::dropFrames(unsigned int nbframes, int64_t ts)
648 {
649     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d, %lld)\n", nbframes, ts);
650     return m_data_buffer->dropFrames(nbframes);
651 }
652
653 bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts)
654 {
655     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.putFrames(%d, %11llu)", nbframes, ts);
656     assert( getType() == ePT_Transmit );
657     if(isDryRunning()) return putFramesDry(nbframes, ts);
658     else return putFramesWet(nbframes, ts);
659 }
660
661 bool
662 StreamProcessor::putFramesWet(unsigned int nbframes, int64_t ts)
663 {
664     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesWet(%d, %llu)\n", nbframes, ts);
665     // transfer the data
666     m_data_buffer->blockProcessWriteFrames(nbframes, ts);
667     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, " New timestamp: %llu\n", ts);
668     return true; // FIXME: what about failure?
669 }
670
671 bool
672 StreamProcessor::putFramesDry(unsigned int nbframes, int64_t ts)
673 {
674     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesDry(%d, %llu)\n", nbframes, ts);
675     // do nothing
676     return true;
677 }
678
679 bool
680 StreamProcessor::putSilenceFrames(unsigned int nbframes, int64_t ts)
681 {
682     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putSilenceFrames(%d, %llu)\n", nbframes, ts);
683
684     size_t bytes_per_frame = getEventSize() * getEventsPerFrame();
685     unsigned int scratch_buffer_size_frames = m_scratch_buffer_size_bytes / bytes_per_frame;
686
687     if (nbframes > scratch_buffer_size_frames) {
688         debugError("nframes (%u) > scratch_buffer_size_frames (%u)\n",
689                    nbframes, scratch_buffer_size_frames);
690     }
691
692     assert(m_scratch_buffer);
693     if(!transmitSilenceBlock((char *)m_scratch_buffer, nbframes, 0)) {
694         debugError("Could not prepare silent block\n");
695         return false;
696     }
697     if(!m_data_buffer->writeFrames(nbframes, (char *)m_scratch_buffer, ts)) {
698         debugError("Could not write silent block\n");
699         return false;
700     }
701     return true;
702 }
703
704 bool
705 StreamProcessor::shiftStream(int nbframes)
706 {
707     if(nbframes == 0) return true;
708     if(nbframes > 0) {
709         return m_data_buffer->dropFrames(nbframes);
710     } else {
711         bool result = true;
712         while(nbframes++) {
713             result &= m_data_buffer->writeDummyFrame();
714         }
715         return result;
716     }
717 }
718
719 /**
720  * @brief write silence events to the stream ringbuffers.
721  */
722 bool StreamProcessor::provideSilenceBlock(unsigned int nevents, unsigned int offset)
723 {
724     bool no_problem=true;
725     for ( PortVectorIterator it = m_PeriodPorts.begin();
726           it != m_PeriodPorts.end();
727           ++it ) {
728         if((*it)->isDisabled()) {continue;};
729
730         //FIXME: make this into a static_cast when not DEBUG?
731         Port *port=dynamic_cast<Port *>(*it);
732
733         switch(port->getPortType()) {
734
735         case Port::E_Audio:
736             if(provideSilenceToPort(static_cast<AudioPort *>(*it), offset, nevents)) {
737                 debugWarning("Could not put silence into to port %s",(*it)->getName().c_str());
738                 no_problem=false;
739             }
740             break;
741         // midi is a packet based port, don't process
742         //    case MotuPortInfo::E_Midi:
743         //        break;
744
745         default: // ignore
746             break;
747         }
748     }
749     return no_problem;
750 }
751
752 int
753 StreamProcessor::provideSilenceToPort(
754                        AudioPort *p, unsigned int offset, unsigned int nevents)
755 {
756     unsigned int j=0;
757     switch(p->getDataType()) {
758         default:
759         case Port::E_Int24:
760             {
761                 quadlet_t *buffer=(quadlet_t *)(p->getBufferAddress());
762                 assert(nevents + offset <= p->getBufferSize());
763                 buffer+=offset;
764
765                 for(j = 0; j < nevents; j += 1) { // decode max nsamples
766                     *(buffer)=0;
767                     buffer++;
768                 }
769             }
770             break;
771         case Port::E_Float:
772             {
773                 float *buffer=(float *)(p->getBufferAddress());
774                 assert(nevents + offset <= p->getBufferSize());
775                 buffer+=offset;
776
777                 for(j = 0; j < nevents; j += 1) { // decode max nsamples
778                     *buffer = 0.0;
779                     buffer++;
780                 }
781             }
782             break;
783     }
784     return 0;
785 }
786
787 /***********************************************
788  * State related API                           *
789  ***********************************************/
790 bool StreamProcessor::init()
791 {
792     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "init...\n");
793
794     // initialization can be done without requesting it
795     // from the packet loop
796     m_next_state = ePS_Created;
797     return true;
798 }
799
800 bool StreamProcessor::prepare()
801 {
802     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare SP (%p)...\n", this);
803     if(!m_manager) {
804         debugFatal("Not attached to a manager!\n");
805         return false;
806     }
807
808     // make the scratch buffer one period of frames long
809     m_scratch_buffer_size_bytes = m_manager->getPeriodSize() * getEventsPerFrame() * getEventSize();
810     debugOutput( DEBUG_LEVEL_VERBOSE, " Allocate scratch buffer of %d quadlets\n");
811     if(m_scratch_buffer) delete[] m_scratch_buffer;
812     m_scratch_buffer = new byte_t[m_scratch_buffer_size_bytes];
813     if(m_scratch_buffer == NULL) {
814         debugFatal("Could not allocate scratch buffer\n");
815         return false;
816     }
817
818     if (!prepareChild()) {
819         debugFatal("Could not prepare child\n");
820         return false;
821     }
822
823     // initialization can be done without requesting it
824     // from the packet loop
825     m_next_state = ePS_Stopped;
826     return updateState();
827 }
828
829 bool
830 StreamProcessor::scheduleStateTransition(enum eProcessorState state, uint64_t time_instant)
831 {
832     // first set the time, since in the packet loop we first check m_state == m_next_state before
833     // using the time
834     m_cycle_to_switch_state = TICKS_TO_CYCLES(time_instant);
835     m_next_state = state;
836     return true;
837 }
838
839 bool
840 StreamProcessor::waitForState(enum eProcessorState state, unsigned int timeout_ms)
841 {
842     debugOutput(DEBUG_LEVEL_VERBOSE, "Waiting for state %s\n", ePSToString(state));
843     int cnt = timeout_ms;
844     while (m_state != state && cnt) {
845         usleep(1000);
846         cnt--;
847     }
848     if(cnt==0) {
849         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout\n");
850         return false;
851     }
852     return true;
853 }
854
855 bool StreamProcessor::scheduleStartDryRunning(int64_t t) {
856     uint64_t tx;
857     if (t < 0) {
858         tx = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
859     } else {
860         tx = t;
861     }
862     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
863     uint64_t now = m_handler->getCycleTimerTicks();
864     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
865                           now,
866                           (unsigned int)TICKS_TO_SECS(now),
867                           (unsigned int)TICKS_TO_CYCLES(now),
868                           (unsigned int)TICKS_TO_OFFSET(now));
869     debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%03us %04uc %04ut)\n",
870                           tx,
871                           (unsigned int)TICKS_TO_SECS(tx),
872                           (unsigned int)TICKS_TO_CYCLES(tx),
873                           (unsigned int)TICKS_TO_OFFSET(tx));
874     if (m_state == ePS_Stopped) {
875         return scheduleStateTransition(ePS_WaitingForStream, tx);
876     } else if (m_state == ePS_Running) {
877         return scheduleStateTransition(ePS_WaitingForStreamDisable, tx);
878     } else {
879         debugError("Cannot switch to ePS_DryRunning from %s\n", ePSToString(m_state));
880         return false;
881     }
882 }
883
884 bool StreamProcessor::scheduleStartRunning(int64_t t) {
885     uint64_t tx;
886     if (t < 0) {
887         tx = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
888     } else {
889         tx = t;
890     }
891     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
892     uint64_t now = m_handler->getCycleTimerTicks();
893     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
894                           now,
895                           (unsigned int)TICKS_TO_SECS(now),
896                           (unsigned int)TICKS_TO_CYCLES(now),
897                           (unsigned int)TICKS_TO_OFFSET(now));
898     debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%03us %04uc %04ut)\n",
899                           tx,
900                           (unsigned int)TICKS_TO_SECS(tx),
901                           (unsigned int)TICKS_TO_CYCLES(tx),
902                           (unsigned int)TICKS_TO_OFFSET(tx));
903     return scheduleStateTransition(ePS_WaitingForStreamEnable, tx);
904 }
905
906 bool StreamProcessor::scheduleStopDryRunning(int64_t t) {
907     uint64_t tx;
908     if (t < 0) {
909         tx = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
910     } else {
911         tx = t;
912     }
913     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
914     uint64_t now = m_handler->getCycleTimerTicks();
915     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
916                           now,
917                           (unsigned int)TICKS_TO_SECS(now),
918                           (unsigned int)TICKS_TO_CYCLES(now),
919                           (unsigned int)TICKS_TO_OFFSET(now));
920     debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at               : %011llu (%03us %04uc %04ut)\n",
921                           tx,
922                           (unsigned int)TICKS_TO_SECS(tx),
923                           (unsigned int)TICKS_TO_CYCLES(tx),
924                           (unsigned int)TICKS_TO_OFFSET(tx));
925     return scheduleStateTransition(ePS_Stopped, tx);
926 }
927
928 bool StreamProcessor::scheduleStopRunning(int64_t t) {
929     uint64_t tx;
930     if (t < 0) {
931         tx = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
932     } else {
933         tx = t;
934     }
935     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
936     uint64_t now = m_handler->getCycleTimerTicks();
937     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
938                           now,
939                           (unsigned int)TICKS_TO_SECS(now),
940                           (unsigned int)TICKS_TO_CYCLES(now),
941                           (unsigned int)TICKS_TO_OFFSET(now));
942     debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at               : %011llu (%03us %04uc %04ut)\n",
943                           tx,
944                           (unsigned int)TICKS_TO_SECS(tx),
945                           (unsigned int)TICKS_TO_CYCLES(tx),
946                           (unsigned int)TICKS_TO_OFFSET(tx));
947     return scheduleStateTransition(ePS_WaitingForStreamDisable, tx);
948 }
949
950 bool StreamProcessor::startDryRunning(int64_t t) {
951     if(!scheduleStartDryRunning(t)) {
952         debugError("Could not schedule transition\n");
953         return false;
954     }
955     if(!waitForState(ePS_DryRunning, 2000)) {
956         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning));
957         return false;
958     }
959     return true;
960 }
961
962 bool StreamProcessor::startRunning(int64_t t) {
963     if(!scheduleStartRunning(t)) {
964         debugError("Could not schedule transition\n");
965         return false;
966     }
967     if(!waitForState(ePS_Running, 2000)) {
968         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Running));
969         return false;
970     }
971     return true;
972 }
973
974 bool StreamProcessor::stopDryRunning(int64_t t) {
975     if(!scheduleStopDryRunning(t)) {
976         debugError("Could not schedule transition\n");
977         return false;
978     }
979     if(!waitForState(ePS_Stopped, 2000)) {
980         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Stopped));
981         return false;
982     }
983     return true;
984 }
985
986 bool StreamProcessor::stopRunning(int64_t t) {
987     if(!scheduleStopRunning(t)) {
988         debugError("Could not schedule transition\n");
989         return false;
990     }
991     if(!waitForState(ePS_DryRunning, 2000)) {
992         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning));
993         return false;
994     }
995     return true;
996 }
997
998
999 // internal state API
1000
1001 /**
1002  * @brief Enter the ePS_Stopped state
1003  * @return true if successful, false if not
1004  *
1005  * @pre none
1006  *
1007  * @post the buffer and the isostream are ready for use.
1008  * @post all dynamic structures have been allocated successfully
1009  * @post the buffer is transparent and empty, and all parameters are set
1010  *       to the correct initial/nominal values.
1011  *
1012  */
1013 bool
1014 StreamProcessor::doStop()
1015 {
1016     float ticks_per_frame;
1017     unsigned int ringbuffer_size_frames = (m_manager->getNbBuffers() + 1) * m_manager->getPeriodSize();
1018
1019     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1020     bool result = true;
1021
1022     switch(m_state) {
1023         case ePS_Created:
1024             assert(m_data_buffer);
1025             // object just created
1026             result = m_data_buffer->init();
1027
1028             // prepare the framerate estimate
1029             ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_manager->getNominalRate());
1030             m_ticks_per_frame = ticks_per_frame;
1031             debugOutput(DEBUG_LEVEL_VERBOSE,"Initializing remote ticks/frame to %f\n", ticks_per_frame);
1032
1033             // initialize internal buffer
1034             result &= m_data_buffer->setBufferSize(ringbuffer_size_frames);
1035
1036             result &= m_data_buffer->setEventSize( getEventSize() );
1037             result &= m_data_buffer->setEventsPerFrame( getEventsPerFrame() );
1038             if(getType() == ePT_Receive) {
1039                 result &= m_data_buffer->setUpdatePeriod( getNominalFramesPerPacket() );
1040             } else {
1041                 result &= m_data_buffer->setUpdatePeriod( m_manager->getPeriodSize() );
1042             }
1043             result &= m_data_buffer->setNominalRate(ticks_per_frame);
1044             result &= m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND);
1045             result &= m_data_buffer->prepare(); // FIXME: the name
1046
1047             // set the parameters of ports we can:
1048             // we want the audio ports to be period buffered,
1049             // and the midi ports to be packet buffered
1050             for ( PortVectorIterator it = m_Ports.begin();
1051                 it != m_Ports.end();
1052                 ++it )
1053             {
1054                 debugOutput(DEBUG_LEVEL_VERBOSE, "Setting up port %s\n",(*it)->getName().c_str());
1055                 if(!(*it)->setBufferSize(m_manager->getPeriodSize())) {
1056                     debugFatal("Could not set buffer size to %d\n",m_manager->getPeriodSize());
1057                     return false;
1058                 }
1059                 switch ((*it)->getPortType()) {
1060                     case Port::E_Audio:
1061                         if(!(*it)->setSignalType(Port::E_PeriodSignalled)) {
1062                             debugFatal("Could not set signal type to PeriodSignalling");
1063                             return false;
1064                         }
1065                         // buffertype and datatype are dependant on the API
1066                         debugWarning("---------------- ! Doing hardcoded dummy setup ! --------------\n");
1067                         // buffertype and datatype are dependant on the API
1068                         if(!(*it)->setBufferType(Port::E_PointerBuffer)) {
1069                             debugFatal("Could not set buffer type");
1070                             return false;
1071                         }
1072                         if(!(*it)->useExternalBuffer(true)) {
1073                             debugFatal("Could not set external buffer usage");
1074                             return false;
1075                         }
1076                         if(!(*it)->setDataType(Port::E_Float)) {
1077                             debugFatal("Could not set data type");
1078                             return false;
1079                         }
1080                         break;
1081                     case Port::E_Midi:
1082                         if(!(*it)->setSignalType(Port::E_PacketSignalled)) {
1083                             debugFatal("Could not set signal type to PacketSignalling");
1084                             return false;
1085                         }
1086                         // buffertype and datatype are dependant on the API
1087                         debugWarning("---------------- ! Doing hardcoded test setup ! --------------\n");
1088                         // buffertype and datatype are dependant on the API
1089                         if(!(*it)->setBufferType(Port::E_RingBuffer)) {
1090                             debugFatal("Could not set buffer type");
1091                             return false;
1092                         }
1093                         if(!(*it)->setDataType(Port::E_MidiEvent)) {
1094                             debugFatal("Could not set data type");
1095                             return false;
1096                         }
1097                         break;
1098                     default:
1099                         debugWarning("Unsupported port type specified\n");
1100                         break;
1101                 }
1102             }
1103             // the API specific settings of the ports should already be set,
1104             // as this is called from the processorManager->prepare()
1105             // so we can init the ports
1106             result &= PortManager::initPorts();
1107
1108             break;
1109         case ePS_DryRunning:
1110             // what to do here?
1111             break;
1112         default:
1113             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1114             return false;
1115     }
1116
1117     result &= m_data_buffer->clearBuffer(); // FIXME: don't like the reset() name
1118     // make the buffer transparent
1119     m_data_buffer->setTransparent(true);
1120
1121     // reset all ports
1122     result &= PortManager::preparePorts();
1123
1124     m_state = ePS_Stopped;
1125     #ifdef DEBUG
1126     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1127         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1128         dumpInfo();
1129     }
1130     #endif
1131     return result;
1132 }
1133
1134 /**
1135  * @brief Enter the ePS_WaitingForStream state
1136  * @return true if successful, false if not
1137  *
1138  * @pre all dynamic data structures are allocated successfully
1139  *
1140  * @post
1141  *
1142  */
1143 bool
1144 StreamProcessor::doWaitForRunningStream()
1145 {
1146     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1147     switch(m_state) {
1148         case ePS_Stopped:
1149             // we have to start waiting for an incoming stream
1150             // this basically means nothing, the state change will
1151             // be picked up by the packet iterator
1152             break;
1153         default:
1154             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1155             return false;
1156     }
1157     m_state = ePS_WaitingForStream;
1158     #ifdef DEBUG
1159     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1160         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1161         dumpInfo();
1162     }
1163     #endif
1164     return true;
1165 }
1166
1167 /**
1168  * @brief Enter the ePS_DryRunning state
1169  * @return true if successful, false if not
1170  *
1171  * @pre
1172  *
1173  * @post
1174  *
1175  */
1176 bool
1177 StreamProcessor::doDryRunning()
1178 {
1179     bool result = true;
1180     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1181     switch(m_state) {
1182         case ePS_WaitingForStream:
1183             // a running stream has been detected
1184             debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started dry-running at cycle %d\n", this, m_last_cycle);
1185             if (getType() == ePT_Receive) {
1186                 // this to ensure that there is no discontinuity when starting to
1187                 // update the DLL based upon the received packets
1188                 m_data_buffer->setBufferTailTimestamp(m_last_timestamp);
1189             } else {
1190                 // FIXME: PC=master mode will have to do something here I guess...
1191             }
1192             break;
1193         case ePS_WaitingForStreamDisable:
1194             result &= m_data_buffer->clearBuffer(); // FIXME: don't like the reset() name
1195             m_data_buffer->setTransparent(true);
1196             break;
1197         default:
1198             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1199             return false;
1200     }
1201     m_state = ePS_DryRunning;
1202     #ifdef DEBUG
1203     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1204         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1205         dumpInfo();
1206     }
1207     #endif
1208     return result;
1209 }
1210
1211 /**
1212  * @brief Enter the ePS_WaitingForStreamEnable state
1213  * @return true if successful, false if not
1214  *
1215  * @pre
1216  *
1217  * @post
1218  *
1219  */
1220 bool
1221 StreamProcessor::doWaitForStreamEnable()
1222 {
1223     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1224     unsigned int ringbuffer_size_frames;
1225     switch(m_state) {
1226         case ePS_DryRunning:
1227             // we have to start waiting for an incoming stream
1228             // this basically means nothing, the state change will
1229             // be picked up by the packet iterator
1230
1231             if(!m_data_buffer->clearBuffer()) { // FIXME: don't like the reset() name
1232                 debugError("Could not reset data buffer\n");
1233                 return false;
1234             }
1235             if (getType() == ePT_Transmit) {
1236                 ringbuffer_size_frames = m_manager->getNbBuffers() * m_manager->getPeriodSize();
1237                 debugOutput(DEBUG_LEVEL_VERBOSE, "Prefill transmit SP %p with %u frames\n", this, ringbuffer_size_frames);
1238                 // prefill the buffer
1239                 if(!transferSilence(ringbuffer_size_frames)) {
1240                     debugFatal("Could not prefill transmit stream\n");
1241                     return false;
1242                 }
1243             }
1244
1245             break;
1246         default:
1247             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1248             return false;
1249     }
1250     m_state = ePS_WaitingForStreamEnable;
1251     #ifdef DEBUG
1252     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1253         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1254         dumpInfo();
1255     }
1256     #endif
1257     return true;
1258 }
1259
1260 /**
1261  * @brief Enter the ePS_Running state
1262  * @return true if successful, false if not
1263  *
1264  * @pre
1265  *
1266  * @post
1267  *
1268  */
1269 bool
1270 StreamProcessor::doRunning()
1271 {
1272     bool result = true;
1273     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1274     switch(m_state) {
1275         case ePS_WaitingForStreamEnable:
1276             // a running stream has been detected
1277             debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started running at cycle %d\n",
1278                                              this, m_last_cycle);
1279             m_in_xrun = false;
1280             m_data_buffer->setTransparent(false);
1281             break;
1282         default:
1283             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1284             return false;
1285     }
1286     m_state = ePS_Running;
1287     #ifdef DEBUG
1288     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1289         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1290         dumpInfo();
1291     }
1292     #endif
1293     return result;
1294 }
1295
1296 /**
1297  * @brief Enter the ePS_WaitingForStreamDisable state
1298  * @return true if successful, false if not
1299  *
1300  * @pre
1301  *
1302  * @post
1303  *
1304  */
1305 bool
1306 StreamProcessor::doWaitForStreamDisable()
1307 {
1308     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1309     switch(m_state) {
1310         case ePS_Running:
1311             // the thread will do the transition
1312             break;
1313         default:
1314             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1315             return false;
1316     }
1317     m_state = ePS_WaitingForStreamDisable;
1318     #ifdef DEBUG
1319     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1320         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1321         dumpInfo();
1322     }
1323     #endif
1324     return true;
1325 }
1326
1327 /**
1328  * @brief Updates the state machine and calls the necessary transition functions
1329  * @return true if successful, false if not
1330  */
1331 bool StreamProcessor::updateState() {
1332     bool result = false;
1333     // copy the current state locally since it could change value,
1334     // and that's something we don't want to happen inbetween tests
1335     // if m_next_state changes during this routine, we know for sure
1336     // that the previous state change was at least attempted correctly.
1337     enum eProcessorState next_state = m_next_state;
1338
1339     debugOutput(DEBUG_LEVEL_VERBOSE, "Do state transition: %s => %s\n",
1340         ePSToString(m_state), ePSToString(next_state));
1341
1342     if (m_state == next_state) {
1343         debugWarning("ignoring identity state update from/to %s\n", ePSToString(m_state) );
1344         return true;
1345     }
1346
1347     // after creation, only initialization is allowed
1348     if (m_state == ePS_Created) {
1349         if(next_state != ePS_Stopped) {
1350             goto updateState_exit_with_error;
1351         }
1352         // do init here
1353         result = doStop();
1354         if (result) return true;
1355         else goto updateState_exit_change_failed;
1356     }
1357
1358     // after initialization, only WaitingForRunningStream is allowed
1359     if (m_state == ePS_Stopped) {
1360         if(next_state != ePS_WaitingForStream) {
1361             goto updateState_exit_with_error;
1362         }
1363         result = doWaitForRunningStream();
1364         if (result) return true;
1365         else goto updateState_exit_change_failed;
1366     }
1367
1368     // after WaitingForStream, only ePS_DryRunning is allowed
1369     // this means that the stream started running
1370     if (m_state == ePS_WaitingForStream) {
1371         if(next_state != ePS_DryRunning) {
1372             goto updateState_exit_with_error;
1373         }
1374         result = doDryRunning();
1375         if (result) return true;
1376         else goto updateState_exit_change_failed;
1377     }
1378
1379     // from ePS_DryRunning we can go to:
1380     //   - ePS_Stopped if something went wrong during DryRunning
1381     //   - ePS_WaitingForStreamEnable if there is a requested to enable
1382     if (m_state == ePS_DryRunning) {
1383         if((next_state != ePS_Stopped) &&
1384            (next_state != ePS_WaitingForStreamEnable)) {
1385             goto updateState_exit_with_error;
1386         }
1387         if (next_state == ePS_Stopped) {
1388             result = doStop();
1389         } else {
1390             result = doWaitForStreamEnable();
1391         }
1392         if (result) return true;
1393         else goto updateState_exit_change_failed;
1394     }
1395
1396     // from ePS_WaitingForStreamEnable we can go to:
1397     //   - ePS_DryRunning if something went wrong while waiting
1398     //   - ePS_Running if the stream enabled correctly
1399     if (m_state == ePS_WaitingForStreamEnable) {
1400         if((next_state != ePS_DryRunning) &&
1401            (next_state != ePS_Running)) {
1402             goto updateState_exit_with_error;
1403         }
1404         if (next_state == ePS_Stopped) {
1405             result = doDryRunning();
1406         } else {
1407             result = doRunning();
1408         }
1409         if (result) return true;
1410         else goto updateState_exit_change_failed;
1411     }
1412
1413     // from ePS_Running we can only start waiting for a disabled stream
1414     if (m_state == ePS_Running) {
1415         if(next_state != ePS_WaitingForStreamDisable) {
1416             goto updateState_exit_with_error;
1417         }
1418         result = doWaitForStreamDisable();
1419         if (result) return true;
1420         else goto updateState_exit_change_failed;
1421     }
1422
1423     // from ePS_WaitingForStreamDisable we can go to DryRunning
1424     if (m_state == ePS_WaitingForStreamDisable) {
1425         if(next_state != ePS_DryRunning) {
1426             goto updateState_exit_with_error;
1427         }
1428         result = doDryRunning();
1429         if (result) return true;
1430         else goto updateState_exit_change_failed;
1431     }
1432
1433     // if we arrive here there is an error
1434 updateState_exit_with_error:
1435     debugError("Invalid state transition: %s => %s\n",
1436         ePSToString(m_state), ePSToString(next_state));
1437     return false;
1438 updateState_exit_change_failed:
1439     debugError("State transition failed: %s => %s\n",
1440         ePSToString(m_state), ePSToString(next_state));
1441     return false;
1442 }
1443
1444 /***********************************************
1445  * Helper routines                             *
1446  ***********************************************/
1447 bool
1448 StreamProcessor::transferSilence(unsigned int nframes)
1449 {
1450     bool retval;
1451     signed int fc;
1452     ffado_timestamp_t ts_tail_tmp;
1453
1454     // prepare a buffer of silence
1455     char *dummybuffer = (char *)calloc(sizeof(quadlet_t), nframes * getEventsPerFrame());
1456     transmitSilenceBlock(dummybuffer, nframes, 0);
1457
1458     m_data_buffer->getBufferTailTimestamp(&ts_tail_tmp, &fc);
1459     if (fc != 0) {
1460         debugWarning("Prefilling a buffer that already contains %d frames\n", fc);
1461     }
1462
1463     // add the silence data to the ringbuffer
1464     if(m_data_buffer->preloadFrames(nframes, dummybuffer, true)) {
1465         retval = true;
1466     } else {
1467         debugWarning("Could not write to event buffer\n");
1468         retval = false;
1469     }
1470     free(dummybuffer);
1471     return retval;
1472 }
1473
1474 /**
1475  * @brief convert a eProcessorState to a string
1476  * @param s the state
1477  * @return a char * describing the state
1478  */
1479 const char *
1480 StreamProcessor::ePSToString(enum eProcessorState s) {
1481     switch (s) {
1482         case ePS_Invalid: return "ePS_Invalid";
1483         case ePS_Created: return "ePS_Created";
1484         case ePS_Stopped: return "ePS_Stopped";
1485         case ePS_WaitingForStream: return "ePS_WaitingForStream";
1486         case ePS_DryRunning: return "ePS_DryRunning";
1487         case ePS_WaitingForStreamEnable: return "ePS_WaitingForStreamEnable";
1488         case ePS_Running: return "ePS_Running";
1489         case ePS_WaitingForStreamDisable: return "ePS_WaitingForStreamDisable";
1490         default: return "error: unknown state";
1491     }
1492 }
1493
1494 /**
1495  * @brief convert a eProcessorType to a string
1496  * @param t the type
1497  * @return a char * describing the state
1498  */
1499 const char *
1500 StreamProcessor::ePTToString(enum eProcessorType t) {
1501     switch (t) {
1502         case ePT_Receive: return "Receive";
1503         case ePT_Transmit: return "Transmit";
1504         default: return "error: unknown type";
1505     }
1506 }
1507
1508 /***********************************************
1509  * Debug                                       *
1510  ***********************************************/
1511 void
1512 StreamProcessor::dumpInfo()
1513 {
1514     debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor %p information\n", this);
1515     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Iso stream info:\n");
1516
1517     IsoStream::dumpInfo();
1518     debugOutputShort( DEBUG_LEVEL_NORMAL, "  StreamProcessor info:\n");
1519     if (m_handler) {
1520         uint64_t now = m_handler->getCycleTimerTicks();
1521         debugOutputShort( DEBUG_LEVEL_NORMAL, "  Now                   : %011llu (%03us %04uc %04ut)\n",
1522                           now,
1523                           (unsigned int)TICKS_TO_SECS(now),
1524                           (unsigned int)TICKS_TO_CYCLES(now),
1525                           (unsigned int)TICKS_TO_OFFSET(now));
1526     }
1527     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Xruns                 : %s\n", (m_in_xrun ? "True":"False"));
1528     debugOutputShort( DEBUG_LEVEL_NORMAL, "  State                 : %s\n", ePSToString(m_state));
1529     debugOutputShort( DEBUG_LEVEL_NORMAL, "   Next state           : %s\n", ePSToString(m_next_state));
1530     debugOutputShort( DEBUG_LEVEL_NORMAL, "    transition at       : %u\n", m_cycle_to_switch_state);
1531     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer                : %p\n", m_data_buffer);
1532     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Nominal framerate     : %u\n", m_manager->getNominalRate());
1533     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Device framerate      : Sync: %f, Buffer %f\n",
1534         24576000.0/m_manager->getSyncSource().m_data_buffer->getRate(),
1535         24576000.0/m_data_buffer->getRate()
1536         );
1537
1538     m_data_buffer->dumpInfo();
1539 }
1540
1541 void
1542 StreamProcessor::setVerboseLevel(int l) {
1543     setDebugLevel(l);
1544     IsoStream::setVerboseLevel(l);
1545     PortManager::setVerboseLevel(l);
1546     m_data_buffer->setVerboseLevel(l);
1547 }
1548
1549 } // end of namespace
Note: See TracBrowser for help on using the browser.