root/trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp

Revision 776, 61.0 kB (checked in by ppalmers, 13 years ago)

try to fix deadlock / performace issues

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "StreamProcessor.h"
25 #include "../StreamProcessorManager.h"
26
27 #include "devicemanager.h"
28
29 #include "libieee1394/ieee1394service.h"
30 #include "libieee1394/IsoHandlerManager.h"
31 #include "libieee1394/cycletimer.h"
32
33 #include "libutil/Atomic.h"
34
35 #include <assert.h>
36 #include <math.h>
37
38 namespace Streaming {
39
40 IMPL_DEBUG_MODULE( StreamProcessor, StreamProcessor, DEBUG_LEVEL_VERBOSE );
41
42 StreamProcessor::StreamProcessor(FFADODevice &parent, enum eProcessorType type)
43     : m_processor_type ( type )
44     , m_state( ePS_Created )
45     , m_next_state( ePS_Invalid )
46     , m_cycle_to_switch_state( 0 )
47     , m_Parent( parent )
48     , m_1394service( parent.get1394Service() ) // local cache
49     , m_IsoHandlerManager( parent.get1394Service().getIsoHandlerManager() ) // local cache
50     , m_StreamProcessorManager( m_Parent.getDeviceManager().getStreamProcessorManager() ) // local cache
51     , m_channel( -1 )
52     , m_dropped(0)
53     , m_last_timestamp(0)
54     , m_last_timestamp2(0)
55     , m_scratch_buffer( NULL )
56     , m_scratch_buffer_size_bytes( 0 )
57     , m_ticks_per_frame( 0 )
58     , m_last_cycle( -1 )
59     , m_sync_delay( 0 )
60     , m_in_xrun( false )
61 {
62     // create the timestamped buffer and register ourselves as its client
63     m_data_buffer = new Util::TimestampedBuffer(this);
64 }
65
66 StreamProcessor::~StreamProcessor() {
67     m_StreamProcessorManager.unregisterProcessor(this);
68     if(!m_IsoHandlerManager.unregisterStream(this)) {
69         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister stream processor with the Iso manager\n");
70     }
71
72     if (m_data_buffer) delete m_data_buffer;
73     if (m_scratch_buffer) delete[] m_scratch_buffer;
74 }
75
76 uint64_t StreamProcessor::getTimeNow() {
77     return m_1394service.getCycleTimerTicks();
78 }
79
80 int StreamProcessor::getMaxFrameLatency() {
81     if (getType() == ePT_Receive) {
82         return (int)(m_IsoHandlerManager.getPacketLatencyForStream( this ) * TICKS_PER_CYCLE);
83     } else {
84         return (int)(m_IsoHandlerManager.getPacketLatencyForStream( this ) * TICKS_PER_CYCLE);
85     }
86 }
87
88 unsigned int
89 StreamProcessor::getNominalPacketsNeeded(unsigned int nframes)
90 {
91     unsigned int nominal_frames_per_second
92                     = m_StreamProcessorManager.getNominalRate();
93     uint64_t nominal_ticks_per_frame = TICKS_PER_SECOND / nominal_frames_per_second;
94     uint64_t nominal_ticks = nominal_ticks_per_frame * nframes;
95     uint64_t nominal_packets = nominal_ticks / TICKS_PER_CYCLE;
96     return nominal_packets;
97 }
98
99 unsigned int
100 StreamProcessor::getPacketsPerPeriod()
101 {
102     return getNominalPacketsNeeded(m_StreamProcessorManager.getPeriodSize());
103 }
104
105 unsigned int
106 StreamProcessor::getNbPacketsIsoXmitBuffer()
107 {
108     // the target is to have all of the transmit buffer (at period transfer) as ISO packets
109     // when one period is received, there will be approx (NbBuffers - 1) * period_size frames
110     // in the transmit buffer (the others are still to be put into the xmit frame buffer)
111     unsigned int packets_to_prebuffer = (getPacketsPerPeriod() * (m_StreamProcessorManager.getNbBuffers()-1));
112    
113     // however we have to take into account the fact that there is some sync delay (unknown at this point)
114     packets_to_prebuffer -= 16; //FIXME: magic
115    
116     // only queue a part (80%) of the theoretical max in order not to have too much 'not ready' cycles
117     packets_to_prebuffer = (packets_to_prebuffer * 8000) / 10000;
118    
119     return packets_to_prebuffer;
120 }
121
122 /***********************************************
123  * Buffer management and manipulation          *
124  ***********************************************/
125 void StreamProcessor::flush() {
126     m_IsoHandlerManager.flushHandlerForStream(this);
127 }
128
129 int StreamProcessor::getBufferFill() {
130     return m_data_buffer->getBufferFill();
131 }
132
133 int64_t
134 StreamProcessor::getTimeUntilNextPeriodSignalUsecs()
135 {
136     uint64_t time_at_period=getTimeAtPeriod();
137
138     // we delay the period signal with the sync delay
139     // this makes that the period signals lag a little compared to reality
140     // ISO buffering causes the packets to be received at max
141     // m_handler->getWakeupInterval() later than the time they were received.
142     // hence their payload is available this amount of time later. However, the
143     // period boundary is predicted based upon earlier samples, and therefore can
144     // pass before these packets are processed. Adding this extra term makes that
145     // the period boundary is signalled later
146     time_at_period = addTicks(time_at_period, m_StreamProcessorManager.getSyncSource().getSyncDelay());
147
148     uint64_t cycle_timer=m_1394service.getCycleTimerTicks();
149
150     // calculate the time until the next period
151     int32_t until_next=diffTicks(time_at_period,cycle_timer);
152
153     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> TAP=%11llu, CTR=%11llu, UTN=%11ld\n",
154         time_at_period, cycle_timer, until_next
155         );
156
157     // now convert to usecs
158     // don't use the mapping function because it only works
159     // for absolute times, not the relative time we are
160     // using here (which can also be negative).
161     return (int64_t)(((float)until_next) / TICKS_PER_USEC);
162 }
163
164 void
165 StreamProcessor::setSyncDelay(int d) {
166     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "Setting SP %p SyncDelay to %d ticks\n", this, d);
167     m_sync_delay = d;
168 }
169
170 uint64_t
171 StreamProcessor::getTimeAtPeriodUsecs()
172 {
173     return (uint64_t)((float)getTimeAtPeriod() * TICKS_PER_USEC);
174 }
175
176 uint64_t
177 StreamProcessor::getTimeAtPeriod()
178 {
179     if (getType() == ePT_Receive) {
180         ffado_timestamp_t next_period_boundary=m_data_buffer->getTimestampFromHead(m_StreamProcessorManager.getPeriodSize());
181    
182         #ifdef DEBUG
183         ffado_timestamp_t ts;
184         signed int fc;
185         m_data_buffer->getBufferTailTimestamp(&ts,&fc);
186    
187         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD="TIMESTAMP_FORMAT_SPEC", LTS="TIMESTAMP_FORMAT_SPEC", FC=%5u, TPF=%f\n",
188             next_period_boundary, ts, fc, getTicksPerFrame()
189             );
190         #endif
191         return (uint64_t)next_period_boundary;
192     } else {
193         ffado_timestamp_t next_period_boundary=m_data_buffer->getTimestampFromTail((m_StreamProcessorManager.getNbBuffers()-1) * m_StreamProcessorManager.getPeriodSize());
194    
195         #ifdef DEBUG
196         ffado_timestamp_t ts;
197         signed int fc;
198         m_data_buffer->getBufferTailTimestamp(&ts,&fc);
199    
200         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD="TIMESTAMP_FORMAT_SPEC", LTS="TIMESTAMP_FORMAT_SPEC", FC=%5u, TPF=%f\n",
201             next_period_boundary, ts, fc, getTicksPerFrame()
202             );
203         #endif
204         return (uint64_t)next_period_boundary;
205     }
206 }
207
208 float
209 StreamProcessor::getTicksPerFrame()
210 {
211     assert(m_data_buffer != NULL);
212     return m_data_buffer->getRate();
213 }
214
215 bool
216 StreamProcessor::canClientTransferFrames(unsigned int nbframes)
217 {
218     bool can_transfer;
219     unsigned int fc = m_data_buffer->getFrameCounter();
220     if (getType() == ePT_Receive) {
221         can_transfer = (fc >= nbframes);
222     } else {
223         // there has to be enough space to put the frames in
224         can_transfer = m_data_buffer->getBufferSize() - fc > nbframes;
225         // or the buffer is transparent
226         can_transfer |= m_data_buffer->isTransparent();
227     }
228    
229     #ifdef DEBUG
230     if (!can_transfer) {
231         debugWarning("(%p, %s) cannot transfer since fc == %u, nbframes == %u\n",
232             this, ePTToString(getType()), fc, nbframes);
233     }
234     #endif
235    
236     return can_transfer;
237 }
238
239 /***********************************************
240  * I/O API                                     *
241  ***********************************************/
242
243 // Packet transfer API
244 enum raw1394_iso_disposition
245 StreamProcessor::putPacket(unsigned char *data, unsigned int length,
246                            unsigned char channel, unsigned char tag, unsigned char sy,
247                            unsigned int cycle, unsigned int dropped) {
248     if(m_last_cycle == -1) {
249         debugOutput(DEBUG_LEVEL_VERBOSE, "Handler for %s SP %p is alive (cycle = %u)\n", getTypeString(), this, cycle);
250     }
251
252     int dropped_cycles = 0;
253     if (m_last_cycle != (int)cycle && m_last_cycle != -1) {
254         dropped_cycles = diffCycles(cycle, m_last_cycle) - 1;
255         if (dropped_cycles < 0) {
256             debugWarning("(%p) dropped < 1 (%d), cycle: %d, last_cycle: %d, dropped: %d\n",
257                          this, dropped_cycles, cycle, m_last_cycle, dropped);
258         }
259         if (dropped_cycles > 0) {
260             debugWarning("(%p) dropped %d packets on cycle %u, 'dropped'=%u, cycle=%d, m_last_cycle=%d\n",
261                 this, dropped_cycles, cycle, dropped, cycle, m_last_cycle);
262             m_dropped += dropped_cycles;
263             m_in_xrun = true;
264             //flushDebugOutput();
265             //assert(0);
266         }
267     }
268     m_last_cycle = cycle;
269
270     // bypass based upon state
271     if (m_state == ePS_Invalid) {
272         debugError("Should not have state %s\n", ePSToString(m_state) );
273         return RAW1394_ISO_ERROR;
274     }
275     if (m_state == ePS_Created) {
276         return RAW1394_ISO_DEFER;
277     }
278
279     // store the previous timestamp
280     m_last_timestamp2 = m_last_timestamp;
281
282     // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles)
283     //       it happens on the first 'good' cycle for the wait condition
284     //       or on the first received cycle that is received afterwards (might be a problem)
285
286     // check whether we are waiting for a stream to be disabled
287     if(m_state == ePS_WaitingForStreamDisable) {
288         // we then check whether we have to switch on this cycle
289         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
290             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n");
291             m_next_state = ePS_DryRunning;
292             if (!updateState()) { // we are allowed to change the state directly
293                 debugError("Could not update state!\n");
294                 return RAW1394_ISO_ERROR;
295             }
296         } else {
297             // not time to disable yet
298         }
299         // the received data can be discarded while waiting for the stream
300         // to be disabled
301         return RAW1394_ISO_OK;
302     }
303
304     // check whether we are waiting for a stream to be enabled
305     else if(m_state == ePS_WaitingForStreamEnable) {
306         // we then check whether we have to switch on this cycle
307         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
308             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n");
309             m_next_state = ePS_Running;
310             if (!updateState()) { // we are allowed to change the state directly
311                 debugError("Could not update state!\n");
312                 return RAW1394_ISO_ERROR;
313             }
314         } else {
315             // not time to enable yet
316         }
317         // we are dryRunning hence data should be processed in any case
318     }
319
320     // check the packet header
321     enum eChildReturnValue result = processPacketHeader(data, length, channel, tag, sy, cycle, dropped_cycles);
322     if (result == eCRV_OK) {
323         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "RECV: CY=%04u TS=%011llu\n",
324                 cycle, m_last_timestamp);
325         // update some accounting
326         m_last_good_cycle = cycle;
327         m_last_dropped = dropped_cycles;
328
329         // check whether we are waiting for a stream to startup
330         // this requires that the packet is good
331         if(m_state == ePS_WaitingForStream) {
332             // since we have a packet with an OK header,
333             // we can indicate that the stream started up
334
335             // we then check whether we have to switch on this cycle
336             if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
337                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to good packet\n");
338                 // hence go to the dryRunning state
339                 m_next_state = ePS_DryRunning;
340                 if (!updateState()) { // we are allowed to change the state directly
341                     debugError("Could not update state!\n");
342                     return RAW1394_ISO_ERROR;
343                 }
344             } else {
345                 // not time (yet) to switch state
346             }
347             // in both cases we don't want to process the data
348             return RAW1394_ISO_OK;
349         }
350
351         // check whether a state change has been requested
352         // note that only the wait state changes are synchronized with the cycles
353         else if(m_state != m_next_state) {
354             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
355                                              ePSToString(m_state), ePSToString(m_next_state));
356             // execute the requested change
357             if (!updateState()) { // we are allowed to change the state directly
358                 debugError("Could not update state!\n");
359                 return RAW1394_ISO_ERROR;
360             }
361         }
362
363         // handle dropped cycles
364         if(dropped_cycles) {
365             // they represent a discontinuity in the timestamps, and hence are
366             // to be dealt with
367             debugWarning("(%p) Correcting timestamp for dropped cycles, discarding packet...\n", this);
368             m_data_buffer->setBufferTailTimestamp(m_last_timestamp);
369             if (m_state == ePS_Running) {
370                 // this is an xrun situation
371                 m_in_xrun = true;
372                 debugWarning("Should update state to WaitingForStreamDisable due to dropped packet xrun\n");
373                 m_cycle_to_switch_state = cycle + 1; // switch in the next cycle
374                 m_next_state = ePS_WaitingForStreamDisable;
375                 // execute the requested change
376                 if (!updateState()) { // we are allowed to change the state directly
377                     debugError("Could not update state!\n");
378                     return RAW1394_ISO_ERROR;
379                 }
380                 return RAW1394_ISO_DEFER;
381             }
382         }
383
384         // for all states that reach this we are allowed to
385         // do protocol specific data reception
386         enum eChildReturnValue result2 = processPacketData(data, length, channel, tag, sy, cycle, dropped_cycles);
387
388         // if an xrun occured, switch to the dryRunning state and
389         // allow for the xrun to be picked up
390         if (result2 == eCRV_XRun) {
391             debugWarning("processPacketData xrun\n");
392             m_in_xrun = true;
393             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
394             m_cycle_to_switch_state = cycle+1; // switch in the next cycle
395             m_next_state = ePS_WaitingForStreamDisable;
396             // execute the requested change
397             if (!updateState()) { // we are allowed to change the state directly
398                 debugError("Could not update state!\n");
399                 return RAW1394_ISO_ERROR;
400             }
401             return RAW1394_ISO_DEFER;
402         } else if(result2 == eCRV_OK) {
403             // no problem here
404             return RAW1394_ISO_OK;
405         } else {
406             debugError("Invalid response\n");
407             return RAW1394_ISO_ERROR;
408         }
409     } else if(result == eCRV_Invalid) {
410         // apparently we don't have to do anything when the packets are not valid
411         return RAW1394_ISO_OK;
412     } else {
413         debugError("Invalid response\n");
414         return RAW1394_ISO_ERROR;
415     }
416     debugError("reached the unreachable\n");
417     return RAW1394_ISO_ERROR;
418 }
419
420 enum raw1394_iso_disposition
421 StreamProcessor::getPacket(unsigned char *data, unsigned int *length,
422                            unsigned char *tag, unsigned char *sy,
423                            int cycle, unsigned int dropped, unsigned int max_length) {
424     if (cycle<0) {
425         *tag = 0;
426         *sy = 0;
427         *length = 0;
428         return RAW1394_ISO_OK;
429     }
430
431     if(m_last_cycle == -1) {
432         debugOutput(DEBUG_LEVEL_VERBOSE, "Handler for %s SP %p is alive (cycle = %d)\n", getTypeString(), this, cycle);
433     }
434
435     int dropped_cycles = 0;
436     if (m_last_cycle != cycle && m_last_cycle != -1) {
437         dropped_cycles = diffCycles(cycle, m_last_cycle) - 1;
438         if (dropped_cycles < 0) {
439             debugWarning("(%p) dropped < 1 (%d), cycle: %d, last_cycle: %d, dropped: %d\n",
440                          this, dropped_cycles, cycle, m_last_cycle, dropped);
441         }
442         if (dropped_cycles > 0) {
443             debugWarning("(%p) dropped %d packets on cycle %u (last_cycle=%u, dropped=%d)\n", this, dropped_cycles, cycle, m_last_cycle, dropped);
444             m_dropped += dropped_cycles;
445 //             flushDebugOutput();
446 //             assert(0);
447         }
448     }
449     if (cycle >= 0) {
450         m_last_cycle = cycle;
451     }
452
453     // bypass based upon state
454     if (m_state == ePS_Invalid) {
455         debugError("Should not have state %s\n", ePSToString(m_state) );
456         return RAW1394_ISO_ERROR;
457     }
458     if (m_state == ePS_Created) {
459         *tag = 0;
460         *sy = 0;
461         *length = 0;
462         return RAW1394_ISO_DEFER;
463     }
464
465     // normal processing
466     // note that we can't use getCycleTimer directly here,
467     // because packets are queued in advance. This means that
468     // we the packet we are constructing will be sent out
469     // on 'cycle', not 'now'.
470     unsigned int ctr = m_1394service.getCycleTimer();
471     int now_cycles = (int)CYCLE_TIMER_GET_CYCLES(ctr);
472
473     // the difference between the cycle this
474     // packet is intended for and 'now'
475     int cycle_diff = diffCycles(cycle, now_cycles);
476
477     #ifdef DEBUG
478     if(cycle_diff < 0 && (m_state == ePS_Running || m_state == ePS_DryRunning)) {
479         debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy)\n",
480             cycle, now_cycles);
481         if(m_state == ePS_Running) {
482             debugShowBackLogLines(200);
483 //             flushDebugOutput();
484 //             assert(0);
485         }
486     }
487     #endif
488
489     // store the previous timestamp
490     m_last_timestamp2 = m_last_timestamp;
491
492     // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles)
493     //       it happens on the first 'good' cycle for the wait condition
494     //       or on the first received cycle that is received afterwards (might be a problem)
495
496     // check whether we are waiting for a stream to be disabled
497     if(m_state == ePS_WaitingForStreamDisable) {
498         // we then check whether we have to switch on this cycle
499         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
500             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n");
501             m_next_state = ePS_DryRunning;
502             if (!updateState()) { // we are allowed to change the state directly
503                 debugError("Could not update state!\n");
504                 return RAW1394_ISO_ERROR;
505             }
506         } else {
507             // not time to disable yet
508         }
509     }
510     // check whether we are waiting for a stream to be enabled
511     else if(m_state == ePS_WaitingForStreamEnable) {
512         // we then check whether we have to switch on this cycle
513         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
514             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n");
515             m_next_state = ePS_Running;
516             if (!updateState()) { // we are allowed to change the state directly
517                 debugError("Could not update state!\n");
518                 return RAW1394_ISO_ERROR;
519             }
520         } else {
521             // not time to enable yet
522         }
523         // we are dryRunning hence data should be processed in any case
524     }
525     // check whether we are waiting for a stream to startup
526     else if(m_state == ePS_WaitingForStream) {
527         // as long as the cycle parameter is not in sync with
528         // the current time, the stream is considered not
529         // to be 'running'
530         // we then check whether we have to switch on this cycle
531         if ((cycle_diff >= 0) && (diffCycles(cycle, m_cycle_to_switch_state) >= 0)) {
532             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStream to DryRunning\n");
533             // hence go to the dryRunning state
534             m_next_state = ePS_DryRunning;
535             if (!updateState()) { // we are allowed to change the state directly
536                 debugError("Could not update state!\n");
537                 return RAW1394_ISO_ERROR;
538             }
539         } else {
540             // not time (yet) to switch state
541         }
542     }
543     else if(m_state == ePS_Running) {
544         // check the packet header
545         enum eChildReturnValue result = generatePacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
546         if (result == eCRV_Packet || result == eCRV_Defer) {
547             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT: CY=%04u TS=%011llu\n",
548                     cycle, m_last_timestamp);
549             // update some accounting
550             m_last_good_cycle = cycle;
551             m_last_dropped = dropped_cycles;
552
553             // check whether a state change has been requested
554             // note that only the wait state changes are synchronized with the cycles
555             if(m_state != m_next_state) {
556                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
557                                                 ePSToString(m_state), ePSToString(m_next_state));
558                 // execute the requested change
559                 if (!updateState()) { // we are allowed to change the state directly
560                     debugError("Could not update state!\n");
561                     return RAW1394_ISO_ERROR;
562                 }
563             }
564
565             enum eChildReturnValue result2 = generatePacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
566             // if an xrun occured, switch to the dryRunning state and
567             // allow for the xrun to be picked up
568             if (result2 == eCRV_XRun) {
569                 debugWarning("generatePacketData xrun\n");
570                 m_in_xrun = true;
571                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
572                 m_cycle_to_switch_state = cycle+1; // switch in the next cycle
573                 m_next_state = ePS_WaitingForStreamDisable;
574                 // execute the requested change
575                 if (!updateState()) { // we are allowed to change the state directly
576                     debugError("Could not update state!\n");
577                     return RAW1394_ISO_ERROR;
578                 }
579                 goto send_empty_packet;
580             }
581             // skip queueing packets if we detect that there are not enough frames
582             // available
583             if(result2 == eCRV_Defer || result == eCRV_Defer)
584                 return RAW1394_ISO_DEFER;
585             else
586                 return RAW1394_ISO_OK;
587         } else if (result == eCRV_XRun) { // pick up the possible xruns
588             debugWarning("generatePacketHeader xrun\n");
589             m_in_xrun = true;
590             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to header xrun\n");
591             m_cycle_to_switch_state = cycle+1; // switch in the next cycle
592             m_next_state = ePS_WaitingForStreamDisable;
593             // execute the requested change
594             if (!updateState()) { // we are allowed to change the state directly
595                 debugError("Could not update state!\n");
596                 return RAW1394_ISO_ERROR;
597             }
598         } else if (result == eCRV_EmptyPacket) {
599             if(m_state != m_next_state) {
600                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
601                                                 ePSToString(m_state), ePSToString(m_next_state));
602                 // execute the requested change
603                 if (!updateState()) { // we are allowed to change the state directly
604                     debugError("Could not update state!\n");
605                     return RAW1394_ISO_ERROR;
606                 }
607             }
608             goto send_empty_packet;
609         } else if (result == eCRV_Again) {
610             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "have to retry cycle %d\n", cycle);
611             if(m_state != m_next_state) {
612                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
613                                                 ePSToString(m_state), ePSToString(m_next_state));
614                 // execute the requested change
615                 if (!updateState()) { // we are allowed to change the state directly
616                     debugError("Could not update state!\n");
617                     return RAW1394_ISO_ERROR;
618                 }
619             }
620 //             return RAW1394_ISO_AGAIN;
621             generateSilentPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
622             generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
623             return RAW1394_ISO_DEFER;
624         } else {
625             debugError("Invalid return value: %d\n", result);
626             return RAW1394_ISO_ERROR;
627         }
628     }
629     // we are not running, so send an empty packet
630     // we should generate a valid packet any time
631 send_empty_packet:
632     // note that only the wait state changes are synchronized with the cycles
633     if(m_state != m_next_state) {
634         debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
635                                         ePSToString(m_state), ePSToString(m_next_state));
636         // execute the requested change
637         if (!updateState()) { // we are allowed to change the state directly
638             debugError("Could not update state!\n");
639             return RAW1394_ISO_ERROR;
640         }
641     }
642
643     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT EMPTY: CY=%04u\n", cycle);
644     generateSilentPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
645     generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
646     return RAW1394_ISO_OK;
647 }
648
649
650 // Frame Transfer API
651 /**
652  * Transfer a block of frames from the event buffer to the port buffers
653  * @param nbframes number of frames to transfer
654  * @param ts the timestamp that the LAST frame in the block should have
655  * @return
656  */
657 bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) {
658     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.getFrames(%d, %11llu)", nbframes, ts);
659     assert( getType() == ePT_Receive );
660     if(isDryRunning()) return getFramesDry(nbframes, ts);
661     else return getFramesWet(nbframes, ts);
662 }
663
664 bool StreamProcessor::getFramesWet(unsigned int nbframes, int64_t ts) {
665 // FIXME: this should be done somewhere else
666 #ifdef DEBUG
667     uint64_t ts_expected;
668     signed int fc;
669     int32_t lag_ticks;
670     float lag_frames;
671
672     // in order to sync up multiple received streams, we should
673     // use the ts parameter. It specifies the time of the block's
674     // last sample.
675     float srate = m_StreamProcessorManager.getSyncSource().getTicksPerFrame();
676     assert(srate != 0.0);
677     int64_t this_block_length_in_ticks = (int64_t)(((float)nbframes) * srate);
678
679     ffado_timestamp_t ts_head_tmp;
680     m_data_buffer->getBufferHeadTimestamp(&ts_head_tmp, &fc);
681     ts_expected = addTicks((uint64_t)ts_head_tmp, this_block_length_in_ticks);
682
683     lag_ticks = diffTicks(ts, ts_expected);
684     lag_frames = (((float)lag_ticks) / srate);
685     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): drifts %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
686                  this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
687     if (lag_frames >= 1.0) {
688         // the stream lags
689         debugWarning( "stream (%p): lags  with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
690                       this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
691     } else if (lag_frames <= -1.0) {
692         // the stream leads
693         debugWarning( "stream (%p): leads with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
694                       this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
695     }
696 #endif
697     // ask the buffer to process nbframes of frames
698     // using it's registered client's processReadBlock(),
699     // which should be ours
700     m_data_buffer->blockProcessReadFrames(nbframes);
701     return true;
702 }
703
704 bool StreamProcessor::getFramesDry(unsigned int nbframes, int64_t ts)
705 {
706     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): dry run %d frames (@ ts=%lld)\n",
707                  this, nbframes, ts);
708     // dry run on this side means that we put silence in all enabled ports
709     // since there is do data put into the ringbuffer in the dry-running state
710     return provideSilenceBlock(nbframes, 0);
711 }
712
713 bool
714 StreamProcessor::dropFrames(unsigned int nbframes, int64_t ts)
715 {
716     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d, %lld)\n", nbframes, ts);
717     return m_data_buffer->dropFrames(nbframes);
718 }
719
720 bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts)
721 {
722     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.putFrames(%d, %11llu)", nbframes, ts);
723     assert( getType() == ePT_Transmit );
724     if(isDryRunning()) return putFramesDry(nbframes, ts);
725     else return putFramesWet(nbframes, ts);
726 }
727
728 bool
729 StreamProcessor::putFramesWet(unsigned int nbframes, int64_t ts)
730 {
731     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesWet(%d, %llu)\n", nbframes, ts);
732     // transfer the data
733     m_data_buffer->blockProcessWriteFrames(nbframes, ts);
734     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, " New timestamp: %llu\n", ts);
735     return true; // FIXME: what about failure?
736 }
737
738 bool
739 StreamProcessor::putFramesDry(unsigned int nbframes, int64_t ts)
740 {
741     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesDry(%d, %llu)\n", nbframes, ts);
742     // do nothing
743     return true;
744 }
745
746 bool
747 StreamProcessor::putSilenceFrames(unsigned int nbframes, int64_t ts)
748 {
749     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putSilenceFrames(%d, %llu)\n", nbframes, ts);
750
751     size_t bytes_per_frame = getEventSize() * getEventsPerFrame();
752     unsigned int scratch_buffer_size_frames = m_scratch_buffer_size_bytes / bytes_per_frame;
753
754     if (nbframes > scratch_buffer_size_frames) {
755         debugError("nframes (%u) > scratch_buffer_size_frames (%u)\n",
756                    nbframes, scratch_buffer_size_frames);
757     }
758
759     assert(m_scratch_buffer);
760     if(!transmitSilenceBlock((char *)m_scratch_buffer, nbframes, 0)) {
761         debugError("Could not prepare silent block\n");
762         return false;
763     }
764     if(!m_data_buffer->writeFrames(nbframes, (char *)m_scratch_buffer, ts)) {
765         debugError("Could not write silent block\n");
766         return false;
767     }
768     return true;
769 }
770
771 bool
772 StreamProcessor::shiftStream(int nbframes)
773 {
774     if(nbframes == 0) return true;
775     if(nbframes > 0) {
776         return m_data_buffer->dropFrames(nbframes);
777     } else {
778         bool result = true;
779         while(nbframes++) {
780             result &= m_data_buffer->writeDummyFrame();
781         }
782         return result;
783     }
784 }
785
786 /**
787  * @brief write silence events to the stream ringbuffers.
788  */
789 bool StreamProcessor::provideSilenceBlock(unsigned int nevents, unsigned int offset)
790 {
791     bool no_problem=true;
792     for ( PortVectorIterator it = m_PeriodPorts.begin();
793           it != m_PeriodPorts.end();
794           ++it ) {
795         if((*it)->isDisabled()) {continue;};
796
797         //FIXME: make this into a static_cast when not DEBUG?
798         Port *port=dynamic_cast<Port *>(*it);
799
800         switch(port->getPortType()) {
801
802         case Port::E_Audio:
803             if(provideSilenceToPort(static_cast<AudioPort *>(*it), offset, nevents)) {
804                 debugWarning("Could not put silence into to port %s",(*it)->getName().c_str());
805                 no_problem=false;
806             }
807             break;
808         // midi is a packet based port, don't process
809         //    case MotuPortInfo::E_Midi:
810         //        break;
811
812         default: // ignore
813             break;
814         }
815     }
816     return no_problem;
817 }
818
819 int
820 StreamProcessor::provideSilenceToPort(
821                        AudioPort *p, unsigned int offset, unsigned int nevents)
822 {
823     unsigned int j=0;
824     switch(p->getDataType()) {
825         default:
826         case Port::E_Int24:
827             {
828                 quadlet_t *buffer=(quadlet_t *)(p->getBufferAddress());
829                 assert(nevents + offset <= p->getBufferSize());
830                 buffer+=offset;
831
832                 for(j = 0; j < nevents; j += 1) { // decode max nsamples
833                     *(buffer)=0;
834                     buffer++;
835                 }
836             }
837             break;
838         case Port::E_Float:
839             {
840                 float *buffer=(float *)(p->getBufferAddress());
841                 assert(nevents + offset <= p->getBufferSize());
842                 buffer+=offset;
843
844                 for(j = 0; j < nevents; j += 1) { // decode max nsamples
845                     *buffer = 0.0;
846                     buffer++;
847                 }
848             }
849             break;
850     }
851     return 0;
852 }
853
854 /***********************************************
855  * State related API                           *
856  ***********************************************/
857 bool StreamProcessor::init()
858 {
859     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "init...\n");
860
861     if(!m_IsoHandlerManager.registerStream(this)) {
862         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register stream processor with the Iso manager\n");
863         return false;
864     }
865     if(!m_StreamProcessorManager.registerProcessor(this)) {
866         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register stream processor with the SP manager\n");
867         return false;
868     }
869
870     // initialization can be done without requesting it
871     // from the packet loop
872     m_next_state = ePS_Created;
873     return true;
874 }
875
876 bool StreamProcessor::prepare()
877 {
878     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare SP (%p)...\n", this);
879
880     // make the scratch buffer one period of frames long
881     m_scratch_buffer_size_bytes = m_StreamProcessorManager.getPeriodSize() * getEventsPerFrame() * getEventSize();
882     debugOutput( DEBUG_LEVEL_VERBOSE, " Allocate scratch buffer of %d quadlets\n");
883     if(m_scratch_buffer) delete[] m_scratch_buffer;
884     m_scratch_buffer = new byte_t[m_scratch_buffer_size_bytes];
885     if(m_scratch_buffer == NULL) {
886         debugFatal("Could not allocate scratch buffer\n");
887         return false;
888     }
889
890     if (!prepareChild()) {
891         debugFatal("Could not prepare child\n");
892         return false;
893     }
894
895     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepared for:\n");
896     debugOutput( DEBUG_LEVEL_VERBOSE, " Samplerate: %d\n",
897              m_StreamProcessorManager.getNominalRate());
898     debugOutput( DEBUG_LEVEL_VERBOSE, " PeriodSize: %d, NbBuffers: %d\n",
899              m_StreamProcessorManager.getPeriodSize(), m_StreamProcessorManager.getNbBuffers());
900     debugOutput( DEBUG_LEVEL_VERBOSE, " Port: %d, Channel: %d\n",
901              m_1394service.getPort(), m_channel);
902
903     // initialization can be done without requesting it
904     // from the packet loop
905     m_next_state = ePS_Stopped;
906     return updateState();
907 }
908
909 bool
910 StreamProcessor::scheduleStateTransition(enum eProcessorState state, uint64_t time_instant)
911 {
912     // first set the time, since in the packet loop we first check m_state == m_next_state before
913     // using the time
914     m_cycle_to_switch_state = TICKS_TO_CYCLES(time_instant);
915     m_next_state = state;
916     return true;
917 }
918
919 bool
920 StreamProcessor::waitForState(enum eProcessorState state, unsigned int timeout_ms)
921 {
922     debugOutput(DEBUG_LEVEL_VERBOSE, "Waiting for state %s\n", ePSToString(state));
923     int cnt = timeout_ms;
924     while (m_state != state && cnt) {
925         usleep(1000);
926         cnt--;
927     }
928     if(cnt==0) {
929         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout\n");
930         return false;
931     }
932     return true;
933 }
934
935 bool StreamProcessor::scheduleStartDryRunning(int64_t t) {
936     uint64_t tx;
937     if (t < 0) {
938         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
939     } else {
940         tx = t;
941     }
942     uint64_t start_handler_ticks = substractTicks(tx, 100 * TICKS_PER_CYCLE);
943
944     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
945     uint64_t now = m_1394service.getCycleTimerTicks();
946     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
947                           now,
948                           (unsigned int)TICKS_TO_SECS(now),
949                           (unsigned int)TICKS_TO_CYCLES(now),
950                           (unsigned int)TICKS_TO_OFFSET(now));
951     debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%03us %04uc %04ut)\n",
952                           tx,
953                           (unsigned int)TICKS_TO_SECS(tx),
954                           (unsigned int)TICKS_TO_CYCLES(tx),
955                           (unsigned int)TICKS_TO_OFFSET(tx));
956     if (m_state == ePS_Stopped) {
957         if(!m_IsoHandlerManager.startHandlerForStream(
958                                         this, TICKS_TO_CYCLES(start_handler_ticks))) {
959             debugError("Could not start handler for SP %p\n", this);
960             return false;
961         }
962         return scheduleStateTransition(ePS_WaitingForStream, tx);
963     } else if (m_state == ePS_Running) {
964         return scheduleStateTransition(ePS_WaitingForStreamDisable, tx);
965     } else {
966         debugError("Cannot switch to ePS_DryRunning from %s\n", ePSToString(m_state));
967         return false;
968     }
969 }
970
971 bool StreamProcessor::scheduleStartRunning(int64_t t) {
972     uint64_t tx;
973     if (t < 0) {
974         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
975     } else {
976         tx = t;
977     }
978     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
979     uint64_t now = m_1394service.getCycleTimerTicks();
980     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
981                           now,
982                           (unsigned int)TICKS_TO_SECS(now),
983                           (unsigned int)TICKS_TO_CYCLES(now),
984                           (unsigned int)TICKS_TO_OFFSET(now));
985     debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%03us %04uc %04ut)\n",
986                           tx,
987                           (unsigned int)TICKS_TO_SECS(tx),
988                           (unsigned int)TICKS_TO_CYCLES(tx),
989                           (unsigned int)TICKS_TO_OFFSET(tx));
990     return scheduleStateTransition(ePS_WaitingForStreamEnable, tx);
991 }
992
993 bool StreamProcessor::scheduleStopDryRunning(int64_t t) {
994     uint64_t tx;
995     if (t < 0) {
996         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
997     } else {
998         tx = t;
999     }
1000     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1001     uint64_t now = m_1394service.getCycleTimerTicks();
1002     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1003                           now,
1004                           (unsigned int)TICKS_TO_SECS(now),
1005                           (unsigned int)TICKS_TO_CYCLES(now),
1006                           (unsigned int)TICKS_TO_OFFSET(now));
1007     debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at               : %011llu (%03us %04uc %04ut)\n",
1008                           tx,
1009                           (unsigned int)TICKS_TO_SECS(tx),
1010                           (unsigned int)TICKS_TO_CYCLES(tx),
1011                           (unsigned int)TICKS_TO_OFFSET(tx));
1012
1013     return scheduleStateTransition(ePS_Stopped, tx);
1014 }
1015
1016 bool StreamProcessor::scheduleStopRunning(int64_t t) {
1017     uint64_t tx;
1018     if (t < 0) {
1019         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
1020     } else {
1021         tx = t;
1022     }
1023     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1024     uint64_t now = m_1394service.getCycleTimerTicks();
1025     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1026                           now,
1027                           (unsigned int)TICKS_TO_SECS(now),
1028                           (unsigned int)TICKS_TO_CYCLES(now),
1029                           (unsigned int)TICKS_TO_OFFSET(now));
1030     debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at               : %011llu (%03us %04uc %04ut)\n",
1031                           tx,
1032                           (unsigned int)TICKS_TO_SECS(tx),
1033                           (unsigned int)TICKS_TO_CYCLES(tx),
1034                           (unsigned int)TICKS_TO_OFFSET(tx));
1035     return scheduleStateTransition(ePS_WaitingForStreamDisable, tx);
1036 }
1037
1038 bool StreamProcessor::startDryRunning(int64_t t) {
1039     if(!scheduleStartDryRunning(t)) {
1040         debugError("Could not schedule transition\n");
1041         return false;
1042     }
1043     if(!waitForState(ePS_DryRunning, 2000)) {
1044         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning));
1045         return false;
1046     }
1047     return true;
1048 }
1049
1050 bool StreamProcessor::startRunning(int64_t t) {
1051     if(!scheduleStartRunning(t)) {
1052         debugError("Could not schedule transition\n");
1053         return false;
1054     }
1055     if(!waitForState(ePS_Running, 2000)) {
1056         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Running));
1057         return false;
1058     }
1059     return true;
1060 }
1061
1062 bool StreamProcessor::stopDryRunning(int64_t t) {
1063     if(!scheduleStopDryRunning(t)) {
1064         debugError("Could not schedule transition\n");
1065         return false;
1066     }
1067     if(!waitForState(ePS_Stopped, 2000)) {
1068         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Stopped));
1069         return false;
1070     }
1071     return true;
1072 }
1073
1074 bool StreamProcessor::stopRunning(int64_t t) {
1075     if(!scheduleStopRunning(t)) {
1076         debugError("Could not schedule transition\n");
1077         return false;
1078     }
1079     if(!waitForState(ePS_DryRunning, 2000)) {
1080         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning));
1081         return false;
1082     }
1083     return true;
1084 }
1085
1086
1087 // internal state API
1088
1089 /**
1090  * @brief Enter the ePS_Stopped state
1091  * @return true if successful, false if not
1092  *
1093  * @pre none
1094  *
1095  * @post the buffer and the isostream are ready for use.
1096  * @post all dynamic structures have been allocated successfully
1097  * @post the buffer is transparent and empty, and all parameters are set
1098  *       to the correct initial/nominal values.
1099  *
1100  */
1101 bool
1102 StreamProcessor::doStop()
1103 {
1104     float ticks_per_frame;
1105     unsigned int ringbuffer_size_frames = (m_StreamProcessorManager.getNbBuffers() + 1) * m_StreamProcessorManager.getPeriodSize();
1106
1107     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1108     bool result = true;
1109
1110     switch(m_state) {
1111         case ePS_Created:
1112             assert(m_data_buffer);
1113             // object just created
1114             result = m_data_buffer->init();
1115
1116             // prepare the framerate estimate
1117             ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_StreamProcessorManager.getNominalRate());
1118             m_ticks_per_frame = ticks_per_frame;
1119             debugOutput(DEBUG_LEVEL_VERBOSE,"Initializing remote ticks/frame to %f\n", ticks_per_frame);
1120
1121             // initialize internal buffer
1122             result &= m_data_buffer->setBufferSize(ringbuffer_size_frames);
1123
1124             result &= m_data_buffer->setEventSize( getEventSize() );
1125             result &= m_data_buffer->setEventsPerFrame( getEventsPerFrame() );
1126             if(getType() == ePT_Receive) {
1127                 result &= m_data_buffer->setUpdatePeriod( getNominalFramesPerPacket() );
1128             } else {
1129                 result &= m_data_buffer->setUpdatePeriod( m_StreamProcessorManager.getPeriodSize() );
1130             }
1131             result &= m_data_buffer->setNominalRate(ticks_per_frame);
1132             result &= m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND);
1133             result &= m_data_buffer->prepare(); // FIXME: the name
1134
1135             // set the parameters of ports we can:
1136             // we want the audio ports to be period buffered,
1137             // and the midi ports to be packet buffered
1138             for ( PortVectorIterator it = m_Ports.begin();
1139                 it != m_Ports.end();
1140                 ++it )
1141             {
1142                 debugOutput(DEBUG_LEVEL_VERBOSE, "Setting up port %s\n",(*it)->getName().c_str());
1143                 if(!(*it)->setBufferSize(m_StreamProcessorManager.getPeriodSize())) {
1144                     debugFatal("Could not set buffer size to %d\n",m_StreamProcessorManager.getPeriodSize());
1145                     return false;
1146                 }
1147                 switch ((*it)->getPortType()) {
1148                     case Port::E_Audio:
1149                         if(!(*it)->setSignalType(Port::E_PeriodSignalled)) {
1150                             debugFatal("Could not set signal type to PeriodSignalling");
1151                             return false;
1152                         }
1153                         // buffertype and datatype are dependant on the API
1154                         debugWarning("---------------- ! Doing hardcoded dummy setup ! --------------\n");
1155                         // buffertype and datatype are dependant on the API
1156                         if(!(*it)->setBufferType(Port::E_PointerBuffer)) {
1157                             debugFatal("Could not set buffer type");
1158                             return false;
1159                         }
1160                         if(!(*it)->useExternalBuffer(true)) {
1161                             debugFatal("Could not set external buffer usage");
1162                             return false;
1163                         }
1164                         if(!(*it)->setDataType(Port::E_Float)) {
1165                             debugFatal("Could not set data type");
1166                             return false;
1167                         }
1168                         break;
1169                     case Port::E_Midi:
1170                         if(!(*it)->setSignalType(Port::E_PacketSignalled)) {
1171                             debugFatal("Could not set signal type to PacketSignalling");
1172                             return false;
1173                         }
1174                         // buffertype and datatype are dependant on the API
1175                         debugWarning("---------------- ! Doing hardcoded test setup ! --------------\n");
1176                         // buffertype and datatype are dependant on the API
1177                         if(!(*it)->setBufferType(Port::E_RingBuffer)) {
1178                             debugFatal("Could not set buffer type");
1179                             return false;
1180                         }
1181                         if(!(*it)->setDataType(Port::E_MidiEvent)) {
1182                             debugFatal("Could not set data type");
1183                             return false;
1184                         }
1185                         break;
1186                     default:
1187                         debugWarning("Unsupported port type specified\n");
1188                         break;
1189                 }
1190             }
1191             // the API specific settings of the ports should already be set,
1192             // as this is called from the processorManager->prepare()
1193             // so we can init the ports
1194             result &= PortManager::initPorts();
1195
1196             break;
1197         case ePS_DryRunning:
1198             if(!m_IsoHandlerManager.stopHandlerForStream(this)) {
1199                 debugError("Could not stop handler for SP %p\n", this);
1200                 return false;
1201             }
1202             break;
1203         default:
1204             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1205             return false;
1206     }
1207
1208     result &= m_data_buffer->clearBuffer(); // FIXME: don't like the reset() name
1209     // make the buffer transparent
1210     m_data_buffer->setTransparent(true);
1211
1212     // reset all ports
1213     result &= PortManager::preparePorts();
1214
1215     m_state = ePS_Stopped;
1216     #ifdef DEBUG
1217     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1218         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1219         dumpInfo();
1220     }
1221     #endif
1222     return result;
1223 }
1224
1225 /**
1226  * @brief Enter the ePS_WaitingForStream state
1227  * @return true if successful, false if not
1228  *
1229  * @pre all dynamic data structures are allocated successfully
1230  *
1231  * @post
1232  *
1233  */
1234 bool
1235 StreamProcessor::doWaitForRunningStream()
1236 {
1237     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1238     switch(m_state) {
1239         case ePS_Stopped:
1240             // we have to start waiting for an incoming stream
1241             // this basically means nothing, the state change will
1242             // be picked up by the packet iterator
1243             break;
1244         default:
1245             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1246             return false;
1247     }
1248     m_state = ePS_WaitingForStream;
1249     #ifdef DEBUG
1250     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1251         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1252         dumpInfo();
1253     }
1254     #endif
1255     return true;
1256 }
1257
1258 /**
1259  * @brief Enter the ePS_DryRunning state
1260  * @return true if successful, false if not
1261  *
1262  * @pre
1263  *
1264  * @post
1265  *
1266  */
1267 bool
1268 StreamProcessor::doDryRunning()
1269 {
1270     bool result = true;
1271     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1272     switch(m_state) {
1273         case ePS_WaitingForStream:
1274             // a running stream has been detected
1275             debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started dry-running at cycle %d\n", this, m_last_cycle);
1276             if (getType() == ePT_Receive) {
1277                 // this to ensure that there is no discontinuity when starting to
1278                 // update the DLL based upon the received packets
1279                 m_data_buffer->setBufferTailTimestamp(m_last_timestamp);
1280             } else {
1281                 // FIXME: PC=master mode will have to do something here I guess...
1282             }
1283             break;
1284         case ePS_WaitingForStreamDisable:
1285             result &= m_data_buffer->clearBuffer(); // FIXME: don't like the reset() name
1286             m_data_buffer->setTransparent(true);
1287             break;
1288         default:
1289             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1290             return false;
1291     }
1292     m_state = ePS_DryRunning;
1293     #ifdef DEBUG
1294     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1295         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1296         dumpInfo();
1297     }
1298     #endif
1299     return result;
1300 }
1301
1302 /**
1303  * @brief Enter the ePS_WaitingForStreamEnable state
1304  * @return true if successful, false if not
1305  *
1306  * @pre
1307  *
1308  * @post
1309  *
1310  */
1311 bool
1312 StreamProcessor::doWaitForStreamEnable()
1313 {
1314     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1315     unsigned int ringbuffer_size_frames;
1316     switch(m_state) {
1317         case ePS_DryRunning:
1318             // we have to start waiting for an incoming stream
1319             // this basically means nothing, the state change will
1320             // be picked up by the packet iterator
1321
1322             if(!m_data_buffer->clearBuffer()) { // FIXME: don't like the reset() name
1323                 debugError("Could not reset data buffer\n");
1324                 return false;
1325             }
1326             if (getType() == ePT_Transmit) {
1327                 ringbuffer_size_frames = m_StreamProcessorManager.getNbBuffers() * m_StreamProcessorManager.getPeriodSize();
1328                 debugOutput(DEBUG_LEVEL_VERBOSE, "Prefill transmit SP %p with %u frames\n", this, ringbuffer_size_frames);
1329                 // prefill the buffer
1330                 if(!transferSilence(ringbuffer_size_frames)) {
1331                     debugFatal("Could not prefill transmit stream\n");
1332                     return false;
1333                 }
1334             }
1335
1336             break;
1337         default:
1338             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1339             return false;
1340     }
1341     m_state = ePS_WaitingForStreamEnable;
1342     #ifdef DEBUG
1343     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1344         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1345         dumpInfo();
1346     }
1347     #endif
1348     return true;
1349 }
1350
1351 /**
1352  * @brief Enter the ePS_Running state
1353  * @return true if successful, false if not
1354  *
1355  * @pre
1356  *
1357  * @post
1358  *
1359  */
1360 bool
1361 StreamProcessor::doRunning()
1362 {
1363     bool result = true;
1364     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1365     switch(m_state) {
1366         case ePS_WaitingForStreamEnable:
1367             // a running stream has been detected
1368             debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started running at cycle %d\n",
1369                                              this, m_last_cycle);
1370             m_in_xrun = false;
1371             m_data_buffer->setTransparent(false);
1372             break;
1373         default:
1374             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1375             return false;
1376     }
1377     m_state = ePS_Running;
1378     #ifdef DEBUG
1379     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1380         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1381         dumpInfo();
1382     }
1383     #endif
1384     return result;
1385 }
1386
1387 /**
1388  * @brief Enter the ePS_WaitingForStreamDisable state
1389  * @return true if successful, false if not
1390  *
1391  * @pre
1392  *
1393  * @post
1394  *
1395  */
1396 bool
1397 StreamProcessor::doWaitForStreamDisable()
1398 {
1399     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1400     switch(m_state) {
1401         case ePS_Running:
1402             // the thread will do the transition
1403             break;
1404         default:
1405             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1406             return false;
1407     }
1408     m_state = ePS_WaitingForStreamDisable;
1409     #ifdef DEBUG
1410     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1411         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1412         dumpInfo();
1413     }
1414     #endif
1415     return true;
1416 }
1417
1418 /**
1419  * @brief Updates the state machine and calls the necessary transition functions
1420  * @return true if successful, false if not
1421  */
1422 bool StreamProcessor::updateState() {
1423     bool result = false;
1424     // copy the current state locally since it could change value,
1425     // and that's something we don't want to happen inbetween tests
1426     // if m_next_state changes during this routine, we know for sure
1427     // that the previous state change was at least attempted correctly.
1428     enum eProcessorState next_state = m_next_state;
1429
1430     debugOutput(DEBUG_LEVEL_VERBOSE, "Do state transition: %s => %s\n",
1431         ePSToString(m_state), ePSToString(next_state));
1432
1433     if (m_state == next_state) {
1434         debugWarning("ignoring identity state update from/to %s\n", ePSToString(m_state) );
1435         return true;
1436     }
1437
1438     // after creation, only initialization is allowed
1439     if (m_state == ePS_Created) {
1440         if(next_state != ePS_Stopped) {
1441             goto updateState_exit_with_error;
1442         }
1443         // do init here
1444         result = doStop();
1445         if (result) return true;
1446         else goto updateState_exit_change_failed;
1447     }
1448
1449     // after initialization, only WaitingForRunningStream is allowed
1450     if (m_state == ePS_Stopped) {
1451         if(next_state != ePS_WaitingForStream) {
1452             goto updateState_exit_with_error;
1453         }
1454         result = doWaitForRunningStream();
1455         if (result) return true;
1456         else goto updateState_exit_change_failed;
1457     }
1458
1459     // after WaitingForStream, only ePS_DryRunning is allowed
1460     // this means that the stream started running
1461     if (m_state == ePS_WaitingForStream) {
1462         if(next_state != ePS_DryRunning) {
1463             goto updateState_exit_with_error;
1464         }
1465         result = doDryRunning();
1466         if (result) return true;
1467         else goto updateState_exit_change_failed;
1468     }
1469
1470     // from ePS_DryRunning we can go to:
1471     //   - ePS_Stopped if something went wrong during DryRunning
1472     //   - ePS_WaitingForStreamEnable if there is a requested to enable
1473     if (m_state == ePS_DryRunning) {
1474         if((next_state != ePS_Stopped) &&
1475            (next_state != ePS_WaitingForStreamEnable)) {
1476             goto updateState_exit_with_error;
1477         }
1478         if (next_state == ePS_Stopped) {
1479             result = doStop();
1480         } else {
1481             result = doWaitForStreamEnable();
1482         }
1483         if (result) return true;
1484         else goto updateState_exit_change_failed;
1485     }
1486
1487     // from ePS_WaitingForStreamEnable we can go to:
1488     //   - ePS_DryRunning if something went wrong while waiting
1489     //   - ePS_Running if the stream enabled correctly
1490     if (m_state == ePS_WaitingForStreamEnable) {
1491         if((next_state != ePS_DryRunning) &&
1492            (next_state != ePS_Running)) {
1493             goto updateState_exit_with_error;
1494         }
1495         if (next_state == ePS_Stopped) {
1496             result = doDryRunning();
1497         } else {
1498             result = doRunning();
1499         }
1500         if (result) return true;
1501         else goto updateState_exit_change_failed;
1502     }
1503
1504     // from ePS_Running we can only start waiting for a disabled stream
1505     if (m_state == ePS_Running) {
1506         if(next_state != ePS_WaitingForStreamDisable) {
1507             goto updateState_exit_with_error;
1508         }
1509         result = doWaitForStreamDisable();
1510         if (result) return true;
1511         else goto updateState_exit_change_failed;
1512     }
1513
1514     // from ePS_WaitingForStreamDisable we can go to DryRunning
1515     if (m_state == ePS_WaitingForStreamDisable) {
1516         if(next_state != ePS_DryRunning) {
1517             goto updateState_exit_with_error;
1518         }
1519         result = doDryRunning();
1520         if (result) return true;
1521         else goto updateState_exit_change_failed;
1522     }
1523
1524     // if we arrive here there is an error
1525 updateState_exit_with_error:
1526     debugError("Invalid state transition: %s => %s\n",
1527         ePSToString(m_state), ePSToString(next_state));
1528     return false;
1529 updateState_exit_change_failed:
1530     debugError("State transition failed: %s => %s\n",
1531         ePSToString(m_state), ePSToString(next_state));
1532     return false;
1533 }
1534
1535 /***********************************************
1536  * Helper routines                             *
1537  ***********************************************/
1538 bool
1539 StreamProcessor::transferSilence(unsigned int nframes)
1540 {
1541     bool retval;
1542     signed int fc;
1543     ffado_timestamp_t ts_tail_tmp;
1544
1545     // prepare a buffer of silence
1546     char *dummybuffer = (char *)calloc(getEventSize(), nframes * getEventsPerFrame());
1547     transmitSilenceBlock(dummybuffer, nframes, 0);
1548
1549     m_data_buffer->getBufferTailTimestamp(&ts_tail_tmp, &fc);
1550     if (fc != 0) {
1551         debugWarning("Prefilling a buffer that already contains %d frames\n", fc);
1552     }
1553
1554     // add the silence data to the ringbuffer
1555     if(m_data_buffer->preloadFrames(nframes, dummybuffer, true)) {
1556         retval = true;
1557     } else {
1558         debugWarning("Could not write to event buffer\n");
1559         retval = false;
1560     }
1561     free(dummybuffer);
1562     return retval;
1563 }
1564
1565 /**
1566  * @brief convert a eProcessorState to a string
1567  * @param s the state
1568  * @return a char * describing the state
1569  */
1570 const char *
1571 StreamProcessor::ePSToString(enum eProcessorState s) {
1572     switch (s) {
1573         case ePS_Invalid: return "ePS_Invalid";
1574         case ePS_Created: return "ePS_Created";
1575         case ePS_Stopped: return "ePS_Stopped";
1576         case ePS_WaitingForStream: return "ePS_WaitingForStream";
1577         case ePS_DryRunning: return "ePS_DryRunning";
1578         case ePS_WaitingForStreamEnable: return "ePS_WaitingForStreamEnable";
1579         case ePS_Running: return "ePS_Running";
1580         case ePS_WaitingForStreamDisable: return "ePS_WaitingForStreamDisable";
1581         default: return "error: unknown state";
1582     }
1583 }
1584
1585 /**
1586  * @brief convert a eProcessorType to a string
1587  * @param t the type
1588  * @return a char * describing the state
1589  */
1590 const char *
1591 StreamProcessor::ePTToString(enum eProcessorType t) {
1592     switch (t) {
1593         case ePT_Receive: return "Receive";
1594         case ePT_Transmit: return "Transmit";
1595         default: return "error: unknown type";
1596     }
1597 }
1598
1599 /***********************************************
1600  * Debug                                       *
1601  ***********************************************/
1602 void
1603 StreamProcessor::dumpInfo()
1604 {
1605     debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor %p information\n", this);
1606     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Port, Channel  : %d, %d\n", m_1394service.getPort(), m_channel);
1607     debugOutputShort( DEBUG_LEVEL_NORMAL, "  StreamProcessor info:\n");
1608     uint64_t now = m_1394service.getCycleTimerTicks();
1609     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Now                   : %011llu (%03us %04uc %04ut)\n",
1610                         now,
1611                         (unsigned int)TICKS_TO_SECS(now),
1612                         (unsigned int)TICKS_TO_CYCLES(now),
1613                         (unsigned int)TICKS_TO_OFFSET(now));
1614     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Xruns                 : %s\n", (m_in_xrun ? "True":"False"));
1615     debugOutputShort( DEBUG_LEVEL_NORMAL, "  State                 : %s\n", ePSToString(m_state));
1616     debugOutputShort( DEBUG_LEVEL_NORMAL, "   Next state           : %s\n", ePSToString(m_next_state));
1617     debugOutputShort( DEBUG_LEVEL_NORMAL, "    transition at       : %u\n", m_cycle_to_switch_state);
1618     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer                : %p\n", m_data_buffer);
1619     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Nominal framerate     : %u\n", m_StreamProcessorManager.getNominalRate());
1620     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Device framerate      : Sync: %f, Buffer %f\n",
1621         24576000.0/m_StreamProcessorManager.getSyncSource().m_data_buffer->getRate(),
1622         24576000.0/m_data_buffer->getRate()
1623         );
1624
1625     m_data_buffer->dumpInfo();
1626 }
1627
1628 void
1629 StreamProcessor::setVerboseLevel(int l) {
1630     setDebugLevel(l);
1631     PortManager::setVerboseLevel(l);
1632     m_data_buffer->setVerboseLevel(l);
1633 }
1634
1635 } // end of namespace
Note: See TracBrowser for help on using the browser.