root/trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp

Revision 796, 62.4 kB (checked in by ppalmers, 13 years ago)

- move #define constants to config.h.in
- switch receive handler over to packet-per-buffer mode to improve latency performance

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "config.h"
25
26 #include "StreamProcessor.h"
27 #include "../StreamProcessorManager.h"
28
29 #include "devicemanager.h"
30
31 #include "libieee1394/ieee1394service.h"
32 #include "libieee1394/IsoHandlerManager.h"
33 #include "libieee1394/cycletimer.h"
34
35 #include "libutil/Time.h"
36
37 #include "libutil/Atomic.h"
38
39 #include <assert.h>
40 #include <math.h>
41
42 namespace Streaming {
43
44 IMPL_DEBUG_MODULE( StreamProcessor, StreamProcessor, DEBUG_LEVEL_VERBOSE );
45
46 StreamProcessor::StreamProcessor(FFADODevice &parent, enum eProcessorType type)
47     : m_processor_type ( type )
48     , m_state( ePS_Created )
49     , m_next_state( ePS_Invalid )
50     , m_cycle_to_switch_state( 0 )
51     , m_Parent( parent )
52     , m_1394service( parent.get1394Service() ) // local cache
53     , m_IsoHandlerManager( parent.get1394Service().getIsoHandlerManager() ) // local cache
54     , m_StreamProcessorManager( m_Parent.getDeviceManager().getStreamProcessorManager() ) // local cache
55     , m_channel( -1 )
56     , m_dropped(0)
57     , m_last_timestamp(0)
58     , m_last_timestamp2(0)
59     , m_scratch_buffer( NULL )
60     , m_scratch_buffer_size_bytes( 0 )
61     , m_ticks_per_frame( 0 )
62     , m_last_cycle( -1 )
63     , m_sync_delay( 0 )
64     , m_in_xrun( false )
65 {
66     // create the timestamped buffer and register ourselves as its client
67     m_data_buffer = new Util::TimestampedBuffer(this);
68 }
69
70 StreamProcessor::~StreamProcessor() {
71     m_StreamProcessorManager.unregisterProcessor(this);
72     if(!m_IsoHandlerManager.unregisterStream(this)) {
73         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister stream processor with the Iso manager\n");
74     }
75
76     if (m_data_buffer) delete m_data_buffer;
77     if (m_scratch_buffer) delete[] m_scratch_buffer;
78 }
79
80 uint64_t StreamProcessor::getTimeNow() {
81     return m_1394service.getCycleTimerTicks();
82 }
83
84 int StreamProcessor::getMaxFrameLatency() {
85     if (getType() == ePT_Receive) {
86         return (int)(m_IsoHandlerManager.getPacketLatencyForStream( this ) * TICKS_PER_CYCLE);
87     } else {
88         return (int)(m_IsoHandlerManager.getPacketLatencyForStream( this ) * TICKS_PER_CYCLE);
89     }
90 }
91
92 unsigned int
93 StreamProcessor::getNominalPacketsNeeded(unsigned int nframes)
94 {
95     unsigned int nominal_frames_per_second
96                     = m_StreamProcessorManager.getNominalRate();
97     uint64_t nominal_ticks_per_frame = TICKS_PER_SECOND / nominal_frames_per_second;
98     uint64_t nominal_ticks = nominal_ticks_per_frame * nframes;
99     uint64_t nominal_packets = nominal_ticks / TICKS_PER_CYCLE;
100     return nominal_packets;
101 }
102
103 unsigned int
104 StreamProcessor::getPacketsPerPeriod()
105 {
106     return getNominalPacketsNeeded(m_StreamProcessorManager.getPeriodSize());
107 }
108
109 unsigned int
110 StreamProcessor::getNbPacketsIsoXmitBuffer()
111 {
112     // the target is to have all of the transmit buffer (at period transfer) as ISO packets
113     // when one period is received, there will be approx (NbBuffers - 1) * period_size frames
114     // in the transmit buffer (the others are still to be put into the xmit frame buffer)
115     unsigned int packets_to_prebuffer = (getPacketsPerPeriod() * (m_StreamProcessorManager.getNbBuffers()-1));
116     debugOutput(DEBUG_LEVEL_VERBOSE, "Nominal prebuffer: %u\n", packets_to_prebuffer);
117    
118     // however we have to take into account the fact that there is some sync delay
119     // we assume that the SPM has indicated
120     // HACK: this counts on the fact that the latency for this stream will be the same as the
121     //       latency for the receive sync source
122     unsigned int est_sync_delay = getPacketsPerPeriod() / MINIMUM_INTERRUPTS_PER_PERIOD;
123     est_sync_delay += STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS / TICKS_PER_CYCLE;
124     packets_to_prebuffer -= est_sync_delay;
125     debugOutput(DEBUG_LEVEL_VERBOSE, " correct for sync delay (%d): %u\n",
126                                      est_sync_delay,
127                                      packets_to_prebuffer);
128    
129     // only queue a part (70%) of the theoretical max in order not to have too much 'not ready' cycles
130     packets_to_prebuffer = (packets_to_prebuffer * MAX_ISO_XMIT_BUFFER_FILL_PCT * 1000) / 100000;
131     debugOutput(DEBUG_LEVEL_VERBOSE, " reduce to %d%%: %u\n",
132                                      MAX_ISO_XMIT_BUFFER_FILL_PCT, packets_to_prebuffer);
133    
134     return packets_to_prebuffer;
135 }
136
137 /***********************************************
138  * Buffer management and manipulation          *
139  ***********************************************/
140 void StreamProcessor::flush() {
141     m_IsoHandlerManager.flushHandlerForStream(this);
142 }
143
144 int StreamProcessor::getBufferFill() {
145     return m_data_buffer->getBufferFill();
146 }
147
148 int64_t
149 StreamProcessor::getTimeUntilNextPeriodSignalUsecs()
150 {
151     uint64_t time_at_period=getTimeAtPeriod();
152
153     // we delay the period signal with the sync delay
154     // this makes that the period signals lag a little compared to reality
155     // ISO buffering causes the packets to be received at max
156     // m_handler->getWakeupInterval() later than the time they were received.
157     // hence their payload is available this amount of time later. However, the
158     // period boundary is predicted based upon earlier samples, and therefore can
159     // pass before these packets are processed. Adding this extra term makes that
160     // the period boundary is signalled later
161     time_at_period = addTicks(time_at_period, m_StreamProcessorManager.getSyncSource().getSyncDelay());
162
163     uint64_t cycle_timer=m_1394service.getCycleTimerTicks();
164
165     // calculate the time until the next period
166     int32_t until_next=diffTicks(time_at_period,cycle_timer);
167
168     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> TAP=%11llu, CTR=%11llu, UTN=%11ld\n",
169         time_at_period, cycle_timer, until_next
170         );
171
172     // now convert to usecs
173     // don't use the mapping function because it only works
174     // for absolute times, not the relative time we are
175     // using here (which can also be negative).
176     return (int64_t)(((float)until_next) / TICKS_PER_USEC);
177 }
178
179 void
180 StreamProcessor::setSyncDelay(int d) {
181     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "Setting SP %p SyncDelay to %d ticks\n", this, d);
182     m_sync_delay = d;
183 }
184
185 uint64_t
186 StreamProcessor::getTimeAtPeriodUsecs()
187 {
188     return (uint64_t)((float)getTimeAtPeriod() * TICKS_PER_USEC);
189 }
190
191 uint64_t
192 StreamProcessor::getTimeAtPeriod()
193 {
194     if (getType() == ePT_Receive) {
195         ffado_timestamp_t next_period_boundary=m_data_buffer->getTimestampFromHead(m_StreamProcessorManager.getPeriodSize());
196    
197         #ifdef DEBUG
198         ffado_timestamp_t ts;
199         signed int fc;
200         m_data_buffer->getBufferTailTimestamp(&ts,&fc);
201    
202         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD="TIMESTAMP_FORMAT_SPEC", LTS="TIMESTAMP_FORMAT_SPEC", FC=%5u, TPF=%f\n",
203             next_period_boundary, ts, fc, getTicksPerFrame()
204             );
205         #endif
206         return (uint64_t)next_period_boundary;
207     } else {
208         ffado_timestamp_t next_period_boundary=m_data_buffer->getTimestampFromTail((m_StreamProcessorManager.getNbBuffers()-1) * m_StreamProcessorManager.getPeriodSize());
209    
210         #ifdef DEBUG
211         ffado_timestamp_t ts;
212         signed int fc;
213         m_data_buffer->getBufferTailTimestamp(&ts,&fc);
214    
215         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD="TIMESTAMP_FORMAT_SPEC", LTS="TIMESTAMP_FORMAT_SPEC", FC=%5u, TPF=%f\n",
216             next_period_boundary, ts, fc, getTicksPerFrame()
217             );
218         #endif
219         return (uint64_t)next_period_boundary;
220     }
221 }
222
223 float
224 StreamProcessor::getTicksPerFrame()
225 {
226     assert(m_data_buffer != NULL);
227     return m_data_buffer->getRate();
228 }
229
230 bool
231 StreamProcessor::canClientTransferFrames(unsigned int nbframes)
232 {
233     bool can_transfer;
234     unsigned int fc = m_data_buffer->getFrameCounter();
235     if (getType() == ePT_Receive) {
236         can_transfer = (fc >= nbframes);
237     } else {
238         // there has to be enough space to put the frames in
239         can_transfer = m_data_buffer->getBufferSize() - fc > nbframes;
240         // or the buffer is transparent
241         can_transfer |= m_data_buffer->isTransparent();
242     }
243    
244     #ifdef DEBUG
245     if (!can_transfer) {
246         debugWarning("(%p, %s) cannot transfer since fc == %u, nbframes == %u\n",
247             this, ePTToString(getType()), fc, nbframes);
248     }
249     #endif
250    
251     return can_transfer;
252 }
253
254 /***********************************************
255  * I/O API                                     *
256  ***********************************************/
257
258 // Packet transfer API
259 enum raw1394_iso_disposition
260 StreamProcessor::putPacket(unsigned char *data, unsigned int length,
261                            unsigned char channel, unsigned char tag, unsigned char sy,
262                            unsigned int cycle, unsigned int dropped) {
263     if(m_last_cycle == -1) {
264         debugOutput(DEBUG_LEVEL_VERBOSE, "Handler for %s SP %p is alive (cycle = %u)\n", getTypeString(), this, cycle);
265     }
266
267     int dropped_cycles = 0;
268     if (m_last_cycle != (int)cycle && m_last_cycle != -1) {
269         dropped_cycles = diffCycles(cycle, m_last_cycle) - 1;
270         if (dropped_cycles < 0) {
271             debugWarning("(%p) dropped < 1 (%d), cycle: %d, last_cycle: %d, dropped: %d\n",
272                          this, dropped_cycles, cycle, m_last_cycle, dropped);
273         }
274         if (dropped_cycles > 0) {
275             debugWarning("(%p) dropped %d packets on cycle %u, 'dropped'=%u, cycle=%d, m_last_cycle=%d\n",
276                 this, dropped_cycles, cycle, dropped, cycle, m_last_cycle);
277             m_dropped += dropped_cycles;
278             m_in_xrun = true;
279             //flushDebugOutput();
280             //assert(0);
281         }
282     }
283     m_last_cycle = cycle;
284
285     // bypass based upon state
286     if (m_state == ePS_Invalid) {
287         debugError("Should not have state %s\n", ePSToString(m_state) );
288         return RAW1394_ISO_ERROR;
289     }
290     if (m_state == ePS_Created) {
291         return RAW1394_ISO_DEFER;
292     }
293
294     // store the previous timestamp
295     m_last_timestamp2 = m_last_timestamp;
296
297     // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles)
298     //       it happens on the first 'good' cycle for the wait condition
299     //       or on the first received cycle that is received afterwards (might be a problem)
300
301     // check whether we are waiting for a stream to be disabled
302     if(m_state == ePS_WaitingForStreamDisable) {
303         // we then check whether we have to switch on this cycle
304         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
305             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n");
306             m_next_state = ePS_DryRunning;
307             if (!updateState()) { // we are allowed to change the state directly
308                 debugError("Could not update state!\n");
309                 return RAW1394_ISO_ERROR;
310             }
311         } else {
312             // not time to disable yet
313         }
314         // the received data can be discarded while waiting for the stream
315         // to be disabled
316         return RAW1394_ISO_OK;
317     }
318
319     // check whether we are waiting for a stream to be enabled
320     else if(m_state == ePS_WaitingForStreamEnable) {
321         // we then check whether we have to switch on this cycle
322         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
323             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n");
324             m_next_state = ePS_Running;
325             if (!updateState()) { // we are allowed to change the state directly
326                 debugError("Could not update state!\n");
327                 return RAW1394_ISO_ERROR;
328             }
329         } else {
330             // not time to enable yet
331         }
332         // we are dryRunning hence data should be processed in any case
333     }
334
335     // check the packet header
336     enum eChildReturnValue result = processPacketHeader(data, length, channel, tag, sy, cycle, dropped_cycles);
337     if (result == eCRV_OK) {
338         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "RECV: CY=%04u TS=%011llu\n",
339                 cycle, m_last_timestamp);
340         // update some accounting
341         m_last_good_cycle = cycle;
342         m_last_dropped = dropped_cycles;
343
344         // check whether we are waiting for a stream to startup
345         // this requires that the packet is good
346         if(m_state == ePS_WaitingForStream) {
347             // since we have a packet with an OK header,
348             // we can indicate that the stream started up
349
350             // we then check whether we have to switch on this cycle
351             if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
352                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to good packet\n");
353                 // hence go to the dryRunning state
354                 m_next_state = ePS_DryRunning;
355                 if (!updateState()) { // we are allowed to change the state directly
356                     debugError("Could not update state!\n");
357                     return RAW1394_ISO_ERROR;
358                 }
359             } else {
360                 // not time (yet) to switch state
361             }
362             // in both cases we don't want to process the data
363             return RAW1394_ISO_OK;
364         }
365
366         // check whether a state change has been requested
367         // note that only the wait state changes are synchronized with the cycles
368         else if(m_state != m_next_state) {
369             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
370                                              ePSToString(m_state), ePSToString(m_next_state));
371             // execute the requested change
372             if (!updateState()) { // we are allowed to change the state directly
373                 debugError("Could not update state!\n");
374                 return RAW1394_ISO_ERROR;
375             }
376         }
377
378         // handle dropped cycles
379         if(dropped_cycles) {
380             // they represent a discontinuity in the timestamps, and hence are
381             // to be dealt with
382             debugWarning("(%p) Correcting timestamp for dropped cycles, discarding packet...\n", this);
383             m_data_buffer->setBufferTailTimestamp(m_last_timestamp);
384             if (m_state == ePS_Running) {
385                 // this is an xrun situation
386                 m_in_xrun = true;
387                 debugWarning("Should update state to WaitingForStreamDisable due to dropped packet xrun\n");
388                 m_cycle_to_switch_state = cycle + 1; // switch in the next cycle
389                 m_next_state = ePS_WaitingForStreamDisable;
390                 // execute the requested change
391                 if (!updateState()) { // we are allowed to change the state directly
392                     debugError("Could not update state!\n");
393                     return RAW1394_ISO_ERROR;
394                 }
395                 return RAW1394_ISO_DEFER;
396             }
397         }
398
399         // for all states that reach this we are allowed to
400         // do protocol specific data reception
401         enum eChildReturnValue result2 = processPacketData(data, length, channel, tag, sy, cycle, dropped_cycles);
402
403         // if an xrun occured, switch to the dryRunning state and
404         // allow for the xrun to be picked up
405         if (result2 == eCRV_XRun) {
406             debugWarning("processPacketData xrun\n");
407             m_in_xrun = true;
408             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
409             m_cycle_to_switch_state = cycle+1; // switch in the next cycle
410             m_next_state = ePS_WaitingForStreamDisable;
411             // execute the requested change
412             if (!updateState()) { // we are allowed to change the state directly
413                 debugError("Could not update state!\n");
414                 return RAW1394_ISO_ERROR;
415             }
416             return RAW1394_ISO_DEFER;
417         } else if(result2 == eCRV_OK) {
418             // no problem here
419             return RAW1394_ISO_OK;
420         } else {
421             debugError("Invalid response\n");
422             return RAW1394_ISO_ERROR;
423         }
424     } else if(result == eCRV_Invalid) {
425         // apparently we don't have to do anything when the packets are not valid
426         return RAW1394_ISO_OK;
427     } else {
428         debugError("Invalid response\n");
429         return RAW1394_ISO_ERROR;
430     }
431     debugError("reached the unreachable\n");
432     return RAW1394_ISO_ERROR;
433 }
434
435 enum raw1394_iso_disposition
436 StreamProcessor::getPacket(unsigned char *data, unsigned int *length,
437                            unsigned char *tag, unsigned char *sy,
438                            int cycle, unsigned int dropped, unsigned int max_length) {
439     if (cycle<0) {
440         *tag = 0;
441         *sy = 0;
442         *length = 0;
443         return RAW1394_ISO_OK;
444     }
445
446     if(m_last_cycle == -1) {
447         debugOutput(DEBUG_LEVEL_VERBOSE, "Handler for %s SP %p is alive (cycle = %d)\n", getTypeString(), this, cycle);
448     }
449
450     int dropped_cycles = 0;
451     if (m_last_cycle != cycle && m_last_cycle != -1) {
452         dropped_cycles = diffCycles(cycle, m_last_cycle) - 1;
453         if (dropped_cycles < 0) {
454             debugWarning("(%p) dropped < 1 (%d), cycle: %d, last_cycle: %d, dropped: %d\n",
455                          this, dropped_cycles, cycle, m_last_cycle, dropped);
456         }
457         if (dropped_cycles > 0) {
458             debugWarning("(%p) dropped %d packets on cycle %u (last_cycle=%u, dropped=%d)\n", this, dropped_cycles, cycle, m_last_cycle, dropped);
459             m_dropped += dropped_cycles;
460 //             flushDebugOutput();
461 //             assert(0);
462         }
463     }
464     if (cycle >= 0) {
465         m_last_cycle = cycle;
466     }
467
468     // bypass based upon state
469     if (m_state == ePS_Invalid) {
470         debugError("Should not have state %s\n", ePSToString(m_state) );
471         return RAW1394_ISO_ERROR;
472     }
473     if (m_state == ePS_Created) {
474         *tag = 0;
475         *sy = 0;
476         *length = 0;
477         return RAW1394_ISO_DEFER;
478     }
479
480     // normal processing
481     // note that we can't use getCycleTimer directly here,
482     // because packets are queued in advance. This means that
483     // we the packet we are constructing will be sent out
484     // on 'cycle', not 'now'.
485     unsigned int ctr = m_1394service.getCycleTimer();
486     int now_cycles = (int)CYCLE_TIMER_GET_CYCLES(ctr);
487
488     // the difference between the cycle this
489     // packet is intended for and 'now'
490     int cycle_diff = diffCycles(cycle, now_cycles);
491
492     #ifdef DEBUG
493     if(cycle_diff < 0 && (m_state == ePS_Running || m_state == ePS_DryRunning)) {
494         debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy)\n",
495             cycle, now_cycles);
496         if(m_state == ePS_Running) {
497             debugShowBackLogLines(200);
498 //             flushDebugOutput();
499 //             assert(0);
500         }
501     }
502     #endif
503
504     // store the previous timestamp
505     m_last_timestamp2 = m_last_timestamp;
506
507     // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles)
508     //       it happens on the first 'good' cycle for the wait condition
509     //       or on the first received cycle that is received afterwards (might be a problem)
510
511     // check whether we are waiting for a stream to be disabled
512     if(m_state == ePS_WaitingForStreamDisable) {
513         // we then check whether we have to switch on this cycle
514         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
515             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n");
516             m_next_state = ePS_DryRunning;
517             if (!updateState()) { // we are allowed to change the state directly
518                 debugError("Could not update state!\n");
519                 return RAW1394_ISO_ERROR;
520             }
521         } else {
522             // not time to disable yet
523         }
524     }
525     // check whether we are waiting for a stream to be enabled
526     else if(m_state == ePS_WaitingForStreamEnable) {
527         // we then check whether we have to switch on this cycle
528         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
529             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n");
530             m_next_state = ePS_Running;
531             if (!updateState()) { // we are allowed to change the state directly
532                 debugError("Could not update state!\n");
533                 return RAW1394_ISO_ERROR;
534             }
535         } else {
536             // not time to enable yet
537         }
538         // we are dryRunning hence data should be processed in any case
539     }
540     // check whether we are waiting for a stream to startup
541     else if(m_state == ePS_WaitingForStream) {
542         // as long as the cycle parameter is not in sync with
543         // the current time, the stream is considered not
544         // to be 'running'
545         // we then check whether we have to switch on this cycle
546         if ((cycle_diff >= 0) && (diffCycles(cycle, m_cycle_to_switch_state) >= 0)) {
547             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStream to DryRunning\n");
548             // hence go to the dryRunning state
549             m_next_state = ePS_DryRunning;
550             if (!updateState()) { // we are allowed to change the state directly
551                 debugError("Could not update state!\n");
552                 return RAW1394_ISO_ERROR;
553             }
554         } else {
555             // not time (yet) to switch state
556         }
557     }
558     else if(m_state == ePS_Running) {
559         // check the packet header
560         enum eChildReturnValue result = generatePacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
561         if (result == eCRV_Packet || result == eCRV_Defer) {
562             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT: CY=%04u TS=%011llu\n",
563                     cycle, m_last_timestamp);
564             // update some accounting
565             m_last_good_cycle = cycle;
566             m_last_dropped = dropped_cycles;
567
568             // check whether a state change has been requested
569             // note that only the wait state changes are synchronized with the cycles
570             if(m_state != m_next_state) {
571                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
572                                                 ePSToString(m_state), ePSToString(m_next_state));
573                 // execute the requested change
574                 if (!updateState()) { // we are allowed to change the state directly
575                     debugError("Could not update state!\n");
576                     return RAW1394_ISO_ERROR;
577                 }
578             }
579
580             enum eChildReturnValue result2 = generatePacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
581             // if an xrun occured, switch to the dryRunning state and
582             // allow for the xrun to be picked up
583             if (result2 == eCRV_XRun) {
584                 debugWarning("generatePacketData xrun\n");
585                 m_in_xrun = true;
586                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
587                 m_cycle_to_switch_state = cycle+1; // switch in the next cycle
588                 m_next_state = ePS_WaitingForStreamDisable;
589                 // execute the requested change
590                 if (!updateState()) { // we are allowed to change the state directly
591                     debugError("Could not update state!\n");
592                     return RAW1394_ISO_ERROR;
593                 }
594                 goto send_empty_packet;
595             }
596             // skip queueing packets if we detect that there are not enough frames
597             // available
598             if(result2 == eCRV_Defer || result == eCRV_Defer)
599                 return RAW1394_ISO_DEFER;
600             else
601                 return RAW1394_ISO_OK;
602         } else if (result == eCRV_XRun) { // pick up the possible xruns
603             debugWarning("generatePacketHeader xrun\n");
604             m_in_xrun = true;
605             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to header xrun\n");
606             m_cycle_to_switch_state = cycle+1; // switch in the next cycle
607             m_next_state = ePS_WaitingForStreamDisable;
608             // execute the requested change
609             if (!updateState()) { // we are allowed to change the state directly
610                 debugError("Could not update state!\n");
611                 return RAW1394_ISO_ERROR;
612             }
613         } else if (result == eCRV_EmptyPacket) {
614             if(m_state != m_next_state) {
615                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
616                                                 ePSToString(m_state), ePSToString(m_next_state));
617                 // execute the requested change
618                 if (!updateState()) { // we are allowed to change the state directly
619                     debugError("Could not update state!\n");
620                     return RAW1394_ISO_ERROR;
621                 }
622             }
623             goto send_empty_packet;
624         } else if (result == eCRV_Again) {
625             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "have to retry cycle %d\n", cycle);
626             if(m_state != m_next_state) {
627                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
628                                                 ePSToString(m_state), ePSToString(m_next_state));
629                 // execute the requested change
630                 if (!updateState()) { // we are allowed to change the state directly
631                     debugError("Could not update state!\n");
632                     return RAW1394_ISO_ERROR;
633                 }
634             }
635             usleep(125); // only when using thread-per-handler
636             return RAW1394_ISO_AGAIN;
637 //             generateSilentPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
638 //             generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
639 //             return RAW1394_ISO_DEFER;
640         } else {
641             debugError("Invalid return value: %d\n", result);
642             return RAW1394_ISO_ERROR;
643         }
644     }
645     // we are not running, so send an empty packet
646     // we should generate a valid packet any time
647 send_empty_packet:
648     // note that only the wait state changes are synchronized with the cycles
649     if(m_state != m_next_state) {
650         debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
651                                         ePSToString(m_state), ePSToString(m_next_state));
652         // execute the requested change
653         if (!updateState()) { // we are allowed to change the state directly
654             debugError("Could not update state!\n");
655             return RAW1394_ISO_ERROR;
656         }
657     }
658
659     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT EMPTY: CY=%04u\n", cycle);
660     generateSilentPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
661     generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
662     return RAW1394_ISO_OK;
663 }
664
665
666 // Frame Transfer API
667 /**
668  * Transfer a block of frames from the event buffer to the port buffers
669  * @param nbframes number of frames to transfer
670  * @param ts the timestamp that the LAST frame in the block should have
671  * @return
672  */
673 bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) {
674     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.getFrames(%d, %11llu)", nbframes, ts);
675     assert( getType() == ePT_Receive );
676     if(isDryRunning()) return getFramesDry(nbframes, ts);
677     else return getFramesWet(nbframes, ts);
678 }
679
680 bool StreamProcessor::getFramesWet(unsigned int nbframes, int64_t ts) {
681 // FIXME: this should be done somewhere else
682 #ifdef DEBUG
683     uint64_t ts_expected;
684     signed int fc;
685     int32_t lag_ticks;
686     float lag_frames;
687
688     // in order to sync up multiple received streams, we should
689     // use the ts parameter. It specifies the time of the block's
690     // last sample.
691     float srate = m_StreamProcessorManager.getSyncSource().getTicksPerFrame();
692     assert(srate != 0.0);
693     int64_t this_block_length_in_ticks = (int64_t)(((float)nbframes) * srate);
694
695     ffado_timestamp_t ts_head_tmp;
696     m_data_buffer->getBufferHeadTimestamp(&ts_head_tmp, &fc);
697     ts_expected = addTicks((uint64_t)ts_head_tmp, this_block_length_in_ticks);
698
699     lag_ticks = diffTicks(ts, ts_expected);
700     lag_frames = (((float)lag_ticks) / srate);
701     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): drifts %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
702                  this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
703     if (lag_frames >= 1.0) {
704         // the stream lags
705         debugWarning( "stream (%p): lags  with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
706                       this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
707     } else if (lag_frames <= -1.0) {
708         // the stream leads
709         debugWarning( "stream (%p): leads with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
710                       this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
711     }
712 #endif
713     // ask the buffer to process nbframes of frames
714     // using it's registered client's processReadBlock(),
715     // which should be ours
716     m_data_buffer->blockProcessReadFrames(nbframes);
717     return true;
718 }
719
720 bool StreamProcessor::getFramesDry(unsigned int nbframes, int64_t ts)
721 {
722     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): dry run %d frames (@ ts=%lld)\n",
723                  this, nbframes, ts);
724     // dry run on this side means that we put silence in all enabled ports
725     // since there is do data put into the ringbuffer in the dry-running state
726     return provideSilenceBlock(nbframes, 0);
727 }
728
729 bool
730 StreamProcessor::dropFrames(unsigned int nbframes, int64_t ts)
731 {
732     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d, %lld)\n", nbframes, ts);
733     return m_data_buffer->dropFrames(nbframes);
734 }
735
736 bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts)
737 {
738     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.putFrames(%d, %11llu)", nbframes, ts);
739     assert( getType() == ePT_Transmit );
740     if(isDryRunning()) return putFramesDry(nbframes, ts);
741     else return putFramesWet(nbframes, ts);
742 }
743
744 bool
745 StreamProcessor::putFramesWet(unsigned int nbframes, int64_t ts)
746 {
747     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesWet(%d, %llu)\n", nbframes, ts);
748     // transfer the data
749     m_data_buffer->blockProcessWriteFrames(nbframes, ts);
750     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, " New timestamp: %llu\n", ts);
751     return true; // FIXME: what about failure?
752 }
753
754 bool
755 StreamProcessor::putFramesDry(unsigned int nbframes, int64_t ts)
756 {
757     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesDry(%d, %llu)\n", nbframes, ts);
758     // do nothing
759     return true;
760 }
761
762 bool
763 StreamProcessor::putSilenceFrames(unsigned int nbframes, int64_t ts)
764 {
765     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putSilenceFrames(%d, %llu)\n", nbframes, ts);
766
767     size_t bytes_per_frame = getEventSize() * getEventsPerFrame();
768     unsigned int scratch_buffer_size_frames = m_scratch_buffer_size_bytes / bytes_per_frame;
769
770     if (nbframes > scratch_buffer_size_frames) {
771         debugError("nframes (%u) > scratch_buffer_size_frames (%u)\n",
772                    nbframes, scratch_buffer_size_frames);
773     }
774
775     assert(m_scratch_buffer);
776     if(!transmitSilenceBlock((char *)m_scratch_buffer, nbframes, 0)) {
777         debugError("Could not prepare silent block\n");
778         return false;
779     }
780     if(!m_data_buffer->writeFrames(nbframes, (char *)m_scratch_buffer, ts)) {
781         debugError("Could not write silent block\n");
782         return false;
783     }
784     return true;
785 }
786
787 bool
788 StreamProcessor::shiftStream(int nbframes)
789 {
790     if(nbframes == 0) return true;
791     if(nbframes > 0) {
792         return m_data_buffer->dropFrames(nbframes);
793     } else {
794         bool result = true;
795         while(nbframes++) {
796             result &= m_data_buffer->writeDummyFrame();
797         }
798         return result;
799     }
800 }
801
802 /**
803  * @brief write silence events to the stream ringbuffers.
804  */
805 bool StreamProcessor::provideSilenceBlock(unsigned int nevents, unsigned int offset)
806 {
807     bool no_problem=true;
808     for ( PortVectorIterator it = m_PeriodPorts.begin();
809           it != m_PeriodPorts.end();
810           ++it ) {
811         if((*it)->isDisabled()) {continue;};
812
813         //FIXME: make this into a static_cast when not DEBUG?
814         Port *port=dynamic_cast<Port *>(*it);
815
816         switch(port->getPortType()) {
817
818         case Port::E_Audio:
819             if(provideSilenceToPort(static_cast<AudioPort *>(*it), offset, nevents)) {
820                 debugWarning("Could not put silence into to port %s",(*it)->getName().c_str());
821                 no_problem=false;
822             }
823             break;
824         // midi is a packet based port, don't process
825         //    case MotuPortInfo::E_Midi:
826         //        break;
827
828         default: // ignore
829             break;
830         }
831     }
832     return no_problem;
833 }
834
835 int
836 StreamProcessor::provideSilenceToPort(
837                        AudioPort *p, unsigned int offset, unsigned int nevents)
838 {
839     unsigned int j=0;
840     switch(p->getDataType()) {
841         default:
842         case Port::E_Int24:
843             {
844                 quadlet_t *buffer=(quadlet_t *)(p->getBufferAddress());
845                 assert(nevents + offset <= p->getBufferSize());
846                 buffer+=offset;
847
848                 for(j = 0; j < nevents; j += 1) { // decode max nsamples
849                     *(buffer)=0;
850                     buffer++;
851                 }
852             }
853             break;
854         case Port::E_Float:
855             {
856                 float *buffer=(float *)(p->getBufferAddress());
857                 assert(nevents + offset <= p->getBufferSize());
858                 buffer+=offset;
859
860                 for(j = 0; j < nevents; j += 1) { // decode max nsamples
861                     *buffer = 0.0;
862                     buffer++;
863                 }
864             }
865             break;
866     }
867     return 0;
868 }
869
870 /***********************************************
871  * State related API                           *
872  ***********************************************/
873 bool StreamProcessor::init()
874 {
875     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "init...\n");
876
877     if(!m_IsoHandlerManager.registerStream(this)) {
878         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register stream processor with the Iso manager\n");
879         return false;
880     }
881     if(!m_StreamProcessorManager.registerProcessor(this)) {
882         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register stream processor with the SP manager\n");
883         return false;
884     }
885
886     // initialization can be done without requesting it
887     // from the packet loop
888     m_next_state = ePS_Created;
889     return true;
890 }
891
892 bool StreamProcessor::prepare()
893 {
894     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare SP (%p)...\n", this);
895
896     // make the scratch buffer one period of frames long
897     m_scratch_buffer_size_bytes = m_StreamProcessorManager.getPeriodSize() * getEventsPerFrame() * getEventSize();
898     debugOutput( DEBUG_LEVEL_VERBOSE, " Allocate scratch buffer of %d quadlets\n");
899     if(m_scratch_buffer) delete[] m_scratch_buffer;
900     m_scratch_buffer = new byte_t[m_scratch_buffer_size_bytes];
901     if(m_scratch_buffer == NULL) {
902         debugFatal("Could not allocate scratch buffer\n");
903         return false;
904     }
905
906     if (!prepareChild()) {
907         debugFatal("Could not prepare child\n");
908         return false;
909     }
910
911     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepared for:\n");
912     debugOutput( DEBUG_LEVEL_VERBOSE, " Samplerate: %d\n",
913              m_StreamProcessorManager.getNominalRate());
914     debugOutput( DEBUG_LEVEL_VERBOSE, " PeriodSize: %d, NbBuffers: %d\n",
915              m_StreamProcessorManager.getPeriodSize(), m_StreamProcessorManager.getNbBuffers());
916     debugOutput( DEBUG_LEVEL_VERBOSE, " Port: %d, Channel: %d\n",
917              m_1394service.getPort(), m_channel);
918
919     // initialization can be done without requesting it
920     // from the packet loop
921     m_next_state = ePS_Stopped;
922     return updateState();
923 }
924
925 bool
926 StreamProcessor::scheduleStateTransition(enum eProcessorState state, uint64_t time_instant)
927 {
928     // first set the time, since in the packet loop we first check m_state == m_next_state before
929     // using the time
930     m_cycle_to_switch_state = TICKS_TO_CYCLES(time_instant);
931     m_next_state = state;
932     return true;
933 }
934
935 bool
936 StreamProcessor::waitForState(enum eProcessorState state, unsigned int timeout_ms)
937 {
938     debugOutput(DEBUG_LEVEL_VERBOSE, "Waiting for state %s\n", ePSToString(state));
939     int cnt = timeout_ms;
940     while (m_state != state && cnt) {
941         SleepRelativeUsec(1000);
942         cnt--;
943     }
944     if(cnt==0) {
945         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout\n");
946         return false;
947     }
948     return true;
949 }
950
951 bool StreamProcessor::scheduleStartDryRunning(int64_t t) {
952     uint64_t tx;
953     if (t < 0) {
954         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
955     } else {
956         tx = t;
957     }
958     uint64_t start_handler_ticks = substractTicks(tx, 100 * TICKS_PER_CYCLE);
959
960     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
961     uint64_t now = m_1394service.getCycleTimerTicks();
962     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
963                           now,
964                           (unsigned int)TICKS_TO_SECS(now),
965                           (unsigned int)TICKS_TO_CYCLES(now),
966                           (unsigned int)TICKS_TO_OFFSET(now));
967     debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%03us %04uc %04ut)\n",
968                           tx,
969                           (unsigned int)TICKS_TO_SECS(tx),
970                           (unsigned int)TICKS_TO_CYCLES(tx),
971                           (unsigned int)TICKS_TO_OFFSET(tx));
972     if (m_state == ePS_Stopped) {
973         if(!m_IsoHandlerManager.startHandlerForStream(
974                                         this, TICKS_TO_CYCLES(start_handler_ticks))) {
975             debugError("Could not start handler for SP %p\n", this);
976             return false;
977         }
978         return scheduleStateTransition(ePS_WaitingForStream, tx);
979     } else if (m_state == ePS_Running) {
980         return scheduleStateTransition(ePS_WaitingForStreamDisable, tx);
981     } else {
982         debugError("Cannot switch to ePS_DryRunning from %s\n", ePSToString(m_state));
983         return false;
984     }
985 }
986
987 bool StreamProcessor::scheduleStartRunning(int64_t t) {
988     uint64_t tx;
989     if (t < 0) {
990         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
991     } else {
992         tx = t;
993     }
994     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
995     uint64_t now = m_1394service.getCycleTimerTicks();
996     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
997                           now,
998                           (unsigned int)TICKS_TO_SECS(now),
999                           (unsigned int)TICKS_TO_CYCLES(now),
1000                           (unsigned int)TICKS_TO_OFFSET(now));
1001     debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%03us %04uc %04ut)\n",
1002                           tx,
1003                           (unsigned int)TICKS_TO_SECS(tx),
1004                           (unsigned int)TICKS_TO_CYCLES(tx),
1005                           (unsigned int)TICKS_TO_OFFSET(tx));
1006     return scheduleStateTransition(ePS_WaitingForStreamEnable, tx);
1007 }
1008
1009 bool StreamProcessor::scheduleStopDryRunning(int64_t t) {
1010     uint64_t tx;
1011     if (t < 0) {
1012         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
1013     } else {
1014         tx = t;
1015     }
1016     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1017     uint64_t now = m_1394service.getCycleTimerTicks();
1018     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1019                           now,
1020                           (unsigned int)TICKS_TO_SECS(now),
1021                           (unsigned int)TICKS_TO_CYCLES(now),
1022                           (unsigned int)TICKS_TO_OFFSET(now));
1023     debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at               : %011llu (%03us %04uc %04ut)\n",
1024                           tx,
1025                           (unsigned int)TICKS_TO_SECS(tx),
1026                           (unsigned int)TICKS_TO_CYCLES(tx),
1027                           (unsigned int)TICKS_TO_OFFSET(tx));
1028
1029     return scheduleStateTransition(ePS_Stopped, tx);
1030 }
1031
1032 bool StreamProcessor::scheduleStopRunning(int64_t t) {
1033     uint64_t tx;
1034     if (t < 0) {
1035         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
1036     } else {
1037         tx = t;
1038     }
1039     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1040     uint64_t now = m_1394service.getCycleTimerTicks();
1041     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1042                           now,
1043                           (unsigned int)TICKS_TO_SECS(now),
1044                           (unsigned int)TICKS_TO_CYCLES(now),
1045                           (unsigned int)TICKS_TO_OFFSET(now));
1046     debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at               : %011llu (%03us %04uc %04ut)\n",
1047                           tx,
1048                           (unsigned int)TICKS_TO_SECS(tx),
1049                           (unsigned int)TICKS_TO_CYCLES(tx),
1050                           (unsigned int)TICKS_TO_OFFSET(tx));
1051     return scheduleStateTransition(ePS_WaitingForStreamDisable, tx);
1052 }
1053
1054 bool StreamProcessor::startDryRunning(int64_t t) {
1055     if(!scheduleStartDryRunning(t)) {
1056         debugError("Could not schedule transition\n");
1057         return false;
1058     }
1059     if(!waitForState(ePS_DryRunning, 2000)) {
1060         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning));
1061         return false;
1062     }
1063     return true;
1064 }
1065
1066 bool StreamProcessor::startRunning(int64_t t) {
1067     if(!scheduleStartRunning(t)) {
1068         debugError("Could not schedule transition\n");
1069         return false;
1070     }
1071     if(!waitForState(ePS_Running, 2000)) {
1072         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Running));
1073         return false;
1074     }
1075     return true;
1076 }
1077
1078 bool StreamProcessor::stopDryRunning(int64_t t) {
1079     if(!scheduleStopDryRunning(t)) {
1080         debugError("Could not schedule transition\n");
1081         return false;
1082     }
1083     if(!waitForState(ePS_Stopped, 2000)) {
1084         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Stopped));
1085         return false;
1086     }
1087     return true;
1088 }
1089
1090 bool StreamProcessor::stopRunning(int64_t t) {
1091     if(!scheduleStopRunning(t)) {
1092         debugError("Could not schedule transition\n");
1093         return false;
1094     }
1095     if(!waitForState(ePS_DryRunning, 2000)) {
1096         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning));
1097         return false;
1098     }
1099     return true;
1100 }
1101
1102
1103 // internal state API
1104
1105 /**
1106  * @brief Enter the ePS_Stopped state
1107  * @return true if successful, false if not
1108  *
1109  * @pre none
1110  *
1111  * @post the buffer and the isostream are ready for use.
1112  * @post all dynamic structures have been allocated successfully
1113  * @post the buffer is transparent and empty, and all parameters are set
1114  *       to the correct initial/nominal values.
1115  *
1116  */
1117 bool
1118 StreamProcessor::doStop()
1119 {
1120     float ticks_per_frame;
1121     unsigned int ringbuffer_size_frames = (m_StreamProcessorManager.getNbBuffers() + 1) * m_StreamProcessorManager.getPeriodSize();
1122
1123     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1124     bool result = true;
1125
1126     switch(m_state) {
1127         case ePS_Created:
1128             assert(m_data_buffer);
1129             // object just created
1130             result = m_data_buffer->init();
1131
1132             // prepare the framerate estimate
1133             ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_StreamProcessorManager.getNominalRate());
1134             m_ticks_per_frame = ticks_per_frame;
1135             debugOutput(DEBUG_LEVEL_VERBOSE,"Initializing remote ticks/frame to %f\n", ticks_per_frame);
1136
1137             // initialize internal buffer
1138             result &= m_data_buffer->setBufferSize(ringbuffer_size_frames);
1139
1140             result &= m_data_buffer->setEventSize( getEventSize() );
1141             result &= m_data_buffer->setEventsPerFrame( getEventsPerFrame() );
1142             if(getType() == ePT_Receive) {
1143                 result &= m_data_buffer->setUpdatePeriod( getNominalFramesPerPacket() );
1144             } else {
1145                 result &= m_data_buffer->setUpdatePeriod( m_StreamProcessorManager.getPeriodSize() );
1146             }
1147             result &= m_data_buffer->setNominalRate(ticks_per_frame);
1148             result &= m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND);
1149             result &= m_data_buffer->prepare(); // FIXME: the name
1150
1151             // set the parameters of ports we can:
1152             // we want the audio ports to be period buffered,
1153             // and the midi ports to be packet buffered
1154             for ( PortVectorIterator it = m_Ports.begin();
1155                 it != m_Ports.end();
1156                 ++it )
1157             {
1158                 debugOutput(DEBUG_LEVEL_VERBOSE, "Setting up port %s\n",(*it)->getName().c_str());
1159                 if(!(*it)->setBufferSize(m_StreamProcessorManager.getPeriodSize())) {
1160                     debugFatal("Could not set buffer size to %d\n",m_StreamProcessorManager.getPeriodSize());
1161                     return false;
1162                 }
1163                 switch ((*it)->getPortType()) {
1164                     case Port::E_Audio:
1165                         if(!(*it)->setSignalType(Port::E_PeriodSignalled)) {
1166                             debugFatal("Could not set signal type to PeriodSignalling");
1167                             return false;
1168                         }
1169                         // buffertype and datatype are dependant on the API
1170                         debugWarning("---------------- ! Doing hardcoded dummy setup ! --------------\n");
1171                         // buffertype and datatype are dependant on the API
1172                         if(!(*it)->setBufferType(Port::E_PointerBuffer)) {
1173                             debugFatal("Could not set buffer type");
1174                             return false;
1175                         }
1176                         if(!(*it)->useExternalBuffer(true)) {
1177                             debugFatal("Could not set external buffer usage");
1178                             return false;
1179                         }
1180                         if(!(*it)->setDataType(Port::E_Float)) {
1181                             debugFatal("Could not set data type");
1182                             return false;
1183                         }
1184                         break;
1185                     case Port::E_Midi:
1186                         if(!(*it)->setSignalType(Port::E_PacketSignalled)) {
1187                             debugFatal("Could not set signal type to PacketSignalling");
1188                             return false;
1189                         }
1190                         // buffertype and datatype are dependant on the API
1191                         debugWarning("---------------- ! Doing hardcoded test setup ! --------------\n");
1192                         // buffertype and datatype are dependant on the API
1193                         if(!(*it)->setBufferType(Port::E_RingBuffer)) {
1194                             debugFatal("Could not set buffer type");
1195                             return false;
1196                         }
1197                         if(!(*it)->setDataType(Port::E_MidiEvent)) {
1198                             debugFatal("Could not set data type");
1199                             return false;
1200                         }
1201                         break;
1202                     default:
1203                         debugWarning("Unsupported port type specified\n");
1204                         break;
1205                 }
1206             }
1207             // the API specific settings of the ports should already be set,
1208             // as this is called from the processorManager->prepare()
1209             // so we can init the ports
1210             result &= PortManager::initPorts();
1211
1212             break;
1213         case ePS_DryRunning:
1214             if(!m_IsoHandlerManager.stopHandlerForStream(this)) {
1215                 debugError("Could not stop handler for SP %p\n", this);
1216                 return false;
1217             }
1218             break;
1219         default:
1220             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1221             return false;
1222     }
1223
1224     result &= m_data_buffer->clearBuffer(); // FIXME: don't like the reset() name
1225     // make the buffer transparent
1226     m_data_buffer->setTransparent(true);
1227
1228     // reset all ports
1229     result &= PortManager::preparePorts();
1230
1231     m_state = ePS_Stopped;
1232     #ifdef DEBUG
1233     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1234         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1235         dumpInfo();
1236     }
1237     #endif
1238     return result;
1239 }
1240
1241 /**
1242  * @brief Enter the ePS_WaitingForStream state
1243  * @return true if successful, false if not
1244  *
1245  * @pre all dynamic data structures are allocated successfully
1246  *
1247  * @post
1248  *
1249  */
1250 bool
1251 StreamProcessor::doWaitForRunningStream()
1252 {
1253     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1254     switch(m_state) {
1255         case ePS_Stopped:
1256             // we have to start waiting for an incoming stream
1257             // this basically means nothing, the state change will
1258             // be picked up by the packet iterator
1259             break;
1260         default:
1261             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1262             return false;
1263     }
1264     m_state = ePS_WaitingForStream;
1265     #ifdef DEBUG
1266     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1267         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1268         dumpInfo();
1269     }
1270     #endif
1271     return true;
1272 }
1273
1274 /**
1275  * @brief Enter the ePS_DryRunning state
1276  * @return true if successful, false if not
1277  *
1278  * @pre
1279  *
1280  * @post
1281  *
1282  */
1283 bool
1284 StreamProcessor::doDryRunning()
1285 {
1286     bool result = true;
1287     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1288     switch(m_state) {
1289         case ePS_WaitingForStream:
1290             // a running stream has been detected
1291             debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started dry-running at cycle %d\n", this, m_last_cycle);
1292             if (getType() == ePT_Receive) {
1293                 // this to ensure that there is no discontinuity when starting to
1294                 // update the DLL based upon the received packets
1295                 m_data_buffer->setBufferTailTimestamp(m_last_timestamp);
1296             } else {
1297                 // FIXME: PC=master mode will have to do something here I guess...
1298             }
1299             break;
1300         case ePS_WaitingForStreamEnable: // when xrunning at startup
1301             result &= m_data_buffer->clearBuffer();
1302             m_data_buffer->setTransparent(true);
1303             break;
1304         case ePS_WaitingForStreamDisable:
1305             result &= m_data_buffer->clearBuffer();
1306             m_data_buffer->setTransparent(true);
1307             break;
1308         default:
1309             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1310             return false;
1311     }
1312     m_state = ePS_DryRunning;
1313     #ifdef DEBUG
1314     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1315         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1316         dumpInfo();
1317     }
1318     #endif
1319     return result;
1320 }
1321
1322 /**
1323  * @brief Enter the ePS_WaitingForStreamEnable state
1324  * @return true if successful, false if not
1325  *
1326  * @pre
1327  *
1328  * @post
1329  *
1330  */
1331 bool
1332 StreamProcessor::doWaitForStreamEnable()
1333 {
1334     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1335     unsigned int ringbuffer_size_frames;
1336     switch(m_state) {
1337         case ePS_DryRunning:
1338             // we have to start waiting for an incoming stream
1339             // this basically means nothing, the state change will
1340             // be picked up by the packet iterator
1341
1342             if(!m_data_buffer->clearBuffer()) { // FIXME: don't like the reset() name
1343                 debugError("Could not reset data buffer\n");
1344                 return false;
1345             }
1346             if (getType() == ePT_Transmit) {
1347                 ringbuffer_size_frames = m_StreamProcessorManager.getNbBuffers() * m_StreamProcessorManager.getPeriodSize();
1348                 debugOutput(DEBUG_LEVEL_VERBOSE, "Prefill transmit SP %p with %u frames\n", this, ringbuffer_size_frames);
1349                 // prefill the buffer
1350                 if(!transferSilence(ringbuffer_size_frames)) {
1351                     debugFatal("Could not prefill transmit stream\n");
1352                     return false;
1353                 }
1354             }
1355
1356             break;
1357         default:
1358             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1359             return false;
1360     }
1361     m_state = ePS_WaitingForStreamEnable;
1362     #ifdef DEBUG
1363     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1364         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1365         dumpInfo();
1366     }
1367     #endif
1368     return true;
1369 }
1370
1371 /**
1372  * @brief Enter the ePS_Running state
1373  * @return true if successful, false if not
1374  *
1375  * @pre
1376  *
1377  * @post
1378  *
1379  */
1380 bool
1381 StreamProcessor::doRunning()
1382 {
1383     bool result = true;
1384     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1385     switch(m_state) {
1386         case ePS_WaitingForStreamEnable:
1387             // a running stream has been detected
1388             debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started running at cycle %d\n",
1389                                              this, m_last_cycle);
1390             m_in_xrun = false;
1391             m_data_buffer->setTransparent(false);
1392             break;
1393         default:
1394             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1395             return false;
1396     }
1397     m_state = ePS_Running;
1398     #ifdef DEBUG
1399     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1400         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1401         dumpInfo();
1402     }
1403     #endif
1404     return result;
1405 }
1406
1407 /**
1408  * @brief Enter the ePS_WaitingForStreamDisable state
1409  * @return true if successful, false if not
1410  *
1411  * @pre
1412  *
1413  * @post
1414  *
1415  */
1416 bool
1417 StreamProcessor::doWaitForStreamDisable()
1418 {
1419     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1420     switch(m_state) {
1421         case ePS_Running:
1422             // the thread will do the transition
1423             break;
1424         default:
1425             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1426             return false;
1427     }
1428     m_state = ePS_WaitingForStreamDisable;
1429     #ifdef DEBUG
1430     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1431         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1432         dumpInfo();
1433     }
1434     #endif
1435     return true;
1436 }
1437
1438 /**
1439  * @brief Updates the state machine and calls the necessary transition functions
1440  * @return true if successful, false if not
1441  */
1442 bool StreamProcessor::updateState() {
1443     bool result = false;
1444     // copy the current state locally since it could change value,
1445     // and that's something we don't want to happen inbetween tests
1446     // if m_next_state changes during this routine, we know for sure
1447     // that the previous state change was at least attempted correctly.
1448     enum eProcessorState next_state = m_next_state;
1449
1450     debugOutput(DEBUG_LEVEL_VERBOSE, "Do state transition: %s => %s\n",
1451         ePSToString(m_state), ePSToString(next_state));
1452
1453     if (m_state == next_state) {
1454         debugWarning("ignoring identity state update from/to %s\n", ePSToString(m_state) );
1455         return true;
1456     }
1457
1458     // after creation, only initialization is allowed
1459     if (m_state == ePS_Created) {
1460         if(next_state != ePS_Stopped) {
1461             goto updateState_exit_with_error;
1462         }
1463         // do init here
1464         result = doStop();
1465         if (result) return true;
1466         else goto updateState_exit_change_failed;
1467     }
1468
1469     // after initialization, only WaitingForRunningStream is allowed
1470     if (m_state == ePS_Stopped) {
1471         if(next_state != ePS_WaitingForStream) {
1472             goto updateState_exit_with_error;
1473         }
1474         result = doWaitForRunningStream();
1475         if (result) return true;
1476         else goto updateState_exit_change_failed;
1477     }
1478
1479     // after WaitingForStream, only ePS_DryRunning is allowed
1480     // this means that the stream started running
1481     if (m_state == ePS_WaitingForStream) {
1482         if(next_state != ePS_DryRunning) {
1483             goto updateState_exit_with_error;
1484         }
1485         result = doDryRunning();
1486         if (result) return true;
1487         else goto updateState_exit_change_failed;
1488     }
1489
1490     // from ePS_DryRunning we can go to:
1491     //   - ePS_Stopped if something went wrong during DryRunning
1492     //   - ePS_WaitingForStreamEnable if there is a requested to enable
1493     if (m_state == ePS_DryRunning) {
1494         if((next_state != ePS_Stopped) &&
1495            (next_state != ePS_WaitingForStreamEnable)) {
1496             goto updateState_exit_with_error;
1497         }
1498         if (next_state == ePS_Stopped) {
1499             result = doStop();
1500         } else {
1501             result = doWaitForStreamEnable();
1502         }
1503         if (result) return true;
1504         else goto updateState_exit_change_failed;
1505     }
1506
1507     // from ePS_WaitingForStreamEnable we can go to:
1508     //   - ePS_DryRunning if something went wrong while waiting
1509     //   - ePS_Running if the stream enabled correctly
1510     if (m_state == ePS_WaitingForStreamEnable) {
1511         if((next_state != ePS_DryRunning) &&
1512            (next_state != ePS_Running)) {
1513             goto updateState_exit_with_error;
1514         }
1515         if (next_state == ePS_Stopped) {
1516             result = doDryRunning();
1517         } else {
1518             result = doRunning();
1519         }
1520         if (result) return true;
1521         else goto updateState_exit_change_failed;
1522     }
1523
1524     // from ePS_Running we can only start waiting for a disabled stream
1525     if (m_state == ePS_Running) {
1526         if(next_state != ePS_WaitingForStreamDisable) {
1527             goto updateState_exit_with_error;
1528         }
1529         result = doWaitForStreamDisable();
1530         if (result) return true;
1531         else goto updateState_exit_change_failed;
1532     }
1533
1534     // from ePS_WaitingForStreamDisable we can go to DryRunning
1535     if (m_state == ePS_WaitingForStreamDisable) {
1536         if(next_state != ePS_DryRunning) {
1537             goto updateState_exit_with_error;
1538         }
1539         result = doDryRunning();
1540         if (result) return true;
1541         else goto updateState_exit_change_failed;
1542     }
1543
1544     // if we arrive here there is an error
1545 updateState_exit_with_error:
1546     debugError("Invalid state transition: %s => %s\n",
1547         ePSToString(m_state), ePSToString(next_state));
1548     return false;
1549 updateState_exit_change_failed:
1550     debugError("State transition failed: %s => %s\n",
1551         ePSToString(m_state), ePSToString(next_state));
1552     return false;
1553 }
1554
1555 /***********************************************
1556  * Helper routines                             *
1557  ***********************************************/
1558 bool
1559 StreamProcessor::transferSilence(unsigned int nframes)
1560 {
1561     bool retval;
1562     signed int fc;
1563     ffado_timestamp_t ts_tail_tmp;
1564
1565     // prepare a buffer of silence
1566     char *dummybuffer = (char *)calloc(getEventSize(), nframes * getEventsPerFrame());
1567     transmitSilenceBlock(dummybuffer, nframes, 0);
1568
1569     m_data_buffer->getBufferTailTimestamp(&ts_tail_tmp, &fc);
1570     if (fc != 0) {
1571         debugWarning("Prefilling a buffer that already contains %d frames\n", fc);
1572     }
1573
1574     // add the silence data to the ringbuffer
1575     if(m_data_buffer->preloadFrames(nframes, dummybuffer, true)) {
1576         retval = true;
1577     } else {
1578         debugWarning("Could not write to event buffer\n");
1579         retval = false;
1580     }
1581     free(dummybuffer);
1582     return retval;
1583 }
1584
1585 /**
1586  * @brief convert a eProcessorState to a string
1587  * @param s the state
1588  * @return a char * describing the state
1589  */
1590 const char *
1591 StreamProcessor::ePSToString(enum eProcessorState s) {
1592     switch (s) {
1593         case ePS_Invalid: return "ePS_Invalid";
1594         case ePS_Created: return "ePS_Created";
1595         case ePS_Stopped: return "ePS_Stopped";
1596         case ePS_WaitingForStream: return "ePS_WaitingForStream";
1597         case ePS_DryRunning: return "ePS_DryRunning";
1598         case ePS_WaitingForStreamEnable: return "ePS_WaitingForStreamEnable";
1599         case ePS_Running: return "ePS_Running";
1600         case ePS_WaitingForStreamDisable: return "ePS_WaitingForStreamDisable";
1601         default: return "error: unknown state";
1602     }
1603 }
1604
1605 /**
1606  * @brief convert a eProcessorType to a string
1607  * @param t the type
1608  * @return a char * describing the state
1609  */
1610 const char *
1611 StreamProcessor::ePTToString(enum eProcessorType t) {
1612     switch (t) {
1613         case ePT_Receive: return "Receive";
1614         case ePT_Transmit: return "Transmit";
1615         default: return "error: unknown type";
1616     }
1617 }
1618
1619 /***********************************************
1620  * Debug                                       *
1621  ***********************************************/
1622 void
1623 StreamProcessor::dumpInfo()
1624 {
1625     debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor %p:\n", this);
1626     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Port, Channel  : %d, %d\n", m_1394service.getPort(), m_channel);
1627     uint64_t now = m_1394service.getCycleTimerTicks();
1628     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Now                   : %011llu (%03us %04uc %04ut)\n",
1629                         now,
1630                         (unsigned int)TICKS_TO_SECS(now),
1631                         (unsigned int)TICKS_TO_CYCLES(now),
1632                         (unsigned int)TICKS_TO_OFFSET(now));
1633     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Xrun?                 : %s\n", (m_in_xrun ? "True":"False"));
1634     if (m_state == m_next_state) {
1635         debugOutputShort( DEBUG_LEVEL_NORMAL, "  State                 : %s\n",
1636                                             ePSToString(m_state));
1637     } else {
1638         debugOutputShort( DEBUG_LEVEL_NORMAL, "  State                 : %s (Next: %s)\n",
1639                                               ePSToString(m_state), ePSToString(m_next_state));
1640         debugOutputShort( DEBUG_LEVEL_NORMAL, "    transition at       : %u\n", m_cycle_to_switch_state);
1641     }
1642     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer                : %p\n", m_data_buffer);
1643     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Framerate             : Nominal: %u, Sync: %f, Buffer %f\n",
1644                                           m_StreamProcessorManager.getNominalRate(),
1645                                           24576000.0/m_StreamProcessorManager.getSyncSource().m_data_buffer->getRate(),
1646                                           24576000.0/m_data_buffer->getRate());
1647     float d = getSyncDelay();
1648     debugOutputShort(DEBUG_LEVEL_NORMAL, "  Sync delay             : %f ticks (%f frames, %f cy)\n",
1649                                          d, d/getTicksPerFrame(),
1650                                          d/((float)TICKS_PER_CYCLE));
1651     m_data_buffer->dumpInfo();
1652 }
1653
1654 void
1655 StreamProcessor::setVerboseLevel(int l) {
1656     setDebugLevel(l);
1657     PortManager::setVerboseLevel(l);
1658     m_data_buffer->setVerboseLevel(l);
1659 }
1660
1661 } // end of namespace
Note: See TracBrowser for help on using the browser.