root/trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp

Revision 860, 69.6 kB (checked in by ppalmers, 13 years ago)

clean up synchronization in streamprocessor

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "config.h"
25
26 #include "StreamProcessor.h"
27 #include "../StreamProcessorManager.h"
28
29 #include "devicemanager.h"
30
31 #include "libieee1394/ieee1394service.h"
32 #include "libieee1394/IsoHandlerManager.h"
33 #include "libieee1394/cycletimer.h"
34
35 #include "libutil/Time.h"
36
37 #include "libutil/Atomic.h"
38
39 #include <assert.h>
40 #include <math.h>
41
42 #define SIGNAL_ACTIVITY { \
43     pthread_mutex_lock(&m_activity_cond_lock); \
44     pthread_cond_broadcast(&m_activity_cond); \
45     pthread_mutex_unlock(&m_activity_cond_lock); \
46 }
47
48 namespace Streaming {
49
50 IMPL_DEBUG_MODULE( StreamProcessor, StreamProcessor, DEBUG_LEVEL_VERBOSE );
51
52 StreamProcessor::StreamProcessor(FFADODevice &parent, enum eProcessorType type)
53     : m_processor_type ( type )
54     , m_state( ePS_Created )
55     , m_next_state( ePS_Invalid )
56     , m_cycle_to_switch_state( 0 )
57     , m_Parent( parent )
58     , m_1394service( parent.get1394Service() ) // local cache
59     , m_IsoHandlerManager( parent.get1394Service().getIsoHandlerManager() ) // local cache
60     , m_StreamProcessorManager( m_Parent.getDeviceManager().getStreamProcessorManager() ) // local cache
61     , m_local_node_id ( 0 ) // local cache
62     , m_channel( -1 )
63     , m_dropped( 0 )
64     , m_last_timestamp( 0 )
65     , m_last_timestamp2( 0 )
66     , m_correct_last_timestamp( false )
67     , m_scratch_buffer( NULL )
68     , m_scratch_buffer_size_bytes( 0 )
69     , m_ticks_per_frame( 0 )
70     , m_last_cycle( -1 )
71     , m_sync_delay( 0 )
72     , m_in_xrun( false )
73 {
74     // create the timestamped buffer and register ourselves as its client
75     m_data_buffer = new Util::TimestampedBuffer(this);
76     pthread_mutex_init(&m_activity_cond_lock, NULL);
77     pthread_cond_init(&m_activity_cond, NULL);
78 }
79
80 StreamProcessor::~StreamProcessor() {
81     m_StreamProcessorManager.unregisterProcessor(this);
82     if(!m_IsoHandlerManager.unregisterStream(this)) {
83         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister stream processor with the Iso manager\n");
84     }
85
86     // lock the condition mutex to keep threads from blocking on
87     // the condition var while we destroy it
88     pthread_mutex_lock(&m_activity_cond_lock);
89     // now signal activity, releasing threads that
90     // are already blocking on the condition variable
91     pthread_cond_broadcast(&m_activity_cond);
92     // then destroy it
93     pthread_cond_destroy(&m_activity_cond);
94     pthread_mutex_unlock(&m_activity_cond_lock);
95
96     // destroy the mutexes
97     pthread_mutex_destroy(&m_activity_cond_lock);
98     if (m_data_buffer) delete m_data_buffer;
99     if (m_scratch_buffer) delete[] m_scratch_buffer;
100 }
101
102 uint64_t StreamProcessor::getTimeNow() {
103     return m_1394service.getCycleTimerTicks();
104 }
105
106 int StreamProcessor::getMaxFrameLatency() {
107     if (getType() == ePT_Receive) {
108         return (int)(m_IsoHandlerManager.getPacketLatencyForStream( this ) * TICKS_PER_CYCLE);
109     } else {
110         return (int)(m_IsoHandlerManager.getPacketLatencyForStream( this ) * TICKS_PER_CYCLE);
111     }
112 }
113
114 unsigned int
115 StreamProcessor::getNominalPacketsNeeded(unsigned int nframes)
116 {
117     unsigned int nominal_frames_per_second
118                     = m_StreamProcessorManager.getNominalRate();
119     uint64_t nominal_ticks_per_frame = TICKS_PER_SECOND / nominal_frames_per_second;
120     uint64_t nominal_ticks = nominal_ticks_per_frame * nframes;
121     uint64_t nominal_packets = nominal_ticks / TICKS_PER_CYCLE;
122     return nominal_packets;
123 }
124
125 unsigned int
126 StreamProcessor::getPacketsPerPeriod()
127 {
128     return getNominalPacketsNeeded(m_StreamProcessorManager.getPeriodSize());
129 }
130
131 unsigned int
132 StreamProcessor::getNbPacketsIsoXmitBuffer()
133 {
134 #if ISOHANDLER_PER_HANDLER_THREAD
135     // if we use one thread per packet, we can put every frame directly into the ISO buffer
136     // the waitForClient in IsoHandler will take care of the fact that the frames are
137     // not present in time
138     unsigned int packets_to_prebuffer = (getPacketsPerPeriod() * (m_StreamProcessorManager.getNbBuffers())) + 10;
139     debugOutput(DEBUG_LEVEL_VERBOSE, "Nominal prebuffer: %u\n", packets_to_prebuffer);
140     return packets_to_prebuffer;
141 #else
142     // the target is to have all of the transmit buffer (at period transfer) as ISO packets
143     // when one period is received, there will be approx (NbBuffers - 1) * period_size frames
144     // in the transmit buffer (the others are still to be put into the xmit frame buffer)
145     unsigned int packets_to_prebuffer = (getPacketsPerPeriod() * (m_StreamProcessorManager.getNbBuffers()-1));
146     debugOutput(DEBUG_LEVEL_VERBOSE, "Nominal prebuffer: %u\n", packets_to_prebuffer);
147    
148     // however we have to take into account the fact that there is some sync delay
149     // we assume that the SPM has indicated
150     // HACK: this counts on the fact that the latency for this stream will be the same as the
151     //       latency for the receive sync source
152     unsigned int est_sync_delay = getPacketsPerPeriod() / MINIMUM_INTERRUPTS_PER_PERIOD;
153     est_sync_delay += STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS / TICKS_PER_CYCLE;
154     packets_to_prebuffer -= est_sync_delay;
155     debugOutput(DEBUG_LEVEL_VERBOSE, " correct for sync delay (%d): %u\n",
156                                      est_sync_delay,
157                                      packets_to_prebuffer);
158    
159     // only queue a part of the theoretical max in order not to have too much 'not ready' cycles
160     packets_to_prebuffer = (packets_to_prebuffer * MAX_ISO_XMIT_BUFFER_FILL_PCT * 1000) / 100000;
161     debugOutput(DEBUG_LEVEL_VERBOSE, " reduce to %d%%: %u\n",
162                                      MAX_ISO_XMIT_BUFFER_FILL_PCT, packets_to_prebuffer);
163    
164     return packets_to_prebuffer;
165 #endif
166 }
167
168 /***********************************************
169  * Buffer management and manipulation          *
170  ***********************************************/
171 void StreamProcessor::flush() {
172     m_IsoHandlerManager.flushHandlerForStream(this);
173 }
174
175 int StreamProcessor::getBufferFill() {
176     return m_data_buffer->getBufferFill();
177 }
178
179 int64_t
180 StreamProcessor::getTimeUntilNextPeriodSignalUsecs()
181 {
182     uint64_t time_at_period=getTimeAtPeriod();
183
184     // we delay the period signal with the sync delay
185     // this makes that the period signals lag a little compared to reality
186     // ISO buffering causes the packets to be received at max
187     // m_handler->getWakeupInterval() later than the time they were received.
188     // hence their payload is available this amount of time later. However, the
189     // period boundary is predicted based upon earlier samples, and therefore can
190     // pass before these packets are processed. Adding this extra term makes that
191     // the period boundary is signalled later
192     time_at_period = addTicks(time_at_period, m_StreamProcessorManager.getSyncSource().getSyncDelay());
193
194     uint64_t cycle_timer=m_1394service.getCycleTimerTicks();
195
196     // calculate the time until the next period
197     int32_t until_next=diffTicks(time_at_period,cycle_timer);
198
199     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> TAP=%11llu, CTR=%11llu, UTN=%11ld\n",
200         time_at_period, cycle_timer, until_next
201         );
202
203     // now convert to usecs
204     // don't use the mapping function because it only works
205     // for absolute times, not the relative time we are
206     // using here (which can also be negative).
207     return (int64_t)(((float)until_next) / TICKS_PER_USEC);
208 }
209
210 void
211 StreamProcessor::setSyncDelay(unsigned int d) {
212     unsigned int frames = d / getTicksPerFrame();
213     debugOutput(DEBUG_LEVEL_VERBOSE, "Setting SP %p SyncDelay to %u ticks, %u frames\n", this, d, frames);
214     m_sync_delay = d; // FIXME: sync delay not necessary anymore
215 }
216
217 uint64_t
218 StreamProcessor::getTimeAtPeriodUsecs()
219 {
220     return (uint64_t)((float)getTimeAtPeriod() * TICKS_PER_USEC);
221 }
222
223 uint64_t
224 StreamProcessor::getTimeAtPeriod()
225 {
226     if (getType() == ePT_Receive) {
227         ffado_timestamp_t next_period_boundary=m_data_buffer->getTimestampFromHead(m_StreamProcessorManager.getPeriodSize());
228    
229         #ifdef DEBUG
230         ffado_timestamp_t ts;
231         signed int fc;
232         m_data_buffer->getBufferTailTimestamp(&ts,&fc);
233    
234         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD="TIMESTAMP_FORMAT_SPEC", LTS="TIMESTAMP_FORMAT_SPEC", FC=%5u, TPF=%f\n",
235             next_period_boundary, ts, fc, getTicksPerFrame()
236             );
237         #endif
238         return (uint64_t)next_period_boundary;
239     } else {
240         ffado_timestamp_t next_period_boundary=m_data_buffer->getTimestampFromTail((m_StreamProcessorManager.getNbBuffers()-1) * m_StreamProcessorManager.getPeriodSize());
241    
242         #ifdef DEBUG
243         ffado_timestamp_t ts;
244         signed int fc;
245         m_data_buffer->getBufferTailTimestamp(&ts,&fc);
246    
247         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD="TIMESTAMP_FORMAT_SPEC", LTS="TIMESTAMP_FORMAT_SPEC", FC=%5u, TPF=%f\n",
248             next_period_boundary, ts, fc, getTicksPerFrame()
249             );
250         #endif
251         return (uint64_t)next_period_boundary;
252     }
253 }
254
255 float
256 StreamProcessor::getTicksPerFrame()
257 {
258     assert(m_data_buffer != NULL);
259     return m_data_buffer->getRate();
260 }
261
262 bool
263 StreamProcessor::canClientTransferFrames(unsigned int nbframes)
264 {
265     bool can_transfer;
266     unsigned int fc = m_data_buffer->getFrameCounter();
267     if (getType() == ePT_Receive) {
268         can_transfer = (fc >= nbframes);
269     } else {
270         // there has to be enough space to put the frames in
271         can_transfer = m_data_buffer->getBufferSize() - fc > nbframes;
272         // or the buffer is transparent
273         can_transfer |= m_data_buffer->isTransparent();
274     }
275    
276     #ifdef DEBUG
277     if (!can_transfer) {
278         debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) cannot transfer since fc == %u, nbframes == %u\n",
279             this, ePTToString(getType()), fc, nbframes);
280     }
281     #endif
282    
283     return can_transfer;
284 }
285
286 /***********************************************
287  * I/O API                                     *
288  ***********************************************/
289
290 // Packet transfer API
291 enum raw1394_iso_disposition
292 StreamProcessor::putPacket(unsigned char *data, unsigned int length,
293                            unsigned char channel, unsigned char tag, unsigned char sy,
294                            unsigned int cycle, unsigned int dropped) {
295 #ifdef DEBUG
296     if(m_last_cycle == -1) {
297         debugOutput(DEBUG_LEVEL_VERBOSE, "Handler for %s SP %p is alive (cycle = %u)\n", getTypeString(), this, cycle);
298     }
299 #endif
300
301     int dropped_cycles = 0;
302     if (m_last_cycle != (int)cycle && m_last_cycle != -1) {
303         dropped_cycles = diffCycles(cycle, m_last_cycle) - 1;
304         if (dropped_cycles < 0) {
305             debugWarning("(%p) dropped < 1 (%d), cycle: %d, last_cycle: %d, dropped: %d\n",
306                          this, dropped_cycles, cycle, m_last_cycle, dropped);
307         }
308         if (dropped_cycles > 0) {
309             debugWarning("(%p) dropped %d packets on cycle %u, 'dropped'=%u, cycle=%d, m_last_cycle=%d\n",
310                 this, dropped_cycles, cycle, dropped, cycle, m_last_cycle);
311             m_dropped += dropped_cycles;
312             m_last_cycle = cycle;
313         }
314     }
315     m_last_cycle = cycle;
316
317     // bypass based upon state
318     if (m_state == ePS_Invalid) {
319         debugError("Should not have state %s\n", ePSToString(m_state) );
320         return RAW1394_ISO_ERROR;
321     }
322     if (m_state == ePS_Created) {
323         return RAW1394_ISO_DEFER;
324     }
325
326     // store the previous timestamp
327     m_last_timestamp2 = m_last_timestamp;
328
329     // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles)
330     //       it happens on the first 'good' cycle for the wait condition
331     //       or on the first received cycle that is received afterwards (might be a problem)
332
333     // check whether we are waiting for a stream to be disabled
334     if(m_state == ePS_WaitingForStreamDisable) {
335         // we then check whether we have to switch on this cycle
336         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
337             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n");
338             m_next_state = ePS_DryRunning;
339             if (!updateState()) { // we are allowed to change the state directly
340                 debugError("Could not update state!\n");
341                 return RAW1394_ISO_ERROR;
342             }
343         } else {
344             // not time to disable yet
345         }
346         // the received data can be discarded while waiting for the stream
347         // to be disabled
348         // similarly for dropped packets
349         return RAW1394_ISO_OK;
350     }
351
352     // check whether we are waiting for a stream to be enabled
353     else if(m_state == ePS_WaitingForStreamEnable) {
354         // we then check whether we have to switch on this cycle
355         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
356             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n");
357             m_next_state = ePS_Running;
358             if (!updateState()) { // we are allowed to change the state directly
359                 debugError("Could not update state!\n");
360                 return RAW1394_ISO_ERROR;
361             }
362         } else {
363             // not time to enable yet
364         }
365         // we are dryRunning hence data should be processed in any case
366     }
367
368     // check the packet header
369     enum eChildReturnValue result = processPacketHeader(data, length, channel, tag, sy, cycle, dropped_cycles);
370
371     // handle dropped cycles
372     if(dropped_cycles) {
373         // make sure the last_timestamp is corrected
374         m_correct_last_timestamp = true;
375         if (m_state == ePS_Running) {
376             // this is an xrun situation
377             m_in_xrun = true;
378             debugWarning("Should update state to WaitingForStreamDisable due to dropped packet xrun\n");
379             m_cycle_to_switch_state = cycle + 1; // switch in the next cycle
380             m_next_state = ePS_WaitingForStreamDisable;
381             // execute the requested change
382             if (!updateState()) { // we are allowed to change the state directly
383                 debugError("Could not update state!\n");
384                 return RAW1394_ISO_ERROR;
385             }
386         }
387     }
388
389     if (result == eCRV_OK) {
390         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "RECV: CY=%04u TS=%011llu\n",
391                 cycle, m_last_timestamp);
392         // update some accounting
393         m_last_good_cycle = cycle;
394         m_last_dropped = dropped_cycles;
395
396         if(m_correct_last_timestamp) {
397             // they represent a discontinuity in the timestamps, and hence are
398             // to be dealt with
399             debugWarning("(%p) Correcting timestamp for dropped cycles, discarding packet...\n", this);
400             m_data_buffer->setBufferTailTimestamp(substractTicks(m_last_timestamp, getNominalFramesPerPacket() * getTicksPerFrame()));
401             m_correct_last_timestamp = false;
402         }
403
404         // check whether we are waiting for a stream to startup
405         // this requires that the packet is good
406         if(m_state == ePS_WaitingForStream) {
407             // since we have a packet with an OK header,
408             // we can indicate that the stream started up
409
410             // we then check whether we have to switch on this cycle
411             if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
412                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to good packet\n");
413                 // hence go to the dryRunning state
414                 m_next_state = ePS_DryRunning;
415                 if (!updateState()) { // we are allowed to change the state directly
416                     debugError("Could not update state!\n");
417                     return RAW1394_ISO_ERROR;
418                 }
419             } else {
420                 // not time (yet) to switch state
421             }
422             // in both cases we don't want to process the data
423             return RAW1394_ISO_OK;
424         }
425
426         // check whether a state change has been requested
427         // note that only the wait state changes are synchronized with the cycles
428         else if(m_state != m_next_state) {
429             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
430                                              ePSToString(m_state), ePSToString(m_next_state));
431             // execute the requested change
432             if (!updateState()) { // we are allowed to change the state directly
433                 debugError("Could not update state!\n");
434                 return RAW1394_ISO_ERROR;
435             }
436         }
437
438         // for all states that reach this we are allowed to
439         // do protocol specific data reception
440         enum eChildReturnValue result2 = processPacketData(data, length, channel, tag, sy, cycle, dropped_cycles);
441
442         // if an xrun occured, switch to the dryRunning state and
443         // allow for the xrun to be picked up
444         if (result2 == eCRV_XRun) {
445             debugWarning("processPacketData xrun\n");
446             m_in_xrun = true;
447             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
448             m_cycle_to_switch_state = cycle+1; // switch in the next cycle
449             m_next_state = ePS_WaitingForStreamDisable;
450             // execute the requested change
451             if (!updateState()) { // we are allowed to change the state directly
452                 debugError("Could not update state!\n");
453                 return RAW1394_ISO_ERROR;
454             }
455             return RAW1394_ISO_DEFER;
456         } else if(result2 == eCRV_OK) {
457             // no problem here
458             SIGNAL_ACTIVITY;
459             return RAW1394_ISO_OK;
460         } else {
461             debugError("Invalid response\n");
462             return RAW1394_ISO_ERROR;
463         }
464     } else if(result == eCRV_Invalid) {
465         // apparently we don't have to do anything when the packets are not valid
466         return RAW1394_ISO_OK;
467     } else {
468         debugError("Invalid response\n");
469         return RAW1394_ISO_ERROR;
470     }
471     debugError("reached the unreachable\n");
472     return RAW1394_ISO_ERROR;
473 }
474
475 enum raw1394_iso_disposition
476 StreamProcessor::getPacket(unsigned char *data, unsigned int *length,
477                            unsigned char *tag, unsigned char *sy,
478                            int cycle, unsigned int dropped, unsigned int max_length) {
479     if (cycle<0) {
480         *tag = 0;
481         *sy = 0;
482         *length = 0;
483         return RAW1394_ISO_OK;
484     }
485
486     unsigned int ctr;
487     int now_cycles;
488     int cycle_diff;
489
490 #ifdef DEBUG
491     if(m_last_cycle == -1) {
492         debugOutput(DEBUG_LEVEL_VERBOSE, "Handler for %s SP %p is alive (cycle = %d)\n", getTypeString(), this, cycle);
493     }
494 #endif
495
496     int dropped_cycles = 0;
497     if (m_last_cycle != cycle && m_last_cycle != -1) {
498         dropped_cycles = diffCycles(cycle, m_last_cycle) - 1;
499         if (dropped_cycles < 0) {
500             debugWarning("(%p) dropped < 1 (%d), cycle: %d, last_cycle: %d, dropped: %d\n",
501                          this, dropped_cycles, cycle, m_last_cycle, dropped);
502         }
503         if (dropped_cycles > 0) {
504             debugWarning("(%p) dropped %d packets on cycle %u (last_cycle=%u, dropped=%d)\n", this, dropped_cycles, cycle, m_last_cycle, dropped);
505             m_dropped += dropped_cycles;
506             // HACK: this should not be necessary, since the header generation functions should trigger the xrun.
507             //       but apparently there are some issues with the 1394 stack
508             m_in_xrun = true;
509             if(m_state == ePS_Running) {
510                 debugShowBackLogLines(200);
511                 debugWarning("dropped packets xrun\n");
512                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to dropped packets xrun\n");
513                 m_cycle_to_switch_state = cycle + 1;
514                 m_next_state = ePS_WaitingForStreamDisable;
515                 // execute the requested change
516                 if (!updateState()) { // we are allowed to change the state directly
517                     debugError("Could not update state!\n");
518                     return RAW1394_ISO_ERROR;
519                 }
520                 goto send_empty_packet;
521             }
522         }
523     }
524     if (cycle >= 0) {
525         m_last_cycle = cycle;
526     }
527
528 #ifdef DEBUG
529     // bypass based upon state
530     if (m_state == ePS_Invalid) {
531         debugError("Should not have state %s\n", ePSToString(m_state) );
532         return RAW1394_ISO_ERROR;
533     }
534 #endif
535
536     if (m_state == ePS_Created) {
537         *tag = 0;
538         *sy = 0;
539         *length = 0;
540         return RAW1394_ISO_DEFER;
541     }
542
543     // normal processing
544     // note that we can't use getCycleTimer directly here,
545     // because packets are queued in advance. This means that
546     // we the packet we are constructing will be sent out
547     // on 'cycle', not 'now'.
548     ctr = m_1394service.getCycleTimer();
549     now_cycles = (int)CYCLE_TIMER_GET_CYCLES(ctr);
550
551     // the difference between the cycle this
552     // packet is intended for and 'now'
553     cycle_diff = diffCycles(cycle, now_cycles);
554
555     if(cycle_diff < 0 && (m_state == ePS_Running || m_state == ePS_DryRunning)) {
556         debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy)\n",
557             cycle, now_cycles);
558         if(m_state == ePS_Running) {
559             debugShowBackLogLines(200);
560 //             flushDebugOutput();
561 //             assert(0);
562             debugWarning("generatePacketData xrun\n");
563             m_in_xrun = true;
564             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
565             m_cycle_to_switch_state = cycle + 1;
566             m_next_state = ePS_WaitingForStreamDisable;
567             // execute the requested change
568             if (!updateState()) { // we are allowed to change the state directly
569                 debugError("Could not update state!\n");
570                 return RAW1394_ISO_ERROR;
571             }
572             goto send_empty_packet;
573         }
574     }
575
576     // store the previous timestamp
577     m_last_timestamp2 = m_last_timestamp;
578
579     // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles)
580     //       it happens on the first 'good' cycle for the wait condition
581     //       or on the first received cycle that is received afterwards (might be a problem)
582
583     // check whether we are waiting for a stream to be disabled
584     if(m_state == ePS_WaitingForStreamDisable) {
585         // we then check whether we have to switch on this cycle
586         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
587             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n");
588             m_next_state = ePS_DryRunning;
589             if (!updateState()) { // we are allowed to change the state directly
590                 debugError("Could not update state!\n");
591                 return RAW1394_ISO_ERROR;
592             }
593         }
594         // generate the silent packet header
595         enum eChildReturnValue result = generateSilentPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
596         if (result == eCRV_Packet) {
597             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT SILENT: CY=%04u TS=%011llu\n",
598                     cycle, m_last_timestamp);
599             // update some accounting
600             m_last_good_cycle = cycle;
601             m_last_dropped = dropped_cycles;
602
603             // assumed not to xrun
604             generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
605             return RAW1394_ISO_OK;
606         } else {
607             debugError("Invalid return value: %d\n", result);
608             return RAW1394_ISO_ERROR;
609         }
610     }
611     // check whether we are waiting for a stream to be enabled
612     else if(m_state == ePS_WaitingForStreamEnable) {
613         // we then check whether we have to switch on this cycle
614         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
615             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n");
616             m_next_state = ePS_Running;
617             if (!updateState()) { // we are allowed to change the state directly
618                 debugError("Could not update state!\n");
619                 return RAW1394_ISO_ERROR;
620             }
621         } else {
622             // not time to enable yet
623         }
624         // we are dryRunning hence data should be processed in any case
625     }
626     // check whether we are waiting for a stream to startup
627     else if(m_state == ePS_WaitingForStream) {
628         // as long as the cycle parameter is not in sync with
629         // the current time, the stream is considered not
630         // to be 'running'
631         // we then check whether we have to switch on this cycle
632         if ((cycle_diff >= 0) && (diffCycles(cycle, m_cycle_to_switch_state) >= 0)) {
633             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStream to DryRunning\n");
634             // hence go to the dryRunning state
635             m_next_state = ePS_DryRunning;
636             if (!updateState()) { // we are allowed to change the state directly
637                 debugError("Could not update state!\n");
638                 return RAW1394_ISO_ERROR;
639             }
640         } else {
641             // not time (yet) to switch state
642         }
643     }
644     else if(m_state == ePS_Running) {
645         // check the packet header
646         enum eChildReturnValue result = generatePacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
647         if (result == eCRV_Packet || result == eCRV_Defer) {
648             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT: CY=%04u TS=%011llu\n",
649                     cycle, m_last_timestamp);
650             // update some accounting
651             m_last_good_cycle = cycle;
652             m_last_dropped = dropped_cycles;
653
654             // check whether a state change has been requested
655             // note that only the wait state changes are synchronized with the cycles
656             if(m_state != m_next_state) {
657                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
658                                                 ePSToString(m_state), ePSToString(m_next_state));
659                 // execute the requested change
660                 if (!updateState()) { // we are allowed to change the state directly
661                     debugError("Could not update state!\n");
662                     return RAW1394_ISO_ERROR;
663                 }
664             }
665
666             enum eChildReturnValue result2 = generatePacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
667             // if an xrun occured, switch to the dryRunning state and
668             // allow for the xrun to be picked up
669             if (result2 == eCRV_XRun) {
670                 debugWarning("generatePacketData xrun\n");
671                 m_in_xrun = true;
672                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
673                 m_cycle_to_switch_state = cycle + 1; // switch in the next cycle
674                 m_next_state = ePS_WaitingForStreamDisable;
675                 // execute the requested change
676                 if (!updateState()) { // we are allowed to change the state directly
677                     debugError("Could not update state!\n");
678                     return RAW1394_ISO_ERROR;
679                 }
680                 goto send_empty_packet;
681             }
682             // skip queueing packets if we detect that there are not enough frames
683             // available
684             if(result2 == eCRV_Defer || result == eCRV_Defer)
685                 return RAW1394_ISO_DEFER;
686             else
687                 return RAW1394_ISO_OK;
688         } else if (result == eCRV_XRun) { // pick up the possible xruns
689             debugWarning("generatePacketHeader xrun\n");
690             m_in_xrun = true;
691             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to header xrun\n");
692             m_cycle_to_switch_state = cycle + 1; // switch in the next cycle
693             m_next_state = ePS_WaitingForStreamDisable;
694             // execute the requested change
695             if (!updateState()) { // we are allowed to change the state directly
696                 debugError("Could not update state!\n");
697                 return RAW1394_ISO_ERROR;
698             }
699         } else if (result == eCRV_EmptyPacket) {
700             if(m_state != m_next_state) {
701                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
702                                                 ePSToString(m_state), ePSToString(m_next_state));
703                 // execute the requested change
704                 if (!updateState()) { // we are allowed to change the state directly
705                     debugError("Could not update state!\n");
706                     return RAW1394_ISO_ERROR;
707                 }
708             }
709             goto send_empty_packet;
710         } else if (result == eCRV_Again) {
711             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "have to retry cycle %d\n", cycle);
712             if(m_state != m_next_state) {
713                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
714                                                 ePSToString(m_state), ePSToString(m_next_state));
715                 // execute the requested change
716                 if (!updateState()) { // we are allowed to change the state directly
717                     debugError("Could not update state!\n");
718                     return RAW1394_ISO_ERROR;
719                 }
720             }
721 //             usleep(125); // only when using thread-per-handler
722 //             return RAW1394_ISO_AGAIN;
723             generateEmptyPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
724             generateEmptyPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
725             return RAW1394_ISO_DEFER;
726         } else {
727             debugError("Invalid return value: %d\n", result);
728             return RAW1394_ISO_ERROR;
729         }
730     }
731     // we are not running, so send an empty packet
732     // we should generate a valid packet any time
733 send_empty_packet:
734     // note that only the wait state changes are synchronized with the cycles
735     if(m_state != m_next_state) {
736         debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
737                                         ePSToString(m_state), ePSToString(m_next_state));
738         // execute the requested change
739         if (!updateState()) { // we are allowed to change the state directly
740             debugError("Could not update state!\n");
741             return RAW1394_ISO_ERROR;
742         }
743     }
744
745     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT EMPTY: CY=%04u\n", cycle);
746     generateEmptyPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
747     generateEmptyPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
748     return RAW1394_ISO_OK;
749 }
750
751
752 // Frame Transfer API
753 /**
754  * Transfer a block of frames from the event buffer to the port buffers
755  * @param nbframes number of frames to transfer
756  * @param ts the timestamp that the LAST frame in the block should have
757  * @return
758  */
759 bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) {
760     bool result;
761     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.getFrames(%d, %11llu)", nbframes, ts);
762     assert( getType() == ePT_Receive );
763     if(isDryRunning()) result = getFramesDry(nbframes, ts);
764     else result = getFramesWet(nbframes, ts);
765     SIGNAL_ACTIVITY;
766     return result;
767 }
768
769 bool StreamProcessor::getFramesWet(unsigned int nbframes, int64_t ts) {
770 // FIXME: this should be done somewhere else
771 #ifdef DEBUG
772     uint64_t ts_expected;
773     signed int fc;
774     int32_t lag_ticks;
775     float lag_frames;
776
777     // in order to sync up multiple received streams, we should
778     // use the ts parameter. It specifies the time of the block's
779     // last sample.
780     float srate = m_StreamProcessorManager.getSyncSource().getTicksPerFrame();
781     assert(srate != 0.0);
782     int64_t this_block_length_in_ticks = (int64_t)(((float)nbframes) * srate);
783
784     ffado_timestamp_t ts_head_tmp;
785     m_data_buffer->getBufferHeadTimestamp(&ts_head_tmp, &fc);
786     ts_expected = addTicks((uint64_t)ts_head_tmp, this_block_length_in_ticks);
787
788     lag_ticks = diffTicks(ts, ts_expected);
789     lag_frames = (((float)lag_ticks) / srate);
790     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): drifts %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
791                  this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
792     if (lag_frames >= 1.0) {
793         // the stream lags
794         debugWarning( "stream (%p): lags  with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
795                       this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
796     } else if (lag_frames <= -1.0) {
797         // the stream leads
798         debugWarning( "stream (%p): leads with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
799                       this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
800     }
801 #endif
802     // ask the buffer to process nbframes of frames
803     // using it's registered client's processReadBlock(),
804     // which should be ours
805     m_data_buffer->blockProcessReadFrames(nbframes);
806     return true;
807 }
808
809 bool StreamProcessor::getFramesDry(unsigned int nbframes, int64_t ts)
810 {
811     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): dry run %d frames (@ ts=%lld)\n",
812                  this, nbframes, ts);
813     // dry run on this side means that we put silence in all enabled ports
814     // since there is do data put into the ringbuffer in the dry-running state
815     return provideSilenceBlock(nbframes, 0);
816 }
817
818 bool
819 StreamProcessor::dropFrames(unsigned int nbframes, int64_t ts)
820 {
821     bool result;
822     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d, %lld)\n", nbframes, ts);
823     result = m_data_buffer->dropFrames(nbframes);
824     SIGNAL_ACTIVITY;
825     return result;
826 }
827
828 bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts)
829 {
830     bool result;
831     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.putFrames(%d, %11llu)", nbframes, ts);
832     assert( getType() == ePT_Transmit );
833     if(isDryRunning()) result = putFramesDry(nbframes, ts);
834     else result = putFramesWet(nbframes, ts);
835     SIGNAL_ACTIVITY;
836     return result;
837 }
838
839 bool
840 StreamProcessor::putFramesWet(unsigned int nbframes, int64_t ts)
841 {
842     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesWet(%d, %llu)\n", nbframes, ts);
843     // transfer the data
844     m_data_buffer->blockProcessWriteFrames(nbframes, ts);
845     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, " New timestamp: %llu\n", ts);
846     return true; // FIXME: what about failure?
847 }
848
849 bool
850 StreamProcessor::putFramesDry(unsigned int nbframes, int64_t ts)
851 {
852     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesDry(%d, %llu)\n", nbframes, ts);
853     // do nothing
854     return true;
855 }
856
857 bool
858 StreamProcessor::putSilenceFrames(unsigned int nbframes, int64_t ts)
859 {
860     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putSilenceFrames(%d, %llu)\n", nbframes, ts);
861
862     size_t bytes_per_frame = getEventSize() * getEventsPerFrame();
863     unsigned int scratch_buffer_size_frames = m_scratch_buffer_size_bytes / bytes_per_frame;
864
865     if (nbframes > scratch_buffer_size_frames) {
866         debugError("nframes (%u) > scratch_buffer_size_frames (%u)\n",
867                    nbframes, scratch_buffer_size_frames);
868     }
869
870     assert(m_scratch_buffer);
871     if(!transmitSilenceBlock((char *)m_scratch_buffer, nbframes, 0)) {
872         debugError("Could not prepare silent block\n");
873         return false;
874     }
875     if(!m_data_buffer->writeFrames(nbframes, (char *)m_scratch_buffer, ts)) {
876         debugError("Could not write silent block\n");
877         return false;
878     }
879
880     SIGNAL_ACTIVITY;
881     return true;
882 }
883
884 bool
885 StreamProcessor::shiftStream(int nbframes)
886 {
887     bool result;
888     if(nbframes == 0) return true;
889     if(nbframes > 0) {
890         result = m_data_buffer->dropFrames(nbframes);
891         SIGNAL_ACTIVITY;
892         return result;
893     } else {
894         result = true;
895         while(nbframes++) {
896             result &= m_data_buffer->writeDummyFrame();
897         }
898         SIGNAL_ACTIVITY;
899         return result;
900     }
901 }
902
903 /**
904  * @brief write silence events to the stream ringbuffers.
905  */
906 bool StreamProcessor::provideSilenceBlock(unsigned int nevents, unsigned int offset)
907 {
908     bool no_problem=true;
909     for ( PortVectorIterator it = m_Ports.begin();
910           it != m_Ports.end();
911           ++it ) {
912         if((*it)->isDisabled()) {continue;};
913
914         if(provideSilenceToPort((*it), offset, nevents)) {
915             debugWarning("Could not put silence into to port %s",(*it)->getName().c_str());
916             no_problem=false;
917         }
918     }
919     return no_problem;
920 }
921
922 int
923 StreamProcessor::provideSilenceToPort(Port *p, unsigned int offset, unsigned int nevents)
924 {
925     unsigned int j=0;
926     switch(p->getPortType()) {
927         default:
928             debugError("Invalid port type: %d\n", p->getPortType());
929             return -1;
930         case Port::E_Midi:
931         case Port::E_Control:
932             {
933                 quadlet_t *buffer=(quadlet_t *)(p->getBufferAddress());
934                 assert(nevents + offset <= p->getBufferSize());
935                 buffer+=offset;
936
937                 for(j = 0; j < nevents; j += 1) {
938                     *(buffer)=0;
939                     buffer++;
940                 }
941             }
942             break;
943         case Port::E_Audio:
944             switch(m_StreamProcessorManager.getAudioDataType()) {
945             case StreamProcessorManager::eADT_Int24:
946                 {
947                     quadlet_t *buffer=(quadlet_t *)(p->getBufferAddress());
948                     assert(nevents + offset <= p->getBufferSize());
949                     buffer+=offset;
950    
951                     for(j = 0; j < nevents; j += 1) {
952                         *(buffer)=0;
953                         buffer++;
954                     }
955                 }
956                 break;
957             case StreamProcessorManager::eADT_Float:
958                 {
959                     float *buffer=(float *)(p->getBufferAddress());
960                     assert(nevents + offset <= p->getBufferSize());
961                     buffer+=offset;
962    
963                     for(j = 0; j < nevents; j += 1) {
964                         *buffer = 0.0;
965                         buffer++;
966                     }
967                 }
968                 break;
969             }
970             break;
971     }
972     return 0;
973 }
974
975 /***********************************************
976  * State related API                           *
977  ***********************************************/
978 bool StreamProcessor::init()
979 {
980     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "init...\n");
981
982     if(!m_IsoHandlerManager.registerStream(this)) {
983         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register stream processor with the Iso manager\n");
984         return false;
985     }
986     if(!m_StreamProcessorManager.registerProcessor(this)) {
987         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register stream processor with the SP manager\n");
988         return false;
989     }
990
991     // initialization can be done without requesting it
992     // from the packet loop
993     m_next_state = ePS_Created;
994     return true;
995 }
996
997 bool StreamProcessor::prepare()
998 {
999     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare SP (%p)...\n", this);
1000
1001     // make the scratch buffer one period of frames long
1002     m_scratch_buffer_size_bytes = m_StreamProcessorManager.getPeriodSize() * getEventsPerFrame() * getEventSize();
1003     debugOutput( DEBUG_LEVEL_VERBOSE, " Allocate scratch buffer of %d quadlets\n");
1004     if(m_scratch_buffer) delete[] m_scratch_buffer;
1005     m_scratch_buffer = new byte_t[m_scratch_buffer_size_bytes];
1006     if(m_scratch_buffer == NULL) {
1007         debugFatal("Could not allocate scratch buffer\n");
1008         return false;
1009     }
1010
1011     // set the parameters of ports we can:
1012     // we want the audio ports to be period buffered,
1013     // and the midi ports to be packet buffered
1014     for ( PortVectorIterator it = m_Ports.begin();
1015         it != m_Ports.end();
1016         ++it )
1017     {
1018         debugOutput(DEBUG_LEVEL_VERBOSE, "Setting up port %s\n",(*it)->getName().c_str());
1019         if(!(*it)->setBufferSize(m_StreamProcessorManager.getPeriodSize())) {
1020             debugFatal("Could not set buffer size to %d\n",m_StreamProcessorManager.getPeriodSize());
1021             return false;
1022         }
1023     }
1024     // the API specific settings of the ports should already be set,
1025     // as this is called from the processorManager->prepare()
1026     // so we can init the ports
1027     if(!PortManager::initPorts()) {
1028         debugFatal("Could not initialize ports\n");
1029         return false;
1030     }
1031
1032     if (!prepareChild()) {
1033         debugFatal("Could not prepare child\n");
1034         return false;
1035     }
1036
1037     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepared for:\n");
1038     debugOutput( DEBUG_LEVEL_VERBOSE, " Samplerate: %d\n",
1039              m_StreamProcessorManager.getNominalRate());
1040     debugOutput( DEBUG_LEVEL_VERBOSE, " PeriodSize: %d, NbBuffers: %d\n",
1041              m_StreamProcessorManager.getPeriodSize(), m_StreamProcessorManager.getNbBuffers());
1042     debugOutput( DEBUG_LEVEL_VERBOSE, " Port: %d, Channel: %d\n",
1043              m_1394service.getPort(), m_channel);
1044
1045     // initialization can be done without requesting it
1046     // from the packet loop
1047     m_next_state = ePS_Stopped;
1048     return updateState();
1049 }
1050
1051 bool
1052 StreamProcessor::scheduleStateTransition(enum eProcessorState state, uint64_t time_instant)
1053 {
1054     // first set the time, since in the packet loop we first check m_state == m_next_state before
1055     // using the time
1056     m_cycle_to_switch_state = TICKS_TO_CYCLES(time_instant);
1057     m_next_state = state;
1058     // wake up any threads that might be waiting on data in the buffers
1059     // since a state transition can cause data to become available
1060     SIGNAL_ACTIVITY;
1061     return true;
1062 }
1063
1064 bool
1065 StreamProcessor::waitForState(enum eProcessorState state, unsigned int timeout_ms)
1066 {
1067     debugOutput(DEBUG_LEVEL_VERBOSE, "Waiting for state %s\n", ePSToString(state));
1068     int cnt = timeout_ms;
1069     while (m_state != state && cnt) {
1070         SleepRelativeUsec(1000);
1071         cnt--;
1072     }
1073     if(cnt==0) {
1074         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout\n");
1075         return false;
1076     }
1077     return true;
1078 }
1079
1080 bool StreamProcessor::scheduleStartDryRunning(int64_t t) {
1081     uint64_t tx;
1082     if (t < 0) {
1083         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
1084     } else {
1085         tx = t;
1086     }
1087     uint64_t start_handler_ticks = substractTicks(tx, 100 * TICKS_PER_CYCLE);
1088
1089     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1090     uint64_t now = m_1394service.getCycleTimerTicks();
1091     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1092                           now,
1093                           (unsigned int)TICKS_TO_SECS(now),
1094                           (unsigned int)TICKS_TO_CYCLES(now),
1095                           (unsigned int)TICKS_TO_OFFSET(now));
1096     debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%03us %04uc %04ut)\n",
1097                           tx,
1098                           (unsigned int)TICKS_TO_SECS(tx),
1099                           (unsigned int)TICKS_TO_CYCLES(tx),
1100                           (unsigned int)TICKS_TO_OFFSET(tx));
1101     if (m_state == ePS_Stopped) {
1102         if(!m_IsoHandlerManager.startHandlerForStream(
1103                                         this, TICKS_TO_CYCLES(start_handler_ticks))) {
1104             debugError("Could not start handler for SP %p\n", this);
1105             return false;
1106         }
1107         return scheduleStateTransition(ePS_WaitingForStream, tx);
1108     } else if (m_state == ePS_Running) {
1109         return scheduleStateTransition(ePS_WaitingForStreamDisable, tx);
1110     } else {
1111         debugError("Cannot switch to ePS_DryRunning from %s\n", ePSToString(m_state));
1112         return false;
1113     }
1114 }
1115
1116 bool StreamProcessor::scheduleStartRunning(int64_t t) {
1117     uint64_t tx;
1118     if (t < 0) {
1119         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
1120     } else {
1121         tx = t;
1122     }
1123     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1124     uint64_t now = m_1394service.getCycleTimerTicks();
1125     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1126                           now,
1127                           (unsigned int)TICKS_TO_SECS(now),
1128                           (unsigned int)TICKS_TO_CYCLES(now),
1129                           (unsigned int)TICKS_TO_OFFSET(now));
1130     debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%03us %04uc %04ut)\n",
1131                           tx,
1132                           (unsigned int)TICKS_TO_SECS(tx),
1133                           (unsigned int)TICKS_TO_CYCLES(tx),
1134                           (unsigned int)TICKS_TO_OFFSET(tx));
1135     return scheduleStateTransition(ePS_WaitingForStreamEnable, tx);
1136 }
1137
1138 bool StreamProcessor::scheduleStopDryRunning(int64_t t) {
1139     uint64_t tx;
1140     if (t < 0) {
1141         tx = addTicks(m_1394service.getCycleTimerTicks(), 2000 * TICKS_PER_CYCLE);
1142     } else {
1143         tx = t;
1144     }
1145     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1146     uint64_t now = m_1394service.getCycleTimerTicks();
1147     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1148                           now,
1149                           (unsigned int)TICKS_TO_SECS(now),
1150                           (unsigned int)TICKS_TO_CYCLES(now),
1151                           (unsigned int)TICKS_TO_OFFSET(now));
1152     debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at               : %011llu (%03us %04uc %04ut)\n",
1153                           tx,
1154                           (unsigned int)TICKS_TO_SECS(tx),
1155                           (unsigned int)TICKS_TO_CYCLES(tx),
1156                           (unsigned int)TICKS_TO_OFFSET(tx));
1157
1158     return scheduleStateTransition(ePS_Stopped, tx);
1159 }
1160
1161 bool StreamProcessor::scheduleStopRunning(int64_t t) {
1162     uint64_t tx;
1163     if (t < 0) {
1164         tx = addTicks(m_1394service.getCycleTimerTicks(), 2000 * TICKS_PER_CYCLE);
1165     } else {
1166         tx = t;
1167     }
1168     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1169     uint64_t now = m_1394service.getCycleTimerTicks();
1170     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1171                           now,
1172                           (unsigned int)TICKS_TO_SECS(now),
1173                           (unsigned int)TICKS_TO_CYCLES(now),
1174                           (unsigned int)TICKS_TO_OFFSET(now));
1175     debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at               : %011llu (%03us %04uc %04ut)\n",
1176                           tx,
1177                           (unsigned int)TICKS_TO_SECS(tx),
1178                           (unsigned int)TICKS_TO_CYCLES(tx),
1179                           (unsigned int)TICKS_TO_OFFSET(tx));
1180     return scheduleStateTransition(ePS_WaitingForStreamDisable, tx);
1181 }
1182
1183 bool StreamProcessor::startDryRunning(int64_t t) {
1184     if(!scheduleStartDryRunning(t)) {
1185         debugError("Could not schedule transition\n");
1186         return false;
1187     }
1188     if(!waitForState(ePS_DryRunning, 2000)) {
1189         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning));
1190         return false;
1191     }
1192     return true;
1193 }
1194
1195 bool StreamProcessor::startRunning(int64_t t) {
1196     if(!scheduleStartRunning(t)) {
1197         debugError("Could not schedule transition\n");
1198         return false;
1199     }
1200     if(!waitForState(ePS_Running, 2000)) {
1201         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Running));
1202         return false;
1203     }
1204     return true;
1205 }
1206
1207 bool StreamProcessor::stopDryRunning(int64_t t) {
1208     if(!scheduleStopDryRunning(t)) {
1209         debugError("Could not schedule transition\n");
1210         return false;
1211     }
1212     if(!waitForState(ePS_Stopped, 2000)) {
1213         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Stopped));
1214         return false;
1215     }
1216     return true;
1217 }
1218
1219 bool StreamProcessor::stopRunning(int64_t t) {
1220     if(!scheduleStopRunning(t)) {
1221         debugError("Could not schedule transition\n");
1222         return false;
1223     }
1224     if(!waitForState(ePS_DryRunning, 2000)) {
1225         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning));
1226         return false;
1227     }
1228     return true;
1229 }
1230
1231
1232 // internal state API
1233
1234 /**
1235  * @brief Enter the ePS_Stopped state
1236  * @return true if successful, false if not
1237  *
1238  * @pre none
1239  *
1240  * @post the buffer and the isostream are ready for use.
1241  * @post all dynamic structures have been allocated successfully
1242  * @post the buffer is transparent and empty, and all parameters are set
1243  *       to the correct initial/nominal values.
1244  *
1245  */
1246 bool
1247 StreamProcessor::doStop()
1248 {
1249     float ticks_per_frame;
1250     unsigned int ringbuffer_size_frames = (m_StreamProcessorManager.getNbBuffers() + 1) * m_StreamProcessorManager.getPeriodSize();
1251
1252     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1253     bool result = true;
1254
1255     switch(m_state) {
1256         case ePS_Created:
1257             assert(m_data_buffer);
1258
1259             // prepare the framerate estimate
1260             ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_StreamProcessorManager.getNominalRate());
1261             m_ticks_per_frame = ticks_per_frame;
1262             m_local_node_id= m_1394service.getLocalNodeId() & 0x3f;
1263             m_correct_last_timestamp = false;
1264
1265             debugOutput(DEBUG_LEVEL_VERBOSE,"Initializing remote ticks/frame to %f\n", ticks_per_frame);
1266
1267             // initialize internal buffer
1268             result &= m_data_buffer->setBufferSize(ringbuffer_size_frames);
1269
1270             result &= m_data_buffer->setEventSize( getEventSize() );
1271             result &= m_data_buffer->setEventsPerFrame( getEventsPerFrame() );
1272             if(getType() == ePT_Receive) {
1273                 result &= m_data_buffer->setUpdatePeriod( getNominalFramesPerPacket() );
1274             } else {
1275                 result &= m_data_buffer->setUpdatePeriod( m_StreamProcessorManager.getPeriodSize() );
1276             }
1277             result &= m_data_buffer->setNominalRate(ticks_per_frame);
1278             result &= m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND);
1279             result &= m_data_buffer->prepare(); // FIXME: the name
1280
1281             break;
1282         case ePS_DryRunning:
1283             if(!m_IsoHandlerManager.stopHandlerForStream(this)) {
1284                 debugError("Could not stop handler for SP %p\n", this);
1285                 return false;
1286             }
1287             break;
1288         default:
1289             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1290             return false;
1291     }
1292
1293     result &= m_data_buffer->clearBuffer(); // FIXME: don't like the reset() name
1294     // make the buffer transparent
1295     m_data_buffer->setTransparent(true);
1296
1297     // reset all ports
1298     result &= PortManager::preparePorts();
1299
1300     m_state = ePS_Stopped;
1301     #ifdef DEBUG
1302     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1303         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1304         dumpInfo();
1305     }
1306     #endif
1307     SIGNAL_ACTIVITY;
1308     return result;
1309 }
1310
1311 /**
1312  * @brief Enter the ePS_WaitingForStream state
1313  * @return true if successful, false if not
1314  *
1315  * @pre all dynamic data structures are allocated successfully
1316  *
1317  * @post
1318  *
1319  */
1320 bool
1321 StreamProcessor::doWaitForRunningStream()
1322 {
1323     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1324     switch(m_state) {
1325         case ePS_Stopped:
1326             // we have to start waiting for an incoming stream
1327             // this basically means nothing, the state change will
1328             // be picked up by the packet iterator
1329             break;
1330         default:
1331             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1332             return false;
1333     }
1334     m_state = ePS_WaitingForStream;
1335     #ifdef DEBUG
1336     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1337         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1338         dumpInfo();
1339     }
1340     #endif
1341     SIGNAL_ACTIVITY;
1342     return true;
1343 }
1344
1345 /**
1346  * @brief Enter the ePS_DryRunning state
1347  * @return true if successful, false if not
1348  *
1349  * @pre
1350  *
1351  * @post
1352  *
1353  */
1354 bool
1355 StreamProcessor::doDryRunning()
1356 {
1357     bool result = true;
1358     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1359     switch(m_state) {
1360         case ePS_WaitingForStream:
1361             // a running stream has been detected
1362             debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started dry-running at cycle %d\n", this, m_last_cycle);
1363             m_local_node_id = m_1394service.getLocalNodeId() & 0x3f;
1364             if (getType() == ePT_Receive) {
1365                 // this to ensure that there is no discontinuity when starting to
1366                 // update the DLL based upon the received packets
1367                 m_data_buffer->setBufferTailTimestamp(m_last_timestamp);
1368             } else {
1369                 // FIXME: PC=master mode will have to do something here I guess...
1370             }
1371             break;
1372         case ePS_WaitingForStreamEnable: // when xrunning at startup
1373             result &= m_data_buffer->clearBuffer();
1374             m_data_buffer->setTransparent(true);
1375             break;
1376         case ePS_WaitingForStreamDisable:
1377             result &= m_data_buffer->clearBuffer();
1378             m_data_buffer->setTransparent(true);
1379             break;
1380         default:
1381             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1382             return false;
1383     }
1384     m_state = ePS_DryRunning;
1385     #ifdef DEBUG
1386     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1387         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1388         dumpInfo();
1389     }
1390     #endif
1391     SIGNAL_ACTIVITY;
1392     return result;
1393 }
1394
1395 /**
1396  * @brief Enter the ePS_WaitingForStreamEnable state
1397  * @return true if successful, false if not
1398  *
1399  * @pre
1400  *
1401  * @post
1402  *
1403  */
1404 bool
1405 StreamProcessor::doWaitForStreamEnable()
1406 {
1407     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1408     unsigned int ringbuffer_size_frames;
1409     switch(m_state) {
1410         case ePS_DryRunning:
1411             // we have to start waiting for an incoming stream
1412             // this basically means nothing, the state change will
1413             // be picked up by the packet iterator
1414
1415             if(!m_data_buffer->clearBuffer()) {
1416                 debugError("Could not reset data buffer\n");
1417                 return false;
1418             }
1419             if (getType() == ePT_Transmit) {
1420                 ringbuffer_size_frames = m_StreamProcessorManager.getNbBuffers() * m_StreamProcessorManager.getPeriodSize();
1421                 debugOutput(DEBUG_LEVEL_VERBOSE, "Prefill transmit SP %p with %u frames\n", this, ringbuffer_size_frames);
1422                 // prefill the buffer
1423                 if(!transferSilence(ringbuffer_size_frames)) {
1424                     debugFatal("Could not prefill transmit stream\n");
1425                     return false;
1426                 }
1427             }
1428             break;
1429         default:
1430             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1431             return false;
1432     }
1433     m_state = ePS_WaitingForStreamEnable;
1434     #ifdef DEBUG
1435     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1436         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1437         dumpInfo();
1438     }
1439     #endif
1440     SIGNAL_ACTIVITY;
1441     return true;
1442 }
1443
1444 /**
1445  * @brief Enter the ePS_Running state
1446  * @return true if successful, false if not
1447  *
1448  * @pre
1449  *
1450  * @post
1451  *
1452  */
1453 bool
1454 StreamProcessor::doRunning()
1455 {
1456     bool result = true;
1457     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1458     switch(m_state) {
1459         case ePS_WaitingForStreamEnable:
1460             // a running stream has been detected
1461             debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started running at cycle %d\n",
1462                                              this, m_last_cycle);
1463             m_in_xrun = false;
1464             m_local_node_id = m_1394service.getLocalNodeId() & 0x3f;
1465             m_data_buffer->setTransparent(false);
1466             break;
1467         default:
1468             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1469             return false;
1470     }
1471     m_state = ePS_Running;
1472     #ifdef DEBUG
1473     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1474         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1475         dumpInfo();
1476     }
1477     #endif
1478     SIGNAL_ACTIVITY;
1479     return result;
1480 }
1481
1482 /**
1483  * @brief Enter the ePS_WaitingForStreamDisable state
1484  * @return true if successful, false if not
1485  *
1486  * @pre
1487  *
1488  * @post
1489  *
1490  */
1491 bool
1492 StreamProcessor::doWaitForStreamDisable()
1493 {
1494     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1495     switch(m_state) {
1496         case ePS_Running:
1497             // the thread will do the transition
1498             break;
1499         default:
1500             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1501             return false;
1502     }
1503     m_state = ePS_WaitingForStreamDisable;
1504     #ifdef DEBUG
1505     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1506         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1507         dumpInfo();
1508     }
1509     #endif
1510     SIGNAL_ACTIVITY;
1511     return true;
1512 }
1513
1514 /**
1515  * @brief Updates the state machine and calls the necessary transition functions
1516  * @return true if successful, false if not
1517  */
1518 bool StreamProcessor::updateState() {
1519     bool result = false;
1520     // copy the current state locally since it could change value,
1521     // and that's something we don't want to happen inbetween tests
1522     // if m_next_state changes during this routine, we know for sure
1523     // that the previous state change was at least attempted correctly.
1524     enum eProcessorState next_state = m_next_state;
1525
1526     debugOutput(DEBUG_LEVEL_VERBOSE, "Do state transition: %s => %s\n",
1527         ePSToString(m_state), ePSToString(next_state));
1528
1529     if (m_state == next_state) {
1530         debugWarning("ignoring identity state update from/to %s\n", ePSToString(m_state) );
1531         return true;
1532     }
1533     // after creation, only initialization is allowed
1534     if (m_state == ePS_Created) {
1535         if(next_state != ePS_Stopped) {
1536             goto updateState_exit_with_error;
1537         }
1538         // do init here
1539         result = doStop();
1540         if (result) {return true;}
1541         else goto updateState_exit_change_failed;
1542     }
1543
1544     // after initialization, only WaitingForRunningStream is allowed
1545     if (m_state == ePS_Stopped) {
1546         if(next_state != ePS_WaitingForStream) {
1547             goto updateState_exit_with_error;
1548         }
1549         result = doWaitForRunningStream();
1550         if (result) {return true;}
1551         else goto updateState_exit_change_failed;
1552     }
1553
1554     // after WaitingForStream, only ePS_DryRunning is allowed
1555     // this means that the stream started running
1556     if (m_state == ePS_WaitingForStream) {
1557         if(next_state != ePS_DryRunning) {
1558             goto updateState_exit_with_error;
1559         }
1560         result = doDryRunning();
1561         if (result) {return true;}
1562         else goto updateState_exit_change_failed;
1563     }
1564
1565     // from ePS_DryRunning we can go to:
1566     //   - ePS_Stopped if something went wrong during DryRunning
1567     //   - ePS_WaitingForStreamEnable if there is a requested to enable
1568     if (m_state == ePS_DryRunning) {
1569         if((next_state != ePS_Stopped) &&
1570            (next_state != ePS_WaitingForStreamEnable)) {
1571             goto updateState_exit_with_error;
1572         }
1573         if (next_state == ePS_Stopped) {
1574             result = doStop();
1575         } else {
1576             result = doWaitForStreamEnable();
1577         }
1578         if (result) {return true;}
1579         else goto updateState_exit_change_failed;
1580     }
1581
1582     // from ePS_WaitingForStreamEnable we can go to:
1583     //   - ePS_DryRunning if something went wrong while waiting
1584     //   - ePS_Running if the stream enabled correctly
1585     if (m_state == ePS_WaitingForStreamEnable) {
1586         if((next_state != ePS_DryRunning) &&
1587            (next_state != ePS_Running)) {
1588             goto updateState_exit_with_error;
1589         }
1590         if (next_state == ePS_Stopped) {
1591             result = doDryRunning();
1592         } else {
1593             result = doRunning();
1594         }
1595         if (result) {return true;}
1596         else goto updateState_exit_change_failed;
1597     }
1598
1599     // from ePS_Running we can only start waiting for a disabled stream
1600     if (m_state == ePS_Running) {
1601         if(next_state != ePS_WaitingForStreamDisable) {
1602             goto updateState_exit_with_error;
1603         }
1604         result = doWaitForStreamDisable();
1605         if (result) {return true;}
1606         else goto updateState_exit_change_failed;
1607     }
1608
1609     // from ePS_WaitingForStreamDisable we can go to DryRunning
1610     if (m_state == ePS_WaitingForStreamDisable) {
1611         if(next_state != ePS_DryRunning) {
1612             goto updateState_exit_with_error;
1613         }
1614         result = doDryRunning();
1615         if (result) {return true;}
1616         else goto updateState_exit_change_failed;
1617     }
1618
1619     // if we arrive here there is an error
1620 updateState_exit_with_error:
1621     debugError("Invalid state transition: %s => %s\n",
1622         ePSToString(m_state), ePSToString(next_state));
1623     SIGNAL_ACTIVITY;
1624     return false;
1625 updateState_exit_change_failed:
1626     debugError("State transition failed: %s => %s\n",
1627         ePSToString(m_state), ePSToString(next_state));
1628     SIGNAL_ACTIVITY;
1629     return false;
1630 }
1631
1632 bool StreamProcessor::waitForProducePacket()
1633 {
1634     return waitForProduce(getNominalFramesPerPacket());
1635 }
1636 bool StreamProcessor::waitForProducePeriod()
1637 {
1638     return waitForProduce(m_StreamProcessorManager.getPeriodSize());
1639 }
1640 bool StreamProcessor::waitForProduce(unsigned int nframes)
1641 {
1642     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p) wait ...\n", this);
1643     struct timespec ts;
1644     int result;
1645
1646     if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
1647         debugError("clock_gettime failed\n");
1648         return false;
1649     }
1650
1651     // FIXME: hardcoded timeout of 0.1 sec
1652     ts.tv_nsec += 100 * 1000000LL;
1653     if (ts.tv_nsec > 1000000000LL) {
1654         ts.tv_sec += 1;
1655         ts.tv_nsec -= 1000000000LL;
1656     }
1657
1658     pthread_mutex_lock(&m_activity_cond_lock);
1659     while(!canProduce(nframes)) {
1660         result = pthread_cond_timedwait(&m_activity_cond, &m_activity_cond_lock, &ts);
1661    
1662         if (result == -1) {
1663             if (errno == ETIMEDOUT) {
1664                 debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) pthread_cond_timedwait() timed out\n", this);
1665                 pthread_mutex_unlock(&m_activity_cond_lock);
1666                 return false;
1667             } else {
1668                 debugError("(%p) pthread_cond_timedwait error\n", this);
1669                 pthread_mutex_unlock(&m_activity_cond_lock);
1670                 return false;
1671             }
1672         }
1673     }
1674     pthread_mutex_unlock(&m_activity_cond_lock);
1675     return true;
1676 }
1677
1678 bool StreamProcessor::waitForConsumePacket()
1679 {
1680     return waitForConsume(getNominalFramesPerPacket());
1681 }
1682 bool StreamProcessor::waitForConsumePeriod()
1683 {
1684     return waitForConsume(m_StreamProcessorManager.getPeriodSize());
1685 }
1686 bool StreamProcessor::waitForConsume(unsigned int nframes)
1687 {
1688     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p) wait ...\n", this);
1689     struct timespec ts;
1690     int result;
1691
1692     if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
1693         debugError("clock_gettime failed\n");
1694         return false;
1695     }
1696
1697     // FIXME: hardcoded timeout of 0.1 sec
1698     ts.tv_nsec += 100 * 1000000LL;
1699     if (ts.tv_nsec > 1000000000LL) {
1700         ts.tv_sec += 1;
1701         ts.tv_nsec -= 1000000000LL;
1702     }
1703
1704     pthread_mutex_lock(&m_activity_cond_lock);
1705     while(!canConsume(nframes)) {
1706         result = pthread_cond_timedwait(&m_activity_cond, &m_activity_cond_lock, &ts);
1707    
1708         if (result == -1) {
1709             if (errno == ETIMEDOUT) {
1710                 debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) pthread_cond_timedwait() timed out\n", this);
1711                 pthread_mutex_unlock(&m_activity_cond_lock);
1712                 return false;
1713             } else {
1714                 debugError("(%p) pthread_cond_timedwait error\n", this);
1715                 pthread_mutex_unlock(&m_activity_cond_lock);
1716                 return false;
1717             }
1718         }
1719     }
1720     pthread_mutex_unlock(&m_activity_cond_lock);
1721     return true;
1722 }
1723
1724 bool StreamProcessor::canProducePacket()
1725 {
1726     return canProduce(getNominalFramesPerPacket());
1727 }
1728 bool StreamProcessor::canProducePeriod()
1729 {
1730     return canProduce(m_StreamProcessorManager.getPeriodSize());
1731 }
1732 bool StreamProcessor::canProduce(unsigned int nframes)
1733 {
1734     if(m_state == ePS_Running && m_next_state == ePS_Running) {
1735         // check whether we already fullfil the criterion
1736         unsigned int bufferspace = m_data_buffer->getBufferSpace();
1737         if(bufferspace >= nframes) {
1738 //             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "enough space (%u)...\n", bufferspace);
1739             return true;
1740         } else return false;
1741     } else {
1742         if(getType() == ePT_Transmit) {
1743             // if we are an xmit SP, we cannot accept frames
1744             // when not running
1745             return false;
1746         } else {
1747             // if we are a receive SP, we can always accept frames
1748             // when not running
1749             return true;
1750         }
1751     }
1752 }
1753
1754 bool StreamProcessor::canConsumePacket()
1755 {
1756     return canConsume(getNominalFramesPerPacket());
1757 }
1758 bool StreamProcessor::canConsumePeriod()
1759 {
1760     return canConsume(m_StreamProcessorManager.getPeriodSize());
1761 }
1762 bool StreamProcessor::canConsume(unsigned int nframes)
1763 {
1764     if(m_state == ePS_Running && m_next_state == ePS_Running) {
1765         // check whether we already fullfil the criterion
1766         unsigned int bufferfill = m_data_buffer->getBufferFill();
1767         if(bufferfill >= nframes) {
1768 //             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "enough items (%u)...\n", bufferfill);
1769             return true;
1770         } else return false;
1771     } else {
1772         if(getType() == ePT_Transmit) {
1773             // if we are an xmit SP, and we're not running,
1774             // we can always provide frames
1775             return true;
1776         } else {
1777             // if we are a receive SP, we can't provide frames
1778             // when not running
1779             return false;
1780         }
1781     }
1782 }
1783
1784 /***********************************************
1785  * Helper routines                             *
1786  ***********************************************/
1787 bool
1788 StreamProcessor::transferSilence(unsigned int nframes)
1789 {
1790     bool retval;
1791     signed int fc;
1792     ffado_timestamp_t ts_tail_tmp;
1793
1794     // prepare a buffer of silence
1795     char *dummybuffer = (char *)calloc(getEventSize(), nframes * getEventsPerFrame());
1796     transmitSilenceBlock(dummybuffer, nframes, 0);
1797
1798     m_data_buffer->getBufferTailTimestamp(&ts_tail_tmp, &fc);
1799     if (fc != 0) {
1800         debugWarning("Prefilling a buffer that already contains %d frames\n", fc);
1801     }
1802
1803     // add the silence data to the ringbuffer
1804     if(m_data_buffer->preloadFrames(nframes, dummybuffer, true)) {
1805         retval = true;
1806     } else {
1807         debugWarning("Could not write to event buffer\n");
1808         retval = false;
1809     }
1810     free(dummybuffer);
1811     return retval;
1812 }
1813
1814 /**
1815  * @brief convert a eProcessorState to a string
1816  * @param s the state
1817  * @return a char * describing the state
1818  */
1819 const char *
1820 StreamProcessor::ePSToString(enum eProcessorState s) {
1821     switch (s) {
1822         case ePS_Invalid: return "ePS_Invalid";
1823         case ePS_Created: return "ePS_Created";
1824         case ePS_Stopped: return "ePS_Stopped";
1825         case ePS_WaitingForStream: return "ePS_WaitingForStream";
1826         case ePS_DryRunning: return "ePS_DryRunning";
1827         case ePS_WaitingForStreamEnable: return "ePS_WaitingForStreamEnable";
1828         case ePS_Running: return "ePS_Running";
1829         case ePS_WaitingForStreamDisable: return "ePS_WaitingForStreamDisable";
1830         default: return "error: unknown state";
1831     }
1832 }
1833
1834 /**
1835  * @brief convert a eProcessorType to a string
1836  * @param t the type
1837  * @return a char * describing the state
1838  */
1839 const char *
1840 StreamProcessor::ePTToString(enum eProcessorType t) {
1841     switch (t) {
1842         case ePT_Receive: return "Receive";
1843         case ePT_Transmit: return "Transmit";
1844         default: return "error: unknown type";
1845     }
1846 }
1847
1848 /***********************************************
1849  * Debug                                       *
1850  ***********************************************/
1851 void
1852 StreamProcessor::dumpInfo()
1853 {
1854     debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor %p, %s:\n", this, ePTToString(m_processor_type));
1855     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Port, Channel  : %d, %d\n", m_1394service.getPort(), m_channel);
1856     uint64_t now = m_1394service.getCycleTimerTicks();
1857     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Now                   : %011llu (%03us %04uc %04ut)\n",
1858                         now,
1859                         (unsigned int)TICKS_TO_SECS(now),
1860                         (unsigned int)TICKS_TO_CYCLES(now),
1861                         (unsigned int)TICKS_TO_OFFSET(now));
1862     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Xrun?                 : %s\n", (m_in_xrun ? "True":"False"));
1863     if (m_state == m_next_state) {
1864         debugOutputShort( DEBUG_LEVEL_NORMAL, "  State                 : %s\n",
1865                                             ePSToString(m_state));
1866     } else {
1867         debugOutputShort( DEBUG_LEVEL_NORMAL, "  State                 : %s (Next: %s)\n",
1868                                               ePSToString(m_state), ePSToString(m_next_state));
1869         debugOutputShort( DEBUG_LEVEL_NORMAL, "    transition at       : %u\n", m_cycle_to_switch_state);
1870     }
1871     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer                : %p\n", m_data_buffer);
1872     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Framerate             : Nominal: %u, Sync: %f, Buffer %f\n",
1873                                           m_StreamProcessorManager.getNominalRate(),
1874                                           24576000.0/m_StreamProcessorManager.getSyncSource().m_data_buffer->getRate(),
1875                                           24576000.0/m_data_buffer->getRate());
1876     float d = getSyncDelay();
1877     debugOutputShort(DEBUG_LEVEL_NORMAL, "  Sync delay             : %f ticks (%f frames, %f cy)\n",
1878                                          d, d/getTicksPerFrame(),
1879                                          d/((float)TICKS_PER_CYCLE));
1880     m_data_buffer->dumpInfo();
1881 }
1882
1883 void
1884 StreamProcessor::setVerboseLevel(int l) {
1885     setDebugLevel(l);
1886     PortManager::setVerboseLevel(l);
1887     m_data_buffer->setVerboseLevel(l);
1888 }
1889
1890 } // end of namespace
Note: See TracBrowser for help on using the browser.