root/trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp

Revision 1001, 75.4 kB (checked in by ppalmers, 13 years ago)

Improve streaming startup for better initial timestamps and locking.

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "config.h"
25
26 #include "StreamProcessor.h"
27 #include "../StreamProcessorManager.h"
28
29 #include "devicemanager.h"
30
31 #include "libieee1394/ieee1394service.h"
32 #include "libieee1394/IsoHandlerManager.h"
33 #include "libieee1394/cycletimer.h"
34
35 #include "libutil/Time.h"
36
37 #include "libutil/Atomic.h"
38
39 #include <assert.h>
40 #include <math.h>
41
42 #define SIGNAL_ACTIVITY { \
43     pthread_mutex_lock(&m_activity_cond_lock); \
44     pthread_cond_broadcast(&m_activity_cond); \
45     pthread_mutex_unlock(&m_activity_cond_lock); \
46 }
47
48 namespace Streaming {
49
50 IMPL_DEBUG_MODULE( StreamProcessor, StreamProcessor, DEBUG_LEVEL_VERBOSE );
51
52 StreamProcessor::StreamProcessor(FFADODevice &parent, enum eProcessorType type)
53     : m_processor_type ( type )
54     , m_state( ePS_Created )
55     , m_next_state( ePS_Invalid )
56     , m_cycle_to_switch_state( 0 )
57     , m_Parent( parent )
58     , m_1394service( parent.get1394Service() ) // local cache
59     , m_IsoHandlerManager( parent.get1394Service().getIsoHandlerManager() ) // local cache
60     , m_StreamProcessorManager( m_Parent.getDeviceManager().getStreamProcessorManager() ) // local cache
61     , m_local_node_id ( 0 ) // local cache
62     , m_channel( -1 )
63     , m_dropped( 0 )
64     , m_last_timestamp( 0 )
65     , m_last_timestamp2( 0 )
66     , m_correct_last_timestamp( false )
67     , m_scratch_buffer( NULL )
68     , m_scratch_buffer_size_bytes( 0 )
69     , m_ticks_per_frame( 0 )
70     , m_last_cycle( -1 )
71     , m_sync_delay( 0 )
72     , m_in_xrun( false )
73 {
74     // create the timestamped buffer and register ourselves as its client
75     m_data_buffer = new Util::TimestampedBuffer(this);
76     pthread_mutex_init(&m_activity_cond_lock, NULL);
77     pthread_cond_init(&m_activity_cond, NULL);
78 }
79
80 StreamProcessor::~StreamProcessor() {
81     m_StreamProcessorManager.unregisterProcessor(this);
82     if(!m_IsoHandlerManager.unregisterStream(this)) {
83         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister stream processor with the Iso manager\n");
84     }
85
86     // lock the condition mutex to keep threads from blocking on
87     // the condition var while we destroy it
88     pthread_mutex_lock(&m_activity_cond_lock);
89     // now signal activity, releasing threads that
90     // are already blocking on the condition variable
91     pthread_cond_broadcast(&m_activity_cond);
92     // then destroy it
93     pthread_cond_destroy(&m_activity_cond);
94     pthread_mutex_unlock(&m_activity_cond_lock);
95
96     // destroy the mutexes
97     pthread_mutex_destroy(&m_activity_cond_lock);
98     if (m_data_buffer) delete m_data_buffer;
99     if (m_scratch_buffer) delete[] m_scratch_buffer;
100 }
101
102 void
103 StreamProcessor::handleBusReset()
104 {
105     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) handling busreset\n", this);
106     // for now, we try and make sure everything is cleanly shutdown
107     if(!stopRunning(-1)) {
108         debugError("Failed to stop SP\n");
109     }
110     SIGNAL_ACTIVITY;
111 }
112
113 void StreamProcessor::handlerDied()
114 {
115     debugWarning("Handler died for %p\n", this);
116     m_state = ePS_Stopped;
117     m_in_xrun = true;
118 }
119
120 uint64_t StreamProcessor::getTimeNow() {
121     return m_1394service.getCycleTimerTicks();
122 }
123
124 int StreamProcessor::getMaxFrameLatency() {
125     if (getType() == ePT_Receive) {
126         return (int)(m_IsoHandlerManager.getPacketLatencyForStream( this ) * TICKS_PER_CYCLE);
127     } else {
128         return (int)(m_IsoHandlerManager.getPacketLatencyForStream( this ) * TICKS_PER_CYCLE);
129     }
130 }
131
132 unsigned int
133 StreamProcessor::getNominalPacketsNeeded(unsigned int nframes)
134 {
135     unsigned int nominal_frames_per_second
136                     = m_StreamProcessorManager.getNominalRate();
137     uint64_t nominal_ticks_per_frame = TICKS_PER_SECOND / nominal_frames_per_second;
138     uint64_t nominal_ticks = nominal_ticks_per_frame * nframes;
139     uint64_t nominal_packets = nominal_ticks / TICKS_PER_CYCLE;
140     return nominal_packets;
141 }
142
143 unsigned int
144 StreamProcessor::getPacketsPerPeriod()
145 {
146     return getNominalPacketsNeeded(m_StreamProcessorManager.getPeriodSize());
147 }
148
149 unsigned int
150 StreamProcessor::getNbPacketsIsoXmitBuffer()
151 {
152     // if we use one thread per packet, we can put every frame directly into the ISO buffer
153     // the waitForClient in IsoHandler will take care of the fact that the frames are
154     // not present in time
155     unsigned int packets_to_prebuffer = (getPacketsPerPeriod() * (m_StreamProcessorManager.getNbBuffers())) + 10;
156     debugOutput(DEBUG_LEVEL_VERBOSE, "Nominal prebuffer: %u\n", packets_to_prebuffer);
157     return packets_to_prebuffer;
158 }
159
160 /***********************************************
161  * Buffer management and manipulation          *
162  ***********************************************/
163 void StreamProcessor::flush() {
164     m_IsoHandlerManager.flushHandlerForStream(this);
165 }
166
167 int StreamProcessor::getBufferFill() {
168     return m_data_buffer->getBufferFill();
169 }
170
171 int64_t
172 StreamProcessor::getTimeUntilNextPeriodSignalUsecs()
173 {
174     uint64_t time_at_period=getTimeAtPeriod();
175
176     // we delay the period signal with the sync delay
177     // this makes that the period signals lag a little compared to reality
178     // ISO buffering causes the packets to be received at max
179     // m_handler->getWakeupInterval() later than the time they were received.
180     // hence their payload is available this amount of time later. However, the
181     // period boundary is predicted based upon earlier samples, and therefore can
182     // pass before these packets are processed. Adding this extra term makes that
183     // the period boundary is signalled later
184     time_at_period = addTicks(time_at_period, m_StreamProcessorManager.getSyncSource().getSyncDelay());
185
186     uint64_t cycle_timer=m_1394service.getCycleTimerTicks();
187
188     // calculate the time until the next period
189     int32_t until_next=diffTicks(time_at_period,cycle_timer);
190
191     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> TAP=%11llu, CTR=%11llu, UTN=%11ld\n",
192         time_at_period, cycle_timer, until_next
193         );
194
195     // now convert to usecs
196     // don't use the mapping function because it only works
197     // for absolute times, not the relative time we are
198     // using here (which can also be negative).
199     return (int64_t)(((float)until_next) / TICKS_PER_USEC);
200 }
201
202 void
203 StreamProcessor::setSyncDelay(unsigned int d) {
204     #ifdef DEBUG
205     unsigned int frames = (unsigned int)((float)d / getTicksPerFrame());
206     debugOutput(DEBUG_LEVEL_VERBOSE, "Setting SP %p SyncDelay to %u ticks, %u frames\n", this, d, frames);
207     #endif
208     m_sync_delay = d; // FIXME: sync delay not necessary anymore
209 }
210
211 uint64_t
212 StreamProcessor::getTimeAtPeriodUsecs()
213 {
214     return (uint64_t)((float)getTimeAtPeriod() * TICKS_PER_USEC);
215 }
216
217 uint64_t
218 StreamProcessor::getTimeAtPeriod()
219 {
220     if (getType() == ePT_Receive) {
221         ffado_timestamp_t next_period_boundary = m_data_buffer->getTimestampFromHead(m_StreamProcessorManager.getPeriodSize());
222    
223         #ifdef DEBUG
224         ffado_timestamp_t ts;
225         signed int fc;
226         m_data_buffer->getBufferTailTimestamp(&ts,&fc);
227    
228         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD="TIMESTAMP_FORMAT_SPEC", LTS="TIMESTAMP_FORMAT_SPEC", FC=%5u, TPF=%f\n",
229             next_period_boundary, ts, fc, getTicksPerFrame()
230             );
231         #endif
232         return (uint64_t)next_period_boundary;
233     } else {
234         ffado_timestamp_t next_period_boundary = m_data_buffer->getTimestampFromTail((m_StreamProcessorManager.getNbBuffers()-1) * m_StreamProcessorManager.getPeriodSize());
235    
236         #ifdef DEBUG
237         ffado_timestamp_t ts;
238         signed int fc;
239         m_data_buffer->getBufferTailTimestamp(&ts,&fc);
240    
241         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD="TIMESTAMP_FORMAT_SPEC", LTS="TIMESTAMP_FORMAT_SPEC", FC=%5u, TPF=%f\n",
242             next_period_boundary, ts, fc, getTicksPerFrame()
243             );
244         #endif
245         return (uint64_t)next_period_boundary;
246     }
247 }
248
249 float
250 StreamProcessor::getTicksPerFrame()
251 {
252     assert(m_data_buffer != NULL);
253     return m_data_buffer->getRate();
254 }
255
256 void
257 StreamProcessor::setTicksPerFrame(float tpf)
258 {
259     assert(m_data_buffer != NULL);
260     m_data_buffer->setRate(tpf);
261 }
262
263 bool
264 StreamProcessor::canClientTransferFrames(unsigned int nbframes)
265 {
266     bool can_transfer;
267     unsigned int fc = m_data_buffer->getFrameCounter();
268     if (getType() == ePT_Receive) {
269         can_transfer = (fc >= nbframes);
270     } else {
271         // there has to be enough space to put the frames in
272         can_transfer = m_data_buffer->getBufferSize() - fc > nbframes;
273         // or the buffer is transparent
274         can_transfer |= m_data_buffer->isTransparent();
275     }
276    
277     #ifdef DEBUG
278     if (!can_transfer) {
279         debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) cannot transfer since fc == %u, nbframes == %u\n",
280             this, ePTToString(getType()), fc, nbframes);
281     }
282     #endif
283    
284     return can_transfer;
285 }
286
287 /***********************************************
288  * I/O API                                     *
289  ***********************************************/
290
291 // Packet transfer API
292 enum raw1394_iso_disposition
293 StreamProcessor::putPacket(unsigned char *data, unsigned int length,
294                            unsigned char channel, unsigned char tag, unsigned char sy,
295                            unsigned int cycle, unsigned int dropped,
296                            unsigned int skipped) {
297 #ifdef DEBUG
298     if(m_last_cycle == -1) {
299         debugOutput(DEBUG_LEVEL_VERBOSE, "Handler for %s SP %p is alive (cycle = %u)\n", getTypeString(), this, cycle);
300     }
301 #endif
302
303     int dropped_cycles = 0;
304     if (m_last_cycle != (int)cycle && m_last_cycle != -1) {
305         dropped_cycles = diffCycles(cycle, m_last_cycle) - 1;
306         if (dropped_cycles < 0) {
307             debugWarning("(%p) dropped < 1 (%d), cycle: %d, last_cycle: %d, dropped: %d, 'skipped'=%u\n",
308                          this, dropped_cycles, cycle, m_last_cycle, dropped, skipped);
309         }
310         if (dropped_cycles > 0) {
311             debugWarning("(%p) dropped %d packets on cycle %u, 'dropped'=%u, 'skipped'=%u, cycle=%d, m_last_cycle=%d\n",
312                 this, dropped_cycles, cycle, dropped, skipped, cycle, m_last_cycle);
313             m_dropped += dropped_cycles;
314             m_last_cycle = cycle;
315             m_Parent.showDevice();
316 //             flushDebugOutput();
317 //             assert(0);
318         }
319     }
320     m_last_cycle = cycle;
321
322     // bypass based upon state
323     if (m_state == ePS_Invalid) {
324         debugError("Should not have state %s\n", ePSToString(m_state) );
325         return RAW1394_ISO_ERROR;
326     }
327     if (m_state == ePS_Created) {
328         return RAW1394_ISO_DEFER;
329     }
330
331     // store the previous timestamp
332     m_last_timestamp2 = m_last_timestamp;
333
334     // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles)
335     //       it happens on the first 'good' cycle for the wait condition
336     //       or on the first received cycle that is received afterwards (might be a problem)
337
338     // check whether we are waiting for a stream to be disabled
339     if(m_state == ePS_WaitingForStreamDisable) {
340         // we then check whether we have to switch on this cycle
341         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
342             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n");
343             m_next_state = ePS_DryRunning;
344             if (!updateState()) { // we are allowed to change the state directly
345                 debugError("Could not update state!\n");
346                 return RAW1394_ISO_ERROR;
347             }
348         } else {
349             // not time to disable yet
350         }
351         // the received data can be discarded while waiting for the stream
352         // to be disabled
353         // similarly for dropped packets
354         return RAW1394_ISO_OK;
355     }
356
357     // check whether we are waiting for a stream to be enabled
358     else if(m_state == ePS_WaitingForStreamEnable) {
359         // we then check whether we have to switch on this cycle
360         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
361             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n");
362             m_next_state = ePS_Running;
363             if (!updateState()) { // we are allowed to change the state directly
364                 debugError("Could not update state!\n");
365                 return RAW1394_ISO_ERROR;
366             }
367         } else {
368             // not time to enable yet
369         }
370         // we are dryRunning hence data should be processed in any case
371     }
372
373     // check the packet header
374     enum eChildReturnValue result = processPacketHeader(data, length, channel, tag, sy, cycle, dropped_cycles);
375
376     // handle dropped cycles
377     if(dropped_cycles) {
378         // make sure the last_timestamp is corrected
379         m_correct_last_timestamp = true;
380         if (m_state == ePS_Running) {
381             // this is an xrun situation
382             m_in_xrun = true;
383             debugWarning("Should update state to WaitingForStreamDisable due to dropped packet xrun\n");
384             m_cycle_to_switch_state = cycle + 1; // switch in the next cycle
385             m_next_state = ePS_WaitingForStreamDisable;
386             // execute the requested change
387             if (!updateState()) { // we are allowed to change the state directly
388                 debugError("Could not update state!\n");
389                 return RAW1394_ISO_ERROR;
390             }
391         }
392     }
393
394     if (result == eCRV_OK) {
395         #ifdef DEBUG
396         int ticks_per_packet = (int)(getTicksPerFrame() * getNominalFramesPerPacket());
397         int diff=diffTicks(m_last_timestamp, m_last_timestamp2);
398         // display message if the difference between two successive tick
399         // values is more than 50 ticks. 1 sample at 48k is 512 ticks
400         // so 50 ticks = 10%, which is a rather large jitter value.
401         if(diff-ticks_per_packet > 50 || diff-ticks_per_packet < -50) {
402             debugOutput(DEBUG_LEVEL_VERBOSE,
403                         "cy %04u rather large TSP difference TS=%011llu => TS=%011llu (%d, nom %d)\n",
404                         cycle, m_last_timestamp2, m_last_timestamp, diff, ticks_per_packet);
405         }
406         debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
407                            "%04u %011llu %011llu %d %d\n",
408                            cycle, m_last_timestamp2, m_last_timestamp,
409                            diff, ticks_per_packet);
410         #endif
411
412         debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
413                           "RECV: CY=%04u TS=%011llu\n",
414                           cycle, m_last_timestamp);
415         // update some accounting
416         m_last_good_cycle = cycle;
417         m_last_dropped = dropped_cycles;
418
419         if(m_correct_last_timestamp) {
420             // they represent a discontinuity in the timestamps, and hence are
421             // to be dealt with
422             debugWarning("(%p) Correcting timestamp for dropped cycles, discarding packet...\n", this);
423             m_data_buffer->setBufferTailTimestamp(substractTicks(m_last_timestamp,
424                                                                  (uint64_t)(getNominalFramesPerPacket()
425                                                                             * getTicksPerFrame())));
426             m_correct_last_timestamp = false;
427         }
428
429         // check whether we are waiting for a stream to startup
430         // this requires that the packet is good
431         if(m_state == ePS_WaitingForStream) {
432             // since we have a packet with an OK header,
433             // we can indicate that the stream started up
434
435             // we then check whether we have to switch on this cycle
436             if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
437                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to good packet\n");
438                 // hence go to the dryRunning state
439                 m_next_state = ePS_DryRunning;
440                 if (!updateState()) { // we are allowed to change the state directly
441                     debugError("Could not update state!\n");
442                     return RAW1394_ISO_ERROR;
443                 }
444             } else {
445                 // not time (yet) to switch state
446             }
447             // in both cases we don't want to process the data
448             return RAW1394_ISO_OK;
449         }
450
451         // check whether a state change has been requested
452         // note that only the wait state changes are synchronized with the cycles
453         else if(m_state != m_next_state) {
454             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
455                                              ePSToString(m_state), ePSToString(m_next_state));
456             // execute the requested change
457             if (!updateState()) { // we are allowed to change the state directly
458                 debugError("Could not update state!\n");
459                 return RAW1394_ISO_ERROR;
460             }
461         }
462
463         // for all states that reach this we are allowed to
464         // do protocol specific data reception
465         enum eChildReturnValue result2 = processPacketData(data, length, channel, tag, sy, cycle, dropped_cycles);
466
467         // if an xrun occured, switch to the dryRunning state and
468         // allow for the xrun to be picked up
469         if (result2 == eCRV_XRun) {
470             debugWarning("processPacketData xrun\n");
471             m_in_xrun = true;
472             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
473             m_cycle_to_switch_state = cycle+1; // switch in the next cycle
474             m_next_state = ePS_WaitingForStreamDisable;
475             // execute the requested change
476             if (!updateState()) { // we are allowed to change the state directly
477                 debugError("Could not update state!\n");
478                 return RAW1394_ISO_ERROR;
479             }
480             return RAW1394_ISO_DEFER;
481         } else if(result2 == eCRV_OK) {
482             // no problem here
483             SIGNAL_ACTIVITY;
484             return RAW1394_ISO_OK;
485         } else {
486             debugError("Invalid response\n");
487             return RAW1394_ISO_ERROR;
488         }
489     } else if(result == eCRV_Invalid) {
490         // apparently we don't have to do anything when the packets are not valid
491         return RAW1394_ISO_OK;
492     } else {
493         debugError("Invalid response\n");
494         return RAW1394_ISO_ERROR;
495     }
496     debugError("reached the unreachable\n");
497     return RAW1394_ISO_ERROR;
498 }
499
500 enum raw1394_iso_disposition
501 StreamProcessor::getPacket(unsigned char *data, unsigned int *length,
502                            unsigned char *tag, unsigned char *sy,
503                            int cycle, unsigned int dropped,
504                            unsigned int skipped, unsigned int max_length) {
505     if (cycle<0) {
506         *tag = 0;
507         *sy = 0;
508         *length = 0;
509         return RAW1394_ISO_OK;
510     }
511
512     unsigned int ctr;
513     uint64_t prev_timestamp;
514     int now_cycles;
515     int cycle_diff;
516
517 #ifdef DEBUG
518     if(m_last_cycle == -1) {
519         debugOutput(DEBUG_LEVEL_VERBOSE, "Handler for %s SP %p is alive (cycle = %d)\n", getTypeString(), this, cycle);
520     }
521 #endif
522
523     int dropped_cycles = 0;
524     if (m_last_cycle != cycle && m_last_cycle != -1) {
525         dropped_cycles = diffCycles(cycle, m_last_cycle) - 1;
526         // correct for skipped packets
527         // since those are not dropped, but only delayed
528         dropped_cycles =- skipped;
529         if(skipped) {
530             debugWarning("(%p) skipped %d cycles, cycle: %d, last_cycle: %d, dropped: %d\n",
531                          this, skipped, cycle, m_last_cycle, dropped);
532         }
533         if (dropped_cycles < 0) {
534             debugWarning("(%p) dropped < 1 (%d), cycle: %d, last_cycle: %d, dropped: %d, skipped: %d\n",
535                          this, dropped_cycles, cycle, m_last_cycle, dropped, skipped);
536         }
537         if (dropped_cycles > 0) {
538             debugWarning("(%p) dropped %d packets on cycle %u (last_cycle=%u, dropped=%d, skipped: %d)\n",
539                          this, dropped_cycles, cycle, m_last_cycle, dropped, skipped);
540             m_dropped += dropped_cycles;
541             // HACK: this should not be necessary, since the header generation functions should trigger the xrun.
542             //       but apparently there are some issues with the 1394 stack
543             m_in_xrun = true;
544             if(m_state == ePS_Running) {
545                 debugShowBackLogLines(200);
546                 debugWarning("dropped packets xrun\n");
547                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to dropped packets xrun\n");
548                 m_cycle_to_switch_state = cycle + 1;
549                 m_next_state = ePS_WaitingForStreamDisable;
550                 // execute the requested change
551                 if (!updateState()) { // we are allowed to change the state directly
552                     debugError("Could not update state!\n");
553                     return RAW1394_ISO_ERROR;
554                 }
555                 goto send_empty_packet;
556             }
557         }
558     }
559     if (cycle >= 0) {
560         m_last_cycle = cycle;
561     }
562
563 #ifdef DEBUG
564     // bypass based upon state
565     if (m_state == ePS_Invalid) {
566         debugError("Should not have state %s\n", ePSToString(m_state) );
567         return RAW1394_ISO_ERROR;
568     }
569 #endif
570
571     if (m_state == ePS_Created) {
572         *tag = 0;
573         *sy = 0;
574         *length = 0;
575         return RAW1394_ISO_DEFER;
576     }
577
578     // normal processing
579     // note that we can't use getCycleTimer directly here,
580     // because packets are queued in advance. This means that
581     // we the packet we are constructing will be sent out
582     // on 'cycle', not 'now'.
583     ctr = m_1394service.getCycleTimer();
584     now_cycles = (int)CYCLE_TIMER_GET_CYCLES(ctr);
585
586     // the difference between the cycle this
587     // packet is intended for and 'now'
588     cycle_diff = diffCycles(cycle, now_cycles);
589
590     if(cycle_diff < 0 && (m_state == ePS_Running || m_state == ePS_DryRunning)) {
591         debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy)\n",
592             cycle, now_cycles);
593         if(m_state == ePS_Running) {
594             debugShowBackLogLines(200);
595 //             flushDebugOutput();
596 //             assert(0);
597             debugWarning("generatePacketData xrun\n");
598             m_in_xrun = true;
599             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
600             m_cycle_to_switch_state = cycle + 1;
601             m_next_state = ePS_WaitingForStreamDisable;
602             // execute the requested change
603             if (!updateState()) { // we are allowed to change the state directly
604                 debugError("Could not update state!\n");
605                 return RAW1394_ISO_ERROR;
606             }
607             goto send_empty_packet;
608         }
609     }
610
611     // store the previous timestamp
612     // keep the old value here, update m_last_timestamp2 only when
613     // a valid packet will be sent
614     prev_timestamp = m_last_timestamp;
615    
616     // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles)
617     //       it happens on the first 'good' cycle for the wait condition
618     //       or on the first received cycle that is received afterwards (might be a problem)
619
620     // check whether we are waiting for a stream to be disabled
621     if(m_state == ePS_WaitingForStreamDisable) {
622         // we then check whether we have to switch on this cycle
623         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
624             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n");
625             m_next_state = ePS_DryRunning;
626             if (!updateState()) { // we are allowed to change the state directly
627                 debugError("Could not update state!\n");
628                 return RAW1394_ISO_ERROR;
629             }
630         }
631         // generate the silent packet header
632         enum eChildReturnValue result = generateSilentPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
633         if (result == eCRV_Packet) {
634             debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
635                                "XMIT SILENT: CY=%04u TS=%011llu\n",
636                                cycle, m_last_timestamp);
637             // update some accounting
638             m_last_good_cycle = cycle;
639             m_last_dropped = dropped_cycles;
640
641             // assumed not to xrun
642             generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
643             return RAW1394_ISO_OK;
644         // FIXME: PP: I think this should be possible too
645         //} else if (result == eCRV_EmptyPacket) {
646         //    goto send_empty_packet;
647         } else {
648             debugError("Invalid return value: %d\n", result);
649             return RAW1394_ISO_ERROR;
650         }
651     }
652     // check whether we are waiting for a stream to be enabled
653     else if(m_state == ePS_WaitingForStreamEnable) {
654         // we then check whether we have to switch on this cycle
655         if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) {
656             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n");
657             m_next_state = ePS_Running;
658             if (!updateState()) { // we are allowed to change the state directly
659                 debugError("Could not update state!\n");
660                 return RAW1394_ISO_ERROR;
661             }
662         } else {
663             // not time to enable yet
664         }
665         // we are dryRunning hence data should be processed in any case
666     }
667     // check whether we are waiting for a stream to startup
668     else if(m_state == ePS_WaitingForStream) {
669         // as long as the cycle parameter is not in sync with
670         // the current time, the stream is considered not
671         // to be 'running'
672         // we then check whether we have to switch on this cycle
673         if ((cycle_diff >= 0) && (diffCycles(cycle, m_cycle_to_switch_state) >= 0)) {
674             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStream to DryRunning\n");
675             // hence go to the dryRunning state
676             m_next_state = ePS_DryRunning;
677             if (!updateState()) { // we are allowed to change the state directly
678                 debugError("Could not update state!\n");
679                 return RAW1394_ISO_ERROR;
680             }
681         } else {
682             // not time (yet) to switch state
683         }
684     }
685     else if(m_state == ePS_Running) {
686         // check the packet header
687         enum eChildReturnValue result = generatePacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
688         if (result == eCRV_Packet || result == eCRV_Defer) {
689             debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
690                                "XMIT: CY=%04u TS=%011llu\n",
691                                cycle, m_last_timestamp);
692             // update some accounting
693             m_last_good_cycle = cycle;
694             m_last_dropped = dropped_cycles;
695
696             // valid packet timestamp
697             m_last_timestamp2 = prev_timestamp;
698
699             // check whether a state change has been requested
700             // note that only the wait state changes are synchronized with the cycles
701             if(m_state != m_next_state) {
702                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
703                                                 ePSToString(m_state), ePSToString(m_next_state));
704                 // execute the requested change
705                 if (!updateState()) { // we are allowed to change the state directly
706                     debugError("Could not update state!\n");
707                     return RAW1394_ISO_ERROR;
708                 }
709             }
710
711             enum eChildReturnValue result2 = generatePacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
712             // if an xrun occured, switch to the dryRunning state and
713             // allow for the xrun to be picked up
714             if (result2 == eCRV_XRun) {
715                 debugWarning("generatePacketData xrun\n");
716                 m_in_xrun = true;
717                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n");
718                 m_cycle_to_switch_state = cycle + 1; // switch in the next cycle
719                 m_next_state = ePS_WaitingForStreamDisable;
720                 // execute the requested change
721                 if (!updateState()) { // we are allowed to change the state directly
722                     debugError("Could not update state!\n");
723                     return RAW1394_ISO_ERROR;
724                 }
725                 goto send_empty_packet;
726             }
727             #ifdef DEBUG
728             int ticks_per_packet = (int)(getTicksPerFrame() * getNominalFramesPerPacket());
729             int diff=diffTicks(m_last_timestamp, m_last_timestamp2);
730             // display message if the difference between two successive tick
731             // values is more than 50 ticks. 1 sample at 48k is 512 ticks
732             // so 50 ticks = 10%, which is a rather large jitter value.
733             if(diff-ticks_per_packet > 50 || diff-ticks_per_packet < -50) {
734                 debugOutput(DEBUG_LEVEL_VERBOSE,
735                             "cy %04d, rather large TSP difference TS=%011llu => TS=%011llu (%d, nom %d)\n",
736                             cycle, m_last_timestamp2, m_last_timestamp, diff, ticks_per_packet);
737             }
738             debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
739                                "%04d %011llu %011llu %d %d\n",
740                                cycle, m_last_timestamp2, m_last_timestamp, diff, ticks_per_packet);
741             #endif
742
743             // skip queueing packets if we detect that there are not enough frames
744             // available
745             if(result2 == eCRV_Defer || result == eCRV_Defer) {
746                 return RAW1394_ISO_DEFER;
747             } else {
748                 return RAW1394_ISO_OK;
749             }
750         } else if (result == eCRV_XRun) { // pick up the possible xruns
751             debugWarning("generatePacketHeader xrun\n");
752             m_in_xrun = true;
753             debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to header xrun\n");
754             m_cycle_to_switch_state = cycle + 1; // switch in the next cycle
755             m_next_state = ePS_WaitingForStreamDisable;
756             // execute the requested change
757             if (!updateState()) { // we are allowed to change the state directly
758                 debugError("Could not update state!\n");
759                 return RAW1394_ISO_ERROR;
760             }
761         } else if (result == eCRV_EmptyPacket) {
762             if(m_state != m_next_state) {
763                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
764                                                 ePSToString(m_state), ePSToString(m_next_state));
765                 // execute the requested change
766                 if (!updateState()) { // we are allowed to change the state directly
767                     debugError("Could not update state!\n");
768                     return RAW1394_ISO_ERROR;
769                 }
770             }
771             goto send_empty_packet;
772         } else if (result == eCRV_Again) {
773             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "have to retry cycle %d\n", cycle);
774             if(m_state != m_next_state) {
775                 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
776                                                 ePSToString(m_state), ePSToString(m_next_state));
777                 // execute the requested change
778                 if (!updateState()) { // we are allowed to change the state directly
779                     debugError("Could not update state!\n");
780                     return RAW1394_ISO_ERROR;
781                 }
782             }
783 //             usleep(125); // only when using thread-per-handler
784 //             return RAW1394_ISO_AGAIN;
785             generateEmptyPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
786             generateEmptyPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
787             return RAW1394_ISO_DEFER;
788         } else {
789             debugError("Invalid return value: %d\n", result);
790             return RAW1394_ISO_ERROR;
791         }
792     }
793     // we are not running, so send an empty packet
794     // we should generate a valid packet any time
795 send_empty_packet:
796     // note that only the wait state changes are synchronized with the cycles
797     if(m_state != m_next_state) {
798         debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n",
799                                         ePSToString(m_state), ePSToString(m_next_state));
800         // execute the requested change
801         if (!updateState()) { // we are allowed to change the state directly
802             debugError("Could not update state!\n");
803             return RAW1394_ISO_ERROR;
804         }
805     }
806
807     debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
808                        "XMIT EMPTY: CY=%04u\n",
809                        cycle);
810     generateEmptyPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length);
811     generateEmptyPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);
812     return RAW1394_ISO_OK;
813 }
814
815
816 // Frame Transfer API
817 /**
818  * Transfer a block of frames from the event buffer to the port buffers
819  * @param nbframes number of frames to transfer
820  * @param ts the timestamp that the LAST frame in the block should have
821  * @return
822  */
823 bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) {
824     bool result;
825     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
826                         "%p.getFrames(%d, %11llu)",
827                         nbframes, ts);
828     assert( getType() == ePT_Receive );
829     if(isDryRunning()) result = getFramesDry(nbframes, ts);
830     else result = getFramesWet(nbframes, ts);
831     SIGNAL_ACTIVITY;
832     return result;
833 }
834
835 bool StreamProcessor::getFramesWet(unsigned int nbframes, int64_t ts) {
836 // FIXME: this should be done somewhere else
837 #ifdef DEBUG
838     uint64_t ts_expected;
839     signed int fc;
840     int32_t lag_ticks;
841     float lag_frames;
842
843     // in order to sync up multiple received streams, we should
844     // use the ts parameter. It specifies the time of the block's
845     // last sample.
846     float srate = m_StreamProcessorManager.getSyncSource().getTicksPerFrame();
847     assert(srate != 0.0);
848     int64_t this_block_length_in_ticks = (int64_t)(((float)nbframes) * srate);
849
850     ffado_timestamp_t ts_head_tmp;
851     m_data_buffer->getBufferHeadTimestamp(&ts_head_tmp, &fc);
852     ts_expected = addTicks((uint64_t)ts_head_tmp, this_block_length_in_ticks);
853
854     lag_ticks = diffTicks(ts, ts_expected);
855     lag_frames = (((float)lag_ticks) / srate);
856     debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
857                        "stream (%p): drifts %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
858                        this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
859     if (lag_frames >= 1.0) {
860         // the stream lags
861         debugOutput(DEBUG_LEVEL_VERBOSE, "stream (%p): lags  with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
862                       this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
863     } else if (lag_frames <= -1.0) {
864         // the stream leads
865         debugOutput(DEBUG_LEVEL_VERBOSE, "stream (%p): leads with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n",
866                       this, lag_ticks, lag_frames, srate, ts, ts_expected, fc);
867     }
868 #endif
869     // ask the buffer to process nbframes of frames
870     // using it's registered client's processReadBlock(),
871     // which should be ours
872     m_data_buffer->blockProcessReadFrames(nbframes);
873     return true;
874 }
875
876 bool StreamProcessor::getFramesDry(unsigned int nbframes, int64_t ts)
877 {
878     debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
879                        "stream (%p): dry run %d frames (@ ts=%lld)\n",
880                        this, nbframes, ts);
881     // dry run on this side means that we put silence in all enabled ports
882     // since there is do data put into the ringbuffer in the dry-running state
883     return provideSilenceBlock(nbframes, 0);
884 }
885
886 bool
887 StreamProcessor::dropFrames(unsigned int nbframes, int64_t ts)
888 {
889     bool result;
890     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d, %lld)\n", nbframes, ts);
891     result = m_data_buffer->dropFrames(nbframes);
892     SIGNAL_ACTIVITY;
893     return result;
894 }
895
896 bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts)
897 {
898     bool result;
899     debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
900                        "%p.putFrames(%d, %11llu)",
901                        nbframes, ts);
902     assert( getType() == ePT_Transmit );
903     if(isDryRunning()) result = putFramesDry(nbframes, ts);
904     else result = putFramesWet(nbframes, ts);
905     SIGNAL_ACTIVITY;
906     return result;
907 }
908
909 bool
910 StreamProcessor::putFramesWet(unsigned int nbframes, int64_t ts)
911 {
912     debugOutputExtreme(DEBUG_LEVEL_ULTRA_VERBOSE,
913                        "StreamProcessor::putFramesWet(%d, %llu)\n",
914                        nbframes, ts);
915     // transfer the data
916     m_data_buffer->blockProcessWriteFrames(nbframes, ts);
917     debugOutputExtreme(DEBUG_LEVEL_ULTRA_VERBOSE,
918                        " New timestamp: %llu\n", ts);
919     return true; // FIXME: what about failure?
920 }
921
922 bool
923 StreamProcessor::putFramesDry(unsigned int nbframes, int64_t ts)
924 {
925     debugOutputExtreme(DEBUG_LEVEL_ULTRA_VERBOSE,
926                        "StreamProcessor::putFramesDry(%d, %llu)\n",
927                        nbframes, ts);
928     // do nothing
929     return true;
930 }
931
932 bool
933 StreamProcessor::putSilenceFrames(unsigned int nbframes, int64_t ts)
934 {
935     debugOutputExtreme(DEBUG_LEVEL_ULTRA_VERBOSE,
936                        "StreamProcessor::putSilenceFrames(%d, %llu)\n",
937                        nbframes, ts);
938
939     size_t bytes_per_frame = getEventSize() * getEventsPerFrame();
940     unsigned int scratch_buffer_size_frames = m_scratch_buffer_size_bytes / bytes_per_frame;
941
942     if (nbframes > scratch_buffer_size_frames) {
943         debugError("nframes (%u) > scratch_buffer_size_frames (%u)\n",
944                    nbframes, scratch_buffer_size_frames);
945     }
946
947     assert(m_scratch_buffer);
948     if(!transmitSilenceBlock((char *)m_scratch_buffer, nbframes, 0)) {
949         debugError("Could not prepare silent block\n");
950         return false;
951     }
952     if(!m_data_buffer->writeFrames(nbframes, (char *)m_scratch_buffer, ts)) {
953         debugError("Could not write silent block\n");
954         return false;
955     }
956
957     SIGNAL_ACTIVITY;
958     return true;
959 }
960
961 bool
962 StreamProcessor::shiftStream(int nbframes)
963 {
964     bool result;
965     if(nbframes == 0) return true;
966     if(nbframes > 0) {
967         result = m_data_buffer->dropFrames(nbframes);
968         SIGNAL_ACTIVITY;
969         return result;
970     } else {
971         result = true;
972         while(nbframes++) {
973             result &= m_data_buffer->writeDummyFrame();
974         }
975         SIGNAL_ACTIVITY;
976         return result;
977     }
978 }
979
980 /**
981  * @brief write silence events to the stream ringbuffers.
982  */
983 bool StreamProcessor::provideSilenceBlock(unsigned int nevents, unsigned int offset)
984 {
985     bool no_problem=true;
986     for ( PortVectorIterator it = m_Ports.begin();
987           it != m_Ports.end();
988           ++it ) {
989         if((*it)->isDisabled()) {continue;};
990
991         if(provideSilenceToPort((*it), offset, nevents)) {
992             debugWarning("Could not put silence into to port %s",(*it)->getName().c_str());
993             no_problem=false;
994         }
995     }
996     return no_problem;
997 }
998
999 int
1000 StreamProcessor::provideSilenceToPort(Port *p, unsigned int offset, unsigned int nevents)
1001 {
1002     unsigned int j=0;
1003     switch(p->getPortType()) {
1004         default:
1005             debugError("Invalid port type: %d\n", p->getPortType());
1006             return -1;
1007         case Port::E_Midi:
1008         case Port::E_Control:
1009             {
1010                 quadlet_t *buffer=(quadlet_t *)(p->getBufferAddress());
1011                 assert(nevents + offset <= p->getBufferSize());
1012                 buffer+=offset;
1013
1014                 for(j = 0; j < nevents; j += 1) {
1015                     *(buffer)=0;
1016                     buffer++;
1017                 }
1018             }
1019             break;
1020         case Port::E_Audio:
1021             switch(m_StreamProcessorManager.getAudioDataType()) {
1022             case StreamProcessorManager::eADT_Int24:
1023                 {
1024                     quadlet_t *buffer=(quadlet_t *)(p->getBufferAddress());
1025                     assert(nevents + offset <= p->getBufferSize());
1026                     buffer+=offset;
1027    
1028                     for(j = 0; j < nevents; j += 1) {
1029                         *(buffer)=0;
1030                         buffer++;
1031                     }
1032                 }
1033                 break;
1034             case StreamProcessorManager::eADT_Float:
1035                 {
1036                     float *buffer=(float *)(p->getBufferAddress());
1037                     assert(nevents + offset <= p->getBufferSize());
1038                     buffer+=offset;
1039    
1040                     for(j = 0; j < nevents; j += 1) {
1041                         *buffer = 0.0;
1042                         buffer++;
1043                     }
1044                 }
1045                 break;
1046             }
1047             break;
1048     }
1049     return 0;
1050 }
1051
1052 /***********************************************
1053  * State related API                           *
1054  ***********************************************/
1055 bool StreamProcessor::init()
1056 {
1057     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "init...\n");
1058
1059     if(!m_IsoHandlerManager.registerStream(this)) {
1060         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register stream processor with the Iso manager\n");
1061         return false;
1062     }
1063     if(!m_StreamProcessorManager.registerProcessor(this)) {
1064         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register stream processor with the SP manager\n");
1065         return false;
1066     }
1067
1068     // initialization can be done without requesting it
1069     // from the packet loop
1070     m_next_state = ePS_Created;
1071     return true;
1072 }
1073
1074 bool StreamProcessor::prepare()
1075 {
1076     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare SP (%p)...\n", this);
1077
1078     // make the scratch buffer one period of frames long
1079     m_scratch_buffer_size_bytes = m_StreamProcessorManager.getPeriodSize() * getEventsPerFrame() * getEventSize();
1080     debugOutput( DEBUG_LEVEL_VERBOSE, " Allocate scratch buffer of %d quadlets\n");
1081     if(m_scratch_buffer) delete[] m_scratch_buffer;
1082     m_scratch_buffer = new byte_t[m_scratch_buffer_size_bytes];
1083     if(m_scratch_buffer == NULL) {
1084         debugFatal("Could not allocate scratch buffer\n");
1085         return false;
1086     }
1087
1088     // set the parameters of ports we can:
1089     // we want the audio ports to be period buffered,
1090     // and the midi ports to be packet buffered
1091     for ( PortVectorIterator it = m_Ports.begin();
1092         it != m_Ports.end();
1093         ++it )
1094     {
1095         debugOutput(DEBUG_LEVEL_VERBOSE, "Setting up port %s\n",(*it)->getName().c_str());
1096         if(!(*it)->setBufferSize(m_StreamProcessorManager.getPeriodSize())) {
1097             debugFatal("Could not set buffer size to %d\n",m_StreamProcessorManager.getPeriodSize());
1098             return false;
1099         }
1100     }
1101     // the API specific settings of the ports should already be set,
1102     // as this is called from the processorManager->prepare()
1103     // so we can init the ports
1104     if(!PortManager::initPorts()) {
1105         debugFatal("Could not initialize ports\n");
1106         return false;
1107     }
1108
1109     if (!prepareChild()) {
1110         debugFatal("Could not prepare child\n");
1111         return false;
1112     }
1113
1114     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepared for:\n");
1115     debugOutput( DEBUG_LEVEL_VERBOSE, " Samplerate: %d\n",
1116              m_StreamProcessorManager.getNominalRate());
1117     debugOutput( DEBUG_LEVEL_VERBOSE, " PeriodSize: %d, NbBuffers: %d\n",
1118              m_StreamProcessorManager.getPeriodSize(), m_StreamProcessorManager.getNbBuffers());
1119     debugOutput( DEBUG_LEVEL_VERBOSE, " Port: %d, Channel: %d\n",
1120              m_1394service.getPort(), m_channel);
1121
1122     // initialization can be done without requesting it
1123     // from the packet loop
1124     m_next_state = ePS_Stopped;
1125     return updateState();
1126 }
1127
1128 bool
1129 StreamProcessor::scheduleStateTransition(enum eProcessorState state, uint64_t time_instant)
1130 {
1131     // first set the time, since in the packet loop we first check m_state == m_next_state before
1132     // using the time
1133     m_cycle_to_switch_state = TICKS_TO_CYCLES(time_instant);
1134     m_next_state = state;
1135     // wake up any threads that might be waiting on data in the buffers
1136     // since a state transition can cause data to become available
1137     SIGNAL_ACTIVITY;
1138     return true;
1139 }
1140
1141 bool
1142 StreamProcessor::waitForState(enum eProcessorState state, unsigned int timeout_ms)
1143 {
1144     debugOutput(DEBUG_LEVEL_VERBOSE, "Waiting for state %s\n", ePSToString(state));
1145     int cnt = timeout_ms;
1146     while (m_state != state && cnt) {
1147         SleepRelativeUsec(1000);
1148         cnt--;
1149     }
1150     if(cnt==0) {
1151         debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout\n");
1152         return false;
1153     }
1154     return true;
1155 }
1156
1157 bool StreamProcessor::scheduleStartDryRunning(int64_t t) {
1158     uint64_t tx;
1159     if (t < 0) {
1160         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
1161     } else {
1162         tx = t;
1163     }
1164     uint64_t start_handler_ticks = substractTicks(tx, 100 * TICKS_PER_CYCLE);
1165
1166     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1167     #ifdef DEBUG
1168     uint64_t now = m_1394service.getCycleTimerTicks();
1169     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1170                           now,
1171                           (unsigned int)TICKS_TO_SECS(now),
1172                           (unsigned int)TICKS_TO_CYCLES(now),
1173                           (unsigned int)TICKS_TO_OFFSET(now));
1174     debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%03us %04uc %04ut)\n",
1175                           tx,
1176                           (unsigned int)TICKS_TO_SECS(tx),
1177                           (unsigned int)TICKS_TO_CYCLES(tx),
1178                           (unsigned int)TICKS_TO_OFFSET(tx));
1179     #endif
1180     if (m_state == ePS_Stopped) {
1181         if(!m_IsoHandlerManager.startHandlerForStream(
1182                                         this, TICKS_TO_CYCLES(start_handler_ticks))) {
1183             debugError("Could not start handler for SP %p\n", this);
1184             return false;
1185         }
1186         return scheduleStateTransition(ePS_WaitingForStream, tx);
1187     } else if (m_state == ePS_Running) {
1188         return scheduleStateTransition(ePS_WaitingForStreamDisable, tx);
1189     } else if (m_state == ePS_DryRunning) {
1190         debugOutput(DEBUG_LEVEL_VERBOSE, " %p already in DryRunning state\n", this);
1191         return true;
1192     } else if (m_state == ePS_WaitingForStreamDisable) {
1193         debugOutput(DEBUG_LEVEL_VERBOSE, " %p already waiting to switch to DryRunning state\n", this);
1194         return true;
1195     } else {
1196         debugError("Cannot switch to ePS_DryRunning from %s\n", ePSToString(m_state));
1197         return false;
1198     }
1199 }
1200
1201 bool StreamProcessor::scheduleStartRunning(int64_t t) {
1202     uint64_t tx;
1203     if (t < 0) {
1204         tx = addTicks(m_1394service.getCycleTimerTicks(), 200 * TICKS_PER_CYCLE);
1205     } else {
1206         tx = t;
1207     }
1208     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1209     #ifdef DEBUG
1210     uint64_t now = m_1394service.getCycleTimerTicks();
1211     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1212                           now,
1213                           (unsigned int)TICKS_TO_SECS(now),
1214                           (unsigned int)TICKS_TO_CYCLES(now),
1215                           (unsigned int)TICKS_TO_OFFSET(now));
1216     debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%03us %04uc %04ut)\n",
1217                           tx,
1218                           (unsigned int)TICKS_TO_SECS(tx),
1219                           (unsigned int)TICKS_TO_CYCLES(tx),
1220                           (unsigned int)TICKS_TO_OFFSET(tx));
1221     #endif
1222     return scheduleStateTransition(ePS_WaitingForStreamEnable, tx);
1223 }
1224
1225 bool StreamProcessor::scheduleStopDryRunning(int64_t t) {
1226     uint64_t tx;
1227     if (t < 0) {
1228         tx = addTicks(m_1394service.getCycleTimerTicks(), 2000 * TICKS_PER_CYCLE);
1229     } else {
1230         tx = t;
1231     }
1232     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1233     #ifdef DEBUG
1234     uint64_t now = m_1394service.getCycleTimerTicks();
1235     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1236                           now,
1237                           (unsigned int)TICKS_TO_SECS(now),
1238                           (unsigned int)TICKS_TO_CYCLES(now),
1239                           (unsigned int)TICKS_TO_OFFSET(now));
1240     debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at              : %011llu (%03us %04uc %04ut)\n",
1241                           tx,
1242                           (unsigned int)TICKS_TO_SECS(tx),
1243                           (unsigned int)TICKS_TO_CYCLES(tx),
1244                           (unsigned int)TICKS_TO_OFFSET(tx));
1245     #endif
1246
1247     return scheduleStateTransition(ePS_Stopped, tx);
1248 }
1249
1250 bool StreamProcessor::scheduleStopRunning(int64_t t) {
1251     uint64_t tx;
1252     if (t < 0) {
1253         tx = addTicks(m_1394service.getCycleTimerTicks(), 2000 * TICKS_PER_CYCLE);
1254     } else {
1255         tx = t;
1256     }
1257     debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this);
1258     #ifdef DEBUG
1259     uint64_t now = m_1394service.getCycleTimerTicks();
1260     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011llu (%03us %04uc %04ut)\n",
1261                           now,
1262                           (unsigned int)TICKS_TO_SECS(now),
1263                           (unsigned int)TICKS_TO_CYCLES(now),
1264                           (unsigned int)TICKS_TO_OFFSET(now));
1265     debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at              : %011llu (%03us %04uc %04ut)\n",
1266                           tx,
1267                           (unsigned int)TICKS_TO_SECS(tx),
1268                           (unsigned int)TICKS_TO_CYCLES(tx),
1269                           (unsigned int)TICKS_TO_OFFSET(tx));
1270     #endif
1271     return scheduleStateTransition(ePS_WaitingForStreamDisable, tx);
1272 }
1273
1274 bool StreamProcessor::startDryRunning(int64_t t) {
1275     if(getState() == ePS_DryRunning) {
1276         // already in the correct state
1277         return true;
1278     }
1279     if(!scheduleStartDryRunning(t)) {
1280         debugError("Could not schedule transition\n");
1281         return false;
1282     }
1283     if(!waitForState(ePS_DryRunning, 2000)) {
1284         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning));
1285         return false;
1286     }
1287     return true;
1288 }
1289
1290 bool StreamProcessor::startRunning(int64_t t) {
1291     if(getState() == ePS_Running) {
1292         // already in the correct state
1293         return true;
1294     }
1295     if(!scheduleStartRunning(t)) {
1296         debugError("Could not schedule transition\n");
1297         return false;
1298     }
1299     if(!waitForState(ePS_Running, 2000)) {
1300         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Running));
1301         return false;
1302     }
1303     return true;
1304 }
1305
1306 bool StreamProcessor::stopDryRunning(int64_t t) {
1307     if(getState() == ePS_Stopped) {
1308         // already in the correct state
1309         return true;
1310     }
1311     if(!scheduleStopDryRunning(t)) {
1312         debugError("Could not schedule transition\n");
1313         return false;
1314     }
1315     if(!waitForState(ePS_Stopped, 2000)) {
1316         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Stopped));
1317         return false;
1318     }
1319     return true;
1320 }
1321
1322 bool StreamProcessor::stopRunning(int64_t t) {
1323     if(getState() == ePS_DryRunning) {
1324         // already in the correct state
1325         return true;
1326     }
1327     if(!scheduleStopRunning(t)) {
1328         debugError("Could not schedule transition\n");
1329         return false;
1330     }
1331     if(!waitForState(ePS_DryRunning, 2000)) {
1332         debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning));
1333         return false;
1334     }
1335     return true;
1336 }
1337
1338
1339 // internal state API
1340
1341 /**
1342  * @brief Enter the ePS_Stopped state
1343  * @return true if successful, false if not
1344  *
1345  * @pre none
1346  *
1347  * @post the buffer and the isostream are ready for use.
1348  * @post all dynamic structures have been allocated successfully
1349  * @post the buffer is transparent and empty, and all parameters are set
1350  *       to the correct initial/nominal values.
1351  *
1352  */
1353 bool
1354 StreamProcessor::doStop()
1355 {
1356     float ticks_per_frame;
1357     unsigned int ringbuffer_size_frames = (m_StreamProcessorManager.getNbBuffers() + 1) * m_StreamProcessorManager.getPeriodSize();
1358
1359     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1360     bool result = true;
1361
1362     switch(m_state) {
1363         case ePS_Created:
1364             assert(m_data_buffer);
1365
1366             // prepare the framerate estimate
1367             ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_StreamProcessorManager.getNominalRate());
1368             m_ticks_per_frame = ticks_per_frame;
1369             m_local_node_id= m_1394service.getLocalNodeId() & 0x3f;
1370             m_correct_last_timestamp = false;
1371
1372             debugOutput(DEBUG_LEVEL_VERBOSE,"Initializing remote ticks/frame to %f\n", ticks_per_frame);
1373
1374             // initialize internal buffer
1375             result &= m_data_buffer->setBufferSize(ringbuffer_size_frames);
1376
1377             result &= m_data_buffer->setEventSize( getEventSize() );
1378             result &= m_data_buffer->setEventsPerFrame( getEventsPerFrame() );
1379             if(getType() == ePT_Receive) {
1380                 result &= m_data_buffer->setUpdatePeriod( getNominalFramesPerPacket() );
1381             } else {
1382                 result &= m_data_buffer->setUpdatePeriod( m_StreamProcessorManager.getPeriodSize() );
1383             }
1384             result &= m_data_buffer->setNominalRate(ticks_per_frame);
1385             result &= m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND);
1386             result &= m_data_buffer->prepare(); // FIXME: the name
1387
1388             break;
1389         case ePS_DryRunning:
1390             if(!m_IsoHandlerManager.stopHandlerForStream(this)) {
1391                 debugError("Could not stop handler for SP %p\n", this);
1392                 return false;
1393             }
1394             break;
1395         default:
1396             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1397             return false;
1398     }
1399
1400     result &= m_data_buffer->clearBuffer(); // FIXME: don't like the reset() name
1401     // make the buffer transparent
1402     m_data_buffer->setTransparent(true);
1403
1404     // reset all ports
1405     result &= PortManager::preparePorts();
1406
1407     m_state = ePS_Stopped;
1408     #ifdef DEBUG
1409     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1410         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1411         dumpInfo();
1412     }
1413     #endif
1414     SIGNAL_ACTIVITY;
1415     return result;
1416 }
1417
1418 /**
1419  * @brief Enter the ePS_WaitingForStream state
1420  * @return true if successful, false if not
1421  *
1422  * @pre all dynamic data structures are allocated successfully
1423  *
1424  * @post
1425  *
1426  */
1427 bool
1428 StreamProcessor::doWaitForRunningStream()
1429 {
1430     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1431     switch(m_state) {
1432         case ePS_Stopped:
1433             // we have to start waiting for an incoming stream
1434             // this basically means nothing, the state change will
1435             // be picked up by the packet iterator
1436             break;
1437         default:
1438             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1439             return false;
1440     }
1441     m_state = ePS_WaitingForStream;
1442     #ifdef DEBUG
1443     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1444         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1445         dumpInfo();
1446     }
1447     #endif
1448     SIGNAL_ACTIVITY;
1449     return true;
1450 }
1451
1452 /**
1453  * @brief Enter the ePS_DryRunning state
1454  * @return true if successful, false if not
1455  *
1456  * @pre
1457  *
1458  * @post
1459  *
1460  */
1461 bool
1462 StreamProcessor::doDryRunning()
1463 {
1464     bool result = true;
1465     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1466     switch(m_state) {
1467         case ePS_WaitingForStream:
1468             // a running stream has been detected
1469             debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started dry-running at cycle %d\n", this, m_last_cycle);
1470             m_local_node_id = m_1394service.getLocalNodeId() & 0x3f;
1471             if (getType() == ePT_Receive) {
1472                 // this to ensure that there is no discontinuity when starting to
1473                 // update the DLL based upon the received packets
1474                 m_data_buffer->setBufferTailTimestamp(m_last_timestamp);
1475             } else {
1476                 // FIXME: PC=master mode will have to do something here I guess...
1477             }
1478             break;
1479         case ePS_WaitingForStreamEnable: // when xrunning at startup
1480             result &= m_data_buffer->clearBuffer();
1481             m_data_buffer->setTransparent(true);
1482             break;
1483         case ePS_WaitingForStreamDisable:
1484             result &= m_data_buffer->clearBuffer();
1485             m_data_buffer->setTransparent(true);
1486             break;
1487         default:
1488             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1489             return false;
1490     }
1491     m_state = ePS_DryRunning;
1492     #ifdef DEBUG
1493     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1494         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1495         dumpInfo();
1496     }
1497     #endif
1498     SIGNAL_ACTIVITY;
1499     return result;
1500 }
1501
1502 /**
1503  * @brief Enter the ePS_WaitingForStreamEnable state
1504  * @return true if successful, false if not
1505  *
1506  * @pre
1507  *
1508  * @post
1509  *
1510  */
1511 bool
1512 StreamProcessor::doWaitForStreamEnable()
1513 {
1514     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1515     unsigned int ringbuffer_size_frames;
1516     switch(m_state) {
1517         case ePS_DryRunning:
1518             // we have to start waiting for an incoming stream
1519             // this basically means nothing, the state change will
1520             // be picked up by the packet iterator
1521
1522             if(!m_data_buffer->clearBuffer()) {
1523                 debugError("Could not reset data buffer\n");
1524                 return false;
1525             }
1526             if (getType() == ePT_Transmit) {
1527                 ringbuffer_size_frames = m_StreamProcessorManager.getNbBuffers() * m_StreamProcessorManager.getPeriodSize();
1528                 debugOutput(DEBUG_LEVEL_VERBOSE, "Prefill transmit SP %p with %u frames\n", this, ringbuffer_size_frames);
1529                 // prefill the buffer
1530                 if(!transferSilence(ringbuffer_size_frames)) {
1531                     debugFatal("Could not prefill transmit stream\n");
1532                     return false;
1533                 }
1534             }
1535             break;
1536         default:
1537             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1538             return false;
1539     }
1540     m_state = ePS_WaitingForStreamEnable;
1541     #ifdef DEBUG
1542     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1543         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1544         dumpInfo();
1545     }
1546     #endif
1547     SIGNAL_ACTIVITY;
1548     return true;
1549 }
1550
1551 /**
1552  * @brief Enter the ePS_Running state
1553  * @return true if successful, false if not
1554  *
1555  * @pre
1556  *
1557  * @post
1558  *
1559  */
1560 bool
1561 StreamProcessor::doRunning()
1562 {
1563     bool result = true;
1564     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1565     switch(m_state) {
1566         case ePS_WaitingForStreamEnable:
1567             // a running stream has been detected
1568             debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started running at cycle %d\n",
1569                                              this, m_last_cycle);
1570             m_in_xrun = false;
1571             m_local_node_id = m_1394service.getLocalNodeId() & 0x3f;
1572             m_data_buffer->setTransparent(false);
1573             break;
1574         default:
1575             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1576             return false;
1577     }
1578     m_state = ePS_Running;
1579     #ifdef DEBUG
1580     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1581         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1582         dumpInfo();
1583     }
1584     #endif
1585     SIGNAL_ACTIVITY;
1586     return result;
1587 }
1588
1589 /**
1590  * @brief Enter the ePS_WaitingForStreamDisable state
1591  * @return true if successful, false if not
1592  *
1593  * @pre
1594  *
1595  * @post
1596  *
1597  */
1598 bool
1599 StreamProcessor::doWaitForStreamDisable()
1600 {
1601     debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state));
1602     switch(m_state) {
1603         case ePS_Running:
1604             // the thread will do the transition
1605             break;
1606         default:
1607             debugError("Entry from invalid state: %s\n", ePSToString(m_state));
1608             return false;
1609     }
1610     m_state = ePS_WaitingForStreamDisable;
1611     #ifdef DEBUG
1612     if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) {
1613         debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n");
1614         dumpInfo();
1615     }
1616     #endif
1617     SIGNAL_ACTIVITY;
1618     return true;
1619 }
1620
1621 /**
1622  * @brief Updates the state machine and calls the necessary transition functions
1623  * @return true if successful, false if not
1624  */
1625 bool StreamProcessor::updateState() {
1626     bool result = false;
1627     // copy the current state locally since it could change value,
1628     // and that's something we don't want to happen inbetween tests
1629     // if m_next_state changes during this routine, we know for sure
1630     // that the previous state change was at least attempted correctly.
1631     enum eProcessorState next_state = m_next_state;
1632
1633     debugOutput(DEBUG_LEVEL_VERBOSE, "Do state transition: %s => %s\n",
1634         ePSToString(m_state), ePSToString(next_state));
1635
1636     if (m_state == next_state) {
1637         debugWarning("ignoring identity state update from/to %s\n", ePSToString(m_state) );
1638         return true;
1639     }
1640     // after creation, only initialization is allowed
1641     if (m_state == ePS_Created) {
1642         if(next_state != ePS_Stopped) {
1643             goto updateState_exit_with_error;
1644         }
1645         // do init here
1646         result = doStop();
1647         if (result) {return true;}
1648         else goto updateState_exit_change_failed;
1649     }
1650
1651     // after initialization, only WaitingForRunningStream is allowed
1652     if (m_state == ePS_Stopped) {
1653         if(next_state != ePS_WaitingForStream) {
1654             goto updateState_exit_with_error;
1655         }
1656         result = doWaitForRunningStream();
1657         if (result) {return true;}
1658         else goto updateState_exit_change_failed;
1659     }
1660
1661     // after WaitingForStream, only ePS_DryRunning is allowed
1662     // this means that the stream started running
1663     if (m_state == ePS_WaitingForStream) {
1664         if(next_state != ePS_DryRunning) {
1665             goto updateState_exit_with_error;
1666         }
1667         result = doDryRunning();
1668         if (result) {return true;}
1669         else goto updateState_exit_change_failed;
1670     }
1671
1672     // from ePS_DryRunning we can go to:
1673     //   - ePS_Stopped if something went wrong during DryRunning
1674     //   - ePS_WaitingForStreamEnable if there is a requested to enable
1675     if (m_state == ePS_DryRunning) {
1676         if((next_state != ePS_Stopped) &&
1677            (next_state != ePS_WaitingForStreamEnable)) {
1678             goto updateState_exit_with_error;
1679         }
1680         if (next_state == ePS_Stopped) {
1681             result = doStop();
1682         } else {
1683             result = doWaitForStreamEnable();
1684         }
1685         if (result) {return true;}
1686         else goto updateState_exit_change_failed;
1687     }
1688
1689     // from ePS_WaitingForStreamEnable we can go to:
1690     //   - ePS_DryRunning if something went wrong while waiting
1691     //   - ePS_Running if the stream enabled correctly
1692     if (m_state == ePS_WaitingForStreamEnable) {
1693         if((next_state != ePS_DryRunning) &&
1694            (next_state != ePS_Running)) {
1695             goto updateState_exit_with_error;
1696         }
1697         if (next_state == ePS_Stopped) {
1698             result = doDryRunning();
1699         } else {
1700             result = doRunning();
1701         }
1702         if (result) {return true;}
1703         else goto updateState_exit_change_failed;
1704     }
1705
1706     // from ePS_Running we can only start waiting for a disabled stream
1707     if (m_state == ePS_Running) {
1708         if(next_state != ePS_WaitingForStreamDisable) {
1709             goto updateState_exit_with_error;
1710         }
1711         result = doWaitForStreamDisable();
1712         if (result) {return true;}
1713         else goto updateState_exit_change_failed;
1714     }
1715
1716     // from ePS_WaitingForStreamDisable we can go to DryRunning
1717     if (m_state == ePS_WaitingForStreamDisable) {
1718         if(next_state != ePS_DryRunning) {
1719             goto updateState_exit_with_error;
1720         }
1721         result = doDryRunning();
1722         if (result) {return true;}
1723         else goto updateState_exit_change_failed;
1724     }
1725
1726     // if we arrive here there is an error
1727 updateState_exit_with_error:
1728     debugError("Invalid state transition: %s => %s\n",
1729         ePSToString(m_state), ePSToString(next_state));
1730     SIGNAL_ACTIVITY;
1731     return false;
1732 updateState_exit_change_failed:
1733     debugError("State transition failed: %s => %s\n",
1734         ePSToString(m_state), ePSToString(next_state));
1735     SIGNAL_ACTIVITY;
1736     return false;
1737 }
1738
1739 bool StreamProcessor::waitForProducePacket()
1740 {
1741     return waitForProduce(getNominalFramesPerPacket());
1742 }
1743 bool StreamProcessor::waitForProducePeriod()
1744 {
1745     return waitForProduce(m_StreamProcessorManager.getPeriodSize());
1746 }
1747 bool StreamProcessor::waitForProduce(unsigned int nframes)
1748 {
1749     debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
1750                        "(%p, %s) wait ...\n",
1751                        this, getTypeString());
1752     struct timespec ts;
1753     int result;
1754     int max_runs = 1000;
1755
1756     if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
1757         debugError("clock_gettime failed\n");
1758         return false;
1759     }
1760
1761     // FIXME: hardcoded timeout of 10 sec
1762 //     ts.tv_nsec += 1000 * 1000000LL;
1763 //     while (ts.tv_nsec > 1000000000LL) {
1764 //         ts.tv_sec += 1;
1765 //         ts.tv_nsec -= 1000000000LL;
1766 //     }
1767     ts.tv_sec += 2;
1768    
1769     pthread_mutex_lock(&m_activity_cond_lock);
1770     while(!canProduce(nframes) && max_runs) {
1771         result = pthread_cond_timedwait(&m_activity_cond, &m_activity_cond_lock, &ts);
1772    
1773         if(result != 0) {
1774             if (result == ETIMEDOUT) {
1775                 debugOutput(DEBUG_LEVEL_VERBOSE,
1776                             "(%p, %s) pthread_cond_timedwait() timed out (result=%d)\n",
1777                             this, getTypeString(), result);
1778                 pthread_mutex_unlock(&m_activity_cond_lock);
1779                 dumpInfo();
1780                 return false;
1781             } else if (result == EINTR) {
1782                 debugOutput(DEBUG_LEVEL_VERBOSE,
1783                             "(%p, %s) pthread_cond_timedwait() interrupted by signal (result=%d)\n",
1784                             this, getTypeString(), result);
1785                 pthread_mutex_unlock(&m_activity_cond_lock);
1786                 dumpInfo();
1787                 return false;
1788             } else {
1789                 debugError("(%p, %s) pthread_cond_timedwait error (result=%d)\n",
1790                             this, getTypeString(), result);
1791                 pthread_mutex_unlock(&m_activity_cond_lock);
1792                 dumpInfo();
1793                 return false;
1794             }
1795         }
1796     }
1797     pthread_mutex_unlock(&m_activity_cond_lock);
1798     if(max_runs == 0) {
1799         debugWarning("(%p) runaway loop\n");
1800     }
1801     return true;
1802 }
1803
1804 bool StreamProcessor::waitForConsumePacket()
1805 {
1806     return waitForConsume(getNominalFramesPerPacket());
1807 }
1808 bool StreamProcessor::waitForConsumePeriod()
1809 {
1810     return waitForConsume(m_StreamProcessorManager.getPeriodSize());
1811 }
1812 bool StreamProcessor::waitForConsume(unsigned int nframes)
1813 {
1814     debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
1815                        "(%p, %s) wait ...\n",
1816                        this, getTypeString());
1817     struct timespec ts;
1818     int result;
1819
1820     int max_runs = 1000;
1821
1822     if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
1823         debugError("clock_gettime failed\n");
1824         return false;
1825     }
1826
1827     // FIXME: hardcoded timeout of 10 sec
1828 //     ts.tv_nsec += 1000 * 1000000LL;
1829 //     while (ts.tv_nsec > 1000000000LL) {
1830 //         ts.tv_sec += 1;
1831 //         ts.tv_nsec -= 1000000000LL;
1832 //     }
1833     ts.tv_sec += 2;
1834
1835     pthread_mutex_lock(&m_activity_cond_lock);
1836     while(!canConsume(nframes) && max_runs) {
1837         result = pthread_cond_timedwait(&m_activity_cond, &m_activity_cond_lock, &ts);
1838         if(result != 0) {
1839             if (result == ETIMEDOUT) {
1840                 debugOutput(DEBUG_LEVEL_VERBOSE,
1841                             "(%p, %s) pthread_cond_timedwait() timed out (result=%d)\n",
1842                             this, getTypeString(), result);
1843                 pthread_mutex_unlock(&m_activity_cond_lock);
1844                 dumpInfo();
1845                 return false;
1846             } else if (result == EINTR) {
1847                 debugOutput(DEBUG_LEVEL_VERBOSE,
1848                             "(%p, %s) pthread_cond_timedwait() interrupted by signal (result=%d)\n",
1849                             this, getTypeString(), result);
1850                 pthread_mutex_unlock(&m_activity_cond_lock);
1851                 dumpInfo();
1852                 return false;
1853             } else {
1854                 debugError("(%p, %s) pthread_cond_timedwait error (result=%d)\n",
1855                             this, getTypeString(), result);
1856                 pthread_mutex_unlock(&m_activity_cond_lock);
1857                 dumpInfo();
1858                 return false;
1859             }
1860         }
1861         max_runs--;
1862     }
1863     pthread_mutex_unlock(&m_activity_cond_lock);
1864    
1865     if(max_runs == 0) {
1866         debugWarning("(%p) runaway loop\n");
1867     }
1868    
1869     debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
1870                        "(%p, %s) leave ...\n",
1871                        this, getTypeString());
1872     return true;
1873 }
1874
1875 bool StreamProcessor::canProducePacket()
1876 {
1877     return canProduce(getNominalFramesPerPacket());
1878 }
1879 bool StreamProcessor::canProducePeriod()
1880 {
1881     return canProduce(m_StreamProcessorManager.getPeriodSize());
1882 }
1883 bool StreamProcessor::canProduce(unsigned int nframes)
1884 {
1885     if(m_in_xrun) return true;
1886     if(m_state == ePS_Running && m_next_state == ePS_Running) {
1887        
1888         if(getType() == ePT_Transmit) {
1889             // can we put a certain amount of frames into the buffer?
1890             unsigned int bufferspace = m_data_buffer->getBufferSpace();
1891             if(bufferspace >= nframes) {
1892                 return true;
1893             } else return false;
1894         } else {
1895             // do we still have to put frames in the buffer?
1896             unsigned int bufferfill = m_data_buffer->getBufferFill();
1897             unsigned int periodsize = m_StreamProcessorManager.getPeriodSize();
1898             if (bufferfill > periodsize) return false;
1899             else return true;
1900         }
1901
1902
1903     } else {
1904         if(getType() == ePT_Transmit) {
1905             // if we are an xmit SP, we cannot accept frames
1906             // when not running
1907             return false;
1908         } else {
1909             // if we are a receive SP, we can always accept frames
1910             // when not running
1911             return true;
1912         }
1913     }
1914 }
1915
1916 bool StreamProcessor::canConsumePacket()
1917 {
1918     return canConsume(getNominalFramesPerPacket());
1919 }
1920 bool StreamProcessor::canConsumePeriod()
1921 {
1922     return canConsume(m_StreamProcessorManager.getPeriodSize());
1923 }
1924 bool StreamProcessor::canConsume(unsigned int nframes)
1925 {
1926     if(m_in_xrun) return true;
1927     if(m_state == ePS_Running && m_next_state == ePS_Running) {
1928         // check whether we already fullfil the criterion
1929         unsigned int bufferfill = m_data_buffer->getBufferFill();
1930         if(bufferfill >= nframes) {
1931             return true;
1932         } else return false;
1933     } else {
1934         if(getType() == ePT_Transmit) {
1935             // if we are an xmit SP, and we're not running,
1936             // we can always provide frames
1937             return true;
1938         } else {
1939             // if we are a receive SP, we can't provide frames
1940             // when not running
1941             return false;
1942         }
1943     }
1944 }
1945
1946 /***********************************************
1947  * Helper routines                             *
1948  ***********************************************/
1949 bool
1950 StreamProcessor::transferSilence(unsigned int nframes)
1951 {
1952     bool retval;
1953     signed int fc;
1954     ffado_timestamp_t ts_tail_tmp;
1955
1956     // prepare a buffer of silence
1957     char *dummybuffer = (char *)calloc(getEventSize(), nframes * getEventsPerFrame());
1958     transmitSilenceBlock(dummybuffer, nframes, 0);
1959
1960     m_data_buffer->getBufferTailTimestamp(&ts_tail_tmp, &fc);
1961     if (fc != 0) {
1962         debugWarning("Prefilling a buffer that already contains %d frames\n", fc);
1963     }
1964
1965     // add the silence data to the ringbuffer
1966     if(m_data_buffer->preloadFrames(nframes, dummybuffer, true)) {
1967         retval = true;
1968     } else {
1969         debugWarning("Could not write to event buffer\n");
1970         retval = false;
1971     }
1972     free(dummybuffer);
1973     return retval;
1974 }
1975
1976 /**
1977  * @brief convert a eProcessorState to a string
1978  * @param s the state
1979  * @return a char * describing the state
1980  */
1981 const char *
1982 StreamProcessor::ePSToString(enum eProcessorState s) {
1983     switch (s) {
1984         case ePS_Invalid: return "ePS_Invalid";
1985         case ePS_Created: return "ePS_Created";
1986         case ePS_Stopped: return "ePS_Stopped";
1987         case ePS_WaitingForStream: return "ePS_WaitingForStream";
1988         case ePS_DryRunning: return "ePS_DryRunning";
1989         case ePS_WaitingForStreamEnable: return "ePS_WaitingForStreamEnable";
1990         case ePS_Running: return "ePS_Running";
1991         case ePS_WaitingForStreamDisable: return "ePS_WaitingForStreamDisable";
1992         default: return "error: unknown state";
1993     }
1994 }
1995
1996 /**
1997  * @brief convert a eProcessorType to a string
1998  * @param t the type
1999  * @return a char * describing the state
2000  */
2001 const char *
2002 StreamProcessor::ePTToString(enum eProcessorType t) {
2003     switch (t) {
2004         case ePT_Receive: return "Receive";
2005         case ePT_Transmit: return "Transmit";
2006         default: return "error: unknown type";
2007     }
2008 }
2009
2010 /***********************************************
2011  * Debug                                       *
2012  ***********************************************/
2013 void
2014 StreamProcessor::dumpInfo()
2015 {
2016     #ifdef DEBUG
2017     debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor %p, %s:\n", this, ePTToString(m_processor_type));
2018     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Port, Channel  : %d, %d\n", m_1394service.getPort(), m_channel);
2019     uint64_t now = m_1394service.getCycleTimerTicks();
2020     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Now                   : %011llu (%03us %04uc %04ut)\n",
2021                         now,
2022                         (unsigned int)TICKS_TO_SECS(now),
2023                         (unsigned int)TICKS_TO_CYCLES(now),
2024                         (unsigned int)TICKS_TO_OFFSET(now));
2025     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Xrun?                 : %s\n", (m_in_xrun ? "True":"False"));
2026     if (m_state == m_next_state) {
2027         debugOutputShort( DEBUG_LEVEL_NORMAL, "  State                 : %s\n",
2028                                             ePSToString(m_state));
2029     } else {
2030         debugOutputShort( DEBUG_LEVEL_NORMAL, "  State                 : %s (Next: %s)\n",
2031                                               ePSToString(m_state), ePSToString(m_next_state));
2032         debugOutputShort( DEBUG_LEVEL_NORMAL, "    transition at       : %u\n", m_cycle_to_switch_state);
2033     }
2034     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer                : %p\n", m_data_buffer);
2035     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Framerate             : Nominal: %u, Sync: %f, Buffer %f\n",
2036                                           m_StreamProcessorManager.getNominalRate(),
2037                                           24576000.0/m_StreamProcessorManager.getSyncSource().m_data_buffer->getRate(),
2038                                           24576000.0/m_data_buffer->getRate());
2039     float d = getSyncDelay();
2040     debugOutputShort(DEBUG_LEVEL_NORMAL, "  Sync delay             : %f ticks (%f frames, %f cy)\n",
2041                                          d, d/getTicksPerFrame(),
2042                                          d/((float)TICKS_PER_CYCLE));
2043     #endif
2044     m_data_buffer->dumpInfo();
2045 }
2046
2047 void
2048 StreamProcessor::printBufferInfo()
2049 {
2050     debugOutput(DEBUG_LEVEL_NORMAL,
2051                 "(%p, %8s) fc: %d fill: %u\n",
2052                 this, getTypeString(), m_data_buffer->getFrameCounter(), m_data_buffer->getBufferFill() );
2053 }
2054
2055 void
2056 StreamProcessor::setVerboseLevel(int l) {
2057     setDebugLevel(l);
2058     PortManager::setVerboseLevel(l);
2059     m_data_buffer->setVerboseLevel(l);
2060 }
2061
2062 } // end of namespace
Note: See TracBrowser for help on using the browser.