root/trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp

Revision 803, 65.2 kB (checked in by ppalmers, 13 years ago)

more reliable streaming. hackish, but a start for a better implementation

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "config.h"
25
26 #include "StreamProcessor.h"
27 #include "../StreamProcessorManager.h"
28
29 #include "devicemanager.h"
30
31 #include "libieee1394/ieee1394service.h"
32 #include "libieee1394/IsoHandlerManager.h"
33 #include "libieee1394/cycletimer.h"
34
35 #include "libutil/Time.h"
36
37 #include "libutil/Atomic.h"
38
39 #include <assert.h>
40 #include <math.h>
41
42 namespace Streaming {
43
44 IMPL_DEBUG_MODULE( StreamProcessor, StreamProcessor, DEBUG_LEVEL_VERBOSE );
45
46 StreamProcessor::StreamProcessor(FFADODevice &parent, enum eProcessorType type)
47     : m_processor_type ( type )
48     , m_state( ePS_Created )
49     , m_next_state( ePS_Invalid )
50     , m_cycle_to_switch_state( 0 )
51     , m_Parent( parent )
52     , m_1394service( parent.get1394Service() ) // local cache
53     , m_IsoHandlerManager( parent.get1394Service().getIsoHandlerManager() ) // local cache
54     , m_StreamProcessorManager( m_Parent.getDeviceManager().getStreamProcessorManager() ) // local cache
55     , m_channel( -1 )
56     , m_dropped(0)
57     , m_last_timestamp(0)
58     , m_last_timestamp2(0)
59     , m_scratch_buffer( NULL )
60     , m_scratch_buffer_size_bytes( 0 )
61     , m_ticks_per_frame( 0 )
62     , m_last_cycle( -1 )
63     , m_sync_delay( 0 )
64     , m_in_xrun( false )
65 {
66     // create the timestamped buffer and register ourselves as its client
67     m_data_buffer = new Util::TimestampedBuffer(this);
68 }
69
70 StreamProcessor::~StreamProcessor() {
71     m_StreamProcessorManager.unregisterProcessor(this);
72     if(!m_IsoHandlerManager.unregisterStream(this)) {
73         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister stream processor with the Iso manager\n");
74     }
75
76     if (m_data_buffer) delete m_data_buffer;
77     if (m_scratch_buffer) delete[] m_scratch_buffer;
78 }
79
80 uint64_t StreamProcessor::getTimeNow() {
81     return m_1394service.getCycleTimerTicks();
82 }
83
84 int StreamProcessor::getMaxFrameLatency() {
85     if (getType() == ePT_Receive) {
86         return (int)(m_IsoHandlerManager.getPacketLatencyForStream( this ) * TICKS_PER_CYCLE);
87     } else {
88         return (int)(m_IsoHandlerManager.getPacketLatencyForStream( this ) * TICKS_PER_CYCLE);
89     }
90 }
91
92 unsigned int
93 StreamProcessor::getNominalPacketsNeeded(unsigned int nframes)
94 {
95     unsigned int nominal_frames_per_second
96                     = m_StreamProcessorManager.getNominalRate();
97     uint64_t nominal_ticks_per_frame = TICKS_PER_SECOND / nominal_frames_per_second;
98     uint64_t nominal_ticks = nominal_ticks_per_frame * nframes;
99     uint64_t nominal_packets = nominal_ticks / TICKS_PER_CYCLE;
100     return nominal_packets;
101 }
102
103 unsigned int
104 StreamProcessor::getPacketsPerPeriod()
105 {
106     return getNominalPacketsNeeded(m_StreamProcessorManager.getPeriodSize());
107 }
108
109 unsigned int
110 StreamProcessor::getNbPacketsIsoXmitBuffer()
111 {
112 #if ISOHANDLER_PER_HANDLER_THREAD
113     // if we use one thread per packet, we can put every frame into the ISO buffer
114     // the waitForClient in IsoHandler will take care of the fact that the frames are
115     // not present in time
116     unsigned int packets_to_prebuffer = (getPacketsPerPeriod() * (m_StreamProcessorManager.getNbBuffers()-1));
117     debugOutput(DEBUG_LEVEL_VERBOSE, "Nominal prebuffer: %u\n", packets_to_prebuffer);
118     return packets_to_prebuffer;
119 #else
120     // the target is to have all of the transmit buffer (at period transfer) as ISO packets
121     // when one period is received, there will be approx (NbBuffers - 1) * period_size frames
122     // in the transmit buffer (the others are still to be put into the xmit frame buffer)
123     unsigned int packets_to_prebuffer = (getPacketsPerPeriod() * (m_StreamProcessorManager.getNbBuffers()-1));
124     debugOutput(DEBUG_LEVEL_VERBOSE, "Nominal prebuffer: %u\n", packets_to_prebuffer);
125    
126     // however we have to take into account the fact that there is some sync delay
127     // we assume that the SPM has indicated
128     // HACK: this counts on the fact that the latency for this stream will be the same as the
129     //       latency for the receive sync source
130     unsigned int est_sync_delay = getPacketsPerPeriod() / MINIMUM_INTERRUPTS_PER_PERIOD;
131     est_sync_delay += STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS / TICKS_PER_CYCLE;
132     packets_to_prebuffer -= est_sync_delay;
133     debugOutput(DEBUG_LEVEL_VERBOSE, " correct for sync delay (%d): %u\n",
134                                      est_sync_delay,
135                                      packets_to_prebuffer);
136    
137     // only queue a part of the theoretical max in order not to have too much 'not ready' cycles
138     packets_to_prebuffer = (packets_to_prebuffer * MAX_ISO_XMIT_BUFFER_FILL_PCT * 1000) / 100000;
139     debugOutput(DEBUG_LEVEL_VERBOSE, " reduce to %d%%: %u\n",
140                                      MAX_ISO_XMIT_BUFFER_FILL_PCT, packets_to_prebuffer);
141    
142     return packets_to_prebuffer;
143 #endif
144 }
145
146 /***********************************************
147  * Buffer management and manipulation          *
148  ***********************************************/
149 void StreamProcessor::flush() {
150     m_IsoHandlerManager.flushHandlerForStream(this);
151 }
152
153 int StreamProcessor::getBufferFill() {
154     return m_data_buffer->getBufferFill();
155 }
156
157 int64_t
158 StreamProcessor::getTimeUntilNextPeriodSignalUsecs()
159 {
160     uint64_t time_at_period=getTimeAtPeriod();
161
162     // we delay the period signal with the sync delay
163     // this makes that the period signals lag a little compared to reality
164     // ISO buffering causes the packets to be received at max
165     // m_handler->getWakeupInterval() later than the time they were received.
166     // hence their payload is available this amount of time later. However, the
167     // period boundary is predicted based upon earlier samples, and therefore can
168     // pass before these packets are processed. Adding this extra term makes that
169     // the period boundary is signalled later
170     time_at_period = addTicks(time_at_period, m_StreamProcessorManager.getSyncSource().getSyncDelay());
171
172     uint64_t cycle_timer=m_1394service.getCycleTimerTicks();
173
174     // calculate the time until the next period
175     int32_t until_next=diffTicks(time_at_period,cycle_timer);
176
177     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> TAP=%11llu, CTR=%11llu, UTN=%11ld\n",
178         time_at_period, cycle_timer, until_next
179         );
180
181     // now convert to usecs
182     // don't use the mapping function because it only works
183     // for absolute times, not the relative time we are
184     // using here (which can also be negative).
185     return (int64_t)(((float)until_next) / TICKS_PER_USEC);
186 }
187
188 void
189 StreamProcessor::setSyncDelay(int d) {
190     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "Setting SP %p SyncDelay to %d ticks\n", this, d);
191     m_sync_delay = d;
192 }
193
194 uint64_t
195 StreamProcessor::getTimeAtPeriodUsecs()
196 {
197     return (uint64_t)((float)getTimeAtPeriod() * TICKS_PER_USEC);
198 }
199
200 uint64_t
201 StreamProcessor::getTimeAtPeriod()
202 {
203     if (getType() == ePT_Receive) {
204         ffado_timestamp_t next_period_boundary=m_data_buffer->getTimestampFromHead(m_StreamProcessorManager.getPeriodSize());
205    
206         #ifdef DEBUG
207         ffado_timestamp_t ts;
208         signed int fc;
209         m_data_buffer->getBufferTailTimestamp(&ts,&fc);
210    
211         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD="TIMESTAMP_FORMAT_SPEC", LTS="TIMESTAMP_FORMAT_SPEC", FC=%5u, TPF=%f\n",
212             next_period_boundary, ts, fc, getTicksPerFrame()
213             );
214         #endif
215         return (uint64_t)next_period_boundary;
216     } else {
217         ffado_timestamp_t next_period_boundary=m_data_buffer->getTimestampFromTail((m_StreamProcessorManager.getNbBuffers()-1) * m_StreamProcessorManager.getPeriodSize());
218    
219         #ifdef DEBUG
220         ffado_timestamp_t ts;
221         signed int fc;
222         m_data_buffer->getBufferTailTimestamp(&ts,&fc);
223    
224         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD="TIMESTAMP_FORMAT_SPEC", LTS="TIMESTAMP_FORMAT_SPEC", FC=%5u, TPF=%f\n",
225             next_period_boundary, ts, fc, getTicksPerFrame()
226             );
227         #endif
228         return (uint64_t)next_period_boundary;
229     }
230 }
231
232 float
233 StreamProcessor::getTicksPerFrame()
234 {
235     assert(m_data_buffer != NULL);
236     return m_data_buffer->getRate();
237 }
238
239 bool
240 StreamProcessor::canClientTransferFrames(unsigned int nbframes)
241 {
242     bool can_transfer;
243     unsigned int fc = m_data_buffer->getFrameCounter();
244     if (getType() == ePT_Receive) {
245         can_transfer = (fc >= nbframes);
246     } else {
247         // there has to be enough space to put the frames in
248         can_transfer = m_data_buffer->getBufferSize() - fc > nbframes;
249         // or the buffer is transparent
250         can_transfer |= m_data_buffer->isTransparent();
251     }
252    
253     #ifdef DEBUG
254     if (!can_transfer) {
255         debugWarning("(%p, %s) cannot transfer since fc == %u, nbframes == %u\n",
256             this, ePTToString(getType()), fc, nbframes);
257     }
258     #endif
259    
260     return can_transfer;
261 }
262
263 /***********************************************
264  * I/O API                                     *
265  ***********************************************/
266
267 // Packet transfer API
268 enum raw1394_iso_disposition
269 StreamProcessor::putPacket(unsigned char *data, unsigned int length,
270                            unsigned char channel, unsigned char tag, unsigned char sy,
271                            unsigned int cycle, unsigned int dropped) {
272     if(m_last_cycle == -1) {
273         debugOutput(DEBUG_LEVEL_VERBOSE, "Handler for %s SP %p is alive (cycle = %u)\n", getTypeString(), this, cycle);
274     }
275
276     int dropped_cycles = 0;
277     if (m_last_cycle != (int)cycle && m_last_cycle != -1) {
278         dropped_cycles = diffCycles(cycle, m_last_cycle) - 1;
279         if (dropped_cycles < 0) {
280             debugWarning("(%p) dropped < 1 (%d), cycle: %d, last_cycle: %d, dropped: %d\n",
281                          this, dropped_cycles, cycle, m_last_cycle, dropped);
282         }
283         if (dropped_cycles > 0) {
284             debugWarning("(%p) dropped %d packets on cycle %u, 'dropped'=%u, cycle=%d, m_last_cycle=%d\n",
285                 this, dropped_cycles, cycle, dropped, cycle, m_last_cycle);
286             m_dropped += dropped_cycles;
287             m_in_xrun = true;
288             //flushDebugOutput();
289             //assert(0);
290         }
291     }
292     m_last_cycle = cycle;
293
294     // bypass based upon state
295     if (m_state == ePS_Invalid) {
296         debugError("Should not have state %s\n", ePSToString(m_state) );
297         return RAW1394_ISO_ERROR;
298     }
299     if (m_state == ePS_Created) {
300         return RAW1394_ISO_DEFER;
301     }
302
303     // store the previous timestamp
304     m_last_timestamp2 = m_last_timestamp;
305
306     // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles)
307     //       it happens on the first 'good' cycle for the wait condition
308     //       or on the first received cycle that is received afterwards (might be a problem)
309
310     // check whether we are waiting for a stream to be disabled
311     if(m_state == ePS_WaitingForStreamDisable) {
312         // we then check whether we have to switch on this cycle
313         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
314             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n");
315             m_next_state = ePS_DryRunning;
316             if (!updateState()) { // we are allowed to change the state directly
317                 debugError("Could not update state!\n");
318                 return RAW1394_ISO_ERROR;
319             }
320         } else {
321             // not time to disable yet
322         }
323         // the received data can be discarded while waiting for the stream
324         // to be disabled
325         return RAW1394_ISO_OK;
326     }
327
328     // check whether we are waiting for a stream to be enabled
329     else if(m_state == ePS_WaitingForStreamEnable) {
330         // we then check whether we have to switch on this cycle
331         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
332             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n");
333             m_next_state = ePS_Running;
334             if (!updateState()) { // we are allowed to change the state directly
335                 debugError("Could not update state!\n");
336                 return RAW1394_ISO_ERROR;
337             }
338         } else {
339             // not time to enable yet
340         }
341         // we are dryRunning hence data should be processed in any case
342     }
343
344     // check the packet header
345     enum eChildReturnValue result = processPacketHeader(data, length, channel, tag, sy, cycle, dropped_cycles);
346     if (result == eCRV_OK) {
347         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "RECV: CY=%04u TS=%011llu\n",
348                 cycle, m_last_timestamp);
349         // update some accounting
350         m_last_good_cycle = cycle;
351         m_last_dropped = dropped_cycles;
352
353         // check whether we are waiting for a stream to startup
354         // this requires that the packet is good
355         if(m_state == ePS_WaitingForStream) {
356             // since we have a packet with an OK header,
357             // we can indicate that the stream started up
358
359             // we then check whether we have to switch on this cycle
360             if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
361                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to good packet\n");
362                 // hence go to the dryRunning state
363                 m_next_state = ePS_DryRunning;
364                 if (!updateState()) { // we are allowed to change the state directly
365                     debugError("Could not update state!\n");
366                     return RAW1394_ISO_ERROR;
367                 }
368             } else {
369                 // not time (yet) to switch state
370             }
371             // in both cases we don't want to process the data
372             return RAW1394_ISO_OK;
373         }
374
375         // check whether a state change has been requested
376         // note that only the wait state changes are synchronized with the cycles
377         else if(m_state != m_next_state) {
378             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
379                                              ePSToString(m_state), ePSToString(m_next_state));
380             // execute the requested change
381             if (!updateState()) { // we are allowed to change the state directly
382                 debugError("Could not update state!\n");
383                 return RAW1394_ISO_ERROR;
384             }
385         }
386
387         // handle dropped cycles
388         if(dropped_cycles) {
389             // they represent a discontinuity in the timestamps, and hence are
390             // to be dealt with
391             debugWarning("(%p) Correcting timestamp for dropped cycles, discarding packet...\n", this);
392             m_data_buffer->setBufferTailTimestamp(m_last_timestamp);
393             if (m_state == ePS_Running) {
394                 // this is an xrun situation
395                 m_in_xrun = true;
396                 debugWarning("Should update state to WaitingForStreamDisable due to dropped packet xrun\n");
397                 m_cycle_to_switch_state = cycle + 1; // switch in the next cycle
398                 m_next_state = ePS_WaitingForStreamDisable;
399                 // execute the requested change
400                 if (!updateState()) { // we are allowed to change the state directly
401                     debugError("Could not update state!\n");
402                     return RAW1394_ISO_ERROR;
403                 }
404                 return RAW1394_ISO_DEFER;
405             }
406         }
407
408         // for all states that reach this we are allowed to
409         // do protocol specific data reception
410         enum eChildReturnValue result2 = processPacketData(data, length, channel, tag, sy, cycle, dropped_cycles);
411
412         // if an xrun occured, switch to the dryRunning state and
413         // allow for the xrun to be picked up
414         if (result2 == eCRV_XRun) {
415             debugWarning("processPacketData xrun\n");
416             m_in_xrun = true;
417             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
418             m_cycle_to_switch_state = cycle+1; // switch in the next cycle
419             m_next_state = ePS_WaitingForStreamDisable;
420             // execute the requested change
421             if (!updateState()) { // we are allowed to change the state directly
422                 debugError("Could not update state!\n");
423                 return RAW1394_ISO_ERROR;
424             }
425             return RAW1394_ISO_DEFER;
426         } else if(result2 == eCRV_OK) {
427             // no problem here
428             return RAW1394_ISO_OK;
429         } else {
430             debugError("Invalid response\n");
431             return RAW1394_ISO_ERROR;
432         }
433     } else if(result == eCRV_Invalid) {
434         // apparently we don't have to do anything when the packets are not valid
435         return RAW1394_ISO_OK;
436     } else {
437         debugError("Invalid response\n");
438         return RAW1394_ISO_ERROR;
439     }
440     debugError("reached the unreachable\n");
441     return RAW1394_ISO_ERROR;
442 }
443
444 enum raw1394_iso_disposition
445 StreamProcessor::getPacket(unsigned char *data, unsigned int *length,
446                            unsigned char *tag, unsigned char *sy,
447                            int cycle, unsigned int dropped, unsigned int max_length) {
448     if (cycle<0) {
449         *tag = 0;
450         *sy = 0;
451         *length = 0;
452         return RAW1394_ISO_OK;
453     }
454
455     unsigned int ctr;
456     int now_cycles;
457     int cycle_diff;
458
459     if(m_last_cycle == -1) {
460         debugOutput(DEBUG_LEVEL_VERBOSE, "Handler for %s SP %p is alive (cycle = %d)\n", getTypeString(), this, cycle);
461     }
462
463     int dropped_cycles = 0;
464     if (m_last_cycle != cycle && m_last_cycle != -1) {
465         dropped_cycles = diffCycles(cycle, m_last_cycle) - 1;
466         if (dropped_cycles < 0) {
467             debugWarning("(%p) dropped < 1 (%d), cycle: %d, last_cycle: %d, dropped: %d\n",
468                          this, dropped_cycles, cycle, m_last_cycle, dropped);
469         }
470         if (dropped_cycles > 0) {
471             debugWarning("(%p) dropped %d packets on cycle %u (last_cycle=%u, dropped=%d)\n", this, dropped_cycles, cycle, m_last_cycle, dropped);
472             m_dropped += dropped_cycles;
473             // HACK: this should not be necessary, since the header generation functions should trigger the xrun.
474             //       but apparently there are some issues with the 1394 stack
475             m_in_xrun = true;
476             if(m_state == ePS_Running) {
477                 debugShowBackLogLines(200);
478                 debugWarning("dropped packets xrun\n");
479                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to dropped packets xrun\n");
480                 m_next_state = ePS_WaitingForStreamDisable;
481                 // execute the requested change
482                 if (!updateState()) { // we are allowed to change the state directly
483                     debugError("Could not update state!\n");
484                     return RAW1394_ISO_ERROR;
485                 }
486                 goto send_empty_packet;
487             }
488         }
489     }
490     if (cycle >= 0) {
491         m_last_cycle = cycle;
492     }
493
494     // bypass based upon state
495     if (m_state == ePS_Invalid) {
496         debugError("Should not have state %s\n", ePSToString(m_state) );
497         return RAW1394_ISO_ERROR;
498     }
499     if (m_state == ePS_Created) {
500         *tag = 0;
501         *sy = 0;
502         *length = 0;
503         return RAW1394_ISO_DEFER;
504     }
505
506     // normal processing
507     // note that we can't use getCycleTimer directly here,
508     // because packets are queued in advance. This means that
509     // we the packet we are constructing will be sent out
510     // on 'cycle', not 'now'.
511     ctr = m_1394service.getCycleTimer();
512     now_cycles = (int)CYCLE_TIMER_GET_CYCLES(ctr);
513
514     // the difference between the cycle this
515     // packet is intended for and 'now'
516     cycle_diff = diffCycles(cycle, now_cycles);
517
518     if(cycle_diff < 0 && (m_state == ePS_Running || m_state == ePS_DryRunning)) {
519         debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy)\n",
520             cycle, now_cycles);
521         if(m_state == ePS_Running) {
522             debugShowBackLogLines(200);
523 //             flushDebugOutput();
524 //             assert(0);
525             debugWarning("generatePacketData xrun\n");
526             m_in_xrun = true;
527             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
528             m_next_state = ePS_WaitingForStreamDisable;
529             // execute the requested change
530             if (!updateState()) { // we are allowed to change the state directly
531                 debugError("Could not update state!\n");
532                 return RAW1394_ISO_ERROR;
533             }
534             goto send_empty_packet;
535         }
536     }
537
538     // store the previous timestamp
539     m_last_timestamp2 = m_last_timestamp;
540
541     // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles)
542     //       it happens on the first 'good' cycle for the wait condition
543     //       or on the first received cycle that is received afterwards (might be a problem)
544
545     // check whether we are waiting for a stream to be disabled
546     if(m_state == ePS_WaitingForStreamDisable) {
547         // we then check whether we have to switch on this cycle
548         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
549             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n");
550             m_next_state = ePS_DryRunning;
551             if (!updateState()) { // we are allowed to change the state directly
552                 debugError("Could not update state!\n");
553                 return RAW1394_ISO_ERROR;
554             }
555         } else {
556             // not time to disable yet
557         }
558     }
559     // check whether we are waiting for a stream to be enabled
560     else if(m_state == ePS_WaitingForStreamEnable) {
561         // we then check whether we have to switch on this cycle
562         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
563             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n");
564             m_next_state = ePS_Running;
565             if (!updateState()) { // we are allowed to change the state directly
566                 debugError("Could not update state!\n");
567                 return RAW1394_ISO_ERROR;
568             }
569         } else {
570             // not time to enable yet
571         }
572         // we are dryRunning hence data should be processed in any case
573     }
574     // check whether we are waiting for a stream to startup
575     else if(m_state == ePS_WaitingForStream) {
576         // as long as the cycle parameter is not in sync with
577         // the current time, the stream is considered not
578         // to be 'running'
579         // we then check whether we have to switch on this cycle
580         if ((cycle_diff >= 0) && (diffCycles(cycle, m_cycle_to_switch_state) >= 0)) {
581             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStream to DryRunning\n");
582             // hence go to the dryRunning state
583             m_next_state = ePS_DryRunning;
584             if (!updateState()) { // we are allowed to change the state directly
585                 debugError("Could not update state!\n");
586                 return RAW1394_ISO_ERROR;
587             }
588         } else {
589             // not time (yet) to switch state
590         }
591     }
592     else if(m_state == ePS_Running) {
593         // check the packet header
594         enum eChildReturnValue result = generatePacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
595         if (result == eCRV_Packet || result == eCRV_Defer) {
596             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT: CY=%04u TS=%011llu\n",
597                     cycle, m_last_timestamp);
598             // update some accounting
599             m_last_good_cycle = cycle;
600             m_last_dropped = dropped_cycles;
601
602             // check whether a state change has been requested
603             // note that only the wait state changes are synchronized with the cycles
604             if(m_state != m_next_state) {
605                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
606                                                 ePSToString(m_state), ePSToString(m_next_state));
607                 // execute the requested change
608                 if (!updateState()) { // we are allowed to change the state directly
609                     debugError("Could not update state!\n");
610                     return RAW1394_ISO_ERROR;
611                 }
612             }
613
614             enum eChildReturnValue result2 = generatePacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
615             // if an xrun occured, switch to the dryRunning state and
616             // allow for the xrun to be picked up
617             if (result2 == eCRV_XRun) {
618                 debugWarning("generatePacketData xrun\n");
619                 m_in_xrun = true;
620                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
621                 m_cycle_to_switch_state = cycle+1; // switch in the next cycle
622                 m_next_state = ePS_WaitingForStreamDisable;
623                 // execute the requested change
624                 if (!updateState()) { // we are allowed to change the state directly
625                     debugError("Could not update state!\n");
626                     return RAW1394_ISO_ERROR;
627                 }
628                 goto send_empty_packet;
629             }
630             // skip queueing packets if we detect that there are not enough frames
631             // available
632             if(result2 == eCRV_Defer || result == eCRV_Defer)
633                 return RAW1394_ISO_DEFER;
634             else
635                 return RAW1394_ISO_OK;
636         } else if (result == eCRV_XRun) { // pick up the possible xruns
637             debugWarning("generatePacketHeader xrun\n");
638             m_in_xrun = true;
639             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to header xrun\n");
640             m_next_state = ePS_WaitingForStreamDisable;
641             // execute the requested change
642             if (!updateState()) { // we are allowed to change the state directly
643                 debugError("Could not update state!\n");
644                 return RAW1394_ISO_ERROR;
645             }
646         } else if (result == eCRV_EmptyPacket) {
647             if(m_state != m_next_state) {
648                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
649                                                 ePSToString(m_state), ePSToString(m_next_state));
650                 // execute the requested change
651                 if (!updateState()) { // we are allowed to change the state directly
652                     debugError("Could not update state!\n");
653                     return RAW1394_ISO_ERROR;
654                 }
655             }
656             goto send_empty_packet;
657         } else if (result == eCRV_Again) {
658             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "have to retry cycle %d\n", cycle);
659             if(m_state != m_next_state) {
660                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
661                                                 ePSToString(m_state), ePSToString(m_next_state));
662                 // execute the requested change
663                 if (!updateState()) { // we are allowed to change the state directly
664                     debugError("Could not update state!\n");
665                     return RAW1394_ISO_ERROR;
666                 }
667             }
668             usleep(125); // only when using thread-per-handler
669             return RAW1394_ISO_AGAIN;
670 //             generateSilentPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
671 //             generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
672 //             return RAW1394_ISO_DEFER;
673         } else {
674             debugError("Invalid return value: %d\n", result);
675             return RAW1394_ISO_ERROR;
676         }
677     }
678     // we are not running, so send an empty packet
679     // we should generate a valid packet any time
680 send_empty_packet:
681     // note that only the wait state changes are synchronized with the cycles
682     if(m_state != m_next_state) {
683         debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
684                                         ePSToString(m_state), ePSToString(m_next_state));
685         // execute the requested change
686         if (!updateState()) { // we are allowed to change the state directly
687             debugError("Could not update state!\n");
688             return RAW1394_ISO_ERROR;
689         }
690     }
691
692     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT EMPTY: CY=%04u\n", cycle);
693     generateSilentPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
694     generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
695     return RAW1394_ISO_OK;
696 }
697
698
699 // Frame Transfer API
700 /**
701  * Transfer a block of frames from the event buffer to the port buffers
702  * @param nbframes number of frames to transfer
703  * @param ts the timestamp that the LAST frame in the block should have
704  * @return
705  */
706 bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) {
707     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.getFrames(%d, %11llu)", nbframes, ts);
708     assert( getType() == ePT_Receive );
709     if(isDryRunning()) return getFramesDry(nbframes, ts);
710     else return getFramesWet(nbframes, ts);
711 }
712
713 bool StreamProcessor::getFramesWet(unsigned int nbframes, int64_t ts) {
714 // FIXME: this should be done somewhere else
715 #ifdef DEBUG
716     uint64_t ts_expected;
717     signed int fc;
718     int32_t lag_ticks;
719     float lag_frames;
720
721     // in order to sync up multiple received streams, we should
722     // use the ts parameter. It specifies the time of the block's
723     // last sample.
724     float srate = m_StreamProcessorManager.getSyncSource().getTicksPerFrame();
725     assert(srate != 0.0);
726     int64_t this_block_length_in_ticks = (int64_t)(((float)nbframes) * srate);
727
728     ffado_timestamp_t ts_head_tmp;
729     m_data_buffer->getBufferHeadTimestamp(&ts_head_tmp, &fc);
730     ts_expected = addTicks((uint64_t)ts_head_tmp, this_block_length_in_ticks);
731
732     lag_ticks = diffTicks(ts, ts_expected);
733     lag_frames = (((float)lag_ticks) / srate);
734     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): drifts %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
735                  this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
736     if (lag_frames >= 1.0) {
737         // the stream lags
738         debugWarning( "stream (%p): lags  with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
739                       this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
740     } else if (lag_frames <= -1.0) {
741         // the stream leads
742         debugWarning( "stream (%p): leads with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
743                       this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
744     }
745 #endif
746     // ask the buffer to process nbframes of frames
747     // using it's registered client's processReadBlock(),
748     // which should be ours
749     m_data_buffer->blockProcessReadFrames(nbframes);
750     return true;
751 }
752
753 bool StreamProcessor::getFramesDry(unsigned int nbframes, int64_t ts)
754 {
755     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): dry run %d frames (@ ts=%lld)\n",
756                  this, nbframes, ts);
757     // dry run on this side means that we put silence in all enabled ports
758     // since there is do data put into the ringbuffer in the dry-running state
759     return provideSilenceBlock(nbframes, 0);
760 }
761
762 bool
763 StreamProcessor::dropFrames(unsigned int nbframes, int64_t ts)
764 {
765     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d, %lld)\n", nbframes, ts);
766     return m_data_buffer->dropFrames(nbframes);
767 }
768
769 bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts)
770 {
771     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.putFrames(%d, %11llu)", nbframes, ts);
772     assert( getType() == ePT_Transmit );
773
774     if(isDryRunning()) return putFramesDry(nbframes, ts);
775     else return putFramesWet(nbframes, ts);
776 }
777
778 bool
779 StreamProcessor::putFramesWet(unsigned int nbframes, int64_t ts)
780 {
781     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesWet(%d, %llu)\n", nbframes, ts);
782     // transfer the data
783     m_data_buffer->blockProcessWriteFrames(nbframes, ts);
784     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, " New timestamp: %llu\n", ts);
785     return true; // FIXME: what about failure?
786 }
787
788 bool
789 StreamProcessor::putFramesDry(unsigned int nbframes, int64_t ts)
790 {
791     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesDry(%d, %llu)\n", nbframes, ts);
792     // do nothing
793     return true;
794 }
795
796 bool
797 StreamProcessor::putSilenceFrames(unsigned int nbframes, int64_t ts)
798 {
799     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putSilenceFrames(%d, %llu)\n", nbframes, ts);
800
801     size_t bytes_per_frame = getEventSize() * getEventsPerFrame();
802     unsigned int scratch_buffer_size_frames = m_scratch_buffer_size_bytes / bytes_per_frame;
803
804     if (nbframes > scratch_buffer_size_frames) {
805         debugError("nframes (%u) > scratch_buffer_size_frames (%u)\n",
806                    nbframes, scratch_buffer_size_frames);
807     }
808
809     assert(m_scratch_buffer);
810     if(!transmitSilenceBlock((char *)m_scratch_buffer, nbframes, 0)) {
811         debugError("Could not prepare silent block\n");
812         return false;
813     }
814     if(!m_data_buffer->writeFrames(nbframes, (char *)m_scratch_buffer, ts)) {
815         debugError("Could not write silent block\n");
816         return false;
817     }
818     return true;
819 }
820
821 bool
822 StreamProcessor::waitForFrames()
823 {
824     if(m_state == ePS_Running) {
825         assert(m_data_buffer);
826         if(getType() == ePT_Receive) {
827             return m_data_buffer->waitForFrames(m_StreamProcessorManager.getPeriodSize());
828         } else {
829             return m_data_buffer->waitForFrames(getNominalFramesPerPacket());
830         }
831     } else {
832         // when we're not running, we can always provide frames
833         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Not running...\n");
834         return true;
835     }
836 }
837
838 bool
839 StreamProcessor::tryWaitForFrames()
840 {
841     if(m_state == ePS_Running) {
842         assert(m_data_buffer);
843         if(getType() == ePT_Receive) {
844             return m_data_buffer->tryWaitForFrames(m_StreamProcessorManager.getPeriodSize());
845         } else {
846             return m_data_buffer->tryWaitForFrames(getNominalFramesPerPacket());
847         }
848     } else {
849         // when we're not running, we can always provide frames
850         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Not running...\n");
851         return true;
852     }
853 }
854
855 bool
856 StreamProcessor::shiftStream(int nbframes)
857 {
858     if(nbframes == 0) return true;
859     if(nbframes > 0) {
860         return m_data_buffer->dropFrames(nbframes);
861     } else {
862         bool result = true;
863         while(nbframes++) {
864             result &= m_data_buffer->writeDummyFrame();
865         }
866         return result;
867     }
868 }
869
870 /**
871  * @brief write silence events to the stream ringbuffers.
872  */
873 bool StreamProcessor::provideSilenceBlock(unsigned int nevents, unsigned int offset)
874 {
875     bool no_problem=true;
876     for ( PortVectorIterator it = m_PeriodPorts.begin();
877           it != m_PeriodPorts.end();
878           ++it ) {
879         if((*it)->isDisabled()) {continue;};
880
881         //FIXME: make this into a static_cast when not DEBUG?
882         Port *port=dynamic_cast<Port *>(*it);
883
884         switch(port->getPortType()) {
885
886         case Port::E_Audio:
887             if(provideSilenceToPort(static_cast<AudioPort *>(*it), offset, nevents)) {
888                 debugWarning("Could not put silence into to port %s",(*it)->getName().c_str());
889                 no_problem=false;
890             }
891             break;
892         // midi is a packet based port, don't process
893         //    case MotuPortInfo::E_Midi:
894         //        break;
895
896         default: // ignore
897             break;
898         }
899     }
900     return no_problem;
901 }
902
903 int
904 StreamProcessor::provideSilenceToPort(
905                        AudioPort *p, unsigned int offset, unsigned int nevents)
906 {
907     unsigned int j=0;
908     switch(p->getDataType()) {
909         default:
910         case Port::E_Int24:
911             {
912                 quadlet_t *buffer=(quadlet_t *)(p->getBufferAddress());
913                 assert(nevents + offset <= p->getBufferSize());
914                 buffer+=offset;
915
916                 for(j = 0; j < nevents; j += 1) { // decode max nsamples
917                     *(buffer)=0;
918                     buffer++;
919                 }
920             }
921             break;
922         case Port::E_Float:
923             {
924                 float *buffer=(float *)(p->getBufferAddress());
925                 assert(nevents + offset <= p->getBufferSize());
926                 buffer+=offset;
927
928                 for(j = 0; j < nevents; j += 1) { // decode max nsamples
929                     *buffer = 0.0;
930                     buffer++;
931                 }
932             }
933             break;
934     }
935     return 0;
936 }
937
938 /***********************************************
939  * State related API                           *
940  ***********************************************/
941 bool StreamProcessor::init()
942 {
943     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "init...\n");
944
945     if(!m_IsoHandlerManager.registerStream(this)) {
946         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register stream processor with the Iso manager\n");
947         return false;
948     }
949     if(!m_StreamProcessorManager.registerProcessor(this)) {
950         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register stream processor with the SP manager\n");
951         return false;
952     }
953
954     // initialization can be done without requesting it
955     // from the packet loop
956     m_next_state = ePS_Created;
957     return true;
958 }
959
960 bool StreamProcessor::prepare()
961 {
962     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare SP (%p)...\n", this);
963
964     // make the scratch buffer one period of frames long
965     m_scratch_buffer_size_bytes = m_StreamProcessorManager.getPeriodSize() * getEventsPerFrame() * getEventSize();
966     debugOutput( DEBUG_LEVEL_VERBOSE, " Allocate scratch buffer of %d quadlets\n");
967     if(m_scratch_buffer) delete[] m_scratch_buffer;
968     m_scratch_buffer = new byte_t[m_scratch_buffer_size_bytes];
969     if(m_scratch_buffer == NULL) {
970         debugFatal("Could not allocate scratch buffer\n");
971         return false;
972     }
973
974     if (!prepareChild()) {
975         debugFatal("Could not prepare child\n");
976         return false;
977     }
978
979     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepared for:\n");
980     debugOutput( DEBUG_LEVEL_VERBOSE, " Samplerate: %d\n",
981              m_StreamProcessorManager.getNominalRate());
982     debugOutput( DEBUG_LEVEL_VERBOSE, " PeriodSize: %d, NbBuffers: %d\n",
983              m_StreamProcessorManager.getPeriodSize(), m_StreamProcessorManager.getNbBuffers());
984     debugOutput( DEBUG_LEVEL_VERBOSE, " Port: %d, Channel: %d\n",
985              m_1394service.getPort(), m_channel);
986
987     // initialization can be done without requesting it
988     // from the packet loop
989     m_next_state = ePS_Stopped;
990     return updateState();
991 }
992
993 bool
994 StreamProcessor::scheduleStateTransition(enum eProcessorState state, uint64_t time_instant)
995 {
996     // first set the time, since in the packet loop we first check m_state == m_next_state before
997     // using the time
998     m_cycle_to_switch_state = TICKS_TO_CYCLES(time_instant);
999     m_next_state = state;
1000     return true;
1001 }
1002
1003 bool
1004 StreamProcessor::waitForState(enum eProcessorState state, unsigned int timeout_ms)
1005 {
1006     debugOutput(DEBUG_LEVEL_VERBOSE, "Waiting for state %s\n", ePSToString(state));
1007     int cnt = timeout_ms;
1008     while (m_state != state && cnt) {
1009         SleepRelativeUsec(1000);
1010         cnt--;
1011     }
1012     if(cnt==0) {
1013         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout\n");
1014         return false;
1015     }
1016     return true;
1017 }
1018
1019 bool StreamProcessor::scheduleStartDryRunning(int64_t t) {
1020     uint64_t tx;
1021     if (t < 0) {
1022         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
1023     } else {
1024         tx = t;
1025     }
1026     uint64_t start_handler_ticks = substractTicks(tx, 100 * TICKS_PER_CYCLE);
1027
1028     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1029     uint64_t now = m_1394service.getCycleTimerTicks();
1030     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1031                           now,
1032                           (unsigned int)TICKS_TO_SECS(now),
1033                           (unsigned int)TICKS_TO_CYCLES(now),
1034                           (unsigned int)TICKS_TO_OFFSET(now));
1035     debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%03us %04uc %04ut)\n",
1036                           tx,
1037                           (unsigned int)TICKS_TO_SECS(tx),
1038                           (unsigned int)TICKS_TO_CYCLES(tx),
1039                           (unsigned int)TICKS_TO_OFFSET(tx));
1040     if (m_state == ePS_Stopped) {
1041         if(!m_IsoHandlerManager.startHandlerForStream(
1042                                         this, TICKS_TO_CYCLES(start_handler_ticks))) {
1043             debugError("Could not start handler for SP %p\n", this);
1044             return false;
1045         }
1046         return scheduleStateTransition(ePS_WaitingForStream, tx);
1047     } else if (m_state == ePS_Running) {
1048         return scheduleStateTransition(ePS_WaitingForStreamDisable, tx);
1049     } else {
1050         debugError("Cannot switch to ePS_DryRunning from %s\n", ePSToString(m_state));
1051         return false;
1052     }
1053 }
1054
1055 bool StreamProcessor::scheduleStartRunning(int64_t t) {
1056     uint64_t tx;
1057     if (t < 0) {
1058         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
1059     } else {
1060         tx = t;
1061     }
1062     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1063     uint64_t now = m_1394service.getCycleTimerTicks();
1064     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1065                           now,
1066                           (unsigned int)TICKS_TO_SECS(now),
1067                           (unsigned int)TICKS_TO_CYCLES(now),
1068                           (unsigned int)TICKS_TO_OFFSET(now));
1069     debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%03us %04uc %04ut)\n",
1070                           tx,
1071                           (unsigned int)TICKS_TO_SECS(tx),
1072                           (unsigned int)TICKS_TO_CYCLES(tx),
1073                           (unsigned int)TICKS_TO_OFFSET(tx));
1074     return scheduleStateTransition(ePS_WaitingForStreamEnable, tx);
1075 }
1076
1077 bool StreamProcessor::scheduleStopDryRunning(int64_t t) {
1078     uint64_t tx;
1079     if (t < 0) {
1080         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
1081     } else {
1082         tx = t;
1083     }
1084     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1085     uint64_t now = m_1394service.getCycleTimerTicks();
1086     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1087                           now,
1088                           (unsigned int)TICKS_TO_SECS(now),
1089                           (unsigned int)TICKS_TO_CYCLES(now),
1090                           (unsigned int)TICKS_TO_OFFSET(now));
1091     debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at               : %011llu (%03us %04uc %04ut)\n",
1092                           tx,
1093                           (unsigned int)TICKS_TO_SECS(tx),
1094                           (unsigned int)TICKS_TO_CYCLES(tx),
1095                           (unsigned int)TICKS_TO_OFFSET(tx));
1096
1097     return scheduleStateTransition(ePS_Stopped, tx);
1098 }
1099
1100 bool StreamProcessor::scheduleStopRunning(int64_t t) {
1101     uint64_t tx;
1102     if (t < 0) {
1103         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
1104     } else {
1105         tx = t;
1106     }
1107     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1108     uint64_t now = m_1394service.getCycleTimerTicks();
1109     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1110                           now,
1111                           (unsigned int)TICKS_TO_SECS(now),
1112                           (unsigned int)TICKS_TO_CYCLES(now),
1113                           (unsigned int)TICKS_TO_OFFSET(now));
1114     debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at               : %011llu (%03us %04uc %04ut)\n",
1115                           tx,
1116                           (unsigned int)TICKS_TO_SECS(tx),
1117                           (unsigned int)TICKS_TO_CYCLES(tx),
1118                           (unsigned int)TICKS_TO_OFFSET(tx));
1119     return scheduleStateTransition(ePS_WaitingForStreamDisable, tx);
1120 }
1121
1122 bool StreamProcessor::startDryRunning(int64_t t) {
1123     if(!scheduleStartDryRunning(t)) {
1124         debugError("Could not schedule transition\n");
1125         return false;
1126     }
1127     if(!waitForState(ePS_DryRunning, 2000)) {
1128         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning));
1129         return false;
1130     }
1131     return true;
1132 }
1133
1134 bool StreamProcessor::startRunning(int64_t t) {
1135     if(!scheduleStartRunning(t)) {
1136         debugError("Could not schedule transition\n");
1137         return false;
1138     }
1139     if(!waitForState(ePS_Running, 2000)) {
1140         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Running));
1141         return false;
1142     }
1143     return true;
1144 }
1145
1146 bool StreamProcessor::stopDryRunning(int64_t t) {
1147     if(!scheduleStopDryRunning(t)) {
1148         debugError("Could not schedule transition\n");
1149         return false;
1150     }
1151     if(!waitForState(ePS_Stopped, 2000)) {
1152         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Stopped));
1153         return false;
1154     }
1155     return true;
1156 }
1157
1158 bool StreamProcessor::stopRunning(int64_t t) {
1159     if(!scheduleStopRunning(t)) {
1160         debugError("Could not schedule transition\n");
1161         return false;
1162     }
1163     if(!waitForState(ePS_DryRunning, 2000)) {
1164         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning));
1165         return false;
1166     }
1167     return true;
1168 }
1169
1170
1171 // internal state API
1172
1173 /**
1174  * @brief Enter the ePS_Stopped state
1175  * @return true if successful, false if not
1176  *
1177  * @pre none
1178  *
1179  * @post the buffer and the isostream are ready for use.
1180  * @post all dynamic structures have been allocated successfully
1181  * @post the buffer is transparent and empty, and all parameters are set
1182  *       to the correct initial/nominal values.
1183  *
1184  */
1185 bool
1186 StreamProcessor::doStop()
1187 {
1188     float ticks_per_frame;
1189     unsigned int ringbuffer_size_frames = (m_StreamProcessorManager.getNbBuffers() + 1) * m_StreamProcessorManager.getPeriodSize();
1190
1191     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1192     bool result = true;
1193
1194     switch(m_state) {
1195         case ePS_Created:
1196             assert(m_data_buffer);
1197             // object just created
1198             result = m_data_buffer->init();
1199
1200             // prepare the framerate estimate
1201             ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_StreamProcessorManager.getNominalRate());
1202             m_ticks_per_frame = ticks_per_frame;
1203             debugOutput(DEBUG_LEVEL_VERBOSE,"Initializing remote ticks/frame to %f\n", ticks_per_frame);
1204
1205             // initialize internal buffer
1206             result &= m_data_buffer->setBufferSize(ringbuffer_size_frames);
1207
1208             result &= m_data_buffer->setEventSize( getEventSize() );
1209             result &= m_data_buffer->setEventsPerFrame( getEventsPerFrame() );
1210             if(getType() == ePT_Receive) {
1211                 result &= m_data_buffer->setUpdatePeriod( getNominalFramesPerPacket() );
1212             } else {
1213                 result &= m_data_buffer->setUpdatePeriod( m_StreamProcessorManager.getPeriodSize() );
1214             }
1215             result &= m_data_buffer->setNominalRate(ticks_per_frame);
1216             result &= m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND);
1217             result &= m_data_buffer->prepare(); // FIXME: the name
1218
1219             // set the parameters of ports we can:
1220             // we want the audio ports to be period buffered,
1221             // and the midi ports to be packet buffered
1222             for ( PortVectorIterator it = m_Ports.begin();
1223                 it != m_Ports.end();
1224                 ++it )
1225             {
1226                 debugOutput(DEBUG_LEVEL_VERBOSE, "Setting up port %s\n",(*it)->getName().c_str());
1227                 if(!(*it)->setBufferSize(m_StreamProcessorManager.getPeriodSize())) {
1228                     debugFatal("Could not set buffer size to %d\n",m_StreamProcessorManager.getPeriodSize());
1229                     return false;
1230                 }
1231                 switch ((*it)->getPortType()) {
1232                     case Port::E_Audio:
1233                         if(!(*it)->setSignalType(Port::E_PeriodSignalled)) {
1234                             debugFatal("Could not set signal type to PeriodSignalling");
1235                             return false;
1236                         }
1237                         // buffertype and datatype are dependant on the API
1238                         debugWarning("---------------- ! Doing hardcoded dummy setup ! --------------\n");
1239                         // buffertype and datatype are dependant on the API
1240                         if(!(*it)->setBufferType(Port::E_PointerBuffer)) {
1241                             debugFatal("Could not set buffer type");
1242                             return false;
1243                         }
1244                         if(!(*it)->useExternalBuffer(true)) {
1245                             debugFatal("Could not set external buffer usage");
1246                             return false;
1247                         }
1248                         if(!(*it)->setDataType(Port::E_Float)) {
1249                             debugFatal("Could not set data type");
1250                             return false;
1251                         }
1252                         break;
1253                     case Port::E_Midi:
1254                         if(!(*it)->setSignalType(Port::E_PacketSignalled)) {
1255                             debugFatal("Could not set signal type to PacketSignalling");
1256                             return false;
1257                         }
1258                         // buffertype and datatype are dependant on the API
1259                         debugWarning("---------------- ! Doing hardcoded test setup ! --------------\n");
1260                         // buffertype and datatype are dependant on the API
1261                         if(!(*it)->setBufferType(Port::E_RingBuffer)) {
1262                             debugFatal("Could not set buffer type");
1263                             return false;
1264                         }
1265                         if(!(*it)->setDataType(Port::E_MidiEvent)) {
1266                             debugFatal("Could not set data type");
1267                             return false;
1268                         }
1269                         break;
1270                     default:
1271                         debugWarning("Unsupported port type specified\n");
1272                         break;
1273                 }
1274             }
1275             // the API specific settings of the ports should already be set,
1276             // as this is called from the processorManager->prepare()
1277             // so we can init the ports
1278             result &= PortManager::initPorts();
1279
1280             break;
1281         case ePS_DryRunning:
1282             if(!m_IsoHandlerManager.stopHandlerForStream(this)) {
1283                 debugError("Could not stop handler for SP %p\n", this);
1284                 return false;
1285             }
1286             break;
1287         default:
1288             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1289             return false;
1290     }
1291
1292     result &= m_data_buffer->clearBuffer(); // FIXME: don't like the reset() name
1293     // make the buffer transparent
1294     m_data_buffer->setTransparent(true);
1295
1296     // reset all ports
1297     result &= PortManager::preparePorts();
1298
1299     m_state = ePS_Stopped;
1300     #ifdef DEBUG
1301     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1302         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1303         dumpInfo();
1304     }
1305     #endif
1306     return result;
1307 }
1308
1309 /**
1310  * @brief Enter the ePS_WaitingForStream state
1311  * @return true if successful, false if not
1312  *
1313  * @pre all dynamic data structures are allocated successfully
1314  *
1315  * @post
1316  *
1317  */
1318 bool
1319 StreamProcessor::doWaitForRunningStream()
1320 {
1321     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1322     switch(m_state) {
1323         case ePS_Stopped:
1324             // we have to start waiting for an incoming stream
1325             // this basically means nothing, the state change will
1326             // be picked up by the packet iterator
1327             break;
1328         default:
1329             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1330             return false;
1331     }
1332     m_state = ePS_WaitingForStream;
1333     #ifdef DEBUG
1334     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1335         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1336         dumpInfo();
1337     }
1338     #endif
1339     return true;
1340 }
1341
1342 /**
1343  * @brief Enter the ePS_DryRunning state
1344  * @return true if successful, false if not
1345  *
1346  * @pre
1347  *
1348  * @post
1349  *
1350  */
1351 bool
1352 StreamProcessor::doDryRunning()
1353 {
1354     bool result = true;
1355     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1356     switch(m_state) {
1357         case ePS_WaitingForStream:
1358             // a running stream has been detected
1359             debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started dry-running at cycle %d\n", this, m_last_cycle);
1360             if (getType() == ePT_Receive) {
1361                 // this to ensure that there is no discontinuity when starting to
1362                 // update the DLL based upon the received packets
1363                 m_data_buffer->setBufferTailTimestamp(m_last_timestamp);
1364             } else {
1365                 // FIXME: PC=master mode will have to do something here I guess...
1366             }
1367             break;
1368         case ePS_WaitingForStreamEnable: // when xrunning at startup
1369             result &= m_data_buffer->clearBuffer();
1370             m_data_buffer->setTransparent(true);
1371             break;
1372         case ePS_WaitingForStreamDisable:
1373             result &= m_data_buffer->clearBuffer();
1374             m_data_buffer->setTransparent(true);
1375             break;
1376         default:
1377             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1378             return false;
1379     }
1380     m_state = ePS_DryRunning;
1381     #ifdef DEBUG
1382     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1383         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1384         dumpInfo();
1385     }
1386     #endif
1387     return result;
1388 }
1389
1390 /**
1391  * @brief Enter the ePS_WaitingForStreamEnable state
1392  * @return true if successful, false if not
1393  *
1394  * @pre
1395  *
1396  * @post
1397  *
1398  */
1399 bool
1400 StreamProcessor::doWaitForStreamEnable()
1401 {
1402     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1403     unsigned int ringbuffer_size_frames;
1404     switch(m_state) {
1405         case ePS_DryRunning:
1406             // we have to start waiting for an incoming stream
1407             // this basically means nothing, the state change will
1408             // be picked up by the packet iterator
1409
1410             if(!m_data_buffer->clearBuffer()) { // FIXME: don't like the reset() name
1411                 debugError("Could not reset data buffer\n");
1412                 return false;
1413             }
1414             if (getType() == ePT_Transmit) {
1415                 ringbuffer_size_frames = m_StreamProcessorManager.getNbBuffers() * m_StreamProcessorManager.getPeriodSize();
1416                 debugOutput(DEBUG_LEVEL_VERBOSE, "Prefill transmit SP %p with %u frames\n", this, ringbuffer_size_frames);
1417                 // prefill the buffer
1418                 if(!transferSilence(ringbuffer_size_frames)) {
1419                     debugFatal("Could not prefill transmit stream\n");
1420                     return false;
1421                 }
1422             }
1423
1424             break;
1425         default:
1426             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1427             return false;
1428     }
1429     m_state = ePS_WaitingForStreamEnable;
1430     #ifdef DEBUG
1431     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1432         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1433         dumpInfo();
1434     }
1435     #endif
1436     return true;
1437 }
1438
1439 /**
1440  * @brief Enter the ePS_Running state
1441  * @return true if successful, false if not
1442  *
1443  * @pre
1444  *
1445  * @post
1446  *
1447  */
1448 bool
1449 StreamProcessor::doRunning()
1450 {
1451     bool result = true;
1452     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1453     switch(m_state) {
1454         case ePS_WaitingForStreamEnable:
1455             // a running stream has been detected
1456             debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started running at cycle %d\n",
1457                                              this, m_last_cycle);
1458             m_in_xrun = false;
1459             m_data_buffer->setTransparent(false);
1460             break;
1461         default:
1462             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1463             return false;
1464     }
1465     m_state = ePS_Running;
1466     #ifdef DEBUG
1467     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1468         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1469         dumpInfo();
1470     }
1471     #endif
1472     return result;
1473 }
1474
1475 /**
1476  * @brief Enter the ePS_WaitingForStreamDisable state
1477  * @return true if successful, false if not
1478  *
1479  * @pre
1480  *
1481  * @post
1482  *
1483  */
1484 bool
1485 StreamProcessor::doWaitForStreamDisable()
1486 {
1487     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1488     switch(m_state) {
1489         case ePS_Running:
1490             // the thread will do the transition
1491             break;
1492         default:
1493             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1494             return false;
1495     }
1496     m_state = ePS_WaitingForStreamDisable;
1497     #ifdef DEBUG
1498     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1499         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1500         dumpInfo();
1501     }
1502     #endif
1503     return true;
1504 }
1505
1506 /**
1507  * @brief Updates the state machine and calls the necessary transition functions
1508  * @return true if successful, false if not
1509  */
1510 bool StreamProcessor::updateState() {
1511     bool result = false;
1512     // copy the current state locally since it could change value,
1513     // and that's something we don't want to happen inbetween tests
1514     // if m_next_state changes during this routine, we know for sure
1515     // that the previous state change was at least attempted correctly.
1516     enum eProcessorState next_state = m_next_state;
1517
1518     debugOutput(DEBUG_LEVEL_VERBOSE, "Do state transition: %s => %s\n",
1519         ePSToString(m_state), ePSToString(next_state));
1520
1521     if (m_state == next_state) {
1522         debugWarning("ignoring identity state update from/to %s\n", ePSToString(m_state) );
1523         return true;
1524     }
1525
1526     // after creation, only initialization is allowed
1527     if (m_state == ePS_Created) {
1528         if(next_state != ePS_Stopped) {
1529             goto updateState_exit_with_error;
1530         }
1531         // do init here
1532         result = doStop();
1533         if (result) return true;
1534         else goto updateState_exit_change_failed;
1535     }
1536
1537     // after initialization, only WaitingForRunningStream is allowed
1538     if (m_state == ePS_Stopped) {
1539         if(next_state != ePS_WaitingForStream) {
1540             goto updateState_exit_with_error;
1541         }
1542         result = doWaitForRunningStream();
1543         if (result) return true;
1544         else goto updateState_exit_change_failed;
1545     }
1546
1547     // after WaitingForStream, only ePS_DryRunning is allowed
1548     // this means that the stream started running
1549     if (m_state == ePS_WaitingForStream) {
1550         if(next_state != ePS_DryRunning) {
1551             goto updateState_exit_with_error;
1552         }
1553         result = doDryRunning();
1554         if (result) return true;
1555         else goto updateState_exit_change_failed;
1556     }
1557
1558     // from ePS_DryRunning we can go to:
1559     //   - ePS_Stopped if something went wrong during DryRunning
1560     //   - ePS_WaitingForStreamEnable if there is a requested to enable
1561     if (m_state == ePS_DryRunning) {
1562         if((next_state != ePS_Stopped) &&
1563            (next_state != ePS_WaitingForStreamEnable)) {
1564             goto updateState_exit_with_error;
1565         }
1566         if (next_state == ePS_Stopped) {
1567             result = doStop();
1568         } else {
1569             result = doWaitForStreamEnable();
1570         }
1571         if (result) return true;
1572         else goto updateState_exit_change_failed;
1573     }
1574
1575     // from ePS_WaitingForStreamEnable we can go to:
1576     //   - ePS_DryRunning if something went wrong while waiting
1577     //   - ePS_Running if the stream enabled correctly
1578     if (m_state == ePS_WaitingForStreamEnable) {
1579         if((next_state != ePS_DryRunning) &&
1580            (next_state != ePS_Running)) {
1581             goto updateState_exit_with_error;
1582         }
1583         if (next_state == ePS_Stopped) {
1584             result = doDryRunning();
1585         } else {
1586             result = doRunning();
1587         }
1588         if (result) return true;
1589         else goto updateState_exit_change_failed;
1590     }
1591
1592     // from ePS_Running we can only start waiting for a disabled stream
1593     if (m_state == ePS_Running) {
1594         if(next_state != ePS_WaitingForStreamDisable) {
1595             goto updateState_exit_with_error;
1596         }
1597         result = doWaitForStreamDisable();
1598         if (result) return true;
1599         else goto updateState_exit_change_failed;
1600     }
1601
1602     // from ePS_WaitingForStreamDisable we can go to DryRunning
1603     if (m_state == ePS_WaitingForStreamDisable) {
1604         if(next_state != ePS_DryRunning) {
1605             goto updateState_exit_with_error;
1606         }
1607         result = doDryRunning();
1608         if (result) return true;
1609         else goto updateState_exit_change_failed;
1610     }
1611
1612     // if we arrive here there is an error
1613 updateState_exit_with_error:
1614     debugError("Invalid state transition: %s => %s\n",
1615         ePSToString(m_state), ePSToString(next_state));
1616     return false;
1617 updateState_exit_change_failed:
1618     debugError("State transition failed: %s => %s\n",
1619         ePSToString(m_state), ePSToString(next_state));
1620     return false;
1621 }
1622
1623 /***********************************************
1624  * Helper routines                             *
1625  ***********************************************/
1626 bool
1627 StreamProcessor::transferSilence(unsigned int nframes)
1628 {
1629     bool retval;
1630     signed int fc;
1631     ffado_timestamp_t ts_tail_tmp;
1632
1633     // prepare a buffer of silence
1634     char *dummybuffer = (char *)calloc(getEventSize(), nframes * getEventsPerFrame());
1635     transmitSilenceBlock(dummybuffer, nframes, 0);
1636
1637     m_data_buffer->getBufferTailTimestamp(&ts_tail_tmp, &fc);
1638     if (fc != 0) {
1639         debugWarning("Prefilling a buffer that already contains %d frames\n", fc);
1640     }
1641
1642     // add the silence data to the ringbuffer
1643     if(m_data_buffer->preloadFrames(nframes, dummybuffer, true)) {
1644         retval = true;
1645     } else {
1646         debugWarning("Could not write to event buffer\n");
1647         retval = false;
1648     }
1649     free(dummybuffer);
1650     return retval;
1651 }
1652
1653 /**
1654  * @brief convert a eProcessorState to a string
1655  * @param s the state
1656  * @return a char * describing the state
1657  */
1658 const char *
1659 StreamProcessor::ePSToString(enum eProcessorState s) {
1660     switch (s) {
1661         case ePS_Invalid: return "ePS_Invalid";
1662         case ePS_Created: return "ePS_Created";
1663         case ePS_Stopped: return "ePS_Stopped";
1664         case ePS_WaitingForStream: return "ePS_WaitingForStream";
1665         case ePS_DryRunning: return "ePS_DryRunning";
1666         case ePS_WaitingForStreamEnable: return "ePS_WaitingForStreamEnable";
1667         case ePS_Running: return "ePS_Running";
1668         case ePS_WaitingForStreamDisable: return "ePS_WaitingForStreamDisable";
1669         default: return "error: unknown state";
1670     }
1671 }
1672
1673 /**
1674  * @brief convert a eProcessorType to a string
1675  * @param t the type
1676  * @return a char * describing the state
1677  */
1678 const char *
1679 StreamProcessor::ePTToString(enum eProcessorType t) {
1680     switch (t) {
1681         case ePT_Receive: return "Receive";
1682         case ePT_Transmit: return "Transmit";
1683         default: return "error: unknown type";
1684     }
1685 }
1686
1687 /***********************************************
1688  * Debug                                       *
1689  ***********************************************/
1690 void
1691 StreamProcessor::dumpInfo()
1692 {
1693     debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor %p:\n", this);
1694     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Port, Channel  : %d, %d\n", m_1394service.getPort(), m_channel);
1695     uint64_t now = m_1394service.getCycleTimerTicks();
1696     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Now                   : %011llu (%03us %04uc %04ut)\n",
1697                         now,
1698                         (unsigned int)TICKS_TO_SECS(now),
1699                         (unsigned int)TICKS_TO_CYCLES(now),
1700                         (unsigned int)TICKS_TO_OFFSET(now));
1701     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Xrun?                 : %s\n", (m_in_xrun ? "True":"False"));
1702     if (m_state == m_next_state) {
1703         debugOutputShort( DEBUG_LEVEL_NORMAL, "  State                 : %s\n",
1704                                             ePSToString(m_state));
1705     } else {
1706         debugOutputShort( DEBUG_LEVEL_NORMAL, "  State                 : %s (Next: %s)\n",
1707                                               ePSToString(m_state), ePSToString(m_next_state));
1708         debugOutputShort( DEBUG_LEVEL_NORMAL, "    transition at       : %u\n", m_cycle_to_switch_state);
1709     }
1710     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer                : %p\n", m_data_buffer);
1711     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Framerate             : Nominal: %u, Sync: %f, Buffer %f\n",
1712                                           m_StreamProcessorManager.getNominalRate(),
1713                                           24576000.0/m_StreamProcessorManager.getSyncSource().m_data_buffer->getRate(),
1714                                           24576000.0/m_data_buffer->getRate());
1715     float d = getSyncDelay();
1716     debugOutputShort(DEBUG_LEVEL_NORMAL, "  Sync delay             : %f ticks (%f frames, %f cy)\n",
1717                                          d, d/getTicksPerFrame(),
1718                                          d/((float)TICKS_PER_CYCLE));
1719     m_data_buffer->dumpInfo();
1720 }
1721
1722 void
1723 StreamProcessor::setVerboseLevel(int l) {
1724     setDebugLevel(l);
1725     PortManager::setVerboseLevel(l);
1726     m_data_buffer->setVerboseLevel(l);
1727 }
1728
1729 } // end of namespace
Note: See TracBrowser for help on using the browser.