root/trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp

Revision 1005, 71.3 kB (checked in by ppalmers, 13 years ago)

Improve thread synchronisation. Switch back to separate threads for transmit and
receive since it is not possible to statically schedule things properly. One
of the threads (i.e. the client thread) is out of our control, hence it's
execution can't be controlled. Using separate threads and correct priorities
will shift this problem to the OS. Note that the priority of the packet
receive thread should be lower than the client thread (such that the client
thread is woken ASAP), and the priority of the transmit thread should be
higher than the client thread (such that packets are queued ASAP).
Extra benefit: multi-cores are used.

Some other startup improvements.

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "config.h"
25
26 #include "StreamProcessor.h"
27 #include "../StreamProcessorManager.h"
28
29 #include "devicemanager.h"
30
31 #include "libieee1394/ieee1394service.h"
32 #include "libieee1394/IsoHandlerManager.h"
33 #include "libieee1394/cycletimer.h"
34
35 #include "libutil/Time.h"
36
37 #include "libutil/Atomic.h"
38
39 #include <assert.h>
40 #include <math.h>
41
42 #define SIGNAL_ACTIVITY_SPM { \
43     m_StreamProcessorManager.signalActivity(); \
44 }
45 #define SIGNAL_ACTIVITY_ISO_XMIT { \
46     m_IsoHandlerManager.signalActivityTransmit(); \
47 }
48 #define SIGNAL_ACTIVITY_ISO_RECV { \
49     m_IsoHandlerManager.signalActivityReceive(); \
50 }
51 #define SIGNAL_ACTIVITY_ALL { \
52     m_StreamProcessorManager.signalActivity(); \
53     m_IsoHandlerManager.signalActivityTransmit(); \
54     m_IsoHandlerManager.signalActivityReceive(); \
55 }
56
57 namespace Streaming {
58
59 IMPL_DEBUG_MODULE( StreamProcessor, StreamProcessor, DEBUG_LEVEL_VERBOSE );
60
61 StreamProcessor::StreamProcessor(FFADODevice &parent, enum eProcessorType type)
62     : m_processor_type ( type )
63     , m_state( ePS_Created )
64     , m_next_state( ePS_Invalid )
65     , m_cycle_to_switch_state( 0 )
66     , m_Parent( parent )
67     , m_1394service( parent.get1394Service() ) // local cache
68     , m_IsoHandlerManager( parent.get1394Service().getIsoHandlerManager() ) // local cache
69     , m_StreamProcessorManager( m_Parent.getDeviceManager().getStreamProcessorManager() ) // local cache
70     , m_local_node_id ( 0 ) // local cache
71     , m_channel( -1 )
72     , m_dropped( 0 )
73     , m_last_timestamp( 0 )
74     , m_last_timestamp2( 0 )
75     , m_correct_last_timestamp( false )
76     , m_scratch_buffer( NULL )
77     , m_scratch_buffer_size_bytes( 0 )
78     , m_ticks_per_frame( 0 )
79     , m_last_cycle( -1 )
80     , m_sync_delay( 0 )
81     , m_in_xrun( false )
82     , m_min_ahead( 7999 )
83 {
84     // create the timestamped buffer and register ourselves as its client
85     m_data_buffer = new Util::TimestampedBuffer(this);
86 }
87
88 StreamProcessor::~StreamProcessor() {
89     m_StreamProcessorManager.unregisterProcessor(this);
90     if(!m_IsoHandlerManager.unregisterStream(this)) {
91         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister stream processor with the Iso manager\n");
92     }
93
94     if (m_data_buffer) delete m_data_buffer;
95     if (m_scratch_buffer) delete[] m_scratch_buffer;
96 }
97
98 void
99 StreamProcessor::handleBusReset()
100 {
101     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) handling busreset\n", this);
102     // for now, we try and make sure everything is cleanly shutdown
103     if(!stopRunning(-1)) {
104         debugError("Failed to stop SP\n");
105     }
106     SIGNAL_ACTIVITY_ALL;
107 }
108
109 void StreamProcessor::handlerDied()
110 {
111     debugWarning("Handler died for %p\n", this);
112     m_state = ePS_Stopped;
113     m_in_xrun = true;
114 }
115
116 uint64_t StreamProcessor::getTimeNow() {
117     return m_1394service.getCycleTimerTicks();
118 }
119
120 int StreamProcessor::getMaxFrameLatency() {
121     if (getType() == ePT_Receive) {
122         return (int)(m_IsoHandlerManager.getPacketLatencyForStream( this ) * TICKS_PER_CYCLE);
123     } else {
124         return (int)(m_IsoHandlerManager.getPacketLatencyForStream( this ) * TICKS_PER_CYCLE);
125     }
126 }
127
128 unsigned int
129 StreamProcessor::getNominalPacketsNeeded(unsigned int nframes)
130 {
131     unsigned int nominal_frames_per_second
132                     = m_StreamProcessorManager.getNominalRate();
133     uint64_t nominal_ticks_per_frame = TICKS_PER_SECOND / nominal_frames_per_second;
134     uint64_t nominal_ticks = nominal_ticks_per_frame * nframes;
135     uint64_t nominal_packets = nominal_ticks / TICKS_PER_CYCLE;
136     return nominal_packets;
137 }
138
139 unsigned int
140 StreamProcessor::getPacketsPerPeriod()
141 {
142     return getNominalPacketsNeeded(m_StreamProcessorManager.getPeriodSize());
143 }
144
145 unsigned int
146 StreamProcessor::getNbPacketsIsoXmitBuffer()
147 {
148     // if we use one thread per packet, we can put every frame directly into the ISO buffer
149     // the waitForClient in IsoHandler will take care of the fact that the frames are
150     // not present in time
151     unsigned int packets_to_prebuffer = (getPacketsPerPeriod() * (m_StreamProcessorManager.getNbBuffers())) + 10;
152     debugOutput(DEBUG_LEVEL_VERBOSE, "Nominal prebuffer: %u\n", packets_to_prebuffer);
153     return packets_to_prebuffer;
154 }
155
156 /***********************************************
157  * Buffer management and manipulation          *
158  ***********************************************/
159 void StreamProcessor::flush() {
160     m_IsoHandlerManager.flushHandlerForStream(this);
161 }
162
163 int StreamProcessor::getBufferFill() {
164     return m_data_buffer->getBufferFill();
165 }
166
167 int64_t
168 StreamProcessor::getTimeUntilNextPeriodSignalUsecs()
169 {
170     uint64_t time_at_period=getTimeAtPeriod();
171
172     // we delay the period signal with the sync delay
173     // this makes that the period signals lag a little compared to reality
174     // ISO buffering causes the packets to be received at max
175     // m_handler->getWakeupInterval() later than the time they were received.
176     // hence their payload is available this amount of time later. However, the
177     // period boundary is predicted based upon earlier samples, and therefore can
178     // pass before these packets are processed. Adding this extra term makes that
179     // the period boundary is signalled later
180     time_at_period = addTicks(time_at_period, m_StreamProcessorManager.getSyncSource().getSyncDelay());
181
182     uint64_t cycle_timer=m_1394service.getCycleTimerTicks();
183
184     // calculate the time until the next period
185     int32_t until_next=diffTicks(time_at_period,cycle_timer);
186
187     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> TAP=%11llu, CTR=%11llu, UTN=%11ld\n",
188         time_at_period, cycle_timer, until_next
189         );
190
191     // now convert to usecs
192     // don't use the mapping function because it only works
193     // for absolute times, not the relative time we are
194     // using here (which can also be negative).
195     return (int64_t)(((float)until_next) / TICKS_PER_USEC);
196 }
197
198 void
199 StreamProcessor::setSyncDelay(unsigned int d) {
200     #ifdef DEBUG
201     unsigned int frames = (unsigned int)((float)d / getTicksPerFrame());
202     debugOutput(DEBUG_LEVEL_VERBOSE, "Setting SP %p SyncDelay to %u ticks, %u frames\n", this, d, frames);
203     #endif
204     m_sync_delay = d; // FIXME: sync delay not necessary anymore
205 }
206
207 uint64_t
208 StreamProcessor::getTimeAtPeriodUsecs()
209 {
210     return (uint64_t)((float)getTimeAtPeriod() * TICKS_PER_USEC);
211 }
212
213 uint64_t
214 StreamProcessor::getTimeAtPeriod()
215 {
216     if (getType() == ePT_Receive) {
217         ffado_timestamp_t next_period_boundary = m_data_buffer->getTimestampFromHead(m_StreamProcessorManager.getPeriodSize());
218    
219         #ifdef DEBUG
220         ffado_timestamp_t ts;
221         signed int fc;
222         m_data_buffer->getBufferTailTimestamp(&ts,&fc);
223    
224         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD="TIMESTAMP_FORMAT_SPEC", LTS="TIMESTAMP_FORMAT_SPEC", FC=%5u, TPF=%f\n",
225             next_period_boundary, ts, fc, getTicksPerFrame()
226             );
227         #endif
228         return (uint64_t)next_period_boundary;
229     } else {
230         ffado_timestamp_t next_period_boundary = m_data_buffer->getTimestampFromTail((m_StreamProcessorManager.getNbBuffers()-1) * m_StreamProcessorManager.getPeriodSize());
231    
232         #ifdef DEBUG
233         ffado_timestamp_t ts;
234         signed int fc;
235         m_data_buffer->getBufferTailTimestamp(&ts,&fc);
236    
237         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD="TIMESTAMP_FORMAT_SPEC", LTS="TIMESTAMP_FORMAT_SPEC", FC=%5u, TPF=%f\n",
238             next_period_boundary, ts, fc, getTicksPerFrame()
239             );
240         #endif
241         return (uint64_t)next_period_boundary;
242     }
243 }
244
245 float
246 StreamProcessor::getTicksPerFrame()
247 {
248     assert(m_data_buffer != NULL);
249     return m_data_buffer->getRate();
250 }
251
252 void
253 StreamProcessor::setTicksPerFrame(float tpf)
254 {
255     assert(m_data_buffer != NULL);
256     m_data_buffer->setRate(tpf);
257 }
258
259 bool
260 StreamProcessor::canClientTransferFrames(unsigned int nbframes)
261 {
262     bool can_transfer;
263     unsigned int fc = m_data_buffer->getFrameCounter();
264     if (getType() == ePT_Receive) {
265         can_transfer = (fc >= nbframes);
266     } else {
267         // there has to be enough space to put the frames in
268         can_transfer = m_data_buffer->getBufferSize() - fc > nbframes;
269         // or the buffer is transparent
270         can_transfer |= m_data_buffer->isTransparent();
271     }
272    
273     #ifdef DEBUG
274     if (!can_transfer) {
275         debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) cannot transfer since fc == %u, nbframes == %u\n",
276             this, ePTToString(getType()), fc, nbframes);
277     }
278     #endif
279    
280     return can_transfer;
281 }
282
283 /***********************************************
284  * I/O API                                     *
285  ***********************************************/
286
287 // Packet transfer API
288 enum raw1394_iso_disposition
289 StreamProcessor::putPacket(unsigned char *data, unsigned int length,
290                            unsigned char channel, unsigned char tag, unsigned char sy,
291                            unsigned int cycle, unsigned int dropped,
292                            unsigned int skipped) {
293 #ifdef DEBUG
294     if(m_last_cycle == -1) {
295         debugOutput(DEBUG_LEVEL_VERBOSE, "Handler for %s SP %p is alive (cycle = %u)\n", getTypeString(), this, cycle);
296     }
297 #endif
298
299     int dropped_cycles = 0;
300     if (m_last_cycle != (int)cycle && m_last_cycle != -1) {
301         dropped_cycles = diffCycles(cycle, m_last_cycle) - 1;
302         if (dropped_cycles < 0) {
303             debugWarning("(%p) dropped < 1 (%d), cycle: %d, last_cycle: %d, dropped: %d, 'skipped'=%u\n",
304                          this, dropped_cycles, cycle, m_last_cycle, dropped, skipped);
305         }
306         if (dropped_cycles > 0) {
307             debugWarning("(%p) dropped %d packets on cycle %u, 'dropped'=%u, 'skipped'=%u, cycle=%d, m_last_cycle=%d\n",
308                 this, dropped_cycles, cycle, dropped, skipped, cycle, m_last_cycle);
309             m_dropped += dropped_cycles;
310             m_last_cycle = cycle;
311             dumpInfo();
312         }
313     }
314     m_last_cycle = cycle;
315
316     // bypass based upon state
317     if (m_state == ePS_Invalid) {
318         debugError("Should not have state %s\n", ePSToString(m_state) );
319         return RAW1394_ISO_ERROR;
320     }
321     if (m_state == ePS_Created) {
322         return RAW1394_ISO_DEFER;
323     }
324
325     // store the previous timestamp
326     m_last_timestamp2 = m_last_timestamp;
327
328     // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles)
329     //       it happens on the first 'good' cycle for the wait condition
330     //       or on the first received cycle that is received afterwards (might be a problem)
331
332     // check whether we are waiting for a stream to be disabled
333     if(m_state == ePS_WaitingForStreamDisable) {
334         // we then check whether we have to switch on this cycle
335         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
336             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n");
337             m_next_state = ePS_DryRunning;
338             if (!updateState()) { // we are allowed to change the state directly
339                 debugError("Could not update state!\n");
340                 return RAW1394_ISO_ERROR;
341             }
342         } else {
343             // not time to disable yet
344         }
345         // the received data can be discarded while waiting for the stream
346         // to be disabled
347         // similarly for dropped packets
348         return RAW1394_ISO_OK;
349     }
350
351     // check whether we are waiting for a stream to be enabled
352     else if(m_state == ePS_WaitingForStreamEnable) {
353         // we then check whether we have to switch on this cycle
354         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
355             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n");
356             m_next_state = ePS_Running;
357             if (!updateState()) { // we are allowed to change the state directly
358                 debugError("Could not update state!\n");
359                 return RAW1394_ISO_ERROR;
360             }
361         } else {
362             // not time to enable yet
363         }
364         // we are dryRunning hence data should be processed in any case
365     }
366
367     // check the packet header
368     enum eChildReturnValue result = processPacketHeader(data, length, channel, tag, sy, cycle, dropped_cycles);
369
370     // handle dropped cycles
371     if(dropped_cycles) {
372         // make sure the last_timestamp is corrected
373         m_correct_last_timestamp = true;
374         if (m_state == ePS_Running) {
375             // this is an xrun situation
376             m_in_xrun = true;
377             debugWarning("Should update state to WaitingForStreamDisable due to dropped packet xrun\n");
378             m_cycle_to_switch_state = cycle + 1; // switch in the next cycle
379             m_next_state = ePS_WaitingForStreamDisable;
380             // execute the requested change
381             if (!updateState()) { // we are allowed to change the state directly
382                 debugError("Could not update state!\n");
383                 return RAW1394_ISO_ERROR;
384             }
385         }
386     }
387
388     if (result == eCRV_OK) {
389         #ifdef DEBUG
390         int ticks_per_packet = (int)(getTicksPerFrame() * getNominalFramesPerPacket());
391         int diff=diffTicks(m_last_timestamp, m_last_timestamp2);
392         // display message if the difference between two successive tick
393         // values is more than 50 ticks. 1 sample at 48k is 512 ticks
394         // so 50 ticks = 10%, which is a rather large jitter value.
395         if(diff-ticks_per_packet > 50 || diff-ticks_per_packet < -50) {
396             debugOutput(DEBUG_LEVEL_VERBOSE,
397                         "cy %04u rather large TSP difference TS=%011llu => TS=%011llu (%d, nom %d)\n",
398                         cycle, m_last_timestamp2, m_last_timestamp, diff, ticks_per_packet);
399         }
400         debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
401                            "%04u %011llu %011llu %d %d\n",
402                            cycle, m_last_timestamp2, m_last_timestamp,
403                            diff, ticks_per_packet);
404         #endif
405
406         debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
407                           "RECV: CY=%04u TS=%011llu\n",
408                           cycle, m_last_timestamp);
409         // update some accounting
410         m_last_good_cycle = cycle;
411         m_last_dropped = dropped_cycles;
412
413         if(m_correct_last_timestamp) {
414             // they represent a discontinuity in the timestamps, and hence are
415             // to be dealt with
416             debugWarning("(%p) Correcting timestamp for dropped cycles, discarding packet...\n", this);
417             m_data_buffer->setBufferTailTimestamp(substractTicks(m_last_timestamp,
418                                                                  (uint64_t)(getNominalFramesPerPacket()
419                                                                             * getTicksPerFrame())));
420             m_correct_last_timestamp = false;
421         }
422
423         // check whether we are waiting for a stream to startup
424         // this requires that the packet is good
425         if(m_state == ePS_WaitingForStream) {
426             // since we have a packet with an OK header,
427             // we can indicate that the stream started up
428
429             // we then check whether we have to switch on this cycle
430             if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
431                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to good packet\n");
432                 // hence go to the dryRunning state
433                 m_next_state = ePS_DryRunning;
434                 if (!updateState()) { // we are allowed to change the state directly
435                     debugError("Could not update state!\n");
436                     return RAW1394_ISO_ERROR;
437                 }
438             } else {
439                 // not time (yet) to switch state
440             }
441             // in both cases we don't want to process the data
442             return RAW1394_ISO_OK;
443         }
444
445         // check whether a state change has been requested
446         // note that only the wait state changes are synchronized with the cycles
447         else if(m_state != m_next_state) {
448             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
449                                              ePSToString(m_state), ePSToString(m_next_state));
450             // execute the requested change
451             if (!updateState()) { // we are allowed to change the state directly
452                 debugError("Could not update state!\n");
453                 return RAW1394_ISO_ERROR;
454             }
455         }
456
457         // for all states that reach this we are allowed to
458         // do protocol specific data reception
459         enum eChildReturnValue result2 = processPacketData(data, length, channel, tag, sy, cycle, dropped_cycles);
460
461         // if an xrun occured, switch to the dryRunning state and
462         // allow for the xrun to be picked up
463         if (result2 == eCRV_XRun) {
464             debugWarning("processPacketData xrun\n");
465             m_in_xrun = true;
466             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
467             m_cycle_to_switch_state = cycle+1; // switch in the next cycle
468             m_next_state = ePS_WaitingForStreamDisable;
469             // execute the requested change
470             if (!updateState()) { // we are allowed to change the state directly
471                 debugError("Could not update state!\n");
472                 return RAW1394_ISO_ERROR;
473             }
474             return RAW1394_ISO_DEFER;
475         } else if(result2 == eCRV_OK) {
476             // no problem here
477             // FIXME: cache the period size?
478             unsigned int periodsize = m_StreamProcessorManager.getPeriodSize();
479             unsigned int bufferfill = m_data_buffer->getBufferFill();
480             if(bufferfill >= periodsize) {
481                 debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "signal activity, %d>%d\n", bufferfill, periodsize);
482                 SIGNAL_ACTIVITY_SPM;
483                 return RAW1394_ISO_DEFER;
484             }
485             return RAW1394_ISO_OK;
486         } else {
487             debugError("Invalid response\n");
488             return RAW1394_ISO_ERROR;
489         }
490     } else if(result == eCRV_Invalid) {
491         // apparently we don't have to do anything when the packets are not valid
492         return RAW1394_ISO_OK;
493     } else {
494         debugError("Invalid response\n");
495         return RAW1394_ISO_ERROR;
496     }
497     debugError("reached the unreachable\n");
498     return RAW1394_ISO_ERROR;
499 }
500
501 enum raw1394_iso_disposition
502 StreamProcessor::getPacket(unsigned char *data, unsigned int *length,
503                            unsigned char *tag, unsigned char *sy,
504                            int cycle, unsigned int dropped,
505                            unsigned int skipped, unsigned int max_length) {
506     if (cycle<0) {
507         *tag = 0;
508         *sy = 0;
509         *length = 0;
510         return RAW1394_ISO_OK;
511     }
512
513     unsigned int ctr;
514     uint64_t prev_timestamp;
515     int now_cycles;
516     int cycle_diff;
517
518 #ifdef DEBUG
519     if(m_last_cycle == -1) {
520         debugOutput(DEBUG_LEVEL_VERBOSE, "Handler for %s SP %p is alive (cycle = %d)\n", getTypeString(), this, cycle);
521     }
522 #endif
523
524     int dropped_cycles = 0;
525     if (m_last_cycle != cycle && m_last_cycle != -1) {
526         dropped_cycles = diffCycles(cycle, m_last_cycle) - 1;
527         // correct for skipped packets
528         // since those are not dropped, but only delayed
529         dropped_cycles =- skipped;
530         if(skipped) {
531             debugWarning("(%p) skipped %d cycles, cycle: %d, last_cycle: %d, dropped: %d\n",
532                          this, skipped, cycle, m_last_cycle, dropped);
533         }
534         if (dropped_cycles < 0) {
535             debugWarning("(%p) dropped < 1 (%d), cycle: %d, last_cycle: %d, dropped: %d, skipped: %d\n",
536                          this, dropped_cycles, cycle, m_last_cycle, dropped, skipped);
537         }
538         if (dropped_cycles > 0) {
539             debugWarning("(%p) dropped %d packets on cycle %u (last_cycle=%u, dropped=%d, skipped: %d)\n",
540                          this, dropped_cycles, cycle, m_last_cycle, dropped, skipped);
541             m_dropped += dropped_cycles;
542             // HACK: this should not be necessary, since the header generation functions should trigger the xrun.
543             //       but apparently there are some issues with the 1394 stack
544             m_in_xrun = true;
545             if(m_state == ePS_Running) {
546                 debugShowBackLogLines(200);
547                 debugWarning("dropped packets xrun\n");
548                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to dropped packets xrun\n");
549                 m_cycle_to_switch_state = cycle + 1;
550                 m_next_state = ePS_WaitingForStreamDisable;
551                 // execute the requested change
552                 if (!updateState()) { // we are allowed to change the state directly
553                     debugError("Could not update state!\n");
554                     return RAW1394_ISO_ERROR;
555                 }
556                 goto send_empty_packet;
557             }
558         }
559     }
560     if (cycle >= 0) {
561         m_last_cycle = cycle;
562     }
563
564 #ifdef DEBUG
565     // bypass based upon state
566     if (m_state == ePS_Invalid) {
567         debugError("Should not have state %s\n", ePSToString(m_state) );
568         return RAW1394_ISO_ERROR;
569     }
570 #endif
571
572     if (m_state == ePS_Created) {
573         *tag = 0;
574         *sy = 0;
575         *length = 0;
576         return RAW1394_ISO_DEFER;
577     }
578
579     // normal processing
580     // note that we can't use getCycleTimer directly here,
581     // because packets are queued in advance. This means that
582     // we the packet we are constructing will be sent out
583     // on 'cycle', not 'now'.
584     ctr = m_1394service.getCycleTimer();
585     now_cycles = (int)CYCLE_TIMER_GET_CYCLES(ctr);
586
587     // the difference between the cycle this
588     // packet is intended for and 'now'
589     cycle_diff = diffCycles(cycle, now_cycles);
590
591     if(cycle_diff < 0 && (m_state == ePS_Running || m_state == ePS_DryRunning)) {
592         unsigned int fc = m_data_buffer->getBufferFill();
593         debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy, fill=%u)\n",
594             cycle, now_cycles, fc);
595         if(m_state == ePS_Running) {
596             debugShowBackLogLines(200);
597 //             flushDebugOutput();
598 //             assert(0);
599             debugWarning("generatePacketData xrun\n");
600             m_in_xrun = true;
601             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
602             m_cycle_to_switch_state = cycle + 1;
603             m_next_state = ePS_WaitingForStreamDisable;
604             // execute the requested change
605             if (!updateState()) { // we are allowed to change the state directly
606                 debugError("Could not update state!\n");
607                 return RAW1394_ISO_ERROR;
608             }
609             goto send_empty_packet;
610         }
611     }
612
613     // store the previous timestamp
614     // keep the old value here, update m_last_timestamp2 only when
615     // a valid packet will be sent
616     prev_timestamp = m_last_timestamp;
617    
618     // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles)
619     //       it happens on the first 'good' cycle for the wait condition
620     //       or on the first received cycle that is received afterwards (might be a problem)
621
622     // check whether we are waiting for a stream to be disabled
623     if(m_state == ePS_WaitingForStreamDisable) {
624         // we then check whether we have to switch on this cycle
625         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
626             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n");
627             m_next_state = ePS_DryRunning;
628             if (!updateState()) { // we are allowed to change the state directly
629                 debugError("Could not update state!\n");
630                 return RAW1394_ISO_ERROR;
631             }
632         }
633         // generate the silent packet header
634         enum eChildReturnValue result = generateSilentPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
635         if (result == eCRV_Packet) {
636             debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
637                                "XMIT SILENT: CY=%04u TS=%011llu\n",
638                                cycle, m_last_timestamp);
639             // update some accounting
640             m_last_good_cycle = cycle;
641             m_last_dropped = dropped_cycles;
642
643             // assumed not to xrun
644             generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
645             return RAW1394_ISO_OK;
646         // FIXME: PP: I think this should also be a possibility
647         //} else if (result == eCRV_EmptyPacket) {
648         //    goto send_empty_packet;
649         } else {
650             debugError("Invalid return value: %d\n", result);
651             return RAW1394_ISO_ERROR;
652         }
653     }
654     // check whether we are waiting for a stream to be enabled
655     else if(m_state == ePS_WaitingForStreamEnable) {
656         // we then check whether we have to switch on this cycle
657         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
658             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n");
659             m_next_state = ePS_Running;
660             if (!updateState()) { // we are allowed to change the state directly
661                 debugError("Could not update state!\n");
662                 return RAW1394_ISO_ERROR;
663             }
664         } else {
665             // not time to enable yet
666         }
667         // we are dryRunning hence data should be processed in any case
668     }
669     // check whether we are waiting for a stream to startup
670     else if(m_state == ePS_WaitingForStream) {
671         // as long as the cycle parameter is not in sync with
672         // the current time, the stream is considered not
673         // to be 'running'
674         // we then check whether we have to switch on this cycle
675         if ((cycle_diff >= 0) && (diffCycles(cycle, m_cycle_to_switch_state) >= 0)) {
676             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStream to DryRunning\n");
677             // hence go to the dryRunning state
678             m_next_state = ePS_DryRunning;
679             if (!updateState()) { // we are allowed to change the state directly
680                 debugError("Could not update state!\n");
681                 return RAW1394_ISO_ERROR;
682             }
683         } else {
684             // not time (yet) to switch state
685         }
686     }
687     else if(m_state == ePS_Running) {
688         // check the packet header
689         enum eChildReturnValue result = generatePacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
690         if (result == eCRV_Packet || result == eCRV_Defer) {
691             int ahead = diffCycles(cycle, now_cycles);
692             if (ahead < m_min_ahead) m_min_ahead = ahead;
693             debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
694                                "XMIT: CY=%04u TS=%011llu NOW_CY=%04u AHEAD=%04d\n",
695                                cycle, m_last_timestamp, now_cycles, ahead);
696             // update some accounting
697             m_last_good_cycle = cycle;
698             m_last_dropped = dropped_cycles;
699
700             // valid packet timestamp
701             m_last_timestamp2 = prev_timestamp;
702
703             // check whether a state change has been requested
704             // note that only the wait state changes are synchronized with the cycles
705             if(m_state != m_next_state) {
706                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
707                                                 ePSToString(m_state), ePSToString(m_next_state));
708                 // execute the requested change
709                 if (!updateState()) { // we are allowed to change the state directly
710                     debugError("Could not update state!\n");
711                     return RAW1394_ISO_ERROR;
712                 }
713             }
714
715             enum eChildReturnValue result2 = generatePacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
716             // if an xrun occured, switch to the dryRunning state and
717             // allow for the xrun to be picked up
718             if (result2 == eCRV_XRun) {
719                 debugWarning("generatePacketData xrun\n");
720                 m_in_xrun = true;
721                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
722                 m_cycle_to_switch_state = cycle + 1; // switch in the next cycle
723                 m_next_state = ePS_WaitingForStreamDisable;
724                 // execute the requested change
725                 if (!updateState()) { // we are allowed to change the state directly
726                     debugError("Could not update state!\n");
727                     return RAW1394_ISO_ERROR;
728                 }
729                 goto send_empty_packet;
730             }
731             #ifdef DEBUG
732             int ticks_per_packet = (int)(getTicksPerFrame() * getNominalFramesPerPacket());
733             int diff=diffTicks(m_last_timestamp, m_last_timestamp2);
734             // display message if the difference between two successive tick
735             // values is more than 50 ticks. 1 sample at 48k is 512 ticks
736             // so 50 ticks = 10%, which is a rather large jitter value.
737             if(diff-ticks_per_packet > 50 || diff-ticks_per_packet < -50) {
738                 debugOutput(DEBUG_LEVEL_VERBOSE,
739                             "cy %04d, rather large TSP difference TS=%011llu => TS=%011llu (%d, nom %d)\n",
740                             cycle, m_last_timestamp2, m_last_timestamp, diff, ticks_per_packet);
741             }
742             debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
743                                "%04d %011llu %011llu %d %d\n",
744                                cycle, m_last_timestamp2, m_last_timestamp, diff, ticks_per_packet);
745             #endif
746
747             // skip queueing packets if we detect that there are not enough frames
748             // available
749             if(result2 == eCRV_Defer || result == eCRV_Defer) {
750                 return RAW1394_ISO_DEFER;
751             } else {
752                 return RAW1394_ISO_OK;
753             }
754         } else if (result == eCRV_XRun) { // pick up the possible xruns
755             debugWarning("generatePacketHeader xrun\n");
756             m_in_xrun = true;
757             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to header xrun\n");
758             m_cycle_to_switch_state = cycle + 1; // switch in the next cycle
759             m_next_state = ePS_WaitingForStreamDisable;
760             // execute the requested change
761             if (!updateState()) { // we are allowed to change the state directly
762                 debugError("Could not update state!\n");
763                 return RAW1394_ISO_ERROR;
764             }
765         } else if (result == eCRV_EmptyPacket) {
766             if(m_state != m_next_state) {
767                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
768                                                 ePSToString(m_state), ePSToString(m_next_state));
769                 // execute the requested change
770                 if (!updateState()) { // we are allowed to change the state directly
771                     debugError("Could not update state!\n");
772                     return RAW1394_ISO_ERROR;
773                 }
774             }
775             goto send_empty_packet;
776         } else if (result == eCRV_Again) {
777             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "have to retry cycle %d\n", cycle);
778             if(m_state != m_next_state) {
779                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
780                                                 ePSToString(m_state), ePSToString(m_next_state));
781                 // execute the requested change
782                 if (!updateState()) { // we are allowed to change the state directly
783                     debugError("Could not update state!\n");
784                     return RAW1394_ISO_ERROR;
785                 }
786             }
787 //             usleep(125); // only when using thread-per-handler
788 //             return RAW1394_ISO_AGAIN;
789             generateEmptyPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
790             generateEmptyPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
791             return RAW1394_ISO_DEFER;
792         } else {
793             debugError("Invalid return value: %d\n", result);
794             return RAW1394_ISO_ERROR;
795         }
796     }
797     // we are not running, so send an empty packet
798     // we should generate a valid packet any time
799 send_empty_packet:
800     // note that only the wait state changes are synchronized with the cycles
801     if(m_state != m_next_state) {
802         debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
803                                         ePSToString(m_state), ePSToString(m_next_state));
804         // execute the requested change
805         if (!updateState()) { // we are allowed to change the state directly
806             debugError("Could not update state!\n");
807             return RAW1394_ISO_ERROR;
808         }
809     }
810    
811     { // context to avoid ahead var clash
812         int ahead = diffCycles(cycle, now_cycles);
813         if (ahead < m_min_ahead) m_min_ahead = ahead;
814         debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
815                            "XMIT EMPTY: CY=%04u, NOW_CY=%04u, AHEAD=%04d\n",
816                            cycle, now_cycles, ahead);
817     }
818
819     generateEmptyPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
820     generateEmptyPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
821     return RAW1394_ISO_OK;
822 }
823
824 // Frame Transfer API
825 /**
826  * Transfer a block of frames from the event buffer to the port buffers
827  * @param nbframes number of frames to transfer
828  * @param ts the timestamp that the LAST frame in the block should have
829  * @return
830  */
831 bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) {
832     bool result;
833     debugOutputExtreme( DEBUG_LEVEL_VERBOSE,
834                         "(%p, %s) getFrames(%d, %11llu)\n",
835                         this, getTypeString(), nbframes, ts);
836     assert( getType() == ePT_Receive );
837     if(isDryRunning()) result = getFramesDry(nbframes, ts);
838     else result = getFramesWet(nbframes, ts);
839     SIGNAL_ACTIVITY_ISO_RECV;
840     return result;
841 }
842
843 bool StreamProcessor::getFramesWet(unsigned int nbframes, int64_t ts) {
844 // FIXME: this should be done somewhere else
845 #ifdef DEBUG
846     uint64_t ts_expected;
847     signed int fc;
848     int32_t lag_ticks;
849     float lag_frames;
850
851     // in order to sync up multiple received streams, we should
852     // use the ts parameter. It specifies the time of the block's
853     // last sample.
854     float srate = m_StreamProcessorManager.getSyncSource().getTicksPerFrame();
855     assert(srate != 0.0);
856     int64_t this_block_length_in_ticks = (int64_t)(((float)nbframes) * srate);
857
858     ffado_timestamp_t ts_head_tmp;
859     m_data_buffer->getBufferHeadTimestamp(&ts_head_tmp, &fc);
860     ts_expected = addTicks((uint64_t)ts_head_tmp, this_block_length_in_ticks);
861
862     lag_ticks = diffTicks(ts, ts_expected);
863     lag_frames = (((float)lag_ticks) / srate);
864     debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
865                        "stream (%p): drifts %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
866                        this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
867     if (lag_frames >= 1.0) {
868         // the stream lags
869         debugOutput(DEBUG_LEVEL_VERBOSE, "stream (%p): lags  with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
870                       this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
871     } else if (lag_frames <= -1.0) {
872         // the stream leads
873         debugOutput(DEBUG_LEVEL_VERBOSE, "stream (%p): leads with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
874                       this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
875     }
876 #endif
877     // ask the buffer to process nbframes of frames
878     // using it's registered client's processReadBlock(),
879     // which should be ours
880     m_data_buffer->blockProcessReadFrames(nbframes);
881     return true;
882 }
883
884 bool StreamProcessor::getFramesDry(unsigned int nbframes, int64_t ts)
885 {
886     debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
887                        "stream (%p): dry run %d frames (@ ts=%lld)\n",
888                        this, nbframes, ts);
889     // dry run on this side means that we put silence in all enabled ports
890     // since there is do data put into the ringbuffer in the dry-running state
891     return provideSilenceBlock(nbframes, 0);
892 }
893
894 bool
895 StreamProcessor::dropFrames(unsigned int nbframes, int64_t ts)
896 {
897     bool result;
898     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d, %lld)\n", nbframes, ts);
899     result = m_data_buffer->dropFrames(nbframes);
900     SIGNAL_ACTIVITY_ISO_RECV;
901     return result;
902 }
903
904 bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts)
905 {
906     bool result;
907     debugOutputExtreme( DEBUG_LEVEL_VERBOSE,
908                         "(%p, %s) putFrames(%d, %11llu)\n",
909                         this, getTypeString(), nbframes, ts);
910     assert( getType() == ePT_Transmit );
911     if(isDryRunning()) result = putFramesDry(nbframes, ts);
912     else result = putFramesWet(nbframes, ts);
913     SIGNAL_ACTIVITY_ISO_XMIT;
914     return result;
915 }
916
917 bool
918 StreamProcessor::putFramesWet(unsigned int nbframes, int64_t ts)
919 {
920     debugOutputExtreme(DEBUG_LEVEL_ULTRA_VERBOSE,
921                        "StreamProcessor::putFramesWet(%d, %llu)\n",
922                        nbframes, ts);
923     // transfer the data
924     m_data_buffer->blockProcessWriteFrames(nbframes, ts);
925     debugOutputExtreme(DEBUG_LEVEL_ULTRA_VERBOSE,
926                        " New timestamp: %llu\n", ts);
927     return true; // FIXME: what about failure?
928 }
929
930 bool
931 StreamProcessor::putFramesDry(unsigned int nbframes, int64_t ts)
932 {
933     debugOutputExtreme(DEBUG_LEVEL_ULTRA_VERBOSE,
934                        "StreamProcessor::putFramesDry(%d, %llu)\n",
935                        nbframes, ts);
936     // do nothing
937     return true;
938 }
939
940 bool
941 StreamProcessor::putSilenceFrames(unsigned int nbframes, int64_t ts)
942 {
943     debugOutput(DEBUG_LEVEL_VERY_VERBOSE,
944                        "StreamProcessor::putSilenceFrames(%d, %llu)\n",
945                        nbframes, ts);
946
947     size_t bytes_per_frame = getEventSize() * getEventsPerFrame();
948     unsigned int scratch_buffer_size_frames = m_scratch_buffer_size_bytes / bytes_per_frame;
949
950     if (nbframes > scratch_buffer_size_frames) {
951         debugError("nframes (%u) > scratch_buffer_size_frames (%u)\n",
952                    nbframes, scratch_buffer_size_frames);
953     }
954
955     assert(m_scratch_buffer);
956     if(!transmitSilenceBlock((char *)m_scratch_buffer, nbframes, 0)) {
957         debugError("Could not prepare silent block\n");
958         return false;
959     }
960     if(!m_data_buffer->writeFrames(nbframes, (char *)m_scratch_buffer, ts)) {
961         debugError("Could not write silent block\n");
962         return false;
963     }
964
965     SIGNAL_ACTIVITY_ISO_XMIT;
966     return true;
967 }
968
969 bool
970 StreamProcessor::shiftStream(int nbframes)
971 {
972     bool result;
973     if(nbframes == 0) return true;
974     if(nbframes > 0) {
975         result = m_data_buffer->dropFrames(nbframes);
976         SIGNAL_ACTIVITY_ALL;
977         return result;
978     } else {
979         result = true;
980         while(nbframes++) {
981             result &= m_data_buffer->writeDummyFrame();
982         }
983         SIGNAL_ACTIVITY_ALL;
984         return result;
985     }
986 }
987
988 /**
989  * @brief write silence events to the stream ringbuffers.
990  */
991 bool StreamProcessor::provideSilenceBlock(unsigned int nevents, unsigned int offset)
992 {
993     bool no_problem=true;
994     for ( PortVectorIterator it = m_Ports.begin();
995           it != m_Ports.end();
996           ++it ) {
997         if((*it)->isDisabled()) {continue;};
998
999         if(provideSilenceToPort((*it), offset, nevents)) {
1000             debugWarning("Could not put silence into to port %s",(*it)->getName().c_str());
1001             no_problem=false;
1002         }
1003     }
1004     return no_problem;
1005 }
1006
1007 int
1008 StreamProcessor::provideSilenceToPort(Port *p, unsigned int offset, unsigned int nevents)
1009 {
1010     unsigned int j=0;
1011     switch(p->getPortType()) {
1012         default:
1013             debugError("Invalid port type: %d\n", p->getPortType());
1014             return -1;
1015         case Port::E_Midi:
1016         case Port::E_Control:
1017             {
1018                 quadlet_t *buffer=(quadlet_t *)(p->getBufferAddress());
1019                 assert(nevents + offset <= p->getBufferSize());
1020                 buffer+=offset;
1021
1022                 for(j = 0; j < nevents; j += 1) {
1023                     *(buffer)=0;
1024                     buffer++;
1025                 }
1026             }
1027             break;
1028         case Port::E_Audio:
1029             switch(m_StreamProcessorManager.getAudioDataType()) {
1030             case StreamProcessorManager::eADT_Int24:
1031                 {
1032                     quadlet_t *buffer=(quadlet_t *)(p->getBufferAddress());
1033                     assert(nevents + offset <= p->getBufferSize());
1034                     buffer+=offset;
1035    
1036                     for(j = 0; j < nevents; j += 1) {
1037                         *(buffer)=0;
1038                         buffer++;
1039                     }
1040                 }
1041                 break;
1042             case StreamProcessorManager::eADT_Float:
1043                 {
1044                     float *buffer=(float *)(p->getBufferAddress());
1045                     assert(nevents + offset <= p->getBufferSize());
1046                     buffer+=offset;
1047    
1048                     for(j = 0; j < nevents; j += 1) {
1049                         *buffer = 0.0;
1050                         buffer++;
1051                     }
1052                 }
1053                 break;
1054             }
1055             break;
1056     }
1057     return 0;
1058 }
1059
1060 /***********************************************
1061  * State related API                           *
1062  ***********************************************/
1063 bool StreamProcessor::init()
1064 {
1065     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "init...\n");
1066
1067     if(!m_IsoHandlerManager.registerStream(this)) {
1068         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register stream processor with the Iso manager\n");
1069         return false;
1070     }
1071     if(!m_StreamProcessorManager.registerProcessor(this)) {
1072         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register stream processor with the SP manager\n");
1073         return false;
1074     }
1075
1076     // initialization can be done without requesting it
1077     // from the packet loop
1078     m_next_state = ePS_Created;
1079     return true;
1080 }
1081
1082 bool StreamProcessor::prepare()
1083 {
1084     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare SP (%p)...\n", this);
1085
1086     // make the scratch buffer one period of frames long
1087     m_scratch_buffer_size_bytes = m_StreamProcessorManager.getPeriodSize() * getEventsPerFrame() * getEventSize();
1088     debugOutput( DEBUG_LEVEL_VERBOSE, " Allocate scratch buffer of %d quadlets\n");
1089     if(m_scratch_buffer) delete[] m_scratch_buffer;
1090     m_scratch_buffer = new byte_t[m_scratch_buffer_size_bytes];
1091     if(m_scratch_buffer == NULL) {
1092         debugFatal("Could not allocate scratch buffer\n");
1093         return false;
1094     }
1095
1096     // set the parameters of ports we can:
1097     // we want the audio ports to be period buffered,
1098     // and the midi ports to be packet buffered
1099     for ( PortVectorIterator it = m_Ports.begin();
1100         it != m_Ports.end();
1101         ++it )
1102     {
1103         debugOutput(DEBUG_LEVEL_VERBOSE, "Setting up port %s\n",(*it)->getName().c_str());
1104         if(!(*it)->setBufferSize(m_StreamProcessorManager.getPeriodSize())) {
1105             debugFatal("Could not set buffer size to %d\n",m_StreamProcessorManager.getPeriodSize());
1106             return false;
1107         }
1108     }
1109     // the API specific settings of the ports should already be set,
1110     // as this is called from the processorManager->prepare()
1111     // so we can init the ports
1112     if(!PortManager::initPorts()) {
1113         debugFatal("Could not initialize ports\n");
1114         return false;
1115     }
1116
1117     if (!prepareChild()) {
1118         debugFatal("Could not prepare child\n");
1119         return false;
1120     }
1121
1122     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepared for:\n");
1123     debugOutput( DEBUG_LEVEL_VERBOSE, " Samplerate: %d\n",
1124              m_StreamProcessorManager.getNominalRate());
1125     debugOutput( DEBUG_LEVEL_VERBOSE, " PeriodSize: %d, NbBuffers: %d\n",
1126              m_StreamProcessorManager.getPeriodSize(), m_StreamProcessorManager.getNbBuffers());
1127     debugOutput( DEBUG_LEVEL_VERBOSE, " Port: %d, Channel: %d\n",
1128              m_1394service.getPort(), m_channel);
1129
1130     // initialization can be done without requesting it
1131     // from the packet loop
1132     m_next_state = ePS_Stopped;
1133     return updateState();
1134 }
1135
1136 bool
1137 StreamProcessor::scheduleStateTransition(enum eProcessorState state, uint64_t time_instant)
1138 {
1139     // first set the time, since in the packet loop we first check m_state == m_next_state before
1140     // using the time
1141     m_cycle_to_switch_state = TICKS_TO_CYCLES(time_instant);
1142     m_next_state = state;
1143     // wake up any threads that might be waiting on data in the buffers
1144     // since a state transition can cause data to become available
1145     SIGNAL_ACTIVITY_ALL;
1146     return true;
1147 }
1148
1149 bool
1150 StreamProcessor::waitForState(enum eProcessorState state, unsigned int timeout_ms)
1151 {
1152     debugOutput(DEBUG_LEVEL_VERBOSE, "Waiting for state %s\n", ePSToString(state));
1153     int cnt = timeout_ms;
1154     while (m_state != state && cnt) {
1155         SleepRelativeUsec(1000);
1156         cnt--;
1157     }
1158     if(cnt==0) {
1159         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout\n");
1160         return false;
1161     }
1162     return true;
1163 }
1164
1165 bool StreamProcessor::scheduleStartDryRunning(int64_t t) {
1166     uint64_t tx;
1167     if (t < 0) {
1168         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
1169     } else {
1170         tx = t;
1171     }
1172     uint64_t start_handler_ticks = substractTicks(tx, 100 * TICKS_PER_CYCLE);
1173
1174     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1175     #ifdef DEBUG
1176     uint64_t now = m_1394service.getCycleTimerTicks();
1177     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1178                           now,
1179                           (unsigned int)TICKS_TO_SECS(now),
1180                           (unsigned int)TICKS_TO_CYCLES(now),
1181                           (unsigned int)TICKS_TO_OFFSET(now));
1182     debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%03us %04uc %04ut)\n",
1183                           tx,
1184                           (unsigned int)TICKS_TO_SECS(tx),
1185                           (unsigned int)TICKS_TO_CYCLES(tx),
1186                           (unsigned int)TICKS_TO_OFFSET(tx));
1187     #endif
1188     if (m_state == ePS_Stopped) {
1189         if(!m_IsoHandlerManager.startHandlerForStream(
1190                                         this, TICKS_TO_CYCLES(start_handler_ticks))) {
1191             debugError("Could not start handler for SP %p\n", this);
1192             return false;
1193         }
1194         return scheduleStateTransition(ePS_WaitingForStream, tx);
1195     } else if (m_state == ePS_Running) {
1196         return scheduleStateTransition(ePS_WaitingForStreamDisable, tx);
1197     } else if (m_state == ePS_DryRunning) {
1198         debugOutput(DEBUG_LEVEL_VERBOSE, " %p already in DryRunning state\n", this);
1199         return true;
1200     } else if (m_state == ePS_WaitingForStreamDisable) {
1201         debugOutput(DEBUG_LEVEL_VERBOSE, " %p already waiting to switch to DryRunning state\n", this);
1202         return true;
1203     } else {
1204         debugError("Cannot switch to ePS_DryRunning from %s\n", ePSToString(m_state));
1205         return false;
1206     }
1207 }
1208
1209 bool StreamProcessor::scheduleStartRunning(int64_t t) {
1210     uint64_t tx;
1211     if (t < 0) {
1212         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
1213     } else {
1214         tx = t;
1215     }
1216     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1217     #ifdef DEBUG
1218     uint64_t now = m_1394service.getCycleTimerTicks();
1219     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1220                           now,
1221                           (unsigned int)TICKS_TO_SECS(now),
1222                           (unsigned int)TICKS_TO_CYCLES(now),
1223                           (unsigned int)TICKS_TO_OFFSET(now));
1224     debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%03us %04uc %04ut)\n",
1225                           tx,
1226                           (unsigned int)TICKS_TO_SECS(tx),
1227                           (unsigned int)TICKS_TO_CYCLES(tx),
1228                           (unsigned int)TICKS_TO_OFFSET(tx));
1229     #endif
1230     return scheduleStateTransition(ePS_WaitingForStreamEnable, tx);
1231 }
1232
1233 bool StreamProcessor::scheduleStopDryRunning(int64_t t) {
1234     uint64_t tx;
1235     if (t < 0) {
1236         tx = addTicks(m_1394service.getCycleTimerTicks(), 2000 * TICKS_PER_CYCLE);
1237     } else {
1238         tx = t;
1239     }
1240     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1241     #ifdef DEBUG
1242     uint64_t now = m_1394service.getCycleTimerTicks();
1243     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1244                           now,
1245                           (unsigned int)TICKS_TO_SECS(now),
1246                           (unsigned int)TICKS_TO_CYCLES(now),
1247                           (unsigned int)TICKS_TO_OFFSET(now));
1248     debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at              : %011llu (%03us %04uc %04ut)\n",
1249                           tx,
1250                           (unsigned int)TICKS_TO_SECS(tx),
1251                           (unsigned int)TICKS_TO_CYCLES(tx),
1252                           (unsigned int)TICKS_TO_OFFSET(tx));
1253     #endif
1254
1255     return scheduleStateTransition(ePS_Stopped, tx);
1256 }
1257
1258 bool StreamProcessor::scheduleStopRunning(int64_t t) {
1259     uint64_t tx;
1260     if (t < 0) {
1261         tx = addTicks(m_1394service.getCycleTimerTicks(), 2000 * TICKS_PER_CYCLE);
1262     } else {
1263         tx = t;
1264     }
1265     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1266     #ifdef DEBUG
1267     uint64_t now = m_1394service.getCycleTimerTicks();
1268     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1269                           now,
1270                           (unsigned int)TICKS_TO_SECS(now),
1271                           (unsigned int)TICKS_TO_CYCLES(now),
1272                           (unsigned int)TICKS_TO_OFFSET(now));
1273     debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at              : %011llu (%03us %04uc %04ut)\n",
1274                           tx,
1275                           (unsigned int)TICKS_TO_SECS(tx),
1276                           (unsigned int)TICKS_TO_CYCLES(tx),
1277                           (unsigned int)TICKS_TO_OFFSET(tx));
1278     #endif
1279     return scheduleStateTransition(ePS_WaitingForStreamDisable, tx);
1280 }
1281
1282 bool StreamProcessor::startDryRunning(int64_t t) {
1283     if(getState() == ePS_DryRunning) {
1284         // already in the correct state
1285         return true;
1286     }
1287     if(!scheduleStartDryRunning(t)) {
1288         debugError("Could not schedule transition\n");
1289         return false;
1290     }
1291     if(!waitForState(ePS_DryRunning, 2000)) {
1292         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning));
1293         return false;
1294     }
1295     return true;
1296 }
1297
1298 bool StreamProcessor::startRunning(int64_t t) {
1299     if(getState() == ePS_Running) {
1300         // already in the correct state
1301         return true;
1302     }
1303     if(!scheduleStartRunning(t)) {
1304         debugError("Could not schedule transition\n");
1305         return false;
1306     }
1307     if(!waitForState(ePS_Running, 2000)) {
1308         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Running));
1309         return false;
1310     }
1311     return true;
1312 }
1313
1314 bool StreamProcessor::stopDryRunning(int64_t t) {
1315     if(getState() == ePS_Stopped) {
1316         // already in the correct state
1317         return true;
1318     }
1319     if(!scheduleStopDryRunning(t)) {
1320         debugError("Could not schedule transition\n");
1321         return false;
1322     }
1323     if(!waitForState(ePS_Stopped, 2000)) {
1324         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Stopped));
1325         return false;
1326     }
1327     return true;
1328 }
1329
1330 bool StreamProcessor::stopRunning(int64_t t) {
1331     if(getState() == ePS_DryRunning) {
1332         // already in the correct state
1333         return true;
1334     }
1335     if(!scheduleStopRunning(t)) {
1336         debugError("Could not schedule transition\n");
1337         return false;
1338     }
1339     if(!waitForState(ePS_DryRunning, 2000)) {
1340         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning));
1341         return false;
1342     }
1343     return true;
1344 }
1345
1346
1347 // internal state API
1348
1349 /**
1350  * @brief Enter the ePS_Stopped state
1351  * @return true if successful, false if not
1352  *
1353  * @pre none
1354  *
1355  * @post the buffer and the isostream are ready for use.
1356  * @post all dynamic structures have been allocated successfully
1357  * @post the buffer is transparent and empty, and all parameters are set
1358  *       to the correct initial/nominal values.
1359  *
1360  */
1361 bool
1362 StreamProcessor::doStop()
1363 {
1364     float ticks_per_frame;
1365     unsigned int ringbuffer_size_frames = (m_StreamProcessorManager.getNbBuffers() + 1) * m_StreamProcessorManager.getPeriodSize();
1366
1367     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1368     bool result = true;
1369
1370     switch(m_state) {
1371         case ePS_Created:
1372             assert(m_data_buffer);
1373
1374             // prepare the framerate estimate
1375             ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_StreamProcessorManager.getNominalRate());
1376             m_ticks_per_frame = ticks_per_frame;
1377             m_local_node_id= m_1394service.getLocalNodeId() & 0x3f;
1378             m_correct_last_timestamp = false;
1379
1380             debugOutput(DEBUG_LEVEL_VERBOSE,"Initializing remote ticks/frame to %f\n", ticks_per_frame);
1381
1382             // initialize internal buffer
1383             result &= m_data_buffer->setBufferSize(ringbuffer_size_frames);
1384
1385             result &= m_data_buffer->setEventSize( getEventSize() );
1386             result &= m_data_buffer->setEventsPerFrame( getEventsPerFrame() );
1387             if(getType() == ePT_Receive) {
1388                 result &= m_data_buffer->setUpdatePeriod( getNominalFramesPerPacket() );
1389             } else {
1390                 result &= m_data_buffer->setUpdatePeriod( m_StreamProcessorManager.getPeriodSize() );
1391             }
1392             result &= m_data_buffer->setNominalRate(ticks_per_frame);
1393             result &= m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND);
1394             result &= m_data_buffer->prepare(); // FIXME: the name
1395
1396             break;
1397         case ePS_DryRunning:
1398             if(!m_IsoHandlerManager.stopHandlerForStream(this)) {
1399                 debugError("Could not stop handler for SP %p\n", this);
1400                 return false;
1401             }
1402             break;
1403         default:
1404             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1405             return false;
1406     }
1407
1408     result &= m_data_buffer->clearBuffer(); // FIXME: don't like the reset() name
1409     // make the buffer transparent
1410     m_data_buffer->setTransparent(true);
1411
1412     // reset all ports
1413     result &= PortManager::preparePorts();
1414
1415     m_state = ePS_Stopped;
1416     #ifdef DEBUG
1417     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1418         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1419         dumpInfo();
1420     }
1421     #endif
1422     SIGNAL_ACTIVITY_ALL;
1423     return result;
1424 }
1425
1426 /**
1427  * @brief Enter the ePS_WaitingForStream state
1428  * @return true if successful, false if not
1429  *
1430  * @pre all dynamic data structures are allocated successfully
1431  *
1432  * @post
1433  *
1434  */
1435 bool
1436 StreamProcessor::doWaitForRunningStream()
1437 {
1438     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1439     switch(m_state) {
1440         case ePS_Stopped:
1441             // we have to start waiting for an incoming stream
1442             // this basically means nothing, the state change will
1443             // be picked up by the packet iterator
1444             break;
1445         default:
1446             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1447             return false;
1448     }
1449     m_state = ePS_WaitingForStream;
1450     #ifdef DEBUG
1451     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1452         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1453         dumpInfo();
1454     }
1455     #endif
1456     SIGNAL_ACTIVITY_ALL;
1457     return true;
1458 }
1459
1460 /**
1461  * @brief Enter the ePS_DryRunning state
1462  * @return true if successful, false if not
1463  *
1464  * @pre
1465  *
1466  * @post
1467  *
1468  */
1469 bool
1470 StreamProcessor::doDryRunning()
1471 {
1472     bool result = true;
1473     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1474     switch(m_state) {
1475         case ePS_WaitingForStream:
1476             // a running stream has been detected
1477             debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started dry-running at cycle %d\n", this, m_last_cycle);
1478             m_local_node_id = m_1394service.getLocalNodeId() & 0x3f;
1479             if (getType() == ePT_Receive) {
1480                 // this to ensure that there is no discontinuity when starting to
1481                 // update the DLL based upon the received packets
1482                 m_data_buffer->setBufferTailTimestamp(m_last_timestamp);
1483             } else {
1484                 // FIXME: PC=master mode will have to do something here I guess...
1485             }
1486             break;
1487         case ePS_WaitingForStreamEnable: // when xrunning at startup
1488             result &= m_data_buffer->clearBuffer();
1489             m_data_buffer->setTransparent(true);
1490             break;
1491         case ePS_WaitingForStreamDisable:
1492             result &= m_data_buffer->clearBuffer();
1493             m_data_buffer->setTransparent(true);
1494             break;
1495         default:
1496             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1497             return false;
1498     }
1499     m_state = ePS_DryRunning;
1500     #ifdef DEBUG
1501     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1502         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1503         dumpInfo();
1504     }
1505     #endif
1506     SIGNAL_ACTIVITY_ALL;
1507     return result;
1508 }
1509
1510 /**
1511  * @brief Enter the ePS_WaitingForStreamEnable state
1512  * @return true if successful, false if not
1513  *
1514  * @pre
1515  *
1516  * @post
1517  *
1518  */
1519 bool
1520 StreamProcessor::doWaitForStreamEnable()
1521 {
1522     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1523     unsigned int ringbuffer_size_frames;
1524     switch(m_state) {
1525         case ePS_DryRunning:
1526             // we have to start waiting for an incoming stream
1527             // this basically means nothing, the state change will
1528             // be picked up by the packet iterator
1529
1530             if(!m_data_buffer->clearBuffer()) {
1531                 debugError("Could not reset data buffer\n");
1532                 return false;
1533             }
1534             if (getType() == ePT_Transmit) {
1535                 ringbuffer_size_frames = m_StreamProcessorManager.getNbBuffers() * m_StreamProcessorManager.getPeriodSize();
1536                 debugOutput(DEBUG_LEVEL_VERBOSE, "Prefill transmit SP %p with %u frames\n", this, ringbuffer_size_frames);
1537                 // prefill the buffer
1538                 if(!transferSilence(ringbuffer_size_frames)) {
1539                     debugFatal("Could not prefill transmit stream\n");
1540                     return false;
1541                 }
1542             }
1543             break;
1544         default:
1545             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1546             return false;
1547     }
1548     m_state = ePS_WaitingForStreamEnable;
1549     #ifdef DEBUG
1550     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1551         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1552         dumpInfo();
1553     }
1554     #endif
1555     SIGNAL_ACTIVITY_ALL;
1556     return true;
1557 }
1558
1559 /**
1560  * @brief Enter the ePS_Running state
1561  * @return true if successful, false if not
1562  *
1563  * @pre
1564  *
1565  * @post
1566  *
1567  */
1568 bool
1569 StreamProcessor::doRunning()
1570 {
1571     bool result = true;
1572     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1573     switch(m_state) {
1574         case ePS_WaitingForStreamEnable:
1575             // a running stream has been detected
1576             debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started running at cycle %d\n",
1577                                              this, m_last_cycle);
1578             m_in_xrun = false;
1579             m_min_ahead = 7999;
1580             m_local_node_id = m_1394service.getLocalNodeId() & 0x3f;
1581             m_data_buffer->setTransparent(false);
1582             break;
1583         default:
1584             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1585             return false;
1586     }
1587     m_state = ePS_Running;
1588     #ifdef DEBUG
1589     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1590         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1591         dumpInfo();
1592     }
1593     #endif
1594     SIGNAL_ACTIVITY_ALL;
1595     return result;
1596 }
1597
1598 /**
1599  * @brief Enter the ePS_WaitingForStreamDisable state
1600  * @return true if successful, false if not
1601  *
1602  * @pre
1603  *
1604  * @post
1605  *
1606  */
1607 bool
1608 StreamProcessor::doWaitForStreamDisable()
1609 {
1610     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1611     switch(m_state) {
1612         case ePS_Running:
1613             // the thread will do the transition
1614             break;
1615         default:
1616             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1617             return false;
1618     }
1619     m_state = ePS_WaitingForStreamDisable;
1620     #ifdef DEBUG
1621     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1622         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1623         dumpInfo();
1624     }
1625     #endif
1626     SIGNAL_ACTIVITY_ALL;
1627     return true;
1628 }
1629
1630 /**
1631  * @brief Updates the state machine and calls the necessary transition functions
1632  * @return true if successful, false if not
1633  */
1634 bool StreamProcessor::updateState() {
1635     bool result = false;
1636     // copy the current state locally since it could change value,
1637     // and that's something we don't want to happen inbetween tests
1638     // if m_next_state changes during this routine, we know for sure
1639     // that the previous state change was at least attempted correctly.
1640     enum eProcessorState next_state = m_next_state;
1641
1642     debugOutput(DEBUG_LEVEL_VERBOSE, "Do state transition: %s => %s\n",
1643         ePSToString(m_state), ePSToString(next_state));
1644
1645     if (m_state == next_state) {
1646         debugWarning("ignoring identity state update from/to %s\n", ePSToString(m_state) );
1647         return true;
1648     }
1649     // after creation, only initialization is allowed
1650     if (m_state == ePS_Created) {
1651         if(next_state != ePS_Stopped) {
1652             goto updateState_exit_with_error;
1653         }
1654         // do init here
1655         result = doStop();
1656         if (result) {return true;}
1657         else goto updateState_exit_change_failed;
1658     }
1659
1660     // after initialization, only WaitingForRunningStream is allowed
1661     if (m_state == ePS_Stopped) {
1662         if(next_state != ePS_WaitingForStream) {
1663             goto updateState_exit_with_error;
1664         }
1665         result = doWaitForRunningStream();
1666         if (result) {return true;}
1667         else goto updateState_exit_change_failed;
1668     }
1669
1670     // after WaitingForStream, only ePS_DryRunning is allowed
1671     // this means that the stream started running
1672     if (m_state == ePS_WaitingForStream) {
1673         if(next_state != ePS_DryRunning) {
1674             goto updateState_exit_with_error;
1675         }
1676         result = doDryRunning();
1677         if (result) {return true;}
1678         else goto updateState_exit_change_failed;
1679     }
1680
1681     // from ePS_DryRunning we can go to:
1682     //   - ePS_Stopped if something went wrong during DryRunning
1683     //   - ePS_WaitingForStreamEnable if there is a requested to enable
1684     if (m_state == ePS_DryRunning) {
1685         if((next_state != ePS_Stopped) &&
1686            (next_state != ePS_WaitingForStreamEnable)) {
1687             goto updateState_exit_with_error;
1688         }
1689         if (next_state == ePS_Stopped) {
1690             result = doStop();
1691         } else {
1692             result = doWaitForStreamEnable();
1693         }
1694         if (result) {return true;}
1695         else goto updateState_exit_change_failed;
1696     }
1697
1698     // from ePS_WaitingForStreamEnable we can go to:
1699     //   - ePS_DryRunning if something went wrong while waiting
1700     //   - ePS_Running if the stream enabled correctly
1701     if (m_state == ePS_WaitingForStreamEnable) {
1702         if((next_state != ePS_DryRunning) &&
1703            (next_state != ePS_Running)) {
1704             goto updateState_exit_with_error;
1705         }
1706         if (next_state == ePS_Stopped) {
1707             result = doDryRunning();
1708         } else {
1709             result = doRunning();
1710         }
1711         if (result) {return true;}
1712         else goto updateState_exit_change_failed;
1713     }
1714
1715     // from ePS_Running we can only start waiting for a disabled stream
1716     if (m_state == ePS_Running) {
1717         if(next_state != ePS_WaitingForStreamDisable) {
1718             goto updateState_exit_with_error;
1719         }
1720         result = doWaitForStreamDisable();
1721         if (result) {return true;}
1722         else goto updateState_exit_change_failed;
1723     }
1724
1725     // from ePS_WaitingForStreamDisable we can go to DryRunning
1726     if (m_state == ePS_WaitingForStreamDisable) {
1727         if(next_state != ePS_DryRunning) {
1728             goto updateState_exit_with_error;
1729         }
1730         result = doDryRunning();
1731         if (result) {return true;}
1732         else goto updateState_exit_change_failed;
1733     }
1734
1735     // if we arrive here there is an error
1736 updateState_exit_with_error:
1737     debugError("Invalid state transition: %s => %s\n",
1738         ePSToString(m_state), ePSToString(next_state));
1739     SIGNAL_ACTIVITY_ALL;
1740     return false;
1741 updateState_exit_change_failed:
1742     debugError("State transition failed: %s => %s\n",
1743         ePSToString(m_state), ePSToString(next_state));
1744     SIGNAL_ACTIVITY_ALL;
1745     return false;
1746 }
1747
1748 bool StreamProcessor::canProducePacket()
1749 {
1750     return canProduce(getNominalFramesPerPacket());
1751 }
1752 bool StreamProcessor::canProducePeriod()
1753 {
1754     return canProduce(m_StreamProcessorManager.getPeriodSize());
1755 }
1756 bool StreamProcessor::canProduce(unsigned int nframes)
1757 {
1758     if(m_in_xrun) return true;
1759     if(m_state == ePS_Running && m_next_state == ePS_Running) {
1760         // can we put a certain amount of frames into the buffer?
1761         unsigned int bufferspace = m_data_buffer->getBufferSpace();
1762         if(bufferspace >= nframes) {
1763             return true;
1764         } else return false;
1765     } else {
1766         if(getType() == ePT_Transmit) {
1767             // if we are an xmit SP, we cannot accept frames
1768             // when not running
1769             return false;
1770         } else {
1771             // if we are a receive SP, we can always accept frames
1772             // when not running
1773             return true;
1774         }
1775     }
1776 }
1777
1778 bool StreamProcessor::canConsumePacket()
1779 {
1780     return canConsume(getNominalFramesPerPacket());
1781 }
1782 bool StreamProcessor::canConsumePeriod()
1783 {
1784     return canConsume(m_StreamProcessorManager.getPeriodSize());
1785 }
1786 bool StreamProcessor::canConsume(unsigned int nframes)
1787 {
1788     if(m_in_xrun) return true;
1789     if(m_state == ePS_Running && m_next_state == ePS_Running) {
1790         // check whether we already fullfil the criterion
1791         unsigned int bufferfill = m_data_buffer->getBufferFill();
1792         if(bufferfill >= nframes) {
1793             return true;
1794         } else return false;
1795     } else {
1796         if(getType() == ePT_Transmit) {
1797             // if we are an xmit SP, and we're not running,
1798             // we can always provide frames
1799             return true;
1800         } else {
1801             // if we are a receive SP, we can't provide frames
1802             // when not running
1803             return false;
1804         }
1805     }
1806 }
1807
1808 /***********************************************
1809  * Helper routines                             *
1810  ***********************************************/
1811 // FIXME: I think this can be removed and replaced by putSilenceFrames
1812 bool
1813 StreamProcessor::transferSilence(unsigned int nframes)
1814 {
1815     bool retval;
1816
1817     #ifdef DEBUG
1818     signed int fc;
1819     ffado_timestamp_t ts_tail_tmp;
1820     m_data_buffer->getBufferTailTimestamp(&ts_tail_tmp, &fc);
1821     if (fc != 0) {
1822         debugWarning("Prefilling a buffer that already contains %d frames\n", fc);
1823     }
1824     #endif
1825
1826     // prepare a buffer of silence
1827     char *dummybuffer = (char *)calloc(getEventSize(), nframes * getEventsPerFrame());
1828     transmitSilenceBlock(dummybuffer, nframes, 0);
1829
1830     // add the silence data to the ringbuffer
1831     if(m_data_buffer->preloadFrames(nframes, dummybuffer, true)) {
1832         retval = true;
1833     } else {
1834         debugWarning("Could not write to event buffer\n");
1835         retval = false;
1836     }
1837
1838     free(dummybuffer);
1839     return retval;
1840 }
1841
1842 /**
1843  * @brief convert a eProcessorState to a string
1844  * @param s the state
1845  * @return a char * describing the state
1846  */
1847 const char *
1848 StreamProcessor::ePSToString(enum eProcessorState s) {
1849     switch (s) {
1850         case ePS_Invalid: return "ePS_Invalid";
1851         case ePS_Created: return "ePS_Created";
1852         case ePS_Stopped: return "ePS_Stopped";
1853         case ePS_WaitingForStream: return "ePS_WaitingForStream";
1854         case ePS_DryRunning: return "ePS_DryRunning";
1855         case ePS_WaitingForStreamEnable: return "ePS_WaitingForStreamEnable";
1856         case ePS_Running: return "ePS_Running";
1857         case ePS_WaitingForStreamDisable: return "ePS_WaitingForStreamDisable";
1858         default: return "error: unknown state";
1859     }
1860 }
1861
1862 /**
1863  * @brief convert a eProcessorType to a string
1864  * @param t the type
1865  * @return a char * describing the state
1866  */
1867 const char *
1868 StreamProcessor::ePTToString(enum eProcessorType t) {
1869     switch (t) {
1870         case ePT_Receive: return "Receive";
1871         case ePT_Transmit: return "Transmit";
1872         default: return "error: unknown type";
1873     }
1874 }
1875
1876 /***********************************************
1877  * Debug                                       *
1878  ***********************************************/
1879 void
1880 StreamProcessor::dumpInfo()
1881 {
1882     #ifdef DEBUG
1883     debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor %p, %s:\n", this, ePTToString(m_processor_type));
1884     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Port, Channel  : %d, %d\n", m_1394service.getPort(), m_channel);
1885     uint64_t now = m_1394service.getCycleTimerTicks();
1886     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Now                   : %011llu (%03us %04uc %04ut)\n",
1887                         now,
1888                         (unsigned int)TICKS_TO_SECS(now),
1889                         (unsigned int)TICKS_TO_CYCLES(now),
1890                         (unsigned int)TICKS_TO_OFFSET(now));
1891     if(getType() == ePT_Transmit) {
1892         debugOutputShort( DEBUG_LEVEL_NORMAL, "  Min ISOXMT bufferfill : %04d\n", m_min_ahead);
1893     }
1894     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Xrun?                 : %s\n", (m_in_xrun ? "True":"False"));
1895     if (m_state == m_next_state) {
1896         debugOutputShort( DEBUG_LEVEL_NORMAL, "  State                 : %s\n",
1897                                             ePSToString(m_state));
1898     } else {
1899         debugOutputShort( DEBUG_LEVEL_NORMAL, "  State                 : %s (Next: %s)\n",
1900                                               ePSToString(m_state), ePSToString(m_next_state));
1901         debugOutputShort( DEBUG_LEVEL_NORMAL, "    transition at       : %u\n", m_cycle_to_switch_state);
1902     }
1903     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer                : %p\n", m_data_buffer);
1904     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Framerate             : Nominal: %u, Sync: %f, Buffer %f\n",
1905                                           m_StreamProcessorManager.getNominalRate(),
1906                                           24576000.0/m_StreamProcessorManager.getSyncSource().m_data_buffer->getRate(),
1907                                           24576000.0/m_data_buffer->getRate());
1908     float d = getSyncDelay();
1909     debugOutputShort(DEBUG_LEVEL_NORMAL, "  Sync delay             : %f ticks (%f frames, %f cy)\n",
1910                                          d, d/getTicksPerFrame(),
1911                                          d/((float)TICKS_PER_CYCLE));
1912     #endif
1913     m_data_buffer->dumpInfo();
1914 }
1915
1916 void
1917 StreamProcessor::printBufferInfo()
1918 {
1919     debugOutput(DEBUG_LEVEL_NORMAL,
1920                 "(%p, %8s) fc: %d fill: %u\n",
1921                 this, getTypeString(), m_data_buffer->getFrameCounter(), m_data_buffer->getBufferFill() );
1922 }
1923
1924 void
1925 StreamProcessor::setVerboseLevel(int l) {
1926     setDebugLevel(l);
1927     PortManager::setVerboseLevel(l);
1928     m_data_buffer->setVerboseLevel(l);
1929 }
1930
1931 } // end of namespace
Note: See TracBrowser for help on using the browser.