root/trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp

Revision 864, 69.8 kB (checked in by ppalmers, 15 years ago)

update license to GPLv2 or GPLv3 instead of GPLv2 or any later version. Update copyrights to reflect the new year

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "config.h"
25
26 #include "StreamProcessor.h"
27 #include "../StreamProcessorManager.h"
28
29 #include "devicemanager.h"
30
31 #include "libieee1394/ieee1394service.h"
32 #include "libieee1394/IsoHandlerManager.h"
33 #include "libieee1394/cycletimer.h"
34
35 #include "libutil/Time.h"
36
37 #include "libutil/Atomic.h"
38
39 #include <assert.h>
40 #include <math.h>
41
42 #define SIGNAL_ACTIVITY { \
43     pthread_mutex_lock(&m_activity_cond_lock); \
44     pthread_cond_broadcast(&m_activity_cond); \
45     pthread_mutex_unlock(&m_activity_cond_lock); \
46 }
47
48 namespace Streaming {
49
50 IMPL_DEBUG_MODULE( StreamProcessor, StreamProcessor, DEBUG_LEVEL_VERBOSE );
51
52 StreamProcessor::StreamProcessor(FFADODevice &parent, enum eProcessorType type)
53     : m_processor_type ( type )
54     , m_state( ePS_Created )
55     , m_next_state( ePS_Invalid )
56     , m_cycle_to_switch_state( 0 )
57     , m_Parent( parent )
58     , m_1394service( parent.get1394Service() ) // local cache
59     , m_IsoHandlerManager( parent.get1394Service().getIsoHandlerManager() ) // local cache
60     , m_StreamProcessorManager( m_Parent.getDeviceManager().getStreamProcessorManager() ) // local cache
61     , m_local_node_id ( 0 ) // local cache
62     , m_channel( -1 )
63     , m_dropped( 0 )
64     , m_last_timestamp( 0 )
65     , m_last_timestamp2( 0 )
66     , m_correct_last_timestamp( false )
67     , m_scratch_buffer( NULL )
68     , m_scratch_buffer_size_bytes( 0 )
69     , m_ticks_per_frame( 0 )
70     , m_last_cycle( -1 )
71     , m_sync_delay( 0 )
72     , m_in_xrun( false )
73 {
74     // create the timestamped buffer and register ourselves as its client
75     m_data_buffer = new Util::TimestampedBuffer(this);
76     pthread_mutex_init(&m_activity_cond_lock, NULL);
77     pthread_cond_init(&m_activity_cond, NULL);
78 }
79
80 StreamProcessor::~StreamProcessor() {
81     m_StreamProcessorManager.unregisterProcessor(this);
82     if(!m_IsoHandlerManager.unregisterStream(this)) {
83         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister stream processor with the Iso manager\n");
84     }
85
86     // lock the condition mutex to keep threads from blocking on
87     // the condition var while we destroy it
88     pthread_mutex_lock(&m_activity_cond_lock);
89     // now signal activity, releasing threads that
90     // are already blocking on the condition variable
91     pthread_cond_broadcast(&m_activity_cond);
92     // then destroy it
93     pthread_cond_destroy(&m_activity_cond);
94     pthread_mutex_unlock(&m_activity_cond_lock);
95
96     // destroy the mutexes
97     pthread_mutex_destroy(&m_activity_cond_lock);
98     if (m_data_buffer) delete m_data_buffer;
99     if (m_scratch_buffer) delete[] m_scratch_buffer;
100 }
101
102 uint64_t StreamProcessor::getTimeNow() {
103     return m_1394service.getCycleTimerTicks();
104 }
105
106 int StreamProcessor::getMaxFrameLatency() {
107     if (getType() == ePT_Receive) {
108         return (int)(m_IsoHandlerManager.getPacketLatencyForStream( this ) * TICKS_PER_CYCLE);
109     } else {
110         return (int)(m_IsoHandlerManager.getPacketLatencyForStream( this ) * TICKS_PER_CYCLE);
111     }
112 }
113
114 unsigned int
115 StreamProcessor::getNominalPacketsNeeded(unsigned int nframes)
116 {
117     unsigned int nominal_frames_per_second
118                     = m_StreamProcessorManager.getNominalRate();
119     uint64_t nominal_ticks_per_frame = TICKS_PER_SECOND / nominal_frames_per_second;
120     uint64_t nominal_ticks = nominal_ticks_per_frame * nframes;
121     uint64_t nominal_packets = nominal_ticks / TICKS_PER_CYCLE;
122     return nominal_packets;
123 }
124
125 unsigned int
126 StreamProcessor::getPacketsPerPeriod()
127 {
128     return getNominalPacketsNeeded(m_StreamProcessorManager.getPeriodSize());
129 }
130
131 unsigned int
132 StreamProcessor::getNbPacketsIsoXmitBuffer()
133 {
134 #if ISOHANDLER_PER_HANDLER_THREAD
135     // if we use one thread per packet, we can put every frame directly into the ISO buffer
136     // the waitForClient in IsoHandler will take care of the fact that the frames are
137     // not present in time
138     unsigned int packets_to_prebuffer = (getPacketsPerPeriod() * (m_StreamProcessorManager.getNbBuffers())) + 10;
139     debugOutput(DEBUG_LEVEL_VERBOSE, "Nominal prebuffer: %u\n", packets_to_prebuffer);
140     return packets_to_prebuffer;
141 #else
142     // the target is to have all of the transmit buffer (at period transfer) as ISO packets
143     // when one period is received, there will be approx (NbBuffers - 1) * period_size frames
144     // in the transmit buffer (the others are still to be put into the xmit frame buffer)
145     unsigned int packets_to_prebuffer = (getPacketsPerPeriod() * (m_StreamProcessorManager.getNbBuffers()-1));
146     debugOutput(DEBUG_LEVEL_VERBOSE, "Nominal prebuffer: %u\n", packets_to_prebuffer);
147    
148     // however we have to take into account the fact that there is some sync delay
149     // we assume that the SPM has indicated
150     // HACK: this counts on the fact that the latency for this stream will be the same as the
151     //       latency for the receive sync source
152     unsigned int est_sync_delay = getPacketsPerPeriod() / MINIMUM_INTERRUPTS_PER_PERIOD;
153     est_sync_delay += STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS / TICKS_PER_CYCLE;
154     packets_to_prebuffer -= est_sync_delay;
155     debugOutput(DEBUG_LEVEL_VERBOSE, " correct for sync delay (%d): %u\n",
156                                      est_sync_delay,
157                                      packets_to_prebuffer);
158    
159     // only queue a part of the theoretical max in order not to have too much 'not ready' cycles
160     packets_to_prebuffer = (packets_to_prebuffer * MAX_ISO_XMIT_BUFFER_FILL_PCT * 1000) / 100000;
161     debugOutput(DEBUG_LEVEL_VERBOSE, " reduce to %d%%: %u\n",
162                                      MAX_ISO_XMIT_BUFFER_FILL_PCT, packets_to_prebuffer);
163    
164     return packets_to_prebuffer;
165 #endif
166 }
167
168 /***********************************************
169  * Buffer management and manipulation          *
170  ***********************************************/
171 void StreamProcessor::flush() {
172     m_IsoHandlerManager.flushHandlerForStream(this);
173 }
174
175 int StreamProcessor::getBufferFill() {
176     return m_data_buffer->getBufferFill();
177 }
178
179 int64_t
180 StreamProcessor::getTimeUntilNextPeriodSignalUsecs()
181 {
182     uint64_t time_at_period=getTimeAtPeriod();
183
184     // we delay the period signal with the sync delay
185     // this makes that the period signals lag a little compared to reality
186     // ISO buffering causes the packets to be received at max
187     // m_handler->getWakeupInterval() later than the time they were received.
188     // hence their payload is available this amount of time later. However, the
189     // period boundary is predicted based upon earlier samples, and therefore can
190     // pass before these packets are processed. Adding this extra term makes that
191     // the period boundary is signalled later
192     time_at_period = addTicks(time_at_period, m_StreamProcessorManager.getSyncSource().getSyncDelay());
193
194     uint64_t cycle_timer=m_1394service.getCycleTimerTicks();
195
196     // calculate the time until the next period
197     int32_t until_next=diffTicks(time_at_period,cycle_timer);
198
199     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> TAP=%11llu, CTR=%11llu, UTN=%11ld\n",
200         time_at_period, cycle_timer, until_next
201         );
202
203     // now convert to usecs
204     // don't use the mapping function because it only works
205     // for absolute times, not the relative time we are
206     // using here (which can also be negative).
207     return (int64_t)(((float)until_next) / TICKS_PER_USEC);
208 }
209
210 void
211 StreamProcessor::setSyncDelay(unsigned int d) {
212     unsigned int frames = d / getTicksPerFrame();
213     debugOutput(DEBUG_LEVEL_VERBOSE, "Setting SP %p SyncDelay to %u ticks, %u frames\n", this, d, frames);
214     m_sync_delay = d; // FIXME: sync delay not necessary anymore
215 }
216
217 uint64_t
218 StreamProcessor::getTimeAtPeriodUsecs()
219 {
220     return (uint64_t)((float)getTimeAtPeriod() * TICKS_PER_USEC);
221 }
222
223 uint64_t
224 StreamProcessor::getTimeAtPeriod()
225 {
226     if (getType() == ePT_Receive) {
227         ffado_timestamp_t next_period_boundary=m_data_buffer->getTimestampFromHead(m_StreamProcessorManager.getPeriodSize());
228    
229         #ifdef DEBUG
230         ffado_timestamp_t ts;
231         signed int fc;
232         m_data_buffer->getBufferTailTimestamp(&ts,&fc);
233    
234         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD="TIMESTAMP_FORMAT_SPEC", LTS="TIMESTAMP_FORMAT_SPEC", FC=%5u, TPF=%f\n",
235             next_period_boundary, ts, fc, getTicksPerFrame()
236             );
237         #endif
238         return (uint64_t)next_period_boundary;
239     } else {
240         ffado_timestamp_t next_period_boundary=m_data_buffer->getTimestampFromTail((m_StreamProcessorManager.getNbBuffers()-1) * m_StreamProcessorManager.getPeriodSize());
241    
242         #ifdef DEBUG
243         ffado_timestamp_t ts;
244         signed int fc;
245         m_data_buffer->getBufferTailTimestamp(&ts,&fc);
246    
247         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD="TIMESTAMP_FORMAT_SPEC", LTS="TIMESTAMP_FORMAT_SPEC", FC=%5u, TPF=%f\n",
248             next_period_boundary, ts, fc, getTicksPerFrame()
249             );
250         #endif
251         return (uint64_t)next_period_boundary;
252     }
253 }
254
255 float
256 StreamProcessor::getTicksPerFrame()
257 {
258     assert(m_data_buffer != NULL);
259     return m_data_buffer->getRate();
260 }
261
262 bool
263 StreamProcessor::canClientTransferFrames(unsigned int nbframes)
264 {
265     bool can_transfer;
266     unsigned int fc = m_data_buffer->getFrameCounter();
267     if (getType() == ePT_Receive) {
268         can_transfer = (fc >= nbframes);
269     } else {
270         // there has to be enough space to put the frames in
271         can_transfer = m_data_buffer->getBufferSize() - fc > nbframes;
272         // or the buffer is transparent
273         can_transfer |= m_data_buffer->isTransparent();
274     }
275    
276     #ifdef DEBUG
277     if (!can_transfer) {
278         debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) cannot transfer since fc == %u, nbframes == %u\n",
279             this, ePTToString(getType()), fc, nbframes);
280     }
281     #endif
282    
283     return can_transfer;
284 }
285
286 /***********************************************
287  * I/O API                                     *
288  ***********************************************/
289
290 // Packet transfer API
291 enum raw1394_iso_disposition
292 StreamProcessor::putPacket(unsigned char *data, unsigned int length,
293                            unsigned char channel, unsigned char tag, unsigned char sy,
294                            unsigned int cycle, unsigned int dropped) {
295 #ifdef DEBUG
296     if(m_last_cycle == -1) {
297         debugOutput(DEBUG_LEVEL_VERBOSE, "Handler for %s SP %p is alive (cycle = %u)\n", getTypeString(), this, cycle);
298     }
299 #endif
300
301     int dropped_cycles = 0;
302     if (m_last_cycle != (int)cycle && m_last_cycle != -1) {
303         dropped_cycles = diffCycles(cycle, m_last_cycle) - 1;
304         if (dropped_cycles < 0) {
305             debugWarning("(%p) dropped < 1 (%d), cycle: %d, last_cycle: %d, dropped: %d\n",
306                          this, dropped_cycles, cycle, m_last_cycle, dropped);
307         }
308         if (dropped_cycles > 0) {
309             debugWarning("(%p) dropped %d packets on cycle %u, 'dropped'=%u, cycle=%d, m_last_cycle=%d\n",
310                 this, dropped_cycles, cycle, dropped, cycle, m_last_cycle);
311             m_dropped += dropped_cycles;
312             m_last_cycle = cycle;
313         }
314     }
315     m_last_cycle = cycle;
316
317     // bypass based upon state
318     if (m_state == ePS_Invalid) {
319         debugError("Should not have state %s\n", ePSToString(m_state) );
320         return RAW1394_ISO_ERROR;
321     }
322     if (m_state == ePS_Created) {
323         return RAW1394_ISO_DEFER;
324     }
325
326     // store the previous timestamp
327     m_last_timestamp2 = m_last_timestamp;
328
329     // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles)
330     //       it happens on the first 'good' cycle for the wait condition
331     //       or on the first received cycle that is received afterwards (might be a problem)
332
333     // check whether we are waiting for a stream to be disabled
334     if(m_state == ePS_WaitingForStreamDisable) {
335         // we then check whether we have to switch on this cycle
336         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
337             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n");
338             m_next_state = ePS_DryRunning;
339             if (!updateState()) { // we are allowed to change the state directly
340                 debugError("Could not update state!\n");
341                 return RAW1394_ISO_ERROR;
342             }
343         } else {
344             // not time to disable yet
345         }
346         // the received data can be discarded while waiting for the stream
347         // to be disabled
348         // similarly for dropped packets
349         return RAW1394_ISO_OK;
350     }
351
352     // check whether we are waiting for a stream to be enabled
353     else if(m_state == ePS_WaitingForStreamEnable) {
354         // we then check whether we have to switch on this cycle
355         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
356             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n");
357             m_next_state = ePS_Running;
358             if (!updateState()) { // we are allowed to change the state directly
359                 debugError("Could not update state!\n");
360                 return RAW1394_ISO_ERROR;
361             }
362         } else {
363             // not time to enable yet
364         }
365         // we are dryRunning hence data should be processed in any case
366     }
367
368     // check the packet header
369     enum eChildReturnValue result = processPacketHeader(data, length, channel, tag, sy, cycle, dropped_cycles);
370
371     // handle dropped cycles
372     if(dropped_cycles) {
373         // make sure the last_timestamp is corrected
374         m_correct_last_timestamp = true;
375         if (m_state == ePS_Running) {
376             // this is an xrun situation
377             m_in_xrun = true;
378             debugWarning("Should update state to WaitingForStreamDisable due to dropped packet xrun\n");
379             m_cycle_to_switch_state = cycle + 1; // switch in the next cycle
380             m_next_state = ePS_WaitingForStreamDisable;
381             // execute the requested change
382             if (!updateState()) { // we are allowed to change the state directly
383                 debugError("Could not update state!\n");
384                 return RAW1394_ISO_ERROR;
385             }
386         }
387     }
388
389     if (result == eCRV_OK) {
390         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "RECV: CY=%04u TS=%011llu\n",
391                 cycle, m_last_timestamp);
392         // update some accounting
393         m_last_good_cycle = cycle;
394         m_last_dropped = dropped_cycles;
395
396         if(m_correct_last_timestamp) {
397             // they represent a discontinuity in the timestamps, and hence are
398             // to be dealt with
399             debugWarning("(%p) Correcting timestamp for dropped cycles, discarding packet...\n", this);
400             m_data_buffer->setBufferTailTimestamp(substractTicks(m_last_timestamp, getNominalFramesPerPacket() * getTicksPerFrame()));
401             m_correct_last_timestamp = false;
402         }
403
404         // check whether we are waiting for a stream to startup
405         // this requires that the packet is good
406         if(m_state == ePS_WaitingForStream) {
407             // since we have a packet with an OK header,
408             // we can indicate that the stream started up
409
410             // we then check whether we have to switch on this cycle
411             if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
412                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to good packet\n");
413                 // hence go to the dryRunning state
414                 m_next_state = ePS_DryRunning;
415                 if (!updateState()) { // we are allowed to change the state directly
416                     debugError("Could not update state!\n");
417                     return RAW1394_ISO_ERROR;
418                 }
419             } else {
420                 // not time (yet) to switch state
421             }
422             // in both cases we don't want to process the data
423             return RAW1394_ISO_OK;
424         }
425
426         // check whether a state change has been requested
427         // note that only the wait state changes are synchronized with the cycles
428         else if(m_state != m_next_state) {
429             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
430                                              ePSToString(m_state), ePSToString(m_next_state));
431             // execute the requested change
432             if (!updateState()) { // we are allowed to change the state directly
433                 debugError("Could not update state!\n");
434                 return RAW1394_ISO_ERROR;
435             }
436         }
437
438         // for all states that reach this we are allowed to
439         // do protocol specific data reception
440         enum eChildReturnValue result2 = processPacketData(data, length, channel, tag, sy, cycle, dropped_cycles);
441
442         // if an xrun occured, switch to the dryRunning state and
443         // allow for the xrun to be picked up
444         if (result2 == eCRV_XRun) {
445             debugWarning("processPacketData xrun\n");
446             m_in_xrun = true;
447             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
448             m_cycle_to_switch_state = cycle+1; // switch in the next cycle
449             m_next_state = ePS_WaitingForStreamDisable;
450             // execute the requested change
451             if (!updateState()) { // we are allowed to change the state directly
452                 debugError("Could not update state!\n");
453                 return RAW1394_ISO_ERROR;
454             }
455             return RAW1394_ISO_DEFER;
456         } else if(result2 == eCRV_OK) {
457             // no problem here
458             SIGNAL_ACTIVITY;
459             return RAW1394_ISO_OK;
460         } else {
461             debugError("Invalid response\n");
462             return RAW1394_ISO_ERROR;
463         }
464     } else if(result == eCRV_Invalid) {
465         // apparently we don't have to do anything when the packets are not valid
466         return RAW1394_ISO_OK;
467     } else {
468         debugError("Invalid response\n");
469         return RAW1394_ISO_ERROR;
470     }
471     debugError("reached the unreachable\n");
472     return RAW1394_ISO_ERROR;
473 }
474
475 enum raw1394_iso_disposition
476 StreamProcessor::getPacket(unsigned char *data, unsigned int *length,
477                            unsigned char *tag, unsigned char *sy,
478                            int cycle, unsigned int dropped, unsigned int max_length) {
479     if (cycle<0) {
480         *tag = 0;
481         *sy = 0;
482         *length = 0;
483         return RAW1394_ISO_OK;
484     }
485
486     unsigned int ctr;
487     int now_cycles;
488     int cycle_diff;
489
490 #ifdef DEBUG
491     if(m_last_cycle == -1) {
492         debugOutput(DEBUG_LEVEL_VERBOSE, "Handler for %s SP %p is alive (cycle = %d)\n", getTypeString(), this, cycle);
493     }
494 #endif
495
496     int dropped_cycles = 0;
497     if (m_last_cycle != cycle && m_last_cycle != -1) {
498         dropped_cycles = diffCycles(cycle, m_last_cycle) - 1;
499         if (dropped_cycles < 0) {
500             debugWarning("(%p) dropped < 1 (%d), cycle: %d, last_cycle: %d, dropped: %d\n",
501                          this, dropped_cycles, cycle, m_last_cycle, dropped);
502         }
503         if (dropped_cycles > 0) {
504             debugWarning("(%p) dropped %d packets on cycle %u (last_cycle=%u, dropped=%d)\n", this, dropped_cycles, cycle, m_last_cycle, dropped);
505             m_dropped += dropped_cycles;
506             // HACK: this should not be necessary, since the header generation functions should trigger the xrun.
507             //       but apparently there are some issues with the 1394 stack
508             m_in_xrun = true;
509             if(m_state == ePS_Running) {
510                 debugShowBackLogLines(200);
511                 debugWarning("dropped packets xrun\n");
512                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to dropped packets xrun\n");
513                 m_cycle_to_switch_state = cycle + 1;
514                 m_next_state = ePS_WaitingForStreamDisable;
515                 // execute the requested change
516                 if (!updateState()) { // we are allowed to change the state directly
517                     debugError("Could not update state!\n");
518                     return RAW1394_ISO_ERROR;
519                 }
520                 goto send_empty_packet;
521             }
522         }
523     }
524     if (cycle >= 0) {
525         m_last_cycle = cycle;
526     }
527
528 #ifdef DEBUG
529     // bypass based upon state
530     if (m_state == ePS_Invalid) {
531         debugError("Should not have state %s\n", ePSToString(m_state) );
532         return RAW1394_ISO_ERROR;
533     }
534 #endif
535
536     if (m_state == ePS_Created) {
537         *tag = 0;
538         *sy = 0;
539         *length = 0;
540         return RAW1394_ISO_DEFER;
541     }
542
543     // normal processing
544     // note that we can't use getCycleTimer directly here,
545     // because packets are queued in advance. This means that
546     // we the packet we are constructing will be sent out
547     // on 'cycle', not 'now'.
548     ctr = m_1394service.getCycleTimer();
549     now_cycles = (int)CYCLE_TIMER_GET_CYCLES(ctr);
550
551     // the difference between the cycle this
552     // packet is intended for and 'now'
553     cycle_diff = diffCycles(cycle, now_cycles);
554
555     if(cycle_diff < 0 && (m_state == ePS_Running || m_state == ePS_DryRunning)) {
556         debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy)\n",
557             cycle, now_cycles);
558         if(m_state == ePS_Running) {
559             debugShowBackLogLines(200);
560 //             flushDebugOutput();
561 //             assert(0);
562             debugWarning("generatePacketData xrun\n");
563             m_in_xrun = true;
564             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
565             m_cycle_to_switch_state = cycle + 1;
566             m_next_state = ePS_WaitingForStreamDisable;
567             // execute the requested change
568             if (!updateState()) { // we are allowed to change the state directly
569                 debugError("Could not update state!\n");
570                 return RAW1394_ISO_ERROR;
571             }
572             goto send_empty_packet;
573         }
574     }
575
576     // store the previous timestamp
577     m_last_timestamp2 = m_last_timestamp;
578
579     // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles)
580     //       it happens on the first 'good' cycle for the wait condition
581     //       or on the first received cycle that is received afterwards (might be a problem)
582
583     // check whether we are waiting for a stream to be disabled
584     if(m_state == ePS_WaitingForStreamDisable) {
585         // we then check whether we have to switch on this cycle
586         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
587             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n");
588             m_next_state = ePS_DryRunning;
589             if (!updateState()) { // we are allowed to change the state directly
590                 debugError("Could not update state!\n");
591                 return RAW1394_ISO_ERROR;
592             }
593         }
594         // generate the silent packet header
595         enum eChildReturnValue result = generateSilentPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
596         if (result == eCRV_Packet) {
597             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT SILENT: CY=%04u TS=%011llu\n",
598                     cycle, m_last_timestamp);
599             // update some accounting
600             m_last_good_cycle = cycle;
601             m_last_dropped = dropped_cycles;
602
603             // assumed not to xrun
604             generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
605             return RAW1394_ISO_OK;
606         } else {
607             debugError("Invalid return value: %d\n", result);
608             return RAW1394_ISO_ERROR;
609         }
610     }
611     // check whether we are waiting for a stream to be enabled
612     else if(m_state == ePS_WaitingForStreamEnable) {
613         // we then check whether we have to switch on this cycle
614         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
615             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n");
616             m_next_state = ePS_Running;
617             if (!updateState()) { // we are allowed to change the state directly
618                 debugError("Could not update state!\n");
619                 return RAW1394_ISO_ERROR;
620             }
621         } else {
622             // not time to enable yet
623         }
624         // we are dryRunning hence data should be processed in any case
625     }
626     // check whether we are waiting for a stream to startup
627     else if(m_state == ePS_WaitingForStream) {
628         // as long as the cycle parameter is not in sync with
629         // the current time, the stream is considered not
630         // to be 'running'
631         // we then check whether we have to switch on this cycle
632         if ((cycle_diff >= 0) && (diffCycles(cycle, m_cycle_to_switch_state) >= 0)) {
633             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStream to DryRunning\n");
634             // hence go to the dryRunning state
635             m_next_state = ePS_DryRunning;
636             if (!updateState()) { // we are allowed to change the state directly
637                 debugError("Could not update state!\n");
638                 return RAW1394_ISO_ERROR;
639             }
640         } else {
641             // not time (yet) to switch state
642         }
643     }
644     else if(m_state == ePS_Running) {
645         // check the packet header
646         enum eChildReturnValue result = generatePacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
647         if (result == eCRV_Packet || result == eCRV_Defer) {
648             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT: CY=%04u TS=%011llu\n",
649                     cycle, m_last_timestamp);
650             // update some accounting
651             m_last_good_cycle = cycle;
652             m_last_dropped = dropped_cycles;
653
654             // check whether a state change has been requested
655             // note that only the wait state changes are synchronized with the cycles
656             if(m_state != m_next_state) {
657                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
658                                                 ePSToString(m_state), ePSToString(m_next_state));
659                 // execute the requested change
660                 if (!updateState()) { // we are allowed to change the state directly
661                     debugError("Could not update state!\n");
662                     return RAW1394_ISO_ERROR;
663                 }
664             }
665
666             enum eChildReturnValue result2 = generatePacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
667             // if an xrun occured, switch to the dryRunning state and
668             // allow for the xrun to be picked up
669             if (result2 == eCRV_XRun) {
670                 debugWarning("generatePacketData xrun\n");
671                 m_in_xrun = true;
672                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
673                 m_cycle_to_switch_state = cycle + 1; // switch in the next cycle
674                 m_next_state = ePS_WaitingForStreamDisable;
675                 // execute the requested change
676                 if (!updateState()) { // we are allowed to change the state directly
677                     debugError("Could not update state!\n");
678                     return RAW1394_ISO_ERROR;
679                 }
680                 goto send_empty_packet;
681             }
682             // skip queueing packets if we detect that there are not enough frames
683             // available
684             if(result2 == eCRV_Defer || result == eCRV_Defer)
685                 return RAW1394_ISO_DEFER;
686             else
687                 return RAW1394_ISO_OK;
688         } else if (result == eCRV_XRun) { // pick up the possible xruns
689             debugWarning("generatePacketHeader xrun\n");
690             m_in_xrun = true;
691             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to header xrun\n");
692             m_cycle_to_switch_state = cycle + 1; // switch in the next cycle
693             m_next_state = ePS_WaitingForStreamDisable;
694             // execute the requested change
695             if (!updateState()) { // we are allowed to change the state directly
696                 debugError("Could not update state!\n");
697                 return RAW1394_ISO_ERROR;
698             }
699         } else if (result == eCRV_EmptyPacket) {
700             if(m_state != m_next_state) {
701                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
702                                                 ePSToString(m_state), ePSToString(m_next_state));
703                 // execute the requested change
704                 if (!updateState()) { // we are allowed to change the state directly
705                     debugError("Could not update state!\n");
706                     return RAW1394_ISO_ERROR;
707                 }
708             }
709             goto send_empty_packet;
710         } else if (result == eCRV_Again) {
711             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "have to retry cycle %d\n", cycle);
712             if(m_state != m_next_state) {
713                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
714                                                 ePSToString(m_state), ePSToString(m_next_state));
715                 // execute the requested change
716                 if (!updateState()) { // we are allowed to change the state directly
717                     debugError("Could not update state!\n");
718                     return RAW1394_ISO_ERROR;
719                 }
720             }
721 //             usleep(125); // only when using thread-per-handler
722 //             return RAW1394_ISO_AGAIN;
723             generateEmptyPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
724             generateEmptyPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
725             return RAW1394_ISO_DEFER;
726         } else {
727             debugError("Invalid return value: %d\n", result);
728             return RAW1394_ISO_ERROR;
729         }
730     }
731     // we are not running, so send an empty packet
732     // we should generate a valid packet any time
733 send_empty_packet:
734     // note that only the wait state changes are synchronized with the cycles
735     if(m_state != m_next_state) {
736         debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
737                                         ePSToString(m_state), ePSToString(m_next_state));
738         // execute the requested change
739         if (!updateState()) { // we are allowed to change the state directly
740             debugError("Could not update state!\n");
741             return RAW1394_ISO_ERROR;
742         }
743     }
744
745     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT EMPTY: CY=%04u\n", cycle);
746     generateEmptyPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
747     generateEmptyPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
748     return RAW1394_ISO_OK;
749 }
750
751
752 // Frame Transfer API
753 /**
754  * Transfer a block of frames from the event buffer to the port buffers
755  * @param nbframes number of frames to transfer
756  * @param ts the timestamp that the LAST frame in the block should have
757  * @return
758  */
759 bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) {
760     bool result;
761     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.getFrames(%d, %11llu)", nbframes, ts);
762     assert( getType() == ePT_Receive );
763     if(isDryRunning()) result = getFramesDry(nbframes, ts);
764     else result = getFramesWet(nbframes, ts);
765     SIGNAL_ACTIVITY;
766     return result;
767 }
768
769 bool StreamProcessor::getFramesWet(unsigned int nbframes, int64_t ts) {
770 // FIXME: this should be done somewhere else
771 #ifdef DEBUG
772     uint64_t ts_expected;
773     signed int fc;
774     int32_t lag_ticks;
775     float lag_frames;
776
777     // in order to sync up multiple received streams, we should
778     // use the ts parameter. It specifies the time of the block's
779     // last sample.
780     float srate = m_StreamProcessorManager.getSyncSource().getTicksPerFrame();
781     assert(srate != 0.0);
782     int64_t this_block_length_in_ticks = (int64_t)(((float)nbframes) * srate);
783
784     ffado_timestamp_t ts_head_tmp;
785     m_data_buffer->getBufferHeadTimestamp(&ts_head_tmp, &fc);
786     ts_expected = addTicks((uint64_t)ts_head_tmp, this_block_length_in_ticks);
787
788     lag_ticks = diffTicks(ts, ts_expected);
789     lag_frames = (((float)lag_ticks) / srate);
790     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): drifts %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
791                  this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
792     if (lag_frames >= 1.0) {
793         // the stream lags
794         debugWarning( "stream (%p): lags  with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
795                       this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
796     } else if (lag_frames <= -1.0) {
797         // the stream leads
798         debugWarning( "stream (%p): leads with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
799                       this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
800     }
801 #endif
802     // ask the buffer to process nbframes of frames
803     // using it's registered client's processReadBlock(),
804     // which should be ours
805     m_data_buffer->blockProcessReadFrames(nbframes);
806     return true;
807 }
808
809 bool StreamProcessor::getFramesDry(unsigned int nbframes, int64_t ts)
810 {
811     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): dry run %d frames (@ ts=%lld)\n",
812                  this, nbframes, ts);
813     // dry run on this side means that we put silence in all enabled ports
814     // since there is do data put into the ringbuffer in the dry-running state
815     return provideSilenceBlock(nbframes, 0);
816 }
817
818 bool
819 StreamProcessor::dropFrames(unsigned int nbframes, int64_t ts)
820 {
821     bool result;
822     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d, %lld)\n", nbframes, ts);
823     result = m_data_buffer->dropFrames(nbframes);
824     SIGNAL_ACTIVITY;
825     return result;
826 }
827
828 bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts)
829 {
830     bool result;
831     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.putFrames(%d, %11llu)", nbframes, ts);
832     assert( getType() == ePT_Transmit );
833     if(isDryRunning()) result = putFramesDry(nbframes, ts);
834     else result = putFramesWet(nbframes, ts);
835     SIGNAL_ACTIVITY;
836     return result;
837 }
838
839 bool
840 StreamProcessor::putFramesWet(unsigned int nbframes, int64_t ts)
841 {
842     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesWet(%d, %llu)\n", nbframes, ts);
843     // transfer the data
844     m_data_buffer->blockProcessWriteFrames(nbframes, ts);
845     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, " New timestamp: %llu\n", ts);
846     return true; // FIXME: what about failure?
847 }
848
849 bool
850 StreamProcessor::putFramesDry(unsigned int nbframes, int64_t ts)
851 {
852     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesDry(%d, %llu)\n", nbframes, ts);
853     // do nothing
854     return true;
855 }
856
857 bool
858 StreamProcessor::putSilenceFrames(unsigned int nbframes, int64_t ts)
859 {
860     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putSilenceFrames(%d, %llu)\n", nbframes, ts);
861
862     size_t bytes_per_frame = getEventSize() * getEventsPerFrame();
863     unsigned int scratch_buffer_size_frames = m_scratch_buffer_size_bytes / bytes_per_frame;
864
865     if (nbframes > scratch_buffer_size_frames) {
866         debugError("nframes (%u) > scratch_buffer_size_frames (%u)\n",
867                    nbframes, scratch_buffer_size_frames);
868     }
869
870     assert(m_scratch_buffer);
871     if(!transmitSilenceBlock((char *)m_scratch_buffer, nbframes, 0)) {
872         debugError("Could not prepare silent block\n");
873         return false;
874     }
875     if(!m_data_buffer->writeFrames(nbframes, (char *)m_scratch_buffer, ts)) {
876         debugError("Could not write silent block\n");
877         return false;
878     }
879
880     SIGNAL_ACTIVITY;
881     return true;
882 }
883
884 bool
885 StreamProcessor::shiftStream(int nbframes)
886 {
887     bool result;
888     if(nbframes == 0) return true;
889     if(nbframes > 0) {
890         result = m_data_buffer->dropFrames(nbframes);
891         SIGNAL_ACTIVITY;
892         return result;
893     } else {
894         result = true;
895         while(nbframes++) {
896             result &= m_data_buffer->writeDummyFrame();
897         }
898         SIGNAL_ACTIVITY;
899         return result;
900     }
901 }
902
903 /**
904  * @brief write silence events to the stream ringbuffers.
905  */
906 bool StreamProcessor::provideSilenceBlock(unsigned int nevents, unsigned int offset)
907 {
908     bool no_problem=true;
909     for ( PortVectorIterator it = m_Ports.begin();
910           it != m_Ports.end();
911           ++it ) {
912         if((*it)->isDisabled()) {continue;};
913
914         if(provideSilenceToPort((*it), offset, nevents)) {
915             debugWarning("Could not put silence into to port %s",(*it)->getName().c_str());
916             no_problem=false;
917         }
918     }
919     return no_problem;
920 }
921
922 int
923 StreamProcessor::provideSilenceToPort(Port *p, unsigned int offset, unsigned int nevents)
924 {
925     unsigned int j=0;
926     switch(p->getPortType()) {
927         default:
928             debugError("Invalid port type: %d\n", p->getPortType());
929             return -1;
930         case Port::E_Midi:
931         case Port::E_Control:
932             {
933                 quadlet_t *buffer=(quadlet_t *)(p->getBufferAddress());
934                 assert(nevents + offset <= p->getBufferSize());
935                 buffer+=offset;
936
937                 for(j = 0; j < nevents; j += 1) {
938                     *(buffer)=0;
939                     buffer++;
940                 }
941             }
942             break;
943         case Port::E_Audio:
944             switch(m_StreamProcessorManager.getAudioDataType()) {
945             case StreamProcessorManager::eADT_Int24:
946                 {
947                     quadlet_t *buffer=(quadlet_t *)(p->getBufferAddress());
948                     assert(nevents + offset <= p->getBufferSize());
949                     buffer+=offset;
950    
951                     for(j = 0; j < nevents; j += 1) {
952                         *(buffer)=0;
953                         buffer++;
954                     }
955                 }
956                 break;
957             case StreamProcessorManager::eADT_Float:
958                 {
959                     float *buffer=(float *)(p->getBufferAddress());
960                     assert(nevents + offset <= p->getBufferSize());
961                     buffer+=offset;
962    
963                     for(j = 0; j < nevents; j += 1) {
964                         *buffer = 0.0;
965                         buffer++;
966                     }
967                 }
968                 break;
969             }
970             break;
971     }
972     return 0;
973 }
974
975 /***********************************************
976  * State related API                           *
977  ***********************************************/
978 bool StreamProcessor::init()
979 {
980     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "init...\n");
981
982     if(!m_IsoHandlerManager.registerStream(this)) {
983         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register stream processor with the Iso manager\n");
984         return false;
985     }
986     if(!m_StreamProcessorManager.registerProcessor(this)) {
987         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register stream processor with the SP manager\n");
988         return false;
989     }
990
991     // initialization can be done without requesting it
992     // from the packet loop
993     m_next_state = ePS_Created;
994     return true;
995 }
996
997 bool StreamProcessor::prepare()
998 {
999     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare SP (%p)...\n", this);
1000
1001     // make the scratch buffer one period of frames long
1002     m_scratch_buffer_size_bytes = m_StreamProcessorManager.getPeriodSize() * getEventsPerFrame() * getEventSize();
1003     debugOutput( DEBUG_LEVEL_VERBOSE, " Allocate scratch buffer of %d quadlets\n");
1004     if(m_scratch_buffer) delete[] m_scratch_buffer;
1005     m_scratch_buffer = new byte_t[m_scratch_buffer_size_bytes];
1006     if(m_scratch_buffer == NULL) {
1007         debugFatal("Could not allocate scratch buffer\n");
1008         return false;
1009     }
1010
1011     // set the parameters of ports we can:
1012     // we want the audio ports to be period buffered,
1013     // and the midi ports to be packet buffered
1014     for ( PortVectorIterator it = m_Ports.begin();
1015         it != m_Ports.end();
1016         ++it )
1017     {
1018         debugOutput(DEBUG_LEVEL_VERBOSE, "Setting up port %s\n",(*it)->getName().c_str());
1019         if(!(*it)->setBufferSize(m_StreamProcessorManager.getPeriodSize())) {
1020             debugFatal("Could not set buffer size to %d\n",m_StreamProcessorManager.getPeriodSize());
1021             return false;
1022         }
1023     }
1024     // the API specific settings of the ports should already be set,
1025     // as this is called from the processorManager->prepare()
1026     // so we can init the ports
1027     if(!PortManager::initPorts()) {
1028         debugFatal("Could not initialize ports\n");
1029         return false;
1030     }
1031
1032     if (!prepareChild()) {
1033         debugFatal("Could not prepare child\n");
1034         return false;
1035     }
1036
1037     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepared for:\n");
1038     debugOutput( DEBUG_LEVEL_VERBOSE, " Samplerate: %d\n",
1039              m_StreamProcessorManager.getNominalRate());
1040     debugOutput( DEBUG_LEVEL_VERBOSE, " PeriodSize: %d, NbBuffers: %d\n",
1041              m_StreamProcessorManager.getPeriodSize(), m_StreamProcessorManager.getNbBuffers());
1042     debugOutput( DEBUG_LEVEL_VERBOSE, " Port: %d, Channel: %d\n",
1043              m_1394service.getPort(), m_channel);
1044
1045     // initialization can be done without requesting it
1046     // from the packet loop
1047     m_next_state = ePS_Stopped;
1048     return updateState();
1049 }
1050
1051 bool
1052 StreamProcessor::scheduleStateTransition(enum eProcessorState state, uint64_t time_instant)
1053 {
1054     // first set the time, since in the packet loop we first check m_state == m_next_state before
1055     // using the time
1056     m_cycle_to_switch_state = TICKS_TO_CYCLES(time_instant);
1057     m_next_state = state;
1058     // wake up any threads that might be waiting on data in the buffers
1059     // since a state transition can cause data to become available
1060     SIGNAL_ACTIVITY;
1061     return true;
1062 }
1063
1064 bool
1065 StreamProcessor::waitForState(enum eProcessorState state, unsigned int timeout_ms)
1066 {
1067     debugOutput(DEBUG_LEVEL_VERBOSE, "Waiting for state %s\n", ePSToString(state));
1068     int cnt = timeout_ms;
1069     while (m_state != state && cnt) {
1070         SleepRelativeUsec(1000);
1071         cnt--;
1072     }
1073     if(cnt==0) {
1074         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout\n");
1075         return false;
1076     }
1077     return true;
1078 }
1079
1080 bool StreamProcessor::scheduleStartDryRunning(int64_t t) {
1081     uint64_t tx;
1082     if (t < 0) {
1083         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
1084     } else {
1085         tx = t;
1086     }
1087     uint64_t start_handler_ticks = substractTicks(tx, 100 * TICKS_PER_CYCLE);
1088
1089     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1090     uint64_t now = m_1394service.getCycleTimerTicks();
1091     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1092                           now,
1093                           (unsigned int)TICKS_TO_SECS(now),
1094                           (unsigned int)TICKS_TO_CYCLES(now),
1095                           (unsigned int)TICKS_TO_OFFSET(now));
1096     debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%03us %04uc %04ut)\n",
1097                           tx,
1098                           (unsigned int)TICKS_TO_SECS(tx),
1099                           (unsigned int)TICKS_TO_CYCLES(tx),
1100                           (unsigned int)TICKS_TO_OFFSET(tx));
1101     if (m_state == ePS_Stopped) {
1102         if(!m_IsoHandlerManager.startHandlerForStream(
1103                                         this, TICKS_TO_CYCLES(start_handler_ticks))) {
1104             debugError("Could not start handler for SP %p\n", this);
1105             return false;
1106         }
1107         return scheduleStateTransition(ePS_WaitingForStream, tx);
1108     } else if (m_state == ePS_Running) {
1109         return scheduleStateTransition(ePS_WaitingForStreamDisable, tx);
1110     } else {
1111         debugError("Cannot switch to ePS_DryRunning from %s\n", ePSToString(m_state));
1112         return false;
1113     }
1114 }
1115
1116 bool StreamProcessor::scheduleStartRunning(int64_t t) {
1117     uint64_t tx;
1118     if (t < 0) {
1119         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
1120     } else {
1121         tx = t;
1122     }
1123     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1124     uint64_t now = m_1394service.getCycleTimerTicks();
1125     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1126                           now,
1127                           (unsigned int)TICKS_TO_SECS(now),
1128                           (unsigned int)TICKS_TO_CYCLES(now),
1129                           (unsigned int)TICKS_TO_OFFSET(now));
1130     debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%03us %04uc %04ut)\n",
1131                           tx,
1132                           (unsigned int)TICKS_TO_SECS(tx),
1133                           (unsigned int)TICKS_TO_CYCLES(tx),
1134                           (unsigned int)TICKS_TO_OFFSET(tx));
1135     return scheduleStateTransition(ePS_WaitingForStreamEnable, tx);
1136 }
1137
1138 bool StreamProcessor::scheduleStopDryRunning(int64_t t) {
1139     uint64_t tx;
1140     if (t < 0) {
1141         tx = addTicks(m_1394service.getCycleTimerTicks(), 2000 * TICKS_PER_CYCLE);
1142     } else {
1143         tx = t;
1144     }
1145     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1146     uint64_t now = m_1394service.getCycleTimerTicks();
1147     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1148                           now,
1149                           (unsigned int)TICKS_TO_SECS(now),
1150                           (unsigned int)TICKS_TO_CYCLES(now),
1151                           (unsigned int)TICKS_TO_OFFSET(now));
1152     debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at               : %011llu (%03us %04uc %04ut)\n",
1153                           tx,
1154                           (unsigned int)TICKS_TO_SECS(tx),
1155                           (unsigned int)TICKS_TO_CYCLES(tx),
1156                           (unsigned int)TICKS_TO_OFFSET(tx));
1157
1158     return scheduleStateTransition(ePS_Stopped, tx);
1159 }
1160
1161 bool StreamProcessor::scheduleStopRunning(int64_t t) {
1162     uint64_t tx;
1163     if (t < 0) {
1164         tx = addTicks(m_1394service.getCycleTimerTicks(), 2000 * TICKS_PER_CYCLE);
1165     } else {
1166         tx = t;
1167     }
1168     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1169     uint64_t now = m_1394service.getCycleTimerTicks();
1170     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1171                           now,
1172                           (unsigned int)TICKS_TO_SECS(now),
1173                           (unsigned int)TICKS_TO_CYCLES(now),
1174                           (unsigned int)TICKS_TO_OFFSET(now));
1175     debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at               : %011llu (%03us %04uc %04ut)\n",
1176                           tx,
1177                           (unsigned int)TICKS_TO_SECS(tx),
1178                           (unsigned int)TICKS_TO_CYCLES(tx),
1179                           (unsigned int)TICKS_TO_OFFSET(tx));
1180     return scheduleStateTransition(ePS_WaitingForStreamDisable, tx);
1181 }
1182
1183 bool StreamProcessor::startDryRunning(int64_t t) {
1184     if(!scheduleStartDryRunning(t)) {
1185         debugError("Could not schedule transition\n");
1186         return false;
1187     }
1188     if(!waitForState(ePS_DryRunning, 2000)) {
1189         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning));
1190         return false;
1191     }
1192     return true;
1193 }
1194
1195 bool StreamProcessor::startRunning(int64_t t) {
1196     if(!scheduleStartRunning(t)) {
1197         debugError("Could not schedule transition\n");
1198         return false;
1199     }
1200     if(!waitForState(ePS_Running, 2000)) {
1201         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Running));
1202         return false;
1203     }
1204     return true;
1205 }
1206
1207 bool StreamProcessor::stopDryRunning(int64_t t) {
1208     if(!scheduleStopDryRunning(t)) {
1209         debugError("Could not schedule transition\n");
1210         return false;
1211     }
1212     if(!waitForState(ePS_Stopped, 2000)) {
1213         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Stopped));
1214         return false;
1215     }
1216     return true;
1217 }
1218
1219 bool StreamProcessor::stopRunning(int64_t t) {
1220     if(!scheduleStopRunning(t)) {
1221         debugError("Could not schedule transition\n");
1222         return false;
1223     }
1224     if(!waitForState(ePS_DryRunning, 2000)) {
1225         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning));
1226         return false;
1227     }
1228     return true;
1229 }
1230
1231
1232 // internal state API
1233
1234 /**
1235  * @brief Enter the ePS_Stopped state
1236  * @return true if successful, false if not
1237  *
1238  * @pre none
1239  *
1240  * @post the buffer and the isostream are ready for use.
1241  * @post all dynamic structures have been allocated successfully
1242  * @post the buffer is transparent and empty, and all parameters are set
1243  *       to the correct initial/nominal values.
1244  *
1245  */
1246 bool
1247 StreamProcessor::doStop()
1248 {
1249     float ticks_per_frame;
1250     unsigned int ringbuffer_size_frames = (m_StreamProcessorManager.getNbBuffers() + 1) * m_StreamProcessorManager.getPeriodSize();
1251
1252     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1253     bool result = true;
1254
1255     switch(m_state) {
1256         case ePS_Created:
1257             assert(m_data_buffer);
1258
1259             // prepare the framerate estimate
1260             ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_StreamProcessorManager.getNominalRate());
1261             m_ticks_per_frame = ticks_per_frame;
1262             m_local_node_id= m_1394service.getLocalNodeId() & 0x3f;
1263             m_correct_last_timestamp = false;
1264
1265             debugOutput(DEBUG_LEVEL_VERBOSE,"Initializing remote ticks/frame to %f\n", ticks_per_frame);
1266
1267             // initialize internal buffer
1268             result &= m_data_buffer->setBufferSize(ringbuffer_size_frames);
1269
1270             result &= m_data_buffer->setEventSize( getEventSize() );
1271             result &= m_data_buffer->setEventsPerFrame( getEventsPerFrame() );
1272             if(getType() == ePT_Receive) {
1273                 result &= m_data_buffer->setUpdatePeriod( getNominalFramesPerPacket() );
1274             } else {
1275                 result &= m_data_buffer->setUpdatePeriod( m_StreamProcessorManager.getPeriodSize() );
1276             }
1277             result &= m_data_buffer->setNominalRate(ticks_per_frame);
1278             result &= m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND);
1279             result &= m_data_buffer->prepare(); // FIXME: the name
1280
1281             break;
1282         case ePS_DryRunning:
1283             if(!m_IsoHandlerManager.stopHandlerForStream(this)) {
1284                 debugError("Could not stop handler for SP %p\n", this);
1285                 return false;
1286             }
1287             break;
1288         default:
1289             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1290             return false;
1291     }
1292
1293     result &= m_data_buffer->clearBuffer(); // FIXME: don't like the reset() name
1294     // make the buffer transparent
1295     m_data_buffer->setTransparent(true);
1296
1297     // reset all ports
1298     result &= PortManager::preparePorts();
1299
1300     m_state = ePS_Stopped;
1301     #ifdef DEBUG
1302     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1303         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1304         dumpInfo();
1305     }
1306     #endif
1307     SIGNAL_ACTIVITY;
1308     return result;
1309 }
1310
1311 /**
1312  * @brief Enter the ePS_WaitingForStream state
1313  * @return true if successful, false if not
1314  *
1315  * @pre all dynamic data structures are allocated successfully
1316  *
1317  * @post
1318  *
1319  */
1320 bool
1321 StreamProcessor::doWaitForRunningStream()
1322 {
1323     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1324     switch(m_state) {
1325         case ePS_Stopped:
1326             // we have to start waiting for an incoming stream
1327             // this basically means nothing, the state change will
1328             // be picked up by the packet iterator
1329             break;
1330         default:
1331             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1332             return false;
1333     }
1334     m_state = ePS_WaitingForStream;
1335     #ifdef DEBUG
1336     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1337         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1338         dumpInfo();
1339     }
1340     #endif
1341     SIGNAL_ACTIVITY;
1342     return true;
1343 }
1344
1345 /**
1346  * @brief Enter the ePS_DryRunning state
1347  * @return true if successful, false if not
1348  *
1349  * @pre
1350  *
1351  * @post
1352  *
1353  */
1354 bool
1355 StreamProcessor::doDryRunning()
1356 {
1357     bool result = true;
1358     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1359     switch(m_state) {
1360         case ePS_WaitingForStream:
1361             // a running stream has been detected
1362             debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started dry-running at cycle %d\n", this, m_last_cycle);
1363             m_local_node_id = m_1394service.getLocalNodeId() & 0x3f;
1364             if (getType() == ePT_Receive) {
1365                 // this to ensure that there is no discontinuity when starting to
1366                 // update the DLL based upon the received packets
1367                 m_data_buffer->setBufferTailTimestamp(m_last_timestamp);
1368             } else {
1369                 // FIXME: PC=master mode will have to do something here I guess...
1370             }
1371             break;
1372         case ePS_WaitingForStreamEnable: // when xrunning at startup
1373             result &= m_data_buffer->clearBuffer();
1374             m_data_buffer->setTransparent(true);
1375             break;
1376         case ePS_WaitingForStreamDisable:
1377             result &= m_data_buffer->clearBuffer();
1378             m_data_buffer->setTransparent(true);
1379             break;
1380         default:
1381             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1382             return false;
1383     }
1384     m_state = ePS_DryRunning;
1385     #ifdef DEBUG
1386     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1387         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1388         dumpInfo();
1389     }
1390     #endif
1391     SIGNAL_ACTIVITY;
1392     return result;
1393 }
1394
1395 /**
1396  * @brief Enter the ePS_WaitingForStreamEnable state
1397  * @return true if successful, false if not
1398  *
1399  * @pre
1400  *
1401  * @post
1402  *
1403  */
1404 bool
1405 StreamProcessor::doWaitForStreamEnable()
1406 {
1407     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1408     unsigned int ringbuffer_size_frames;
1409     switch(m_state) {
1410         case ePS_DryRunning:
1411             // we have to start waiting for an incoming stream
1412             // this basically means nothing, the state change will
1413             // be picked up by the packet iterator
1414
1415             if(!m_data_buffer->clearBuffer()) {
1416                 debugError("Could not reset data buffer\n");
1417                 return false;
1418             }
1419             if (getType() == ePT_Transmit) {
1420                 ringbuffer_size_frames = m_StreamProcessorManager.getNbBuffers() * m_StreamProcessorManager.getPeriodSize();
1421                 debugOutput(DEBUG_LEVEL_VERBOSE, "Prefill transmit SP %p with %u frames\n", this, ringbuffer_size_frames);
1422                 // prefill the buffer
1423                 if(!transferSilence(ringbuffer_size_frames)) {
1424                     debugFatal("Could not prefill transmit stream\n");
1425                     return false;
1426                 }
1427             }
1428             break;
1429         default:
1430             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1431             return false;
1432     }
1433     m_state = ePS_WaitingForStreamEnable;
1434     #ifdef DEBUG
1435     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1436         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1437         dumpInfo();
1438     }
1439     #endif
1440     SIGNAL_ACTIVITY;
1441     return true;
1442 }
1443
1444 /**
1445  * @brief Enter the ePS_Running state
1446  * @return true if successful, false if not
1447  *
1448  * @pre
1449  *
1450  * @post
1451  *
1452  */
1453 bool
1454 StreamProcessor::doRunning()
1455 {
1456     bool result = true;
1457     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1458     switch(m_state) {
1459         case ePS_WaitingForStreamEnable:
1460             // a running stream has been detected
1461             debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started running at cycle %d\n",
1462                                              this, m_last_cycle);
1463             m_in_xrun = false;
1464             m_local_node_id = m_1394service.getLocalNodeId() & 0x3f;
1465             m_data_buffer->setTransparent(false);
1466             break;
1467         default:
1468             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1469             return false;
1470     }
1471     m_state = ePS_Running;
1472     #ifdef DEBUG
1473     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1474         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1475         dumpInfo();
1476     }
1477     #endif
1478     SIGNAL_ACTIVITY;
1479     return result;
1480 }
1481
1482 /**
1483  * @brief Enter the ePS_WaitingForStreamDisable state
1484  * @return true if successful, false if not
1485  *
1486  * @pre
1487  *
1488  * @post
1489  *
1490  */
1491 bool
1492 StreamProcessor::doWaitForStreamDisable()
1493 {
1494     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1495     switch(m_state) {
1496         case ePS_Running:
1497             // the thread will do the transition
1498             break;
1499         default:
1500             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1501             return false;
1502     }
1503     m_state = ePS_WaitingForStreamDisable;
1504     #ifdef DEBUG
1505     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1506         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1507         dumpInfo();
1508     }
1509     #endif
1510     SIGNAL_ACTIVITY;
1511     return true;
1512 }
1513
1514 /**
1515  * @brief Updates the state machine and calls the necessary transition functions
1516  * @return true if successful, false if not
1517  */
1518 bool StreamProcessor::updateState() {
1519     bool result = false;
1520     // copy the current state locally since it could change value,
1521     // and that's something we don't want to happen inbetween tests
1522     // if m_next_state changes during this routine, we know for sure
1523     // that the previous state change was at least attempted correctly.
1524     enum eProcessorState next_state = m_next_state;
1525
1526     debugOutput(DEBUG_LEVEL_VERBOSE, "Do state transition: %s => %s\n",
1527         ePSToString(m_state), ePSToString(next_state));
1528
1529     if (m_state == next_state) {
1530         debugWarning("ignoring identity state update from/to %s\n", ePSToString(m_state) );
1531         return true;
1532     }
1533     // after creation, only initialization is allowed
1534     if (m_state == ePS_Created) {
1535         if(next_state != ePS_Stopped) {
1536             goto updateState_exit_with_error;
1537         }
1538         // do init here
1539         result = doStop();
1540         if (result) {return true;}
1541         else goto updateState_exit_change_failed;
1542     }
1543
1544     // after initialization, only WaitingForRunningStream is allowed
1545     if (m_state == ePS_Stopped) {
1546         if(next_state != ePS_WaitingForStream) {
1547             goto updateState_exit_with_error;
1548         }
1549         result = doWaitForRunningStream();
1550         if (result) {return true;}
1551         else goto updateState_exit_change_failed;
1552     }
1553
1554     // after WaitingForStream, only ePS_DryRunning is allowed
1555     // this means that the stream started running
1556     if (m_state == ePS_WaitingForStream) {
1557         if(next_state != ePS_DryRunning) {
1558             goto updateState_exit_with_error;
1559         }
1560         result = doDryRunning();
1561         if (result) {return true;}
1562         else goto updateState_exit_change_failed;
1563     }
1564
1565     // from ePS_DryRunning we can go to:
1566     //   - ePS_Stopped if something went wrong during DryRunning
1567     //   - ePS_WaitingForStreamEnable if there is a requested to enable
1568     if (m_state == ePS_DryRunning) {
1569         if((next_state != ePS_Stopped) &&
1570            (next_state != ePS_WaitingForStreamEnable)) {
1571             goto updateState_exit_with_error;
1572         }
1573         if (next_state == ePS_Stopped) {
1574             result = doStop();
1575         } else {
1576             result = doWaitForStreamEnable();
1577         }
1578         if (result) {return true;}
1579         else goto updateState_exit_change_failed;
1580     }
1581
1582     // from ePS_WaitingForStreamEnable we can go to:
1583     //   - ePS_DryRunning if something went wrong while waiting
1584     //   - ePS_Running if the stream enabled correctly
1585     if (m_state == ePS_WaitingForStreamEnable) {
1586         if((next_state != ePS_DryRunning) &&
1587            (next_state != ePS_Running)) {
1588             goto updateState_exit_with_error;
1589         }
1590         if (next_state == ePS_Stopped) {
1591             result = doDryRunning();
1592         } else {
1593             result = doRunning();
1594         }
1595         if (result) {return true;}
1596         else goto updateState_exit_change_failed;
1597     }
1598
1599     // from ePS_Running we can only start waiting for a disabled stream
1600     if (m_state == ePS_Running) {
1601         if(next_state != ePS_WaitingForStreamDisable) {
1602             goto updateState_exit_with_error;
1603         }
1604         result = doWaitForStreamDisable();
1605         if (result) {return true;}
1606         else goto updateState_exit_change_failed;
1607     }
1608
1609     // from ePS_WaitingForStreamDisable we can go to DryRunning
1610     if (m_state == ePS_WaitingForStreamDisable) {
1611         if(next_state != ePS_DryRunning) {
1612             goto updateState_exit_with_error;
1613         }
1614         result = doDryRunning();
1615         if (result) {return true;}
1616         else goto updateState_exit_change_failed;
1617     }
1618
1619     // if we arrive here there is an error
1620 updateState_exit_with_error:
1621     debugError("Invalid state transition: %s => %s\n",
1622         ePSToString(m_state), ePSToString(next_state));
1623     SIGNAL_ACTIVITY;
1624     return false;
1625 updateState_exit_change_failed:
1626     debugError("State transition failed: %s => %s\n",
1627         ePSToString(m_state), ePSToString(next_state));
1628     SIGNAL_ACTIVITY;
1629     return false;
1630 }
1631
1632 bool StreamProcessor::waitForProducePacket()
1633 {
1634     return waitForProduce(getNominalFramesPerPacket());
1635 }
1636 bool StreamProcessor::waitForProducePeriod()
1637 {
1638     return waitForProduce(m_StreamProcessorManager.getPeriodSize());
1639 }
1640 bool StreamProcessor::waitForProduce(unsigned int nframes)
1641 {
1642     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) wait ...\n", this, getTypeString());
1643     struct timespec ts;
1644     int result;
1645
1646     if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
1647         debugError("clock_gettime failed\n");
1648         return false;
1649     }
1650
1651     // FIXME: hardcoded timeout of 0.1 sec
1652     ts.tv_nsec += 100 * 1000000LL;
1653     if (ts.tv_nsec > 1000000000LL) {
1654         ts.tv_sec += 1;
1655         ts.tv_nsec -= 1000000000LL;
1656     }
1657
1658     pthread_mutex_lock(&m_activity_cond_lock);
1659     while(!canProduce(nframes)) {
1660         result = pthread_cond_timedwait(&m_activity_cond, &m_activity_cond_lock, &ts);
1661    
1662         if (result == -1) {
1663             if (errno == ETIMEDOUT) {
1664                 debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) pthread_cond_timedwait() timed out\n", this, getTypeString());
1665                 pthread_mutex_unlock(&m_activity_cond_lock);
1666                 return false;
1667             } else {
1668                 debugError("(%p, %s) pthread_cond_timedwait error\n", this, getTypeString());
1669                 pthread_mutex_unlock(&m_activity_cond_lock);
1670                 return false;
1671             }
1672         }
1673     }
1674     pthread_mutex_unlock(&m_activity_cond_lock);
1675     return true;
1676 }
1677
1678 bool StreamProcessor::waitForConsumePacket()
1679 {
1680     return waitForConsume(getNominalFramesPerPacket());
1681 }
1682 bool StreamProcessor::waitForConsumePeriod()
1683 {
1684     return waitForConsume(m_StreamProcessorManager.getPeriodSize());
1685 }
1686 bool StreamProcessor::waitForConsume(unsigned int nframes)
1687 {
1688     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) wait ...\n", this, getTypeString());
1689     struct timespec ts;
1690     int result;
1691
1692     if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
1693         debugError("clock_gettime failed\n");
1694         return false;
1695     }
1696
1697     // FIXME: hardcoded timeout of 0.1 sec
1698     ts.tv_nsec += 100 * 1000000LL;
1699     if (ts.tv_nsec > 1000000000LL) {
1700         ts.tv_sec += 1;
1701         ts.tv_nsec -= 1000000000LL;
1702     }
1703
1704     pthread_mutex_lock(&m_activity_cond_lock);
1705     while(!canConsume(nframes)) {
1706         result = pthread_cond_timedwait(&m_activity_cond, &m_activity_cond_lock, &ts);
1707         if (result == -1) {
1708             if (errno == ETIMEDOUT) {
1709                 debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) pthread_cond_timedwait() timed out\n", this, getTypeString());
1710                 pthread_mutex_unlock(&m_activity_cond_lock);
1711                 return false;
1712             } else {
1713                 debugError("(%p, %s) pthread_cond_timedwait error\n", this, getTypeString());
1714                 pthread_mutex_unlock(&m_activity_cond_lock);
1715                 return false;
1716             }
1717         }
1718     }
1719     pthread_mutex_unlock(&m_activity_cond_lock);
1720     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) leave ...\n", this, getTypeString());
1721     return true;
1722 }
1723
1724 bool StreamProcessor::canProducePacket()
1725 {
1726     return canProduce(getNominalFramesPerPacket());
1727 }
1728 bool StreamProcessor::canProducePeriod()
1729 {
1730     return canProduce(m_StreamProcessorManager.getPeriodSize());
1731 }
1732 bool StreamProcessor::canProduce(unsigned int nframes)
1733 {
1734     if(m_in_xrun) return true;
1735     if(m_state == ePS_Running && m_next_state == ePS_Running) {
1736         // check whether we already fullfil the criterion
1737         unsigned int bufferspace = m_data_buffer->getBufferSpace();
1738         if(bufferspace >= nframes) {
1739 //             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "enough space (%u)...\n", bufferspace);
1740             return true;
1741         } else return false;
1742     } else {
1743         if(getType() == ePT_Transmit) {
1744             // if we are an xmit SP, we cannot accept frames
1745             // when not running
1746             return false;
1747         } else {
1748             // if we are a receive SP, we can always accept frames
1749             // when not running
1750             return true;
1751         }
1752     }
1753 }
1754
1755 bool StreamProcessor::canConsumePacket()
1756 {
1757     return canConsume(getNominalFramesPerPacket());
1758 }
1759 bool StreamProcessor::canConsumePeriod()
1760 {
1761     return canConsume(m_StreamProcessorManager.getPeriodSize());
1762 }
1763 bool StreamProcessor::canConsume(unsigned int nframes)
1764 {
1765     if(m_in_xrun) return true;
1766     if(m_state == ePS_Running && m_next_state == ePS_Running) {
1767         // check whether we already fullfil the criterion
1768         unsigned int bufferfill = m_data_buffer->getBufferFill();
1769         if(bufferfill >= nframes) {
1770 //             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "enough items (%u)...\n", bufferfill);
1771             return true;
1772         } else return false;
1773     } else {
1774         if(getType() == ePT_Transmit) {
1775             // if we are an xmit SP, and we're not running,
1776             // we can always provide frames
1777             return true;
1778         } else {
1779             // if we are a receive SP, we can't provide frames
1780             // when not running
1781             return false;
1782         }
1783     }
1784 }
1785
1786 /***********************************************
1787  * Helper routines                             *
1788  ***********************************************/
1789 bool
1790 StreamProcessor::transferSilence(unsigned int nframes)
1791 {
1792     bool retval;
1793     signed int fc;
1794     ffado_timestamp_t ts_tail_tmp;
1795
1796     // prepare a buffer of silence
1797     char *dummybuffer = (char *)calloc(getEventSize(), nframes * getEventsPerFrame());
1798     transmitSilenceBlock(dummybuffer, nframes, 0);
1799
1800     m_data_buffer->getBufferTailTimestamp(&ts_tail_tmp, &fc);
1801     if (fc != 0) {
1802         debugWarning("Prefilling a buffer that already contains %d frames\n", fc);
1803     }
1804
1805     // add the silence data to the ringbuffer
1806     if(m_data_buffer->preloadFrames(nframes, dummybuffer, true)) {
1807         retval = true;
1808     } else {
1809         debugWarning("Could not write to event buffer\n");
1810         retval = false;
1811     }
1812     free(dummybuffer);
1813     return retval;
1814 }
1815
1816 /**
1817  * @brief convert a eProcessorState to a string
1818  * @param s the state
1819  * @return a char * describing the state
1820  */
1821 const char *
1822 StreamProcessor::ePSToString(enum eProcessorState s) {
1823     switch (s) {
1824         case ePS_Invalid: return "ePS_Invalid";
1825         case ePS_Created: return "ePS_Created";
1826         case ePS_Stopped: return "ePS_Stopped";
1827         case ePS_WaitingForStream: return "ePS_WaitingForStream";
1828         case ePS_DryRunning: return "ePS_DryRunning";
1829         case ePS_WaitingForStreamEnable: return "ePS_WaitingForStreamEnable";
1830         case ePS_Running: return "ePS_Running";
1831         case ePS_WaitingForStreamDisable: return "ePS_WaitingForStreamDisable";
1832         default: return "error: unknown state";
1833     }
1834 }
1835
1836 /**
1837  * @brief convert a eProcessorType to a string
1838  * @param t the type
1839  * @return a char * describing the state
1840  */
1841 const char *
1842 StreamProcessor::ePTToString(enum eProcessorType t) {
1843     switch (t) {
1844         case ePT_Receive: return "Receive";
1845         case ePT_Transmit: return "Transmit";
1846         default: return "error: unknown type";
1847     }
1848 }
1849
1850 /***********************************************
1851  * Debug                                       *
1852  ***********************************************/
1853 void
1854 StreamProcessor::dumpInfo()
1855 {
1856     debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor %p, %s:\n", this, ePTToString(m_processor_type));
1857     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Port, Channel  : %d, %d\n", m_1394service.getPort(), m_channel);
1858     uint64_t now = m_1394service.getCycleTimerTicks();
1859     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Now                   : %011llu (%03us %04uc %04ut)\n",
1860                         now,
1861                         (unsigned int)TICKS_TO_SECS(now),
1862                         (unsigned int)TICKS_TO_CYCLES(now),
1863                         (unsigned int)TICKS_TO_OFFSET(now));
1864     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Xrun?                 : %s\n", (m_in_xrun ? "True":"False"));
1865     if (m_state == m_next_state) {
1866         debugOutputShort( DEBUG_LEVEL_NORMAL, "  State                 : %s\n",
1867                                             ePSToString(m_state));
1868     } else {
1869         debugOutputShort( DEBUG_LEVEL_NORMAL, "  State                 : %s (Next: %s)\n",
1870                                               ePSToString(m_state), ePSToString(m_next_state));
1871         debugOutputShort( DEBUG_LEVEL_NORMAL, "    transition at       : %u\n", m_cycle_to_switch_state);
1872     }
1873     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer                : %p\n", m_data_buffer);
1874     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Framerate             : Nominal: %u, Sync: %f, Buffer %f\n",
1875                                           m_StreamProcessorManager.getNominalRate(),
1876                                           24576000.0/m_StreamProcessorManager.getSyncSource().m_data_buffer->getRate(),
1877                                           24576000.0/m_data_buffer->getRate());
1878     float d = getSyncDelay();
1879     debugOutputShort(DEBUG_LEVEL_NORMAL, "  Sync delay             : %f ticks (%f frames, %f cy)\n",
1880                                          d, d/getTicksPerFrame(),
1881                                          d/((float)TICKS_PER_CYCLE));
1882     m_data_buffer->dumpInfo();
1883 }
1884
1885 void
1886 StreamProcessor::setVerboseLevel(int l) {
1887     setDebugLevel(l);
1888     PortManager::setVerboseLevel(l);
1889     m_data_buffer->setVerboseLevel(l);
1890 }
1891
1892 } // end of namespace
Note: See TracBrowser for help on using the browser.