root/trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp

Revision 798, 63.7 kB (checked in by ppalmers, 13 years ago)

cleanup and reliability fixes

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "config.h"
25
26 #include "StreamProcessor.h"
27 #include "../StreamProcessorManager.h"
28
29 #include "devicemanager.h"
30
31 #include "libieee1394/ieee1394service.h"
32 #include "libieee1394/IsoHandlerManager.h"
33 #include "libieee1394/cycletimer.h"
34
35 #include "libutil/Time.h"
36
37 #include "libutil/Atomic.h"
38
39 #include <assert.h>
40 #include <math.h>
41
42 namespace Streaming {
43
44 IMPL_DEBUG_MODULE( StreamProcessor, StreamProcessor, DEBUG_LEVEL_VERBOSE );
45
46 StreamProcessor::StreamProcessor(FFADODevice &parent, enum eProcessorType type)
47     : m_processor_type ( type )
48     , m_state( ePS_Created )
49     , m_next_state( ePS_Invalid )
50     , m_cycle_to_switch_state( 0 )
51     , m_Parent( parent )
52     , m_1394service( parent.get1394Service() ) // local cache
53     , m_IsoHandlerManager( parent.get1394Service().getIsoHandlerManager() ) // local cache
54     , m_StreamProcessorManager( m_Parent.getDeviceManager().getStreamProcessorManager() ) // local cache
55     , m_channel( -1 )
56     , m_dropped(0)
57     , m_last_timestamp(0)
58     , m_last_timestamp2(0)
59     , m_scratch_buffer( NULL )
60     , m_scratch_buffer_size_bytes( 0 )
61     , m_ticks_per_frame( 0 )
62     , m_last_cycle( -1 )
63     , m_sync_delay( 0 )
64     , m_in_xrun( false )
65 {
66     // create the timestamped buffer and register ourselves as its client
67     m_data_buffer = new Util::TimestampedBuffer(this);
68 }
69
70 StreamProcessor::~StreamProcessor() {
71     m_StreamProcessorManager.unregisterProcessor(this);
72     if(!m_IsoHandlerManager.unregisterStream(this)) {
73         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister stream processor with the Iso manager\n");
74     }
75
76     if (m_data_buffer) delete m_data_buffer;
77     if (m_scratch_buffer) delete[] m_scratch_buffer;
78 }
79
80 uint64_t StreamProcessor::getTimeNow() {
81     return m_1394service.getCycleTimerTicks();
82 }
83
84 int StreamProcessor::getMaxFrameLatency() {
85     if (getType() == ePT_Receive) {
86         return (int)(m_IsoHandlerManager.getPacketLatencyForStream( this ) * TICKS_PER_CYCLE);
87     } else {
88         return (int)(m_IsoHandlerManager.getPacketLatencyForStream( this ) * TICKS_PER_CYCLE);
89     }
90 }
91
92 unsigned int
93 StreamProcessor::getNominalPacketsNeeded(unsigned int nframes)
94 {
95     unsigned int nominal_frames_per_second
96                     = m_StreamProcessorManager.getNominalRate();
97     uint64_t nominal_ticks_per_frame = TICKS_PER_SECOND / nominal_frames_per_second;
98     uint64_t nominal_ticks = nominal_ticks_per_frame * nframes;
99     uint64_t nominal_packets = nominal_ticks / TICKS_PER_CYCLE;
100     return nominal_packets;
101 }
102
103 unsigned int
104 StreamProcessor::getPacketsPerPeriod()
105 {
106     return getNominalPacketsNeeded(m_StreamProcessorManager.getPeriodSize());
107 }
108
109 unsigned int
110 StreamProcessor::getNbPacketsIsoXmitBuffer()
111 {
112     // the target is to have all of the transmit buffer (at period transfer) as ISO packets
113     // when one period is received, there will be approx (NbBuffers - 1) * period_size frames
114     // in the transmit buffer (the others are still to be put into the xmit frame buffer)
115     unsigned int packets_to_prebuffer = (getPacketsPerPeriod() * (m_StreamProcessorManager.getNbBuffers()-1));
116     debugOutput(DEBUG_LEVEL_VERBOSE, "Nominal prebuffer: %u\n", packets_to_prebuffer);
117    
118     // however we have to take into account the fact that there is some sync delay
119     // we assume that the SPM has indicated
120     // HACK: this counts on the fact that the latency for this stream will be the same as the
121     //       latency for the receive sync source
122     unsigned int est_sync_delay = getPacketsPerPeriod() / MINIMUM_INTERRUPTS_PER_PERIOD;
123     est_sync_delay += STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS / TICKS_PER_CYCLE;
124     packets_to_prebuffer -= est_sync_delay;
125     debugOutput(DEBUG_LEVEL_VERBOSE, " correct for sync delay (%d): %u\n",
126                                      est_sync_delay,
127                                      packets_to_prebuffer);
128    
129     // only queue a part of the theoretical max in order not to have too much 'not ready' cycles
130     packets_to_prebuffer = (packets_to_prebuffer * MAX_ISO_XMIT_BUFFER_FILL_PCT * 1000) / 100000;
131     debugOutput(DEBUG_LEVEL_VERBOSE, " reduce to %d%%: %u\n",
132                                      MAX_ISO_XMIT_BUFFER_FILL_PCT, packets_to_prebuffer);
133    
134     return packets_to_prebuffer;
135 }
136
137 /***********************************************
138  * Buffer management and manipulation          *
139  ***********************************************/
140 void StreamProcessor::flush() {
141     m_IsoHandlerManager.flushHandlerForStream(this);
142 }
143
144 int StreamProcessor::getBufferFill() {
145     return m_data_buffer->getBufferFill();
146 }
147
148 int64_t
149 StreamProcessor::getTimeUntilNextPeriodSignalUsecs()
150 {
151     uint64_t time_at_period=getTimeAtPeriod();
152
153     // we delay the period signal with the sync delay
154     // this makes that the period signals lag a little compared to reality
155     // ISO buffering causes the packets to be received at max
156     // m_handler->getWakeupInterval() later than the time they were received.
157     // hence their payload is available this amount of time later. However, the
158     // period boundary is predicted based upon earlier samples, and therefore can
159     // pass before these packets are processed. Adding this extra term makes that
160     // the period boundary is signalled later
161     time_at_period = addTicks(time_at_period, m_StreamProcessorManager.getSyncSource().getSyncDelay());
162
163     uint64_t cycle_timer=m_1394service.getCycleTimerTicks();
164
165     // calculate the time until the next period
166     int32_t until_next=diffTicks(time_at_period,cycle_timer);
167
168     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> TAP=%11llu, CTR=%11llu, UTN=%11ld\n",
169         time_at_period, cycle_timer, until_next
170         );
171
172     // now convert to usecs
173     // don't use the mapping function because it only works
174     // for absolute times, not the relative time we are
175     // using here (which can also be negative).
176     return (int64_t)(((float)until_next) / TICKS_PER_USEC);
177 }
178
179 void
180 StreamProcessor::setSyncDelay(int d) {
181     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "Setting SP %p SyncDelay to %d ticks\n", this, d);
182     m_sync_delay = d;
183 }
184
185 uint64_t
186 StreamProcessor::getTimeAtPeriodUsecs()
187 {
188     return (uint64_t)((float)getTimeAtPeriod() * TICKS_PER_USEC);
189 }
190
191 uint64_t
192 StreamProcessor::getTimeAtPeriod()
193 {
194     if (getType() == ePT_Receive) {
195         ffado_timestamp_t next_period_boundary=m_data_buffer->getTimestampFromHead(m_StreamProcessorManager.getPeriodSize());
196    
197         #ifdef DEBUG
198         ffado_timestamp_t ts;
199         signed int fc;
200         m_data_buffer->getBufferTailTimestamp(&ts,&fc);
201    
202         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD="TIMESTAMP_FORMAT_SPEC", LTS="TIMESTAMP_FORMAT_SPEC", FC=%5u, TPF=%f\n",
203             next_period_boundary, ts, fc, getTicksPerFrame()
204             );
205         #endif
206         return (uint64_t)next_period_boundary;
207     } else {
208         ffado_timestamp_t next_period_boundary=m_data_buffer->getTimestampFromTail((m_StreamProcessorManager.getNbBuffers()-1) * m_StreamProcessorManager.getPeriodSize());
209    
210         #ifdef DEBUG
211         ffado_timestamp_t ts;
212         signed int fc;
213         m_data_buffer->getBufferTailTimestamp(&ts,&fc);
214    
215         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD="TIMESTAMP_FORMAT_SPEC", LTS="TIMESTAMP_FORMAT_SPEC", FC=%5u, TPF=%f\n",
216             next_period_boundary, ts, fc, getTicksPerFrame()
217             );
218         #endif
219         return (uint64_t)next_period_boundary;
220     }
221 }
222
223 float
224 StreamProcessor::getTicksPerFrame()
225 {
226     assert(m_data_buffer != NULL);
227     return m_data_buffer->getRate();
228 }
229
230 bool
231 StreamProcessor::canClientTransferFrames(unsigned int nbframes)
232 {
233     bool can_transfer;
234     unsigned int fc = m_data_buffer->getFrameCounter();
235     if (getType() == ePT_Receive) {
236         can_transfer = (fc >= nbframes);
237     } else {
238         // there has to be enough space to put the frames in
239         can_transfer = m_data_buffer->getBufferSize() - fc > nbframes;
240         // or the buffer is transparent
241         can_transfer |= m_data_buffer->isTransparent();
242     }
243    
244     #ifdef DEBUG
245     if (!can_transfer) {
246         debugWarning("(%p, %s) cannot transfer since fc == %u, nbframes == %u\n",
247             this, ePTToString(getType()), fc, nbframes);
248     }
249     #endif
250    
251     return can_transfer;
252 }
253
254 /***********************************************
255  * I/O API                                     *
256  ***********************************************/
257
258 // Packet transfer API
259 enum raw1394_iso_disposition
260 StreamProcessor::putPacket(unsigned char *data, unsigned int length,
261                            unsigned char channel, unsigned char tag, unsigned char sy,
262                            unsigned int cycle, unsigned int dropped) {
263     if(m_last_cycle == -1) {
264         debugOutput(DEBUG_LEVEL_VERBOSE, "Handler for %s SP %p is alive (cycle = %u)\n", getTypeString(), this, cycle);
265     }
266
267     int dropped_cycles = 0;
268     if (m_last_cycle != (int)cycle && m_last_cycle != -1) {
269         dropped_cycles = diffCycles(cycle, m_last_cycle) - 1;
270         if (dropped_cycles < 0) {
271             debugWarning("(%p) dropped < 1 (%d), cycle: %d, last_cycle: %d, dropped: %d\n",
272                          this, dropped_cycles, cycle, m_last_cycle, dropped);
273         }
274         if (dropped_cycles > 0) {
275             debugWarning("(%p) dropped %d packets on cycle %u, 'dropped'=%u, cycle=%d, m_last_cycle=%d\n",
276                 this, dropped_cycles, cycle, dropped, cycle, m_last_cycle);
277             m_dropped += dropped_cycles;
278             m_in_xrun = true;
279             //flushDebugOutput();
280             //assert(0);
281         }
282     }
283     m_last_cycle = cycle;
284
285     // bypass based upon state
286     if (m_state == ePS_Invalid) {
287         debugError("Should not have state %s\n", ePSToString(m_state) );
288         return RAW1394_ISO_ERROR;
289     }
290     if (m_state == ePS_Created) {
291         return RAW1394_ISO_DEFER;
292     }
293
294     // store the previous timestamp
295     m_last_timestamp2 = m_last_timestamp;
296
297     // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles)
298     //       it happens on the first 'good' cycle for the wait condition
299     //       or on the first received cycle that is received afterwards (might be a problem)
300
301     // check whether we are waiting for a stream to be disabled
302     if(m_state == ePS_WaitingForStreamDisable) {
303         // we then check whether we have to switch on this cycle
304         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
305             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n");
306             m_next_state = ePS_DryRunning;
307             if (!updateState()) { // we are allowed to change the state directly
308                 debugError("Could not update state!\n");
309                 return RAW1394_ISO_ERROR;
310             }
311         } else {
312             // not time to disable yet
313         }
314         // the received data can be discarded while waiting for the stream
315         // to be disabled
316         return RAW1394_ISO_OK;
317     }
318
319     // check whether we are waiting for a stream to be enabled
320     else if(m_state == ePS_WaitingForStreamEnable) {
321         // we then check whether we have to switch on this cycle
322         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
323             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n");
324             m_next_state = ePS_Running;
325             if (!updateState()) { // we are allowed to change the state directly
326                 debugError("Could not update state!\n");
327                 return RAW1394_ISO_ERROR;
328             }
329         } else {
330             // not time to enable yet
331         }
332         // we are dryRunning hence data should be processed in any case
333     }
334
335     // check the packet header
336     enum eChildReturnValue result = processPacketHeader(data, length, channel, tag, sy, cycle, dropped_cycles);
337     if (result == eCRV_OK) {
338         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "RECV: CY=%04u TS=%011llu\n",
339                 cycle, m_last_timestamp);
340         // update some accounting
341         m_last_good_cycle = cycle;
342         m_last_dropped = dropped_cycles;
343
344         // check whether we are waiting for a stream to startup
345         // this requires that the packet is good
346         if(m_state == ePS_WaitingForStream) {
347             // since we have a packet with an OK header,
348             // we can indicate that the stream started up
349
350             // we then check whether we have to switch on this cycle
351             if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
352                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to good packet\n");
353                 // hence go to the dryRunning state
354                 m_next_state = ePS_DryRunning;
355                 if (!updateState()) { // we are allowed to change the state directly
356                     debugError("Could not update state!\n");
357                     return RAW1394_ISO_ERROR;
358                 }
359             } else {
360                 // not time (yet) to switch state
361             }
362             // in both cases we don't want to process the data
363             return RAW1394_ISO_OK;
364         }
365
366         // check whether a state change has been requested
367         // note that only the wait state changes are synchronized with the cycles
368         else if(m_state != m_next_state) {
369             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
370                                              ePSToString(m_state), ePSToString(m_next_state));
371             // execute the requested change
372             if (!updateState()) { // we are allowed to change the state directly
373                 debugError("Could not update state!\n");
374                 return RAW1394_ISO_ERROR;
375             }
376         }
377
378         // handle dropped cycles
379         if(dropped_cycles) {
380             // they represent a discontinuity in the timestamps, and hence are
381             // to be dealt with
382             debugWarning("(%p) Correcting timestamp for dropped cycles, discarding packet...\n", this);
383             m_data_buffer->setBufferTailTimestamp(m_last_timestamp);
384             if (m_state == ePS_Running) {
385                 // this is an xrun situation
386                 m_in_xrun = true;
387                 debugWarning("Should update state to WaitingForStreamDisable due to dropped packet xrun\n");
388                 m_cycle_to_switch_state = cycle + 1; // switch in the next cycle
389                 m_next_state = ePS_WaitingForStreamDisable;
390                 // execute the requested change
391                 if (!updateState()) { // we are allowed to change the state directly
392                     debugError("Could not update state!\n");
393                     return RAW1394_ISO_ERROR;
394                 }
395                 return RAW1394_ISO_DEFER;
396             }
397         }
398
399         // for all states that reach this we are allowed to
400         // do protocol specific data reception
401         enum eChildReturnValue result2 = processPacketData(data, length, channel, tag, sy, cycle, dropped_cycles);
402
403         // if an xrun occured, switch to the dryRunning state and
404         // allow for the xrun to be picked up
405         if (result2 == eCRV_XRun) {
406             debugWarning("processPacketData xrun\n");
407             m_in_xrun = true;
408             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
409             m_cycle_to_switch_state = cycle+1; // switch in the next cycle
410             m_next_state = ePS_WaitingForStreamDisable;
411             // execute the requested change
412             if (!updateState()) { // we are allowed to change the state directly
413                 debugError("Could not update state!\n");
414                 return RAW1394_ISO_ERROR;
415             }
416             return RAW1394_ISO_DEFER;
417         } else if(result2 == eCRV_OK) {
418             // no problem here
419             return RAW1394_ISO_OK;
420         } else {
421             debugError("Invalid response\n");
422             return RAW1394_ISO_ERROR;
423         }
424     } else if(result == eCRV_Invalid) {
425         // apparently we don't have to do anything when the packets are not valid
426         return RAW1394_ISO_OK;
427     } else {
428         debugError("Invalid response\n");
429         return RAW1394_ISO_ERROR;
430     }
431     debugError("reached the unreachable\n");
432     return RAW1394_ISO_ERROR;
433 }
434
435 enum raw1394_iso_disposition
436 StreamProcessor::getPacket(unsigned char *data, unsigned int *length,
437                            unsigned char *tag, unsigned char *sy,
438                            int cycle, unsigned int dropped, unsigned int max_length) {
439     if (cycle<0) {
440         *tag = 0;
441         *sy = 0;
442         *length = 0;
443         return RAW1394_ISO_OK;
444     }
445
446     unsigned int ctr;
447     int now_cycles;
448     int cycle_diff;
449
450     if(m_last_cycle == -1) {
451         debugOutput(DEBUG_LEVEL_VERBOSE, "Handler for %s SP %p is alive (cycle = %d)\n", getTypeString(), this, cycle);
452     }
453
454     int dropped_cycles = 0;
455     if (m_last_cycle != cycle && m_last_cycle != -1) {
456         dropped_cycles = diffCycles(cycle, m_last_cycle) - 1;
457         if (dropped_cycles < 0) {
458             debugWarning("(%p) dropped < 1 (%d), cycle: %d, last_cycle: %d, dropped: %d\n",
459                          this, dropped_cycles, cycle, m_last_cycle, dropped);
460         }
461         if (dropped_cycles > 0) {
462             debugWarning("(%p) dropped %d packets on cycle %u (last_cycle=%u, dropped=%d)\n", this, dropped_cycles, cycle, m_last_cycle, dropped);
463             m_dropped += dropped_cycles;
464             // HACK: this should not be necessary, since the header generation functions should trigger the xrun.
465             //       but apparently there are some issues with the 1394 stack
466             m_in_xrun = true;
467             if(m_state == ePS_Running) {
468                 debugShowBackLogLines(200);
469                 debugWarning("dropped packets xrun\n");
470                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to dropped packets xrun\n");
471                 m_next_state = ePS_WaitingForStreamDisable;
472                 // execute the requested change
473                 if (!updateState()) { // we are allowed to change the state directly
474                     debugError("Could not update state!\n");
475                     return RAW1394_ISO_ERROR;
476                 }
477                 goto send_empty_packet;
478             }
479         }
480     }
481     if (cycle >= 0) {
482         m_last_cycle = cycle;
483     }
484
485     // bypass based upon state
486     if (m_state == ePS_Invalid) {
487         debugError("Should not have state %s\n", ePSToString(m_state) );
488         return RAW1394_ISO_ERROR;
489     }
490     if (m_state == ePS_Created) {
491         *tag = 0;
492         *sy = 0;
493         *length = 0;
494         return RAW1394_ISO_DEFER;
495     }
496
497     // normal processing
498     // note that we can't use getCycleTimer directly here,
499     // because packets are queued in advance. This means that
500     // we the packet we are constructing will be sent out
501     // on 'cycle', not 'now'.
502     ctr = m_1394service.getCycleTimer();
503     now_cycles = (int)CYCLE_TIMER_GET_CYCLES(ctr);
504
505     // the difference between the cycle this
506     // packet is intended for and 'now'
507     cycle_diff = diffCycles(cycle, now_cycles);
508
509     if(cycle_diff < 0 && (m_state == ePS_Running || m_state == ePS_DryRunning)) {
510         debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy)\n",
511             cycle, now_cycles);
512         if(m_state == ePS_Running) {
513             debugShowBackLogLines(200);
514 //             flushDebugOutput();
515 //             assert(0);
516             debugWarning("generatePacketData xrun\n");
517             m_in_xrun = true;
518             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
519             m_next_state = ePS_WaitingForStreamDisable;
520             // execute the requested change
521             if (!updateState()) { // we are allowed to change the state directly
522                 debugError("Could not update state!\n");
523                 return RAW1394_ISO_ERROR;
524             }
525             goto send_empty_packet;
526         }
527     }
528
529     // store the previous timestamp
530     m_last_timestamp2 = m_last_timestamp;
531
532     // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles)
533     //       it happens on the first 'good' cycle for the wait condition
534     //       or on the first received cycle that is received afterwards (might be a problem)
535
536     // check whether we are waiting for a stream to be disabled
537     if(m_state == ePS_WaitingForStreamDisable) {
538         // we then check whether we have to switch on this cycle
539         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
540             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n");
541             m_next_state = ePS_DryRunning;
542             if (!updateState()) { // we are allowed to change the state directly
543                 debugError("Could not update state!\n");
544                 return RAW1394_ISO_ERROR;
545             }
546         } else {
547             // not time to disable yet
548         }
549     }
550     // check whether we are waiting for a stream to be enabled
551     else if(m_state == ePS_WaitingForStreamEnable) {
552         // we then check whether we have to switch on this cycle
553         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
554             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n");
555             m_next_state = ePS_Running;
556             if (!updateState()) { // we are allowed to change the state directly
557                 debugError("Could not update state!\n");
558                 return RAW1394_ISO_ERROR;
559             }
560         } else {
561             // not time to enable yet
562         }
563         // we are dryRunning hence data should be processed in any case
564     }
565     // check whether we are waiting for a stream to startup
566     else if(m_state == ePS_WaitingForStream) {
567         // as long as the cycle parameter is not in sync with
568         // the current time, the stream is considered not
569         // to be 'running'
570         // we then check whether we have to switch on this cycle
571         if ((cycle_diff >= 0) && (diffCycles(cycle, m_cycle_to_switch_state) >= 0)) {
572             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStream to DryRunning\n");
573             // hence go to the dryRunning state
574             m_next_state = ePS_DryRunning;
575             if (!updateState()) { // we are allowed to change the state directly
576                 debugError("Could not update state!\n");
577                 return RAW1394_ISO_ERROR;
578             }
579         } else {
580             // not time (yet) to switch state
581         }
582     }
583     else if(m_state == ePS_Running) {
584         // check the packet header
585         enum eChildReturnValue result = generatePacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
586         if (result == eCRV_Packet || result == eCRV_Defer) {
587             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT: CY=%04u TS=%011llu\n",
588                     cycle, m_last_timestamp);
589             // update some accounting
590             m_last_good_cycle = cycle;
591             m_last_dropped = dropped_cycles;
592
593             // check whether a state change has been requested
594             // note that only the wait state changes are synchronized with the cycles
595             if(m_state != m_next_state) {
596                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
597                                                 ePSToString(m_state), ePSToString(m_next_state));
598                 // execute the requested change
599                 if (!updateState()) { // we are allowed to change the state directly
600                     debugError("Could not update state!\n");
601                     return RAW1394_ISO_ERROR;
602                 }
603             }
604
605             enum eChildReturnValue result2 = generatePacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
606             // if an xrun occured, switch to the dryRunning state and
607             // allow for the xrun to be picked up
608             if (result2 == eCRV_XRun) {
609                 debugWarning("generatePacketData xrun\n");
610                 m_in_xrun = true;
611                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
612                 m_cycle_to_switch_state = cycle+1; // switch in the next cycle
613                 m_next_state = ePS_WaitingForStreamDisable;
614                 // execute the requested change
615                 if (!updateState()) { // we are allowed to change the state directly
616                     debugError("Could not update state!\n");
617                     return RAW1394_ISO_ERROR;
618                 }
619                 goto send_empty_packet;
620             }
621             // skip queueing packets if we detect that there are not enough frames
622             // available
623             if(result2 == eCRV_Defer || result == eCRV_Defer)
624                 return RAW1394_ISO_DEFER;
625             else
626                 return RAW1394_ISO_OK;
627         } else if (result == eCRV_XRun) { // pick up the possible xruns
628             debugWarning("generatePacketHeader xrun\n");
629             m_in_xrun = true;
630             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to header xrun\n");
631             m_next_state = ePS_WaitingForStreamDisable;
632             // execute the requested change
633             if (!updateState()) { // we are allowed to change the state directly
634                 debugError("Could not update state!\n");
635                 return RAW1394_ISO_ERROR;
636             }
637         } else if (result == eCRV_EmptyPacket) {
638             if(m_state != m_next_state) {
639                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
640                                                 ePSToString(m_state), ePSToString(m_next_state));
641                 // execute the requested change
642                 if (!updateState()) { // we are allowed to change the state directly
643                     debugError("Could not update state!\n");
644                     return RAW1394_ISO_ERROR;
645                 }
646             }
647             goto send_empty_packet;
648         } else if (result == eCRV_Again) {
649             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "have to retry cycle %d\n", cycle);
650             if(m_state != m_next_state) {
651                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
652                                                 ePSToString(m_state), ePSToString(m_next_state));
653                 // execute the requested change
654                 if (!updateState()) { // we are allowed to change the state directly
655                     debugError("Could not update state!\n");
656                     return RAW1394_ISO_ERROR;
657                 }
658             }
659             usleep(125); // only when using thread-per-handler
660             return RAW1394_ISO_AGAIN;
661 //             generateSilentPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
662 //             generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
663 //             return RAW1394_ISO_DEFER;
664         } else {
665             debugError("Invalid return value: %d\n", result);
666             return RAW1394_ISO_ERROR;
667         }
668     }
669     // we are not running, so send an empty packet
670     // we should generate a valid packet any time
671 send_empty_packet:
672     // note that only the wait state changes are synchronized with the cycles
673     if(m_state != m_next_state) {
674         debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
675                                         ePSToString(m_state), ePSToString(m_next_state));
676         // execute the requested change
677         if (!updateState()) { // we are allowed to change the state directly
678             debugError("Could not update state!\n");
679             return RAW1394_ISO_ERROR;
680         }
681     }
682
683     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT EMPTY: CY=%04u\n", cycle);
684     generateSilentPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
685     generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
686     return RAW1394_ISO_OK;
687 }
688
689
690 // Frame Transfer API
691 /**
692  * Transfer a block of frames from the event buffer to the port buffers
693  * @param nbframes number of frames to transfer
694  * @param ts the timestamp that the LAST frame in the block should have
695  * @return
696  */
697 bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) {
698     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.getFrames(%d, %11llu)", nbframes, ts);
699     assert( getType() == ePT_Receive );
700     if(isDryRunning()) return getFramesDry(nbframes, ts);
701     else return getFramesWet(nbframes, ts);
702 }
703
704 bool StreamProcessor::getFramesWet(unsigned int nbframes, int64_t ts) {
705 // FIXME: this should be done somewhere else
706 #ifdef DEBUG
707     uint64_t ts_expected;
708     signed int fc;
709     int32_t lag_ticks;
710     float lag_frames;
711
712     // in order to sync up multiple received streams, we should
713     // use the ts parameter. It specifies the time of the block's
714     // last sample.
715     float srate = m_StreamProcessorManager.getSyncSource().getTicksPerFrame();
716     assert(srate != 0.0);
717     int64_t this_block_length_in_ticks = (int64_t)(((float)nbframes) * srate);
718
719     ffado_timestamp_t ts_head_tmp;
720     m_data_buffer->getBufferHeadTimestamp(&ts_head_tmp, &fc);
721     ts_expected = addTicks((uint64_t)ts_head_tmp, this_block_length_in_ticks);
722
723     lag_ticks = diffTicks(ts, ts_expected);
724     lag_frames = (((float)lag_ticks) / srate);
725     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): drifts %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
726                  this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
727     if (lag_frames >= 1.0) {
728         // the stream lags
729         debugWarning( "stream (%p): lags  with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
730                       this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
731     } else if (lag_frames <= -1.0) {
732         // the stream leads
733         debugWarning( "stream (%p): leads with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
734                       this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
735     }
736 #endif
737     // ask the buffer to process nbframes of frames
738     // using it's registered client's processReadBlock(),
739     // which should be ours
740     m_data_buffer->blockProcessReadFrames(nbframes);
741     return true;
742 }
743
744 bool StreamProcessor::getFramesDry(unsigned int nbframes, int64_t ts)
745 {
746     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): dry run %d frames (@ ts=%lld)\n",
747                  this, nbframes, ts);
748     // dry run on this side means that we put silence in all enabled ports
749     // since there is do data put into the ringbuffer in the dry-running state
750     return provideSilenceBlock(nbframes, 0);
751 }
752
753 bool
754 StreamProcessor::dropFrames(unsigned int nbframes, int64_t ts)
755 {
756     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d, %lld)\n", nbframes, ts);
757     return m_data_buffer->dropFrames(nbframes);
758 }
759
760 bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts)
761 {
762     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.putFrames(%d, %11llu)", nbframes, ts);
763     assert( getType() == ePT_Transmit );
764     if(isDryRunning()) return putFramesDry(nbframes, ts);
765     else return putFramesWet(nbframes, ts);
766 }
767
768 bool
769 StreamProcessor::putFramesWet(unsigned int nbframes, int64_t ts)
770 {
771     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesWet(%d, %llu)\n", nbframes, ts);
772     // transfer the data
773     m_data_buffer->blockProcessWriteFrames(nbframes, ts);
774     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, " New timestamp: %llu\n", ts);
775     return true; // FIXME: what about failure?
776 }
777
778 bool
779 StreamProcessor::putFramesDry(unsigned int nbframes, int64_t ts)
780 {
781     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesDry(%d, %llu)\n", nbframes, ts);
782     // do nothing
783     return true;
784 }
785
786 bool
787 StreamProcessor::putSilenceFrames(unsigned int nbframes, int64_t ts)
788 {
789     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putSilenceFrames(%d, %llu)\n", nbframes, ts);
790
791     size_t bytes_per_frame = getEventSize() * getEventsPerFrame();
792     unsigned int scratch_buffer_size_frames = m_scratch_buffer_size_bytes / bytes_per_frame;
793
794     if (nbframes > scratch_buffer_size_frames) {
795         debugError("nframes (%u) > scratch_buffer_size_frames (%u)\n",
796                    nbframes, scratch_buffer_size_frames);
797     }
798
799     assert(m_scratch_buffer);
800     if(!transmitSilenceBlock((char *)m_scratch_buffer, nbframes, 0)) {
801         debugError("Could not prepare silent block\n");
802         return false;
803     }
804     if(!m_data_buffer->writeFrames(nbframes, (char *)m_scratch_buffer, ts)) {
805         debugError("Could not write silent block\n");
806         return false;
807     }
808     return true;
809 }
810
811 bool
812 StreamProcessor::shiftStream(int nbframes)
813 {
814     if(nbframes == 0) return true;
815     if(nbframes > 0) {
816         return m_data_buffer->dropFrames(nbframes);
817     } else {
818         bool result = true;
819         while(nbframes++) {
820             result &= m_data_buffer->writeDummyFrame();
821         }
822         return result;
823     }
824 }
825
826 /**
827  * @brief write silence events to the stream ringbuffers.
828  */
829 bool StreamProcessor::provideSilenceBlock(unsigned int nevents, unsigned int offset)
830 {
831     bool no_problem=true;
832     for ( PortVectorIterator it = m_PeriodPorts.begin();
833           it != m_PeriodPorts.end();
834           ++it ) {
835         if((*it)->isDisabled()) {continue;};
836
837         //FIXME: make this into a static_cast when not DEBUG?
838         Port *port=dynamic_cast<Port *>(*it);
839
840         switch(port->getPortType()) {
841
842         case Port::E_Audio:
843             if(provideSilenceToPort(static_cast<AudioPort *>(*it), offset, nevents)) {
844                 debugWarning("Could not put silence into to port %s",(*it)->getName().c_str());
845                 no_problem=false;
846             }
847             break;
848         // midi is a packet based port, don't process
849         //    case MotuPortInfo::E_Midi:
850         //        break;
851
852         default: // ignore
853             break;
854         }
855     }
856     return no_problem;
857 }
858
859 int
860 StreamProcessor::provideSilenceToPort(
861                        AudioPort *p, unsigned int offset, unsigned int nevents)
862 {
863     unsigned int j=0;
864     switch(p->getDataType()) {
865         default:
866         case Port::E_Int24:
867             {
868                 quadlet_t *buffer=(quadlet_t *)(p->getBufferAddress());
869                 assert(nevents + offset <= p->getBufferSize());
870                 buffer+=offset;
871
872                 for(j = 0; j < nevents; j += 1) { // decode max nsamples
873                     *(buffer)=0;
874                     buffer++;
875                 }
876             }
877             break;
878         case Port::E_Float:
879             {
880                 float *buffer=(float *)(p->getBufferAddress());
881                 assert(nevents + offset <= p->getBufferSize());
882                 buffer+=offset;
883
884                 for(j = 0; j < nevents; j += 1) { // decode max nsamples
885                     *buffer = 0.0;
886                     buffer++;
887                 }
888             }
889             break;
890     }
891     return 0;
892 }
893
894 /***********************************************
895  * State related API                           *
896  ***********************************************/
897 bool StreamProcessor::init()
898 {
899     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "init...\n");
900
901     if(!m_IsoHandlerManager.registerStream(this)) {
902         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register stream processor with the Iso manager\n");
903         return false;
904     }
905     if(!m_StreamProcessorManager.registerProcessor(this)) {
906         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register stream processor with the SP manager\n");
907         return false;
908     }
909
910     // initialization can be done without requesting it
911     // from the packet loop
912     m_next_state = ePS_Created;
913     return true;
914 }
915
916 bool StreamProcessor::prepare()
917 {
918     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare SP (%p)...\n", this);
919
920     // make the scratch buffer one period of frames long
921     m_scratch_buffer_size_bytes = m_StreamProcessorManager.getPeriodSize() * getEventsPerFrame() * getEventSize();
922     debugOutput( DEBUG_LEVEL_VERBOSE, " Allocate scratch buffer of %d quadlets\n");
923     if(m_scratch_buffer) delete[] m_scratch_buffer;
924     m_scratch_buffer = new byte_t[m_scratch_buffer_size_bytes];
925     if(m_scratch_buffer == NULL) {
926         debugFatal("Could not allocate scratch buffer\n");
927         return false;
928     }
929
930     if (!prepareChild()) {
931         debugFatal("Could not prepare child\n");
932         return false;
933     }
934
935     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepared for:\n");
936     debugOutput( DEBUG_LEVEL_VERBOSE, " Samplerate: %d\n",
937              m_StreamProcessorManager.getNominalRate());
938     debugOutput( DEBUG_LEVEL_VERBOSE, " PeriodSize: %d, NbBuffers: %d\n",
939              m_StreamProcessorManager.getPeriodSize(), m_StreamProcessorManager.getNbBuffers());
940     debugOutput( DEBUG_LEVEL_VERBOSE, " Port: %d, Channel: %d\n",
941              m_1394service.getPort(), m_channel);
942
943     // initialization can be done without requesting it
944     // from the packet loop
945     m_next_state = ePS_Stopped;
946     return updateState();
947 }
948
949 bool
950 StreamProcessor::scheduleStateTransition(enum eProcessorState state, uint64_t time_instant)
951 {
952     // first set the time, since in the packet loop we first check m_state == m_next_state before
953     // using the time
954     m_cycle_to_switch_state = TICKS_TO_CYCLES(time_instant);
955     m_next_state = state;
956     return true;
957 }
958
959 bool
960 StreamProcessor::waitForState(enum eProcessorState state, unsigned int timeout_ms)
961 {
962     debugOutput(DEBUG_LEVEL_VERBOSE, "Waiting for state %s\n", ePSToString(state));
963     int cnt = timeout_ms;
964     while (m_state != state && cnt) {
965         SleepRelativeUsec(1000);
966         cnt--;
967     }
968     if(cnt==0) {
969         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout\n");
970         return false;
971     }
972     return true;
973 }
974
975 bool StreamProcessor::scheduleStartDryRunning(int64_t t) {
976     uint64_t tx;
977     if (t < 0) {
978         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
979     } else {
980         tx = t;
981     }
982     uint64_t start_handler_ticks = substractTicks(tx, 100 * TICKS_PER_CYCLE);
983
984     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
985     uint64_t now = m_1394service.getCycleTimerTicks();
986     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
987                           now,
988                           (unsigned int)TICKS_TO_SECS(now),
989                           (unsigned int)TICKS_TO_CYCLES(now),
990                           (unsigned int)TICKS_TO_OFFSET(now));
991     debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%03us %04uc %04ut)\n",
992                           tx,
993                           (unsigned int)TICKS_TO_SECS(tx),
994                           (unsigned int)TICKS_TO_CYCLES(tx),
995                           (unsigned int)TICKS_TO_OFFSET(tx));
996     if (m_state == ePS_Stopped) {
997         if(!m_IsoHandlerManager.startHandlerForStream(
998                                         this, TICKS_TO_CYCLES(start_handler_ticks))) {
999             debugError("Could not start handler for SP %p\n", this);
1000             return false;
1001         }
1002         return scheduleStateTransition(ePS_WaitingForStream, tx);
1003     } else if (m_state == ePS_Running) {
1004         return scheduleStateTransition(ePS_WaitingForStreamDisable, tx);
1005     } else {
1006         debugError("Cannot switch to ePS_DryRunning from %s\n", ePSToString(m_state));
1007         return false;
1008     }
1009 }
1010
1011 bool StreamProcessor::scheduleStartRunning(int64_t t) {
1012     uint64_t tx;
1013     if (t < 0) {
1014         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
1015     } else {
1016         tx = t;
1017     }
1018     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1019     uint64_t now = m_1394service.getCycleTimerTicks();
1020     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1021                           now,
1022                           (unsigned int)TICKS_TO_SECS(now),
1023                           (unsigned int)TICKS_TO_CYCLES(now),
1024                           (unsigned int)TICKS_TO_OFFSET(now));
1025     debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%03us %04uc %04ut)\n",
1026                           tx,
1027                           (unsigned int)TICKS_TO_SECS(tx),
1028                           (unsigned int)TICKS_TO_CYCLES(tx),
1029                           (unsigned int)TICKS_TO_OFFSET(tx));
1030     return scheduleStateTransition(ePS_WaitingForStreamEnable, tx);
1031 }
1032
1033 bool StreamProcessor::scheduleStopDryRunning(int64_t t) {
1034     uint64_t tx;
1035     if (t < 0) {
1036         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
1037     } else {
1038         tx = t;
1039     }
1040     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1041     uint64_t now = m_1394service.getCycleTimerTicks();
1042     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1043                           now,
1044                           (unsigned int)TICKS_TO_SECS(now),
1045                           (unsigned int)TICKS_TO_CYCLES(now),
1046                           (unsigned int)TICKS_TO_OFFSET(now));
1047     debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at               : %011llu (%03us %04uc %04ut)\n",
1048                           tx,
1049                           (unsigned int)TICKS_TO_SECS(tx),
1050                           (unsigned int)TICKS_TO_CYCLES(tx),
1051                           (unsigned int)TICKS_TO_OFFSET(tx));
1052
1053     return scheduleStateTransition(ePS_Stopped, tx);
1054 }
1055
1056 bool StreamProcessor::scheduleStopRunning(int64_t t) {
1057     uint64_t tx;
1058     if (t < 0) {
1059         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
1060     } else {
1061         tx = t;
1062     }
1063     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1064     uint64_t now = m_1394service.getCycleTimerTicks();
1065     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1066                           now,
1067                           (unsigned int)TICKS_TO_SECS(now),
1068                           (unsigned int)TICKS_TO_CYCLES(now),
1069                           (unsigned int)TICKS_TO_OFFSET(now));
1070     debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at               : %011llu (%03us %04uc %04ut)\n",
1071                           tx,
1072                           (unsigned int)TICKS_TO_SECS(tx),
1073                           (unsigned int)TICKS_TO_CYCLES(tx),
1074                           (unsigned int)TICKS_TO_OFFSET(tx));
1075     return scheduleStateTransition(ePS_WaitingForStreamDisable, tx);
1076 }
1077
1078 bool StreamProcessor::startDryRunning(int64_t t) {
1079     if(!scheduleStartDryRunning(t)) {
1080         debugError("Could not schedule transition\n");
1081         return false;
1082     }
1083     if(!waitForState(ePS_DryRunning, 2000)) {
1084         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning));
1085         return false;
1086     }
1087     return true;
1088 }
1089
1090 bool StreamProcessor::startRunning(int64_t t) {
1091     if(!scheduleStartRunning(t)) {
1092         debugError("Could not schedule transition\n");
1093         return false;
1094     }
1095     if(!waitForState(ePS_Running, 2000)) {
1096         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Running));
1097         return false;
1098     }
1099     return true;
1100 }
1101
1102 bool StreamProcessor::stopDryRunning(int64_t t) {
1103     if(!scheduleStopDryRunning(t)) {
1104         debugError("Could not schedule transition\n");
1105         return false;
1106     }
1107     if(!waitForState(ePS_Stopped, 2000)) {
1108         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Stopped));
1109         return false;
1110     }
1111     return true;
1112 }
1113
1114 bool StreamProcessor::stopRunning(int64_t t) {
1115     if(!scheduleStopRunning(t)) {
1116         debugError("Could not schedule transition\n");
1117         return false;
1118     }
1119     if(!waitForState(ePS_DryRunning, 2000)) {
1120         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning));
1121         return false;
1122     }
1123     return true;
1124 }
1125
1126
1127 // internal state API
1128
1129 /**
1130  * @brief Enter the ePS_Stopped state
1131  * @return true if successful, false if not
1132  *
1133  * @pre none
1134  *
1135  * @post the buffer and the isostream are ready for use.
1136  * @post all dynamic structures have been allocated successfully
1137  * @post the buffer is transparent and empty, and all parameters are set
1138  *       to the correct initial/nominal values.
1139  *
1140  */
1141 bool
1142 StreamProcessor::doStop()
1143 {
1144     float ticks_per_frame;
1145     unsigned int ringbuffer_size_frames = (m_StreamProcessorManager.getNbBuffers() + 1) * m_StreamProcessorManager.getPeriodSize();
1146
1147     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1148     bool result = true;
1149
1150     switch(m_state) {
1151         case ePS_Created:
1152             assert(m_data_buffer);
1153             // object just created
1154             result = m_data_buffer->init();
1155
1156             // prepare the framerate estimate
1157             ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_StreamProcessorManager.getNominalRate());
1158             m_ticks_per_frame = ticks_per_frame;
1159             debugOutput(DEBUG_LEVEL_VERBOSE,"Initializing remote ticks/frame to %f\n", ticks_per_frame);
1160
1161             // initialize internal buffer
1162             result &= m_data_buffer->setBufferSize(ringbuffer_size_frames);
1163
1164             result &= m_data_buffer->setEventSize( getEventSize() );
1165             result &= m_data_buffer->setEventsPerFrame( getEventsPerFrame() );
1166             if(getType() == ePT_Receive) {
1167                 result &= m_data_buffer->setUpdatePeriod( getNominalFramesPerPacket() );
1168             } else {
1169                 result &= m_data_buffer->setUpdatePeriod( m_StreamProcessorManager.getPeriodSize() );
1170             }
1171             result &= m_data_buffer->setNominalRate(ticks_per_frame);
1172             result &= m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND);
1173             result &= m_data_buffer->prepare(); // FIXME: the name
1174
1175             // set the parameters of ports we can:
1176             // we want the audio ports to be period buffered,
1177             // and the midi ports to be packet buffered
1178             for ( PortVectorIterator it = m_Ports.begin();
1179                 it != m_Ports.end();
1180                 ++it )
1181             {
1182                 debugOutput(DEBUG_LEVEL_VERBOSE, "Setting up port %s\n",(*it)->getName().c_str());
1183                 if(!(*it)->setBufferSize(m_StreamProcessorManager.getPeriodSize())) {
1184                     debugFatal("Could not set buffer size to %d\n",m_StreamProcessorManager.getPeriodSize());
1185                     return false;
1186                 }
1187                 switch ((*it)->getPortType()) {
1188                     case Port::E_Audio:
1189                         if(!(*it)->setSignalType(Port::E_PeriodSignalled)) {
1190                             debugFatal("Could not set signal type to PeriodSignalling");
1191                             return false;
1192                         }
1193                         // buffertype and datatype are dependant on the API
1194                         debugWarning("---------------- ! Doing hardcoded dummy setup ! --------------\n");
1195                         // buffertype and datatype are dependant on the API
1196                         if(!(*it)->setBufferType(Port::E_PointerBuffer)) {
1197                             debugFatal("Could not set buffer type");
1198                             return false;
1199                         }
1200                         if(!(*it)->useExternalBuffer(true)) {
1201                             debugFatal("Could not set external buffer usage");
1202                             return false;
1203                         }
1204                         if(!(*it)->setDataType(Port::E_Float)) {
1205                             debugFatal("Could not set data type");
1206                             return false;
1207                         }
1208                         break;
1209                     case Port::E_Midi:
1210                         if(!(*it)->setSignalType(Port::E_PacketSignalled)) {
1211                             debugFatal("Could not set signal type to PacketSignalling");
1212                             return false;
1213                         }
1214                         // buffertype and datatype are dependant on the API
1215                         debugWarning("---------------- ! Doing hardcoded test setup ! --------------\n");
1216                         // buffertype and datatype are dependant on the API
1217                         if(!(*it)->setBufferType(Port::E_RingBuffer)) {
1218                             debugFatal("Could not set buffer type");
1219                             return false;
1220                         }
1221                         if(!(*it)->setDataType(Port::E_MidiEvent)) {
1222                             debugFatal("Could not set data type");
1223                             return false;
1224                         }
1225                         break;
1226                     default:
1227                         debugWarning("Unsupported port type specified\n");
1228                         break;
1229                 }
1230             }
1231             // the API specific settings of the ports should already be set,
1232             // as this is called from the processorManager->prepare()
1233             // so we can init the ports
1234             result &= PortManager::initPorts();
1235
1236             break;
1237         case ePS_DryRunning:
1238             if(!m_IsoHandlerManager.stopHandlerForStream(this)) {
1239                 debugError("Could not stop handler for SP %p\n", this);
1240                 return false;
1241             }
1242             break;
1243         default:
1244             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1245             return false;
1246     }
1247
1248     result &= m_data_buffer->clearBuffer(); // FIXME: don't like the reset() name
1249     // make the buffer transparent
1250     m_data_buffer->setTransparent(true);
1251
1252     // reset all ports
1253     result &= PortManager::preparePorts();
1254
1255     m_state = ePS_Stopped;
1256     #ifdef DEBUG
1257     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1258         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1259         dumpInfo();
1260     }
1261     #endif
1262     return result;
1263 }
1264
1265 /**
1266  * @brief Enter the ePS_WaitingForStream state
1267  * @return true if successful, false if not
1268  *
1269  * @pre all dynamic data structures are allocated successfully
1270  *
1271  * @post
1272  *
1273  */
1274 bool
1275 StreamProcessor::doWaitForRunningStream()
1276 {
1277     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1278     switch(m_state) {
1279         case ePS_Stopped:
1280             // we have to start waiting for an incoming stream
1281             // this basically means nothing, the state change will
1282             // be picked up by the packet iterator
1283             break;
1284         default:
1285             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1286             return false;
1287     }
1288     m_state = ePS_WaitingForStream;
1289     #ifdef DEBUG
1290     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1291         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1292         dumpInfo();
1293     }
1294     #endif
1295     return true;
1296 }
1297
1298 /**
1299  * @brief Enter the ePS_DryRunning state
1300  * @return true if successful, false if not
1301  *
1302  * @pre
1303  *
1304  * @post
1305  *
1306  */
1307 bool
1308 StreamProcessor::doDryRunning()
1309 {
1310     bool result = true;
1311     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1312     switch(m_state) {
1313         case ePS_WaitingForStream:
1314             // a running stream has been detected
1315             debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started dry-running at cycle %d\n", this, m_last_cycle);
1316             if (getType() == ePT_Receive) {
1317                 // this to ensure that there is no discontinuity when starting to
1318                 // update the DLL based upon the received packets
1319                 m_data_buffer->setBufferTailTimestamp(m_last_timestamp);
1320             } else {
1321                 // FIXME: PC=master mode will have to do something here I guess...
1322             }
1323             break;
1324         case ePS_WaitingForStreamEnable: // when xrunning at startup
1325             result &= m_data_buffer->clearBuffer();
1326             m_data_buffer->setTransparent(true);
1327             break;
1328         case ePS_WaitingForStreamDisable:
1329             result &= m_data_buffer->clearBuffer();
1330             m_data_buffer->setTransparent(true);
1331             break;
1332         default:
1333             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1334             return false;
1335     }
1336     m_state = ePS_DryRunning;
1337     #ifdef DEBUG
1338     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1339         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1340         dumpInfo();
1341     }
1342     #endif
1343     return result;
1344 }
1345
1346 /**
1347  * @brief Enter the ePS_WaitingForStreamEnable state
1348  * @return true if successful, false if not
1349  *
1350  * @pre
1351  *
1352  * @post
1353  *
1354  */
1355 bool
1356 StreamProcessor::doWaitForStreamEnable()
1357 {
1358     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1359     unsigned int ringbuffer_size_frames;
1360     switch(m_state) {
1361         case ePS_DryRunning:
1362             // we have to start waiting for an incoming stream
1363             // this basically means nothing, the state change will
1364             // be picked up by the packet iterator
1365
1366             if(!m_data_buffer->clearBuffer()) { // FIXME: don't like the reset() name
1367                 debugError("Could not reset data buffer\n");
1368                 return false;
1369             }
1370             if (getType() == ePT_Transmit) {
1371                 ringbuffer_size_frames = m_StreamProcessorManager.getNbBuffers() * m_StreamProcessorManager.getPeriodSize();
1372                 debugOutput(DEBUG_LEVEL_VERBOSE, "Prefill transmit SP %p with %u frames\n", this, ringbuffer_size_frames);
1373                 // prefill the buffer
1374                 if(!transferSilence(ringbuffer_size_frames)) {
1375                     debugFatal("Could not prefill transmit stream\n");
1376                     return false;
1377                 }
1378             }
1379
1380             break;
1381         default:
1382             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1383             return false;
1384     }
1385     m_state = ePS_WaitingForStreamEnable;
1386     #ifdef DEBUG
1387     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1388         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1389         dumpInfo();
1390     }
1391     #endif
1392     return true;
1393 }
1394
1395 /**
1396  * @brief Enter the ePS_Running state
1397  * @return true if successful, false if not
1398  *
1399  * @pre
1400  *
1401  * @post
1402  *
1403  */
1404 bool
1405 StreamProcessor::doRunning()
1406 {
1407     bool result = true;
1408     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1409     switch(m_state) {
1410         case ePS_WaitingForStreamEnable:
1411             // a running stream has been detected
1412             debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started running at cycle %d\n",
1413                                              this, m_last_cycle);
1414             m_in_xrun = false;
1415             m_data_buffer->setTransparent(false);
1416             break;
1417         default:
1418             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1419             return false;
1420     }
1421     m_state = ePS_Running;
1422     #ifdef DEBUG
1423     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1424         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1425         dumpInfo();
1426     }
1427     #endif
1428     return result;
1429 }
1430
1431 /**
1432  * @brief Enter the ePS_WaitingForStreamDisable state
1433  * @return true if successful, false if not
1434  *
1435  * @pre
1436  *
1437  * @post
1438  *
1439  */
1440 bool
1441 StreamProcessor::doWaitForStreamDisable()
1442 {
1443     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1444     switch(m_state) {
1445         case ePS_Running:
1446             // the thread will do the transition
1447             break;
1448         default:
1449             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1450             return false;
1451     }
1452     m_state = ePS_WaitingForStreamDisable;
1453     #ifdef DEBUG
1454     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1455         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1456         dumpInfo();
1457     }
1458     #endif
1459     return true;
1460 }
1461
1462 /**
1463  * @brief Updates the state machine and calls the necessary transition functions
1464  * @return true if successful, false if not
1465  */
1466 bool StreamProcessor::updateState() {
1467     bool result = false;
1468     // copy the current state locally since it could change value,
1469     // and that's something we don't want to happen inbetween tests
1470     // if m_next_state changes during this routine, we know for sure
1471     // that the previous state change was at least attempted correctly.
1472     enum eProcessorState next_state = m_next_state;
1473
1474     debugOutput(DEBUG_LEVEL_VERBOSE, "Do state transition: %s => %s\n",
1475         ePSToString(m_state), ePSToString(next_state));
1476
1477     if (m_state == next_state) {
1478         debugWarning("ignoring identity state update from/to %s\n", ePSToString(m_state) );
1479         return true;
1480     }
1481
1482     // after creation, only initialization is allowed
1483     if (m_state == ePS_Created) {
1484         if(next_state != ePS_Stopped) {
1485             goto updateState_exit_with_error;
1486         }
1487         // do init here
1488         result = doStop();
1489         if (result) return true;
1490         else goto updateState_exit_change_failed;
1491     }
1492
1493     // after initialization, only WaitingForRunningStream is allowed
1494     if (m_state == ePS_Stopped) {
1495         if(next_state != ePS_WaitingForStream) {
1496             goto updateState_exit_with_error;
1497         }
1498         result = doWaitForRunningStream();
1499         if (result) return true;
1500         else goto updateState_exit_change_failed;
1501     }
1502
1503     // after WaitingForStream, only ePS_DryRunning is allowed
1504     // this means that the stream started running
1505     if (m_state == ePS_WaitingForStream) {
1506         if(next_state != ePS_DryRunning) {
1507             goto updateState_exit_with_error;
1508         }
1509         result = doDryRunning();
1510         if (result) return true;
1511         else goto updateState_exit_change_failed;
1512     }
1513
1514     // from ePS_DryRunning we can go to:
1515     //   - ePS_Stopped if something went wrong during DryRunning
1516     //   - ePS_WaitingForStreamEnable if there is a requested to enable
1517     if (m_state == ePS_DryRunning) {
1518         if((next_state != ePS_Stopped) &&
1519            (next_state != ePS_WaitingForStreamEnable)) {
1520             goto updateState_exit_with_error;
1521         }
1522         if (next_state == ePS_Stopped) {
1523             result = doStop();
1524         } else {
1525             result = doWaitForStreamEnable();
1526         }
1527         if (result) return true;
1528         else goto updateState_exit_change_failed;
1529     }
1530
1531     // from ePS_WaitingForStreamEnable we can go to:
1532     //   - ePS_DryRunning if something went wrong while waiting
1533     //   - ePS_Running if the stream enabled correctly
1534     if (m_state == ePS_WaitingForStreamEnable) {
1535         if((next_state != ePS_DryRunning) &&
1536            (next_state != ePS_Running)) {
1537             goto updateState_exit_with_error;
1538         }
1539         if (next_state == ePS_Stopped) {
1540             result = doDryRunning();
1541         } else {
1542             result = doRunning();
1543         }
1544         if (result) return true;
1545         else goto updateState_exit_change_failed;
1546     }
1547
1548     // from ePS_Running we can only start waiting for a disabled stream
1549     if (m_state == ePS_Running) {
1550         if(next_state != ePS_WaitingForStreamDisable) {
1551             goto updateState_exit_with_error;
1552         }
1553         result = doWaitForStreamDisable();
1554         if (result) return true;
1555         else goto updateState_exit_change_failed;
1556     }
1557
1558     // from ePS_WaitingForStreamDisable we can go to DryRunning
1559     if (m_state == ePS_WaitingForStreamDisable) {
1560         if(next_state != ePS_DryRunning) {
1561             goto updateState_exit_with_error;
1562         }
1563         result = doDryRunning();
1564         if (result) return true;
1565         else goto updateState_exit_change_failed;
1566     }
1567
1568     // if we arrive here there is an error
1569 updateState_exit_with_error:
1570     debugError("Invalid state transition: %s => %s\n",
1571         ePSToString(m_state), ePSToString(next_state));
1572     return false;
1573 updateState_exit_change_failed:
1574     debugError("State transition failed: %s => %s\n",
1575         ePSToString(m_state), ePSToString(next_state));
1576     return false;
1577 }
1578
1579 /***********************************************
1580  * Helper routines                             *
1581  ***********************************************/
1582 bool
1583 StreamProcessor::transferSilence(unsigned int nframes)
1584 {
1585     bool retval;
1586     signed int fc;
1587     ffado_timestamp_t ts_tail_tmp;
1588
1589     // prepare a buffer of silence
1590     char *dummybuffer = (char *)calloc(getEventSize(), nframes * getEventsPerFrame());
1591     transmitSilenceBlock(dummybuffer, nframes, 0);
1592
1593     m_data_buffer->getBufferTailTimestamp(&ts_tail_tmp, &fc);
1594     if (fc != 0) {
1595         debugWarning("Prefilling a buffer that already contains %d frames\n", fc);
1596     }
1597
1598     // add the silence data to the ringbuffer
1599     if(m_data_buffer->preloadFrames(nframes, dummybuffer, true)) {
1600         retval = true;
1601     } else {
1602         debugWarning("Could not write to event buffer\n");
1603         retval = false;
1604     }
1605     free(dummybuffer);
1606     return retval;
1607 }
1608
1609 /**
1610  * @brief convert a eProcessorState to a string
1611  * @param s the state
1612  * @return a char * describing the state
1613  */
1614 const char *
1615 StreamProcessor::ePSToString(enum eProcessorState s) {
1616     switch (s) {
1617         case ePS_Invalid: return "ePS_Invalid";
1618         case ePS_Created: return "ePS_Created";
1619         case ePS_Stopped: return "ePS_Stopped";
1620         case ePS_WaitingForStream: return "ePS_WaitingForStream";
1621         case ePS_DryRunning: return "ePS_DryRunning";
1622         case ePS_WaitingForStreamEnable: return "ePS_WaitingForStreamEnable";
1623         case ePS_Running: return "ePS_Running";
1624         case ePS_WaitingForStreamDisable: return "ePS_WaitingForStreamDisable";
1625         default: return "error: unknown state";
1626     }
1627 }
1628
1629 /**
1630  * @brief convert a eProcessorType to a string
1631  * @param t the type
1632  * @return a char * describing the state
1633  */
1634 const char *
1635 StreamProcessor::ePTToString(enum eProcessorType t) {
1636     switch (t) {
1637         case ePT_Receive: return "Receive";
1638         case ePT_Transmit: return "Transmit";
1639         default: return "error: unknown type";
1640     }
1641 }
1642
1643 /***********************************************
1644  * Debug                                       *
1645  ***********************************************/
1646 void
1647 StreamProcessor::dumpInfo()
1648 {
1649     debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor %p:\n", this);
1650     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Port, Channel  : %d, %d\n", m_1394service.getPort(), m_channel);
1651     uint64_t now = m_1394service.getCycleTimerTicks();
1652     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Now                   : %011llu (%03us %04uc %04ut)\n",
1653                         now,
1654                         (unsigned int)TICKS_TO_SECS(now),
1655                         (unsigned int)TICKS_TO_CYCLES(now),
1656                         (unsigned int)TICKS_TO_OFFSET(now));
1657     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Xrun?                 : %s\n", (m_in_xrun ? "True":"False"));
1658     if (m_state == m_next_state) {
1659         debugOutputShort( DEBUG_LEVEL_NORMAL, "  State                 : %s\n",
1660                                             ePSToString(m_state));
1661     } else {
1662         debugOutputShort( DEBUG_LEVEL_NORMAL, "  State                 : %s (Next: %s)\n",
1663                                               ePSToString(m_state), ePSToString(m_next_state));
1664         debugOutputShort( DEBUG_LEVEL_NORMAL, "    transition at       : %u\n", m_cycle_to_switch_state);
1665     }
1666     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer                : %p\n", m_data_buffer);
1667     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Framerate             : Nominal: %u, Sync: %f, Buffer %f\n",
1668                                           m_StreamProcessorManager.getNominalRate(),
1669                                           24576000.0/m_StreamProcessorManager.getSyncSource().m_data_buffer->getRate(),
1670                                           24576000.0/m_data_buffer->getRate());
1671     float d = getSyncDelay();
1672     debugOutputShort(DEBUG_LEVEL_NORMAL, "  Sync delay             : %f ticks (%f frames, %f cy)\n",
1673                                          d, d/getTicksPerFrame(),
1674                                          d/((float)TICKS_PER_CYCLE));
1675     m_data_buffer->dumpInfo();
1676 }
1677
1678 void
1679 StreamProcessor::setVerboseLevel(int l) {
1680     setDebugLevel(l);
1681     PortManager::setVerboseLevel(l);
1682     m_data_buffer->setVerboseLevel(l);
1683 }
1684
1685 } // end of namespace
Note: See TracBrowser for help on using the browser.