root/trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp

Revision 791, 61.2 kB (checked in by ppalmers, 13 years ago)

try and get some better low-latency performance

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "StreamProcessor.h"
25 #include "../StreamProcessorManager.h"
26
27 #include "devicemanager.h"
28
29 #include "libieee1394/ieee1394service.h"
30 #include "libieee1394/IsoHandlerManager.h"
31 #include "libieee1394/cycletimer.h"
32
33 #include "libutil/Time.h"
34
35 #include "libutil/Atomic.h"
36
37 #include <assert.h>
38 #include <math.h>
39
40 namespace Streaming {
41
42 IMPL_DEBUG_MODULE( StreamProcessor, StreamProcessor, DEBUG_LEVEL_VERBOSE );
43
44 StreamProcessor::StreamProcessor(FFADODevice &parent, enum eProcessorType type)
45     : m_processor_type ( type )
46     , m_state( ePS_Created )
47     , m_next_state( ePS_Invalid )
48     , m_cycle_to_switch_state( 0 )
49     , m_Parent( parent )
50     , m_1394service( parent.get1394Service() ) // local cache
51     , m_IsoHandlerManager( parent.get1394Service().getIsoHandlerManager() ) // local cache
52     , m_StreamProcessorManager( m_Parent.getDeviceManager().getStreamProcessorManager() ) // local cache
53     , m_channel( -1 )
54     , m_dropped(0)
55     , m_last_timestamp(0)
56     , m_last_timestamp2(0)
57     , m_scratch_buffer( NULL )
58     , m_scratch_buffer_size_bytes( 0 )
59     , m_ticks_per_frame( 0 )
60     , m_last_cycle( -1 )
61     , m_sync_delay( 0 )
62     , m_in_xrun( false )
63 {
64     // create the timestamped buffer and register ourselves as its client
65     m_data_buffer = new Util::TimestampedBuffer(this);
66 }
67
68 StreamProcessor::~StreamProcessor() {
69     m_StreamProcessorManager.unregisterProcessor(this);
70     if(!m_IsoHandlerManager.unregisterStream(this)) {
71         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister stream processor with the Iso manager\n");
72     }
73
74     if (m_data_buffer) delete m_data_buffer;
75     if (m_scratch_buffer) delete[] m_scratch_buffer;
76 }
77
78 uint64_t StreamProcessor::getTimeNow() {
79     return m_1394service.getCycleTimerTicks();
80 }
81
82 int StreamProcessor::getMaxFrameLatency() {
83     if (getType() == ePT_Receive) {
84         return (int)(m_IsoHandlerManager.getPacketLatencyForStream( this ) * TICKS_PER_CYCLE);
85     } else {
86         return (int)(m_IsoHandlerManager.getPacketLatencyForStream( this ) * TICKS_PER_CYCLE);
87     }
88 }
89
90 unsigned int
91 StreamProcessor::getNominalPacketsNeeded(unsigned int nframes)
92 {
93     unsigned int nominal_frames_per_second
94                     = m_StreamProcessorManager.getNominalRate();
95     uint64_t nominal_ticks_per_frame = TICKS_PER_SECOND / nominal_frames_per_second;
96     uint64_t nominal_ticks = nominal_ticks_per_frame * nframes;
97     uint64_t nominal_packets = nominal_ticks / TICKS_PER_CYCLE;
98     return nominal_packets;
99 }
100
101 unsigned int
102 StreamProcessor::getPacketsPerPeriod()
103 {
104     return getNominalPacketsNeeded(m_StreamProcessorManager.getPeriodSize());
105 }
106
107 unsigned int
108 StreamProcessor::getNbPacketsIsoXmitBuffer()
109 {
110     // the target is to have all of the transmit buffer (at period transfer) as ISO packets
111     // when one period is received, there will be approx (NbBuffers - 1) * period_size frames
112     // in the transmit buffer (the others are still to be put into the xmit frame buffer)
113     unsigned int packets_to_prebuffer = (getPacketsPerPeriod() * (m_StreamProcessorManager.getNbBuffers()-1));
114    
115     // however we have to take into account the fact that there is some sync delay (unknown at this point)
116     packets_to_prebuffer -= 16; //FIXME: magic
117    
118     // only queue a part (80%) of the theoretical max in order not to have too much 'not ready' cycles
119     packets_to_prebuffer = (packets_to_prebuffer * 7000) / 10000;
120    
121     return packets_to_prebuffer;
122 }
123
124 /***********************************************
125  * Buffer management and manipulation          *
126  ***********************************************/
127 void StreamProcessor::flush() {
128     m_IsoHandlerManager.flushHandlerForStream(this);
129 }
130
131 int StreamProcessor::getBufferFill() {
132     return m_data_buffer->getBufferFill();
133 }
134
135 int64_t
136 StreamProcessor::getTimeUntilNextPeriodSignalUsecs()
137 {
138     uint64_t time_at_period=getTimeAtPeriod();
139
140     // we delay the period signal with the sync delay
141     // this makes that the period signals lag a little compared to reality
142     // ISO buffering causes the packets to be received at max
143     // m_handler->getWakeupInterval() later than the time they were received.
144     // hence their payload is available this amount of time later. However, the
145     // period boundary is predicted based upon earlier samples, and therefore can
146     // pass before these packets are processed. Adding this extra term makes that
147     // the period boundary is signalled later
148     time_at_period = addTicks(time_at_period, m_StreamProcessorManager.getSyncSource().getSyncDelay());
149
150     uint64_t cycle_timer=m_1394service.getCycleTimerTicks();
151
152     // calculate the time until the next period
153     int32_t until_next=diffTicks(time_at_period,cycle_timer);
154
155     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> TAP=%11llu, CTR=%11llu, UTN=%11ld\n",
156         time_at_period, cycle_timer, until_next
157         );
158
159     // now convert to usecs
160     // don't use the mapping function because it only works
161     // for absolute times, not the relative time we are
162     // using here (which can also be negative).
163     return (int64_t)(((float)until_next) / TICKS_PER_USEC);
164 }
165
166 void
167 StreamProcessor::setSyncDelay(int d) {
168     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "Setting SP %p SyncDelay to %d ticks\n", this, d);
169     m_sync_delay = d;
170 }
171
172 uint64_t
173 StreamProcessor::getTimeAtPeriodUsecs()
174 {
175     return (uint64_t)((float)getTimeAtPeriod() * TICKS_PER_USEC);
176 }
177
178 uint64_t
179 StreamProcessor::getTimeAtPeriod()
180 {
181     if (getType() == ePT_Receive) {
182         ffado_timestamp_t next_period_boundary=m_data_buffer->getTimestampFromHead(m_StreamProcessorManager.getPeriodSize());
183    
184         #ifdef DEBUG
185         ffado_timestamp_t ts;
186         signed int fc;
187         m_data_buffer->getBufferTailTimestamp(&ts,&fc);
188    
189         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD="TIMESTAMP_FORMAT_SPEC", LTS="TIMESTAMP_FORMAT_SPEC", FC=%5u, TPF=%f\n",
190             next_period_boundary, ts, fc, getTicksPerFrame()
191             );
192         #endif
193         return (uint64_t)next_period_boundary;
194     } else {
195         ffado_timestamp_t next_period_boundary=m_data_buffer->getTimestampFromTail((m_StreamProcessorManager.getNbBuffers()-1) * m_StreamProcessorManager.getPeriodSize());
196    
197         #ifdef DEBUG
198         ffado_timestamp_t ts;
199         signed int fc;
200         m_data_buffer->getBufferTailTimestamp(&ts,&fc);
201    
202         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD="TIMESTAMP_FORMAT_SPEC", LTS="TIMESTAMP_FORMAT_SPEC", FC=%5u, TPF=%f\n",
203             next_period_boundary, ts, fc, getTicksPerFrame()
204             );
205         #endif
206         return (uint64_t)next_period_boundary;
207     }
208 }
209
210 float
211 StreamProcessor::getTicksPerFrame()
212 {
213     assert(m_data_buffer != NULL);
214     return m_data_buffer->getRate();
215 }
216
217 bool
218 StreamProcessor::canClientTransferFrames(unsigned int nbframes)
219 {
220     bool can_transfer;
221     unsigned int fc = m_data_buffer->getFrameCounter();
222     if (getType() == ePT_Receive) {
223         can_transfer = (fc >= nbframes);
224     } else {
225         // there has to be enough space to put the frames in
226         can_transfer = m_data_buffer->getBufferSize() - fc > nbframes;
227         // or the buffer is transparent
228         can_transfer |= m_data_buffer->isTransparent();
229     }
230    
231     #ifdef DEBUG
232     if (!can_transfer) {
233         debugWarning("(%p, %s) cannot transfer since fc == %u, nbframes == %u\n",
234             this, ePTToString(getType()), fc, nbframes);
235     }
236     #endif
237    
238     return can_transfer;
239 }
240
241 /***********************************************
242  * I/O API                                     *
243  ***********************************************/
244
245 // Packet transfer API
246 enum raw1394_iso_disposition
247 StreamProcessor::putPacket(unsigned char *data, unsigned int length,
248                            unsigned char channel, unsigned char tag, unsigned char sy,
249                            unsigned int cycle, unsigned int dropped) {
250     if(m_last_cycle == -1) {
251         debugOutput(DEBUG_LEVEL_VERBOSE, "Handler for %s SP %p is alive (cycle = %u)\n", getTypeString(), this, cycle);
252     }
253
254     int dropped_cycles = 0;
255     if (m_last_cycle != (int)cycle && m_last_cycle != -1) {
256         dropped_cycles = diffCycles(cycle, m_last_cycle) - 1;
257         if (dropped_cycles < 0) {
258             debugWarning("(%p) dropped < 1 (%d), cycle: %d, last_cycle: %d, dropped: %d\n",
259                          this, dropped_cycles, cycle, m_last_cycle, dropped);
260         }
261         if (dropped_cycles > 0) {
262             debugWarning("(%p) dropped %d packets on cycle %u, 'dropped'=%u, cycle=%d, m_last_cycle=%d\n",
263                 this, dropped_cycles, cycle, dropped, cycle, m_last_cycle);
264             m_dropped += dropped_cycles;
265             m_in_xrun = true;
266             //flushDebugOutput();
267             //assert(0);
268         }
269     }
270     m_last_cycle = cycle;
271
272     // bypass based upon state
273     if (m_state == ePS_Invalid) {
274         debugError("Should not have state %s\n", ePSToString(m_state) );
275         return RAW1394_ISO_ERROR;
276     }
277     if (m_state == ePS_Created) {
278         return RAW1394_ISO_DEFER;
279     }
280
281     // store the previous timestamp
282     m_last_timestamp2 = m_last_timestamp;
283
284     // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles)
285     //       it happens on the first 'good' cycle for the wait condition
286     //       or on the first received cycle that is received afterwards (might be a problem)
287
288     // check whether we are waiting for a stream to be disabled
289     if(m_state == ePS_WaitingForStreamDisable) {
290         // we then check whether we have to switch on this cycle
291         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
292             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n");
293             m_next_state = ePS_DryRunning;
294             if (!updateState()) { // we are allowed to change the state directly
295                 debugError("Could not update state!\n");
296                 return RAW1394_ISO_ERROR;
297             }
298         } else {
299             // not time to disable yet
300         }
301         // the received data can be discarded while waiting for the stream
302         // to be disabled
303         return RAW1394_ISO_OK;
304     }
305
306     // check whether we are waiting for a stream to be enabled
307     else if(m_state == ePS_WaitingForStreamEnable) {
308         // we then check whether we have to switch on this cycle
309         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
310             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n");
311             m_next_state = ePS_Running;
312             if (!updateState()) { // we are allowed to change the state directly
313                 debugError("Could not update state!\n");
314                 return RAW1394_ISO_ERROR;
315             }
316         } else {
317             // not time to enable yet
318         }
319         // we are dryRunning hence data should be processed in any case
320     }
321
322     // check the packet header
323     enum eChildReturnValue result = processPacketHeader(data, length, channel, tag, sy, cycle, dropped_cycles);
324     if (result == eCRV_OK) {
325         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "RECV: CY=%04u TS=%011llu\n",
326                 cycle, m_last_timestamp);
327         // update some accounting
328         m_last_good_cycle = cycle;
329         m_last_dropped = dropped_cycles;
330
331         // check whether we are waiting for a stream to startup
332         // this requires that the packet is good
333         if(m_state == ePS_WaitingForStream) {
334             // since we have a packet with an OK header,
335             // we can indicate that the stream started up
336
337             // we then check whether we have to switch on this cycle
338             if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
339                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to good packet\n");
340                 // hence go to the dryRunning state
341                 m_next_state = ePS_DryRunning;
342                 if (!updateState()) { // we are allowed to change the state directly
343                     debugError("Could not update state!\n");
344                     return RAW1394_ISO_ERROR;
345                 }
346             } else {
347                 // not time (yet) to switch state
348             }
349             // in both cases we don't want to process the data
350             return RAW1394_ISO_OK;
351         }
352
353         // check whether a state change has been requested
354         // note that only the wait state changes are synchronized with the cycles
355         else if(m_state != m_next_state) {
356             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
357                                              ePSToString(m_state), ePSToString(m_next_state));
358             // execute the requested change
359             if (!updateState()) { // we are allowed to change the state directly
360                 debugError("Could not update state!\n");
361                 return RAW1394_ISO_ERROR;
362             }
363         }
364
365         // handle dropped cycles
366         if(dropped_cycles) {
367             // they represent a discontinuity in the timestamps, and hence are
368             // to be dealt with
369             debugWarning("(%p) Correcting timestamp for dropped cycles, discarding packet...\n", this);
370             m_data_buffer->setBufferTailTimestamp(m_last_timestamp);
371             if (m_state == ePS_Running) {
372                 // this is an xrun situation
373                 m_in_xrun = true;
374                 debugWarning("Should update state to WaitingForStreamDisable due to dropped packet xrun\n");
375                 m_cycle_to_switch_state = cycle + 1; // switch in the next cycle
376                 m_next_state = ePS_WaitingForStreamDisable;
377                 // execute the requested change
378                 if (!updateState()) { // we are allowed to change the state directly
379                     debugError("Could not update state!\n");
380                     return RAW1394_ISO_ERROR;
381                 }
382                 return RAW1394_ISO_DEFER;
383             }
384         }
385
386         // for all states that reach this we are allowed to
387         // do protocol specific data reception
388         enum eChildReturnValue result2 = processPacketData(data, length, channel, tag, sy, cycle, dropped_cycles);
389
390         // if an xrun occured, switch to the dryRunning state and
391         // allow for the xrun to be picked up
392         if (result2 == eCRV_XRun) {
393             debugWarning("processPacketData xrun\n");
394             m_in_xrun = true;
395             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
396             m_cycle_to_switch_state = cycle+1; // switch in the next cycle
397             m_next_state = ePS_WaitingForStreamDisable;
398             // execute the requested change
399             if (!updateState()) { // we are allowed to change the state directly
400                 debugError("Could not update state!\n");
401                 return RAW1394_ISO_ERROR;
402             }
403             return RAW1394_ISO_DEFER;
404         } else if(result2 == eCRV_OK) {
405             // no problem here
406             return RAW1394_ISO_OK;
407         } else {
408             debugError("Invalid response\n");
409             return RAW1394_ISO_ERROR;
410         }
411     } else if(result == eCRV_Invalid) {
412         // apparently we don't have to do anything when the packets are not valid
413         return RAW1394_ISO_OK;
414     } else {
415         debugError("Invalid response\n");
416         return RAW1394_ISO_ERROR;
417     }
418     debugError("reached the unreachable\n");
419     return RAW1394_ISO_ERROR;
420 }
421
422 enum raw1394_iso_disposition
423 StreamProcessor::getPacket(unsigned char *data, unsigned int *length,
424                            unsigned char *tag, unsigned char *sy,
425                            int cycle, unsigned int dropped, unsigned int max_length) {
426     if (cycle<0) {
427         *tag = 0;
428         *sy = 0;
429         *length = 0;
430         return RAW1394_ISO_OK;
431     }
432
433     if(m_last_cycle == -1) {
434         debugOutput(DEBUG_LEVEL_VERBOSE, "Handler for %s SP %p is alive (cycle = %d)\n", getTypeString(), this, cycle);
435     }
436
437     int dropped_cycles = 0;
438     if (m_last_cycle != cycle && m_last_cycle != -1) {
439         dropped_cycles = diffCycles(cycle, m_last_cycle) - 1;
440         if (dropped_cycles < 0) {
441             debugWarning("(%p) dropped < 1 (%d), cycle: %d, last_cycle: %d, dropped: %d\n",
442                          this, dropped_cycles, cycle, m_last_cycle, dropped);
443         }
444         if (dropped_cycles > 0) {
445             debugWarning("(%p) dropped %d packets on cycle %u (last_cycle=%u, dropped=%d)\n", this, dropped_cycles, cycle, m_last_cycle, dropped);
446             m_dropped += dropped_cycles;
447 //             flushDebugOutput();
448 //             assert(0);
449         }
450     }
451     if (cycle >= 0) {
452         m_last_cycle = cycle;
453     }
454
455     // bypass based upon state
456     if (m_state == ePS_Invalid) {
457         debugError("Should not have state %s\n", ePSToString(m_state) );
458         return RAW1394_ISO_ERROR;
459     }
460     if (m_state == ePS_Created) {
461         *tag = 0;
462         *sy = 0;
463         *length = 0;
464         return RAW1394_ISO_DEFER;
465     }
466
467     // normal processing
468     // note that we can't use getCycleTimer directly here,
469     // because packets are queued in advance. This means that
470     // we the packet we are constructing will be sent out
471     // on 'cycle', not 'now'.
472     unsigned int ctr = m_1394service.getCycleTimer();
473     int now_cycles = (int)CYCLE_TIMER_GET_CYCLES(ctr);
474
475     // the difference between the cycle this
476     // packet is intended for and 'now'
477     int cycle_diff = diffCycles(cycle, now_cycles);
478
479     #ifdef DEBUG
480     if(cycle_diff < 0 && (m_state == ePS_Running || m_state == ePS_DryRunning)) {
481         debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy)\n",
482             cycle, now_cycles);
483         if(m_state == ePS_Running) {
484             debugShowBackLogLines(200);
485 //             flushDebugOutput();
486 //             assert(0);
487         }
488     }
489     #endif
490
491     // store the previous timestamp
492     m_last_timestamp2 = m_last_timestamp;
493
494     // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles)
495     //       it happens on the first 'good' cycle for the wait condition
496     //       or on the first received cycle that is received afterwards (might be a problem)
497
498     // check whether we are waiting for a stream to be disabled
499     if(m_state == ePS_WaitingForStreamDisable) {
500         // we then check whether we have to switch on this cycle
501         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
502             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n");
503             m_next_state = ePS_DryRunning;
504             if (!updateState()) { // we are allowed to change the state directly
505                 debugError("Could not update state!\n");
506                 return RAW1394_ISO_ERROR;
507             }
508         } else {
509             // not time to disable yet
510         }
511     }
512     // check whether we are waiting for a stream to be enabled
513     else if(m_state == ePS_WaitingForStreamEnable) {
514         // we then check whether we have to switch on this cycle
515         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
516             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n");
517             m_next_state = ePS_Running;
518             if (!updateState()) { // we are allowed to change the state directly
519                 debugError("Could not update state!\n");
520                 return RAW1394_ISO_ERROR;
521             }
522         } else {
523             // not time to enable yet
524         }
525         // we are dryRunning hence data should be processed in any case
526     }
527     // check whether we are waiting for a stream to startup
528     else if(m_state == ePS_WaitingForStream) {
529         // as long as the cycle parameter is not in sync with
530         // the current time, the stream is considered not
531         // to be 'running'
532         // we then check whether we have to switch on this cycle
533         if ((cycle_diff >= 0) && (diffCycles(cycle, m_cycle_to_switch_state) >= 0)) {
534             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStream to DryRunning\n");
535             // hence go to the dryRunning state
536             m_next_state = ePS_DryRunning;
537             if (!updateState()) { // we are allowed to change the state directly
538                 debugError("Could not update state!\n");
539                 return RAW1394_ISO_ERROR;
540             }
541         } else {
542             // not time (yet) to switch state
543         }
544     }
545     else if(m_state == ePS_Running) {
546         // check the packet header
547         enum eChildReturnValue result = generatePacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
548         if (result == eCRV_Packet || result == eCRV_Defer) {
549             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT: CY=%04u TS=%011llu\n",
550                     cycle, m_last_timestamp);
551             // update some accounting
552             m_last_good_cycle = cycle;
553             m_last_dropped = dropped_cycles;
554
555             // check whether a state change has been requested
556             // note that only the wait state changes are synchronized with the cycles
557             if(m_state != m_next_state) {
558                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
559                                                 ePSToString(m_state), ePSToString(m_next_state));
560                 // execute the requested change
561                 if (!updateState()) { // we are allowed to change the state directly
562                     debugError("Could not update state!\n");
563                     return RAW1394_ISO_ERROR;
564                 }
565             }
566
567             enum eChildReturnValue result2 = generatePacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
568             // if an xrun occured, switch to the dryRunning state and
569             // allow for the xrun to be picked up
570             if (result2 == eCRV_XRun) {
571                 debugWarning("generatePacketData xrun\n");
572                 m_in_xrun = true;
573                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
574                 m_cycle_to_switch_state = cycle+1; // switch in the next cycle
575                 m_next_state = ePS_WaitingForStreamDisable;
576                 // execute the requested change
577                 if (!updateState()) { // we are allowed to change the state directly
578                     debugError("Could not update state!\n");
579                     return RAW1394_ISO_ERROR;
580                 }
581                 goto send_empty_packet;
582             }
583             // skip queueing packets if we detect that there are not enough frames
584             // available
585             if(result2 == eCRV_Defer || result == eCRV_Defer)
586                 return RAW1394_ISO_DEFER;
587             else
588                 return RAW1394_ISO_OK;
589         } else if (result == eCRV_XRun) { // pick up the possible xruns
590             debugWarning("generatePacketHeader xrun\n");
591             m_in_xrun = true;
592             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to header xrun\n");
593             m_cycle_to_switch_state = cycle+1; // switch in the next cycle
594             m_next_state = ePS_WaitingForStreamDisable;
595             // execute the requested change
596             if (!updateState()) { // we are allowed to change the state directly
597                 debugError("Could not update state!\n");
598                 return RAW1394_ISO_ERROR;
599             }
600         } else if (result == eCRV_EmptyPacket) {
601             if(m_state != m_next_state) {
602                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
603                                                 ePSToString(m_state), ePSToString(m_next_state));
604                 // execute the requested change
605                 if (!updateState()) { // we are allowed to change the state directly
606                     debugError("Could not update state!\n");
607                     return RAW1394_ISO_ERROR;
608                 }
609             }
610             goto send_empty_packet;
611         } else if (result == eCRV_Again) {
612             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "have to retry cycle %d\n", cycle);
613             if(m_state != m_next_state) {
614                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
615                                                 ePSToString(m_state), ePSToString(m_next_state));
616                 // execute the requested change
617                 if (!updateState()) { // we are allowed to change the state directly
618                     debugError("Could not update state!\n");
619                     return RAW1394_ISO_ERROR;
620                 }
621             }
622             usleep(125); // only when using thread-per-handler
623             return RAW1394_ISO_AGAIN;
624 //             generateSilentPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
625 //             generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
626 //             return RAW1394_ISO_DEFER;
627         } else {
628             debugError("Invalid return value: %d\n", result);
629             return RAW1394_ISO_ERROR;
630         }
631     }
632     // we are not running, so send an empty packet
633     // we should generate a valid packet any time
634 send_empty_packet:
635     // note that only the wait state changes are synchronized with the cycles
636     if(m_state != m_next_state) {
637         debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
638                                         ePSToString(m_state), ePSToString(m_next_state));
639         // execute the requested change
640         if (!updateState()) { // we are allowed to change the state directly
641             debugError("Could not update state!\n");
642             return RAW1394_ISO_ERROR;
643         }
644     }
645
646     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT EMPTY: CY=%04u\n", cycle);
647     generateSilentPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
648     generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
649     return RAW1394_ISO_OK;
650 }
651
652
653 // Frame Transfer API
654 /**
655  * Transfer a block of frames from the event buffer to the port buffers
656  * @param nbframes number of frames to transfer
657  * @param ts the timestamp that the LAST frame in the block should have
658  * @return
659  */
660 bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) {
661     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.getFrames(%d, %11llu)", nbframes, ts);
662     assert( getType() == ePT_Receive );
663     if(isDryRunning()) return getFramesDry(nbframes, ts);
664     else return getFramesWet(nbframes, ts);
665 }
666
667 bool StreamProcessor::getFramesWet(unsigned int nbframes, int64_t ts) {
668 // FIXME: this should be done somewhere else
669 #ifdef DEBUG
670     uint64_t ts_expected;
671     signed int fc;
672     int32_t lag_ticks;
673     float lag_frames;
674
675     // in order to sync up multiple received streams, we should
676     // use the ts parameter. It specifies the time of the block's
677     // last sample.
678     float srate = m_StreamProcessorManager.getSyncSource().getTicksPerFrame();
679     assert(srate != 0.0);
680     int64_t this_block_length_in_ticks = (int64_t)(((float)nbframes) * srate);
681
682     ffado_timestamp_t ts_head_tmp;
683     m_data_buffer->getBufferHeadTimestamp(&ts_head_tmp, &fc);
684     ts_expected = addTicks((uint64_t)ts_head_tmp, this_block_length_in_ticks);
685
686     lag_ticks = diffTicks(ts, ts_expected);
687     lag_frames = (((float)lag_ticks) / srate);
688     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): drifts %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
689                  this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
690     if (lag_frames >= 1.0) {
691         // the stream lags
692         debugWarning( "stream (%p): lags  with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
693                       this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
694     } else if (lag_frames <= -1.0) {
695         // the stream leads
696         debugWarning( "stream (%p): leads with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
697                       this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
698     }
699 #endif
700     // ask the buffer to process nbframes of frames
701     // using it's registered client's processReadBlock(),
702     // which should be ours
703     m_data_buffer->blockProcessReadFrames(nbframes);
704     return true;
705 }
706
707 bool StreamProcessor::getFramesDry(unsigned int nbframes, int64_t ts)
708 {
709     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): dry run %d frames (@ ts=%lld)\n",
710                  this, nbframes, ts);
711     // dry run on this side means that we put silence in all enabled ports
712     // since there is do data put into the ringbuffer in the dry-running state
713     return provideSilenceBlock(nbframes, 0);
714 }
715
716 bool
717 StreamProcessor::dropFrames(unsigned int nbframes, int64_t ts)
718 {
719     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d, %lld)\n", nbframes, ts);
720     return m_data_buffer->dropFrames(nbframes);
721 }
722
723 bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts)
724 {
725     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.putFrames(%d, %11llu)", nbframes, ts);
726     assert( getType() == ePT_Transmit );
727     if(isDryRunning()) return putFramesDry(nbframes, ts);
728     else return putFramesWet(nbframes, ts);
729 }
730
731 bool
732 StreamProcessor::putFramesWet(unsigned int nbframes, int64_t ts)
733 {
734     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesWet(%d, %llu)\n", nbframes, ts);
735     // transfer the data
736     m_data_buffer->blockProcessWriteFrames(nbframes, ts);
737     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, " New timestamp: %llu\n", ts);
738     return true; // FIXME: what about failure?
739 }
740
741 bool
742 StreamProcessor::putFramesDry(unsigned int nbframes, int64_t ts)
743 {
744     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesDry(%d, %llu)\n", nbframes, ts);
745     // do nothing
746     return true;
747 }
748
749 bool
750 StreamProcessor::putSilenceFrames(unsigned int nbframes, int64_t ts)
751 {
752     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putSilenceFrames(%d, %llu)\n", nbframes, ts);
753
754     size_t bytes_per_frame = getEventSize() * getEventsPerFrame();
755     unsigned int scratch_buffer_size_frames = m_scratch_buffer_size_bytes / bytes_per_frame;
756
757     if (nbframes > scratch_buffer_size_frames) {
758         debugError("nframes (%u) > scratch_buffer_size_frames (%u)\n",
759                    nbframes, scratch_buffer_size_frames);
760     }
761
762     assert(m_scratch_buffer);
763     if(!transmitSilenceBlock((char *)m_scratch_buffer, nbframes, 0)) {
764         debugError("Could not prepare silent block\n");
765         return false;
766     }
767     if(!m_data_buffer->writeFrames(nbframes, (char *)m_scratch_buffer, ts)) {
768         debugError("Could not write silent block\n");
769         return false;
770     }
771     return true;
772 }
773
774 bool
775 StreamProcessor::shiftStream(int nbframes)
776 {
777     if(nbframes == 0) return true;
778     if(nbframes > 0) {
779         return m_data_buffer->dropFrames(nbframes);
780     } else {
781         bool result = true;
782         while(nbframes++) {
783             result &= m_data_buffer->writeDummyFrame();
784         }
785         return result;
786     }
787 }
788
789 /**
790  * @brief write silence events to the stream ringbuffers.
791  */
792 bool StreamProcessor::provideSilenceBlock(unsigned int nevents, unsigned int offset)
793 {
794     bool no_problem=true;
795     for ( PortVectorIterator it = m_PeriodPorts.begin();
796           it != m_PeriodPorts.end();
797           ++it ) {
798         if((*it)->isDisabled()) {continue;};
799
800         //FIXME: make this into a static_cast when not DEBUG?
801         Port *port=dynamic_cast<Port *>(*it);
802
803         switch(port->getPortType()) {
804
805         case Port::E_Audio:
806             if(provideSilenceToPort(static_cast<AudioPort *>(*it), offset, nevents)) {
807                 debugWarning("Could not put silence into to port %s",(*it)->getName().c_str());
808                 no_problem=false;
809             }
810             break;
811         // midi is a packet based port, don't process
812         //    case MotuPortInfo::E_Midi:
813         //        break;
814
815         default: // ignore
816             break;
817         }
818     }
819     return no_problem;
820 }
821
822 int
823 StreamProcessor::provideSilenceToPort(
824                        AudioPort *p, unsigned int offset, unsigned int nevents)
825 {
826     unsigned int j=0;
827     switch(p->getDataType()) {
828         default:
829         case Port::E_Int24:
830             {
831                 quadlet_t *buffer=(quadlet_t *)(p->getBufferAddress());
832                 assert(nevents + offset <= p->getBufferSize());
833                 buffer+=offset;
834
835                 for(j = 0; j < nevents; j += 1) { // decode max nsamples
836                     *(buffer)=0;
837                     buffer++;
838                 }
839             }
840             break;
841         case Port::E_Float:
842             {
843                 float *buffer=(float *)(p->getBufferAddress());
844                 assert(nevents + offset <= p->getBufferSize());
845                 buffer+=offset;
846
847                 for(j = 0; j < nevents; j += 1) { // decode max nsamples
848                     *buffer = 0.0;
849                     buffer++;
850                 }
851             }
852             break;
853     }
854     return 0;
855 }
856
857 /***********************************************
858  * State related API                           *
859  ***********************************************/
860 bool StreamProcessor::init()
861 {
862     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "init...\n");
863
864     if(!m_IsoHandlerManager.registerStream(this)) {
865         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register stream processor with the Iso manager\n");
866         return false;
867     }
868     if(!m_StreamProcessorManager.registerProcessor(this)) {
869         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register stream processor with the SP manager\n");
870         return false;
871     }
872
873     // initialization can be done without requesting it
874     // from the packet loop
875     m_next_state = ePS_Created;
876     return true;
877 }
878
879 bool StreamProcessor::prepare()
880 {
881     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare SP (%p)...\n", this);
882
883     // make the scratch buffer one period of frames long
884     m_scratch_buffer_size_bytes = m_StreamProcessorManager.getPeriodSize() * getEventsPerFrame() * getEventSize();
885     debugOutput( DEBUG_LEVEL_VERBOSE, " Allocate scratch buffer of %d quadlets\n");
886     if(m_scratch_buffer) delete[] m_scratch_buffer;
887     m_scratch_buffer = new byte_t[m_scratch_buffer_size_bytes];
888     if(m_scratch_buffer == NULL) {
889         debugFatal("Could not allocate scratch buffer\n");
890         return false;
891     }
892
893     if (!prepareChild()) {
894         debugFatal("Could not prepare child\n");
895         return false;
896     }
897
898     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepared for:\n");
899     debugOutput( DEBUG_LEVEL_VERBOSE, " Samplerate: %d\n",
900              m_StreamProcessorManager.getNominalRate());
901     debugOutput( DEBUG_LEVEL_VERBOSE, " PeriodSize: %d, NbBuffers: %d\n",
902              m_StreamProcessorManager.getPeriodSize(), m_StreamProcessorManager.getNbBuffers());
903     debugOutput( DEBUG_LEVEL_VERBOSE, " Port: %d, Channel: %d\n",
904              m_1394service.getPort(), m_channel);
905
906     // initialization can be done without requesting it
907     // from the packet loop
908     m_next_state = ePS_Stopped;
909     return updateState();
910 }
911
912 bool
913 StreamProcessor::scheduleStateTransition(enum eProcessorState state, uint64_t time_instant)
914 {
915     // first set the time, since in the packet loop we first check m_state == m_next_state before
916     // using the time
917     m_cycle_to_switch_state = TICKS_TO_CYCLES(time_instant);
918     m_next_state = state;
919     return true;
920 }
921
922 bool
923 StreamProcessor::waitForState(enum eProcessorState state, unsigned int timeout_ms)
924 {
925     debugOutput(DEBUG_LEVEL_VERBOSE, "Waiting for state %s\n", ePSToString(state));
926     int cnt = timeout_ms;
927     while (m_state != state && cnt) {
928         SleepRelativeUsec(1000);
929         cnt--;
930     }
931     if(cnt==0) {
932         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout\n");
933         return false;
934     }
935     return true;
936 }
937
938 bool StreamProcessor::scheduleStartDryRunning(int64_t t) {
939     uint64_t tx;
940     if (t < 0) {
941         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
942     } else {
943         tx = t;
944     }
945     uint64_t start_handler_ticks = substractTicks(tx, 100 * TICKS_PER_CYCLE);
946
947     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
948     uint64_t now = m_1394service.getCycleTimerTicks();
949     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
950                           now,
951                           (unsigned int)TICKS_TO_SECS(now),
952                           (unsigned int)TICKS_TO_CYCLES(now),
953                           (unsigned int)TICKS_TO_OFFSET(now));
954     debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%03us %04uc %04ut)\n",
955                           tx,
956                           (unsigned int)TICKS_TO_SECS(tx),
957                           (unsigned int)TICKS_TO_CYCLES(tx),
958                           (unsigned int)TICKS_TO_OFFSET(tx));
959     if (m_state == ePS_Stopped) {
960         if(!m_IsoHandlerManager.startHandlerForStream(
961                                         this, TICKS_TO_CYCLES(start_handler_ticks))) {
962             debugError("Could not start handler for SP %p\n", this);
963             return false;
964         }
965         return scheduleStateTransition(ePS_WaitingForStream, tx);
966     } else if (m_state == ePS_Running) {
967         return scheduleStateTransition(ePS_WaitingForStreamDisable, tx);
968     } else {
969         debugError("Cannot switch to ePS_DryRunning from %s\n", ePSToString(m_state));
970         return false;
971     }
972 }
973
974 bool StreamProcessor::scheduleStartRunning(int64_t t) {
975     uint64_t tx;
976     if (t < 0) {
977         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
978     } else {
979         tx = t;
980     }
981     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
982     uint64_t now = m_1394service.getCycleTimerTicks();
983     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
984                           now,
985                           (unsigned int)TICKS_TO_SECS(now),
986                           (unsigned int)TICKS_TO_CYCLES(now),
987                           (unsigned int)TICKS_TO_OFFSET(now));
988     debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%03us %04uc %04ut)\n",
989                           tx,
990                           (unsigned int)TICKS_TO_SECS(tx),
991                           (unsigned int)TICKS_TO_CYCLES(tx),
992                           (unsigned int)TICKS_TO_OFFSET(tx));
993     return scheduleStateTransition(ePS_WaitingForStreamEnable, tx);
994 }
995
996 bool StreamProcessor::scheduleStopDryRunning(int64_t t) {
997     uint64_t tx;
998     if (t < 0) {
999         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
1000     } else {
1001         tx = t;
1002     }
1003     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1004     uint64_t now = m_1394service.getCycleTimerTicks();
1005     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1006                           now,
1007                           (unsigned int)TICKS_TO_SECS(now),
1008                           (unsigned int)TICKS_TO_CYCLES(now),
1009                           (unsigned int)TICKS_TO_OFFSET(now));
1010     debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at               : %011llu (%03us %04uc %04ut)\n",
1011                           tx,
1012                           (unsigned int)TICKS_TO_SECS(tx),
1013                           (unsigned int)TICKS_TO_CYCLES(tx),
1014                           (unsigned int)TICKS_TO_OFFSET(tx));
1015
1016     return scheduleStateTransition(ePS_Stopped, tx);
1017 }
1018
1019 bool StreamProcessor::scheduleStopRunning(int64_t t) {
1020     uint64_t tx;
1021     if (t < 0) {
1022         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
1023     } else {
1024         tx = t;
1025     }
1026     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1027     uint64_t now = m_1394service.getCycleTimerTicks();
1028     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1029                           now,
1030                           (unsigned int)TICKS_TO_SECS(now),
1031                           (unsigned int)TICKS_TO_CYCLES(now),
1032                           (unsigned int)TICKS_TO_OFFSET(now));
1033     debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at               : %011llu (%03us %04uc %04ut)\n",
1034                           tx,
1035                           (unsigned int)TICKS_TO_SECS(tx),
1036                           (unsigned int)TICKS_TO_CYCLES(tx),
1037                           (unsigned int)TICKS_TO_OFFSET(tx));
1038     return scheduleStateTransition(ePS_WaitingForStreamDisable, tx);
1039 }
1040
1041 bool StreamProcessor::startDryRunning(int64_t t) {
1042     if(!scheduleStartDryRunning(t)) {
1043         debugError("Could not schedule transition\n");
1044         return false;
1045     }
1046     if(!waitForState(ePS_DryRunning, 2000)) {
1047         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning));
1048         return false;
1049     }
1050     return true;
1051 }
1052
1053 bool StreamProcessor::startRunning(int64_t t) {
1054     if(!scheduleStartRunning(t)) {
1055         debugError("Could not schedule transition\n");
1056         return false;
1057     }
1058     if(!waitForState(ePS_Running, 2000)) {
1059         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Running));
1060         return false;
1061     }
1062     return true;
1063 }
1064
1065 bool StreamProcessor::stopDryRunning(int64_t t) {
1066     if(!scheduleStopDryRunning(t)) {
1067         debugError("Could not schedule transition\n");
1068         return false;
1069     }
1070     if(!waitForState(ePS_Stopped, 2000)) {
1071         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Stopped));
1072         return false;
1073     }
1074     return true;
1075 }
1076
1077 bool StreamProcessor::stopRunning(int64_t t) {
1078     if(!scheduleStopRunning(t)) {
1079         debugError("Could not schedule transition\n");
1080         return false;
1081     }
1082     if(!waitForState(ePS_DryRunning, 2000)) {
1083         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning));
1084         return false;
1085     }
1086     return true;
1087 }
1088
1089
1090 // internal state API
1091
1092 /**
1093  * @brief Enter the ePS_Stopped state
1094  * @return true if successful, false if not
1095  *
1096  * @pre none
1097  *
1098  * @post the buffer and the isostream are ready for use.
1099  * @post all dynamic structures have been allocated successfully
1100  * @post the buffer is transparent and empty, and all parameters are set
1101  *       to the correct initial/nominal values.
1102  *
1103  */
1104 bool
1105 StreamProcessor::doStop()
1106 {
1107     float ticks_per_frame;
1108     unsigned int ringbuffer_size_frames = (m_StreamProcessorManager.getNbBuffers() + 1) * m_StreamProcessorManager.getPeriodSize();
1109
1110     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1111     bool result = true;
1112
1113     switch(m_state) {
1114         case ePS_Created:
1115             assert(m_data_buffer);
1116             // object just created
1117             result = m_data_buffer->init();
1118
1119             // prepare the framerate estimate
1120             ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_StreamProcessorManager.getNominalRate());
1121             m_ticks_per_frame = ticks_per_frame;
1122             debugOutput(DEBUG_LEVEL_VERBOSE,"Initializing remote ticks/frame to %f\n", ticks_per_frame);
1123
1124             // initialize internal buffer
1125             result &= m_data_buffer->setBufferSize(ringbuffer_size_frames);
1126
1127             result &= m_data_buffer->setEventSize( getEventSize() );
1128             result &= m_data_buffer->setEventsPerFrame( getEventsPerFrame() );
1129             if(getType() == ePT_Receive) {
1130                 result &= m_data_buffer->setUpdatePeriod( getNominalFramesPerPacket() );
1131             } else {
1132                 result &= m_data_buffer->setUpdatePeriod( m_StreamProcessorManager.getPeriodSize() );
1133             }
1134             result &= m_data_buffer->setNominalRate(ticks_per_frame);
1135             result &= m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND);
1136             result &= m_data_buffer->prepare(); // FIXME: the name
1137
1138             // set the parameters of ports we can:
1139             // we want the audio ports to be period buffered,
1140             // and the midi ports to be packet buffered
1141             for ( PortVectorIterator it = m_Ports.begin();
1142                 it != m_Ports.end();
1143                 ++it )
1144             {
1145                 debugOutput(DEBUG_LEVEL_VERBOSE, "Setting up port %s\n",(*it)->getName().c_str());
1146                 if(!(*it)->setBufferSize(m_StreamProcessorManager.getPeriodSize())) {
1147                     debugFatal("Could not set buffer size to %d\n",m_StreamProcessorManager.getPeriodSize());
1148                     return false;
1149                 }
1150                 switch ((*it)->getPortType()) {
1151                     case Port::E_Audio:
1152                         if(!(*it)->setSignalType(Port::E_PeriodSignalled)) {
1153                             debugFatal("Could not set signal type to PeriodSignalling");
1154                             return false;
1155                         }
1156                         // buffertype and datatype are dependant on the API
1157                         debugWarning("---------------- ! Doing hardcoded dummy setup ! --------------\n");
1158                         // buffertype and datatype are dependant on the API
1159                         if(!(*it)->setBufferType(Port::E_PointerBuffer)) {
1160                             debugFatal("Could not set buffer type");
1161                             return false;
1162                         }
1163                         if(!(*it)->useExternalBuffer(true)) {
1164                             debugFatal("Could not set external buffer usage");
1165                             return false;
1166                         }
1167                         if(!(*it)->setDataType(Port::E_Float)) {
1168                             debugFatal("Could not set data type");
1169                             return false;
1170                         }
1171                         break;
1172                     case Port::E_Midi:
1173                         if(!(*it)->setSignalType(Port::E_PacketSignalled)) {
1174                             debugFatal("Could not set signal type to PacketSignalling");
1175                             return false;
1176                         }
1177                         // buffertype and datatype are dependant on the API
1178                         debugWarning("---------------- ! Doing hardcoded test setup ! --------------\n");
1179                         // buffertype and datatype are dependant on the API
1180                         if(!(*it)->setBufferType(Port::E_RingBuffer)) {
1181                             debugFatal("Could not set buffer type");
1182                             return false;
1183                         }
1184                         if(!(*it)->setDataType(Port::E_MidiEvent)) {
1185                             debugFatal("Could not set data type");
1186                             return false;
1187                         }
1188                         break;
1189                     default:
1190                         debugWarning("Unsupported port type specified\n");
1191                         break;
1192                 }
1193             }
1194             // the API specific settings of the ports should already be set,
1195             // as this is called from the processorManager->prepare()
1196             // so we can init the ports
1197             result &= PortManager::initPorts();
1198
1199             break;
1200         case ePS_DryRunning:
1201             if(!m_IsoHandlerManager.stopHandlerForStream(this)) {
1202                 debugError("Could not stop handler for SP %p\n", this);
1203                 return false;
1204             }
1205             break;
1206         default:
1207             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1208             return false;
1209     }
1210
1211     result &= m_data_buffer->clearBuffer(); // FIXME: don't like the reset() name
1212     // make the buffer transparent
1213     m_data_buffer->setTransparent(true);
1214
1215     // reset all ports
1216     result &= PortManager::preparePorts();
1217
1218     m_state = ePS_Stopped;
1219     #ifdef DEBUG
1220     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1221         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1222         dumpInfo();
1223     }
1224     #endif
1225     return result;
1226 }
1227
1228 /**
1229  * @brief Enter the ePS_WaitingForStream state
1230  * @return true if successful, false if not
1231  *
1232  * @pre all dynamic data structures are allocated successfully
1233  *
1234  * @post
1235  *
1236  */
1237 bool
1238 StreamProcessor::doWaitForRunningStream()
1239 {
1240     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1241     switch(m_state) {
1242         case ePS_Stopped:
1243             // we have to start waiting for an incoming stream
1244             // this basically means nothing, the state change will
1245             // be picked up by the packet iterator
1246             break;
1247         default:
1248             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1249             return false;
1250     }
1251     m_state = ePS_WaitingForStream;
1252     #ifdef DEBUG
1253     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1254         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1255         dumpInfo();
1256     }
1257     #endif
1258     return true;
1259 }
1260
1261 /**
1262  * @brief Enter the ePS_DryRunning state
1263  * @return true if successful, false if not
1264  *
1265  * @pre
1266  *
1267  * @post
1268  *
1269  */
1270 bool
1271 StreamProcessor::doDryRunning()
1272 {
1273     bool result = true;
1274     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1275     switch(m_state) {
1276         case ePS_WaitingForStream:
1277             // a running stream has been detected
1278             debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started dry-running at cycle %d\n", this, m_last_cycle);
1279             if (getType() == ePT_Receive) {
1280                 // this to ensure that there is no discontinuity when starting to
1281                 // update the DLL based upon the received packets
1282                 m_data_buffer->setBufferTailTimestamp(m_last_timestamp);
1283             } else {
1284                 // FIXME: PC=master mode will have to do something here I guess...
1285             }
1286             break;
1287         case ePS_WaitingForStreamEnable: // when xrunning at startup
1288             result &= m_data_buffer->clearBuffer();
1289             m_data_buffer->setTransparent(true);
1290             break;
1291         case ePS_WaitingForStreamDisable:
1292             result &= m_data_buffer->clearBuffer();
1293             m_data_buffer->setTransparent(true);
1294             break;
1295         default:
1296             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1297             return false;
1298     }
1299     m_state = ePS_DryRunning;
1300     #ifdef DEBUG
1301     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1302         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1303         dumpInfo();
1304     }
1305     #endif
1306     return result;
1307 }
1308
1309 /**
1310  * @brief Enter the ePS_WaitingForStreamEnable state
1311  * @return true if successful, false if not
1312  *
1313  * @pre
1314  *
1315  * @post
1316  *
1317  */
1318 bool
1319 StreamProcessor::doWaitForStreamEnable()
1320 {
1321     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1322     unsigned int ringbuffer_size_frames;
1323     switch(m_state) {
1324         case ePS_DryRunning:
1325             // we have to start waiting for an incoming stream
1326             // this basically means nothing, the state change will
1327             // be picked up by the packet iterator
1328
1329             if(!m_data_buffer->clearBuffer()) { // FIXME: don't like the reset() name
1330                 debugError("Could not reset data buffer\n");
1331                 return false;
1332             }
1333             if (getType() == ePT_Transmit) {
1334                 ringbuffer_size_frames = m_StreamProcessorManager.getNbBuffers() * m_StreamProcessorManager.getPeriodSize();
1335                 debugOutput(DEBUG_LEVEL_VERBOSE, "Prefill transmit SP %p with %u frames\n", this, ringbuffer_size_frames);
1336                 // prefill the buffer
1337                 if(!transferSilence(ringbuffer_size_frames)) {
1338                     debugFatal("Could not prefill transmit stream\n");
1339                     return false;
1340                 }
1341             }
1342
1343             break;
1344         default:
1345             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1346             return false;
1347     }
1348     m_state = ePS_WaitingForStreamEnable;
1349     #ifdef DEBUG
1350     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1351         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1352         dumpInfo();
1353     }
1354     #endif
1355     return true;
1356 }
1357
1358 /**
1359  * @brief Enter the ePS_Running state
1360  * @return true if successful, false if not
1361  *
1362  * @pre
1363  *
1364  * @post
1365  *
1366  */
1367 bool
1368 StreamProcessor::doRunning()
1369 {
1370     bool result = true;
1371     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1372     switch(m_state) {
1373         case ePS_WaitingForStreamEnable:
1374             // a running stream has been detected
1375             debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started running at cycle %d\n",
1376                                              this, m_last_cycle);
1377             m_in_xrun = false;
1378             m_data_buffer->setTransparent(false);
1379             break;
1380         default:
1381             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1382             return false;
1383     }
1384     m_state = ePS_Running;
1385     #ifdef DEBUG
1386     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1387         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1388         dumpInfo();
1389     }
1390     #endif
1391     return result;
1392 }
1393
1394 /**
1395  * @brief Enter the ePS_WaitingForStreamDisable state
1396  * @return true if successful, false if not
1397  *
1398  * @pre
1399  *
1400  * @post
1401  *
1402  */
1403 bool
1404 StreamProcessor::doWaitForStreamDisable()
1405 {
1406     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1407     switch(m_state) {
1408         case ePS_Running:
1409             // the thread will do the transition
1410             break;
1411         default:
1412             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1413             return false;
1414     }
1415     m_state = ePS_WaitingForStreamDisable;
1416     #ifdef DEBUG
1417     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1418         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1419         dumpInfo();
1420     }
1421     #endif
1422     return true;
1423 }
1424
1425 /**
1426  * @brief Updates the state machine and calls the necessary transition functions
1427  * @return true if successful, false if not
1428  */
1429 bool StreamProcessor::updateState() {
1430     bool result = false;
1431     // copy the current state locally since it could change value,
1432     // and that's something we don't want to happen inbetween tests
1433     // if m_next_state changes during this routine, we know for sure
1434     // that the previous state change was at least attempted correctly.
1435     enum eProcessorState next_state = m_next_state;
1436
1437     debugOutput(DEBUG_LEVEL_VERBOSE, "Do state transition: %s => %s\n",
1438         ePSToString(m_state), ePSToString(next_state));
1439
1440     if (m_state == next_state) {
1441         debugWarning("ignoring identity state update from/to %s\n", ePSToString(m_state) );
1442         return true;
1443     }
1444
1445     // after creation, only initialization is allowed
1446     if (m_state == ePS_Created) {
1447         if(next_state != ePS_Stopped) {
1448             goto updateState_exit_with_error;
1449         }
1450         // do init here
1451         result = doStop();
1452         if (result) return true;
1453         else goto updateState_exit_change_failed;
1454     }
1455
1456     // after initialization, only WaitingForRunningStream is allowed
1457     if (m_state == ePS_Stopped) {
1458         if(next_state != ePS_WaitingForStream) {
1459             goto updateState_exit_with_error;
1460         }
1461         result = doWaitForRunningStream();
1462         if (result) return true;
1463         else goto updateState_exit_change_failed;
1464     }
1465
1466     // after WaitingForStream, only ePS_DryRunning is allowed
1467     // this means that the stream started running
1468     if (m_state == ePS_WaitingForStream) {
1469         if(next_state != ePS_DryRunning) {
1470             goto updateState_exit_with_error;
1471         }
1472         result = doDryRunning();
1473         if (result) return true;
1474         else goto updateState_exit_change_failed;
1475     }
1476
1477     // from ePS_DryRunning we can go to:
1478     //   - ePS_Stopped if something went wrong during DryRunning
1479     //   - ePS_WaitingForStreamEnable if there is a requested to enable
1480     if (m_state == ePS_DryRunning) {
1481         if((next_state != ePS_Stopped) &&
1482            (next_state != ePS_WaitingForStreamEnable)) {
1483             goto updateState_exit_with_error;
1484         }
1485         if (next_state == ePS_Stopped) {
1486             result = doStop();
1487         } else {
1488             result = doWaitForStreamEnable();
1489         }
1490         if (result) return true;
1491         else goto updateState_exit_change_failed;
1492     }
1493
1494     // from ePS_WaitingForStreamEnable we can go to:
1495     //   - ePS_DryRunning if something went wrong while waiting
1496     //   - ePS_Running if the stream enabled correctly
1497     if (m_state == ePS_WaitingForStreamEnable) {
1498         if((next_state != ePS_DryRunning) &&
1499            (next_state != ePS_Running)) {
1500             goto updateState_exit_with_error;
1501         }
1502         if (next_state == ePS_Stopped) {
1503             result = doDryRunning();
1504         } else {
1505             result = doRunning();
1506         }
1507         if (result) return true;
1508         else goto updateState_exit_change_failed;
1509     }
1510
1511     // from ePS_Running we can only start waiting for a disabled stream
1512     if (m_state == ePS_Running) {
1513         if(next_state != ePS_WaitingForStreamDisable) {
1514             goto updateState_exit_with_error;
1515         }
1516         result = doWaitForStreamDisable();
1517         if (result) return true;
1518         else goto updateState_exit_change_failed;
1519     }
1520
1521     // from ePS_WaitingForStreamDisable we can go to DryRunning
1522     if (m_state == ePS_WaitingForStreamDisable) {
1523         if(next_state != ePS_DryRunning) {
1524             goto updateState_exit_with_error;
1525         }
1526         result = doDryRunning();
1527         if (result) return true;
1528         else goto updateState_exit_change_failed;
1529     }
1530
1531     // if we arrive here there is an error
1532 updateState_exit_with_error:
1533     debugError("Invalid state transition: %s => %s\n",
1534         ePSToString(m_state), ePSToString(next_state));
1535     return false;
1536 updateState_exit_change_failed:
1537     debugError("State transition failed: %s => %s\n",
1538         ePSToString(m_state), ePSToString(next_state));
1539     return false;
1540 }
1541
1542 /***********************************************
1543  * Helper routines                             *
1544  ***********************************************/
1545 bool
1546 StreamProcessor::transferSilence(unsigned int nframes)
1547 {
1548     bool retval;
1549     signed int fc;
1550     ffado_timestamp_t ts_tail_tmp;
1551
1552     // prepare a buffer of silence
1553     char *dummybuffer = (char *)calloc(getEventSize(), nframes * getEventsPerFrame());
1554     transmitSilenceBlock(dummybuffer, nframes, 0);
1555
1556     m_data_buffer->getBufferTailTimestamp(&ts_tail_tmp, &fc);
1557     if (fc != 0) {
1558         debugWarning("Prefilling a buffer that already contains %d frames\n", fc);
1559     }
1560
1561     // add the silence data to the ringbuffer
1562     if(m_data_buffer->preloadFrames(nframes, dummybuffer, true)) {
1563         retval = true;
1564     } else {
1565         debugWarning("Could not write to event buffer\n");
1566         retval = false;
1567     }
1568     free(dummybuffer);
1569     return retval;
1570 }
1571
1572 /**
1573  * @brief convert a eProcessorState to a string
1574  * @param s the state
1575  * @return a char * describing the state
1576  */
1577 const char *
1578 StreamProcessor::ePSToString(enum eProcessorState s) {
1579     switch (s) {
1580         case ePS_Invalid: return "ePS_Invalid";
1581         case ePS_Created: return "ePS_Created";
1582         case ePS_Stopped: return "ePS_Stopped";
1583         case ePS_WaitingForStream: return "ePS_WaitingForStream";
1584         case ePS_DryRunning: return "ePS_DryRunning";
1585         case ePS_WaitingForStreamEnable: return "ePS_WaitingForStreamEnable";
1586         case ePS_Running: return "ePS_Running";
1587         case ePS_WaitingForStreamDisable: return "ePS_WaitingForStreamDisable";
1588         default: return "error: unknown state";
1589     }
1590 }
1591
1592 /**
1593  * @brief convert a eProcessorType to a string
1594  * @param t the type
1595  * @return a char * describing the state
1596  */
1597 const char *
1598 StreamProcessor::ePTToString(enum eProcessorType t) {
1599     switch (t) {
1600         case ePT_Receive: return "Receive";
1601         case ePT_Transmit: return "Transmit";
1602         default: return "error: unknown type";
1603     }
1604 }
1605
1606 /***********************************************
1607  * Debug                                       *
1608  ***********************************************/
1609 void
1610 StreamProcessor::dumpInfo()
1611 {
1612     debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor %p information\n", this);
1613     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Port, Channel  : %d, %d\n", m_1394service.getPort(), m_channel);
1614     debugOutputShort( DEBUG_LEVEL_NORMAL, "  StreamProcessor info:\n");
1615     uint64_t now = m_1394service.getCycleTimerTicks();
1616     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Now                   : %011llu (%03us %04uc %04ut)\n",
1617                         now,
1618                         (unsigned int)TICKS_TO_SECS(now),
1619                         (unsigned int)TICKS_TO_CYCLES(now),
1620                         (unsigned int)TICKS_TO_OFFSET(now));
1621     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Xruns                 : %s\n", (m_in_xrun ? "True":"False"));
1622     debugOutputShort( DEBUG_LEVEL_NORMAL, "  State                 : %s\n", ePSToString(m_state));
1623     debugOutputShort( DEBUG_LEVEL_NORMAL, "   Next state           : %s\n", ePSToString(m_next_state));
1624     debugOutputShort( DEBUG_LEVEL_NORMAL, "    transition at       : %u\n", m_cycle_to_switch_state);
1625     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer                : %p\n", m_data_buffer);
1626     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Nominal framerate     : %u\n", m_StreamProcessorManager.getNominalRate());
1627     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Device framerate      : Sync: %f, Buffer %f\n",
1628         24576000.0/m_StreamProcessorManager.getSyncSource().m_data_buffer->getRate(),
1629         24576000.0/m_data_buffer->getRate()
1630         );
1631
1632     m_data_buffer->dumpInfo();
1633 }
1634
1635 void
1636 StreamProcessor::setVerboseLevel(int l) {
1637     setDebugLevel(l);
1638     PortManager::setVerboseLevel(l);
1639     m_data_buffer->setVerboseLevel(l);
1640 }
1641
1642 } // end of namespace
Note: See TracBrowser for help on using the browser.