root/branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp

Revision 391, 60.6 kB (checked in by pieterpalmers, 16 years ago)

* Partially finished:

  • Introduce TimestampedBuffer? util class
  • replace interal ringbuffer of SP with timed ringbuffer

* Compiles & works

Line 
1 /* $Id$ */
2
3 /*
4  *   FreeBob Streaming API
5  *   FreeBob = Firewire (pro-)audio for linux
6  *
7  *   http://freebob.sf.net
8  *
9  *   Copyright (C) 2005,2006 Pieter Palmers <pieterpalmers@users.sourceforge.net>
10  *
11  *   This program is free software {} you can redistribute it and/or modify
12  *   it under the terms of the GNU General Public License as published by
13  *   the Free Software Foundation {} either version 2 of the License, or
14  *   (at your option) any later version.
15  *
16  *   This program is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY {} without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with this program {} if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  *
26  *
27  */
28
29 #include "AmdtpStreamProcessor.h"
30 #include "Port.h"
31 #include "AmdtpPort.h"
32
33 #include "cycletimer.h"
34
35 #include <netinet/in.h>
36 #include <assert.h>
37
38 #define RECEIVE_DLL_INTEGRATION_COEFFICIENT 0.015
39
40 #define RECEIVE_PROCESSING_DELAY (10000U)
41
42 // in ticks
43 #define TRANSMIT_TRANSFER_DELAY 9000U
44 // the number of cycles to send a packet in advance of it's timestamp
45 #define TRANSMIT_ADVANCE_CYCLES 1U
46
47 namespace FreebobStreaming {
48
49 IMPL_DEBUG_MODULE( AmdtpTransmitStreamProcessor, AmdtpTransmitStreamProcessor, DEBUG_LEVEL_NORMAL );
50 IMPL_DEBUG_MODULE( AmdtpReceiveStreamProcessor, AmdtpReceiveStreamProcessor, DEBUG_LEVEL_NORMAL );
51
52
53 /* transmit */
54 AmdtpTransmitStreamProcessor::AmdtpTransmitStreamProcessor(int port, int framerate, int dimension)
55         : TransmitStreamProcessor(port, framerate), m_dimension(dimension)
56         , m_last_timestamp(0), m_dbc(0), m_ringbuffer_size_frames(0)
57 {
58
59 }
60
61 AmdtpTransmitStreamProcessor::~AmdtpTransmitStreamProcessor() {
62
63 }
64
65 /**
66  * @return
67  */
68 bool AmdtpTransmitStreamProcessor::init() {
69
70         debugOutput( DEBUG_LEVEL_VERBOSE, "Initializing (%p)...\n");
71         // call the parent init
72         // this has to be done before allocating the buffers,
73         // because this sets the buffersizes from the processormanager
74         if(!TransmitStreamProcessor::init()) {
75                 debugFatal("Could not do base class init (%p)\n",this);
76                 return false;
77         }
78        
79         return true;
80 }
81
82 void AmdtpTransmitStreamProcessor::setVerboseLevel(int l) {
83         setDebugLevel(l);
84         TransmitStreamProcessor::setVerboseLevel(l);
85 }
86
87 enum raw1394_iso_disposition
88 AmdtpTransmitStreamProcessor::getPacket(unsigned char *data, unsigned int *length,
89                       unsigned char *tag, unsigned char *sy,
90                       int cycle, unsigned int dropped, unsigned int max_length) {
91    
92     struct iec61883_packet *packet = (struct iec61883_packet *) data;
93     unsigned int nevents=0;
94    
95     m_last_cycle=cycle;
96    
97     debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"Xmit handler for cycle %d, (running=%d, enabled=%d,%d)\n",
98         cycle, m_running, m_disabled, m_is_disabled);
99    
100 #ifdef DEBUG
101     if(dropped>0) {
102         debugWarning("Dropped %d packets on cycle %d\n",dropped, cycle);
103     }
104 #endif
105    
106     /* Our node ID can change after a bus reset, so it is best to fetch
107      * our node ID for each packet. */
108     packet->sid = getNodeId() & 0x3f;
109
110     packet->dbs = m_dimension;
111     packet->fn = 0;
112     packet->qpc = 0;
113     packet->sph = 0;
114     packet->reserved = 0;
115     packet->dbc = m_dbc;
116     packet->eoh1 = 2;
117     packet->fmt = IEC61883_FMT_AMDTP;
118
119     // recalculate the buffer head timestamp
120     float ticks_per_frame=m_SyncSource->getTicksPerFrame();
121    
122     // the base timestamp is the one of the last sample in the buffer
123     uint64_t ts_tail;
124     uint64_t fc;
125    
126     m_data_buffer->getBufferTailTimestamp(&ts_tail, &fc); // thread safe
127    
128     int64_t timestamp = ts_tail;
129    
130     // meaning that the first sample in the buffer lies m_framecounter * rate
131     // earlier. This gives the next equation:
132     //   timestamp = m_last_timestamp - m_framecounter * rate
133     timestamp -= (int64_t)((float)fc * ticks_per_frame);
134    
135     // FIXME: test
136 //     timestamp -= (uint64_t)(((float)m_handler->getWakeupInterval())
137 //                                        * ((float)m_syt_interval) * ticks_per_frame);
138 //
139 //     // substract the receive transfer delay
140 //     timestamp -= RECEIVE_PROCESSING_DELAY;
141    
142     // this happens if m_buffer_tail_timestamp wraps around while there are
143     // not much frames in the buffer. We should add the wraparound value of the ticks
144     // counter
145     if (timestamp < 0) {
146         timestamp += TICKS_PER_SECOND * 128L;
147     }
148     // this happens when the last timestamp is near wrapping, and
149     // m_framecounter is low.
150     // this means: m_last_timestamp is near wrapping and have just had
151     // a getPackets() from the client side. the projected next_period
152     // boundary lies beyond the wrap value.
153     // the action is to wrap the value.
154     else if (timestamp >= TICKS_PER_SECOND * 128L) {
155         timestamp -= TICKS_PER_SECOND * 128L;
156     }
157    
158     // determine if we want to send a packet or not
159     // note that we can't use getCycleTimer directly here,
160     // because packets are queued in advance. This means that
161     // we the packet we are constructing will be sent out
162     // on 'cycle', not 'now'.
163     unsigned int ctr=m_handler->getCycleTimer();
164     int now_cycles = (int)CYCLE_TIMER_GET_CYCLES(ctr);
165    
166     // the difference between 'now' and the cycle this
167     // packet is intended for
168     int cycle_diff = cycle - now_cycles;
169    
170     // detect wraparounds
171     if(cycle_diff < -(int)(CYCLES_PER_SECOND/2)) {
172         cycle_diff += CYCLES_PER_SECOND;
173     } else if(cycle_diff > (int)(CYCLES_PER_SECOND/2)) {
174         cycle_diff -= CYCLES_PER_SECOND;
175     }
176
177     // as long as the cycle parameter is not in sync with
178     // the current time, the stream is considered not
179     // to be 'running'
180     if (!m_running && cycle_diff >= 0 && cycle != -1) {
181             debugOutput(DEBUG_LEVEL_VERBOSE, "Xmit StreamProcessor %p started running at cycle %d\n",this, cycle);
182             m_running=true;
183     }
184    
185 #ifdef DEBUG
186     if(cycle_diff < 0) {
187         debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy)\n",
188             cycle, now_cycles);
189     }
190 #endif
191     // if cycle lies cycle_diff cycles in the future, we should
192     // queue this packet cycle_diff * TICKS_PER_CYCLE earlier than
193     // we would if it were to be sent immediately.
194    
195     // determine the 'now' time in ticks
196     uint64_t cycle_timer=CYCLE_TIMER_TO_TICKS(ctr);
197    
198     // time until the packet is to be sent (if > 0: send packet)
199     int64_t until_next=timestamp-(int64_t)cycle_timer;
200    
201 #ifdef DEBUG
202     int64_t utn2=until_next; // debug!!
203 #endif
204
205     // we send a packet some cycles in advance, to avoid the
206     // following situation:
207     // suppose we are only a few ticks away from
208     // the moment to send this packet. therefore we decide
209     // not to send the packet, but send it in the next cycle.
210     // This means that the next time point will be 3072 ticks
211     // later, making that the timestamp will be expired when the
212     // packet is sent, unless TRANSFER_DELAY > 3072.
213     // this means that we need at least one cycle of extra buffering.
214     until_next -= TICKS_PER_CYCLE * TRANSMIT_ADVANCE_CYCLES;
215    
216     // we have to queue it cycle_diff * TICKS_PER_CYCLE earlier
217     until_next -= cycle_diff * TICKS_PER_CYCLE;
218
219     // the maximal difference we can allow (64secs)
220     const int64_t max=TICKS_PER_SECOND*64L;
221
222 #ifdef DEBUG
223     if(!m_is_disabled) {
224         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> TS=%11llu, CTR=%11llu, FC=%5d\n",
225             timestamp, cycle_timer, fc
226             );
227         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "    UTN=%11lld, UTN2=%11lld\n",
228             until_next, utn2
229             );
230         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "    CY_NOW=%04d, CY_TARGET=%04d, CY_DIFF=%04d\n",
231             now_cycles, cycle, cycle_diff
232             );
233     }
234 #endif
235
236     if(until_next > max) {
237         // this means that cycle_timer has wrapped, but
238         // timestamp has not. we should unwrap cycle_timer
239         // by adding TICKS_PER_SECOND*128L, meaning that we should substract
240         // this value from until_next           
241         until_next -= TICKS_PER_SECOND*128L;
242     } else if (until_next < -max) {
243         // this means that timestamp has wrapped, but
244         // cycle_timer has not. we should unwrap timestamp
245         // by adding TICKS_PER_SECOND*128L, meaning that we should add
246         // this value from until_next
247         until_next += TICKS_PER_SECOND*128L;
248     }
249
250 #ifdef DEBUG
251     if(!m_is_disabled) {
252         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " > TS=%11llu, CTR=%11llu, FC=%5d\n",
253             timestamp, cycle_timer, fc
254             );
255         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "    UTN=%11lld, UTN2=%11lld\n",
256             until_next, utn2
257             );
258     }
259 #endif
260
261     if (!m_disabled && m_is_disabled) {
262         // this means that we are trying to enable
263         if ((unsigned int)cycle == m_cycle_to_enable_at) {
264             m_is_disabled=false;
265             debugOutput(DEBUG_LEVEL_VERBOSE,"Enabling StreamProcessor %p at %u\n", this, cycle);
266            
267             // initialize the buffer head & tail
268             uint64_t ts;
269             uint64_t fc;
270            
271             debugOutput(DEBUG_LEVEL_VERBOSE,"Preparing to enable...\n");
272            
273             m_SyncSource->m_data_buffer->getBufferHeadTimestamp(&ts, &fc); // thread safe
274            
275             // the number of cycles the sync source lags
276             // or leads (< 0)
277             int sync_lag_cycles=cycle-m_SyncSource->getLastCycle()-1;
278             if(sync_lag_cycles > (int)(CYCLES_PER_SECOND/2)) {
279                 sync_lag_cycles -= CYCLES_PER_SECOND/2;
280             }
281             if (sync_lag_cycles < -((int)CYCLES_PER_SECOND/2)) {
282                 sync_lag_cycles += CYCLES_PER_SECOND/2;
283             }
284
285             // recalculate the buffer head timestamp
286             float ticks_per_frame=m_SyncSource->getTicksPerFrame();
287
288             // set buffer head timestamp
289             // this makes that the next sample to be sent out
290             // has the same timestamp as the last one received
291             // plus one frame
292             ts += (uint64_t)ticks_per_frame;
293            
294             // account for the cycle lag between sync SP and this SP
295             ts += sync_lag_cycles * TICKS_PER_CYCLE;
296            
297             if (ts >= TICKS_PER_SECOND * 128L) {
298                 ts -= TICKS_PER_SECOND * 128L;
299             }
300
301 //             m_data_buffer->setBufferHeadTimestamp(ts);
302             int64_t timestamp = ts;
303        
304             // since we have frames_in_buffer frames in the buffer,
305             // we know that the buffer tail lies
306             // frames_in_buffer * rate
307             // later
308             int frames_in_buffer=m_data_buffer->getFrameCounter();
309             timestamp += (int64_t)((float)frames_in_buffer * ticks_per_frame);
310            
311             // this happens when the last timestamp is near wrapping, and
312             // m_framecounter is low.
313             // this means: m_last_timestamp is near wrapping and have just had
314             // a getPackets() from the client side. the projected next_period
315             // boundary lies beyond the wrap value.
316             // the action is to wrap the value.
317             if (timestamp >= TICKS_PER_SECOND * 128L) {
318                 timestamp -= TICKS_PER_SECOND * 128L;
319             }
320        
321             m_data_buffer->setBufferTailTimestamp(timestamp);
322            
323             debugOutput(DEBUG_LEVEL_VERBOSE,"XMIT TS SET: TS=%10lld, TSTMP=%10llu, FC=%4d, %f\n",
324                             ts, timestamp, frames_in_buffer, ticks_per_frame);
325
326         } else {
327             debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"will enable StreamProcessor %p at %u, now is %d\n", this, m_cycle_to_enable_at, cycle);
328         }
329     } else if (m_disabled && !m_is_disabled) {
330         // trying to disable
331         debugOutput(DEBUG_LEVEL_VERBOSE,"disabling StreamProcessor %p at %u\n", this, cycle);
332         m_is_disabled=true;
333     }
334
335     // don't process the stream when it is not enabled, not running
336     // or when the next sample is not due yet.
337    
338     // we do have to generate (semi) valid packets
339     // that means that we'll send NODATA packets.
340     // we don't add payload because DICE devices don't like that.
341     if((until_next>0) || m_is_disabled || !m_running) {
342         // no-data packets have syt=0xFFFF
343         // and have the usual amount of events as dummy data (?)
344         packet->fdf = IEC61883_FDF_NODATA;
345         packet->syt = 0xffff;
346        
347         // the dbc is incremented even with no data packets
348         m_dbc += m_syt_interval;
349
350         // this means no-data packets with payload (DICE doesn't like that)
351         *length = 2*sizeof(quadlet_t) + m_syt_interval * m_dimension * sizeof(quadlet_t);
352        
353         // this means no-data packets without payload
354         //*length = 2*sizeof(quadlet_t);
355        
356         *tag = IEC61883_TAG_WITH_CIP;
357         *sy = 0;
358        
359         return RAW1394_ISO_DEFER;
360     }
361    
362     // construct the packet
363     nevents = m_syt_interval;
364     m_dbc += m_syt_interval;
365    
366     *tag = IEC61883_TAG_WITH_CIP;
367     *sy = 0;
368    
369     if (m_data_buffer->readFrames(nevents, (char *)(data + 8)))
370     {
371         *length = nevents*sizeof(quadlet_t)*m_dimension + 8;
372
373         // process all ports that should be handled on a per-packet base
374         // this is MIDI for AMDTP (due to the need of DBC)
375         if (!encodePacketPorts((quadlet_t *)(data+8), nevents, packet->dbc)) {
376             debugWarning("Problem encoding Packet Ports\n");
377         }
378
379         packet->fdf = m_fdf;
380
381         // convert the timestamp to SYT format
382         uint64_t ts=timestamp + TRANSMIT_TRANSFER_DELAY;
383        
384         // check if it wrapped
385         if (ts >= TICKS_PER_SECOND * 128L) {
386             ts -= TICKS_PER_SECOND * 128L;
387         }
388        
389         unsigned int timestamp_SYT = TICKS_TO_SYT(ts);
390         packet->syt = ntohs(timestamp_SYT);
391        
392         // update the frame counter such that it reflects the new value
393         // done in the SP base class
394         if (!StreamProcessor::getFrames(nevents)) {
395             debugError("Could not do StreamProcessor::getFrames(%d)\n",nevents);
396              return RAW1394_ISO_ERROR;
397         }
398        
399         return RAW1394_ISO_OK;   
400     } else {
401         /* there is no more data in the ringbuffer */
402         // convert the timestamp to SYT format
403         uint64_t ts=timestamp + TRANSMIT_TRANSFER_DELAY;
404        
405         // check if it wrapped
406         if (ts >= TICKS_PER_SECOND * 128L) {
407             ts -= TICKS_PER_SECOND * 128L;
408         }
409
410         debugWarning("Transmit buffer underrun (now %d, queue %d, target %d)\n",
411                  now_cycles, cycle, TICKS_TO_CYCLES(ts));
412
413         nevents=0;
414
415         // TODO: we have to be a little smarter here
416         //       because we have some slack on the device side (TRANSFER_DELAY)
417         //       we can allow some skipped packets
418         // signal underrun
419         m_xruns++;
420
421         // disable the processing, will be re-enabled when
422         // the xrun is handled
423         m_disabled=true;
424         m_is_disabled=true;
425
426         // compose a no-data packet, we should always
427         // send a valid packet
428         packet->fdf = IEC61883_FDF_NODATA;
429         packet->syt = 0xffff;
430
431         // this means no-data packets with payload (DICE doesn't like that)
432         *length = 2*sizeof(quadlet_t) + m_syt_interval * m_dimension * sizeof(quadlet_t);
433
434         // this means no-data packets without payload
435         //*length = 2*sizeof(quadlet_t);
436
437         return RAW1394_ISO_DEFER;
438     }
439
440     // we shouldn't get here
441     return RAW1394_ISO_ERROR;
442
443 }
444
445 int64_t AmdtpTransmitStreamProcessor::getTimeUntilNextPeriodUsecs() {
446     debugFatal("IMPLEMENT ME!");
447     return 0;
448 }
449
450 uint64_t AmdtpTransmitStreamProcessor::getTimeAtPeriodUsecs() {
451     // then we should convert this into usecs
452     // FIXME: we assume that the TimeSource of the IsoHandler is
453     //        in usecs.
454     return m_handler->mapToTimeSource(getTimeAtPeriod());
455 }
456
457 uint64_t AmdtpTransmitStreamProcessor::getTimeAtPeriod() {
458     debugFatal("IMPLEMENT ME!");
459    
460     return 0;
461 }
462
463 bool AmdtpTransmitStreamProcessor::prefill() {
464     if(!transferSilence(m_ringbuffer_size_frames)) {
465         debugFatal("Could not prefill transmit stream\n");
466         return false;
467     }
468
469     // when the buffer is prefilled, we should
470     // also initialize the base timestamp
471     // this base timestamp is the timestamp of the
472     // last buffer transfer.
473     uint64_t ts;
474     uint64_t fc;
475     m_SyncSource->m_data_buffer->getBufferHeadTimestamp(&ts, &fc); // thread safe
476
477     // update the frame counter such that it reflects the buffer content,
478     // the buffer tail timestamp is initialized when the SP is enabled
479     // done in the SP base class
480     if (!StreamProcessor::putFrames(m_ringbuffer_size_frames, ts)) {
481         debugError("Could not do StreamProcessor::putFrames(%d, %011llu)\n",
482             m_ringbuffer_size_frames,ts);
483         return false;
484     }
485
486     return true;
487 }
488
489 bool AmdtpTransmitStreamProcessor::reset() {
490
491     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting...\n");
492
493     // reset the statistics
494     m_PeriodStat.reset();
495     m_PacketStat.reset();
496     m_WakeupStat.reset();
497    
498     // reset all non-device specific stuff
499     // i.e. the iso stream and the associated ports
500     if(!TransmitStreamProcessor::reset()) {
501         debugFatal("Could not do base class reset\n");
502         return false;
503     }
504    
505     // we should prefill the event buffer
506     if (!prefill()) {
507         debugFatal("Could not prefill buffers\n");
508         return false;   
509     }
510    
511     return true;
512 }
513
514 bool AmdtpTransmitStreamProcessor::prepare() {
515     m_PeriodStat.setName("XMT PERIOD");
516     m_PacketStat.setName("XMT PACKET");
517     m_WakeupStat.setName("XMT WAKEUP");
518
519     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
520    
521     // prepare all non-device specific stuff
522     // i.e. the iso stream and the associated ports
523     if(!TransmitStreamProcessor::prepare()) {
524         debugFatal("Could not prepare base class\n");
525         return false;
526     }
527    
528     switch (m_framerate) {
529     case 32000:
530         m_syt_interval = 8;
531         m_fdf = IEC61883_FDF_SFC_32KHZ;
532         break;
533     case 44100:
534         m_syt_interval = 8;
535         m_fdf = IEC61883_FDF_SFC_44K1HZ;
536         break;
537     default:
538     case 48000:
539         m_syt_interval = 8;
540         m_fdf = IEC61883_FDF_SFC_48KHZ;
541         break;
542     case 88200:
543         m_syt_interval = 16;
544         m_fdf = IEC61883_FDF_SFC_88K2HZ;
545         break;
546     case 96000:
547         m_syt_interval = 16;
548         m_fdf = IEC61883_FDF_SFC_96KHZ;
549         break;
550     case 176400:
551         m_syt_interval = 32;
552         m_fdf = IEC61883_FDF_SFC_176K4HZ;
553         break;
554     case 192000:
555         m_syt_interval = 32;
556         m_fdf = IEC61883_FDF_SFC_192KHZ;
557         break;
558     }
559    
560     iec61883_cip_init (
561         &m_cip_status,
562         IEC61883_FMT_AMDTP,
563         m_fdf,
564         m_framerate,
565         m_dimension,
566         m_syt_interval);
567
568     // prepare the framerate estimate
569     m_ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_framerate);
570    
571     // allocate the event buffer
572     m_ringbuffer_size_frames=m_nb_buffers * m_period;
573
574     // add the receive processing delay
575 //     m_ringbuffer_size_frames+=(uint)(RECEIVE_PROCESSING_DELAY/m_ticks_per_frame);
576
577     assert(m_data_buffer);   
578     m_data_buffer->setBufferSize(m_ringbuffer_size_frames);
579     m_data_buffer->setEventSize(sizeof(quadlet_t));
580     m_data_buffer->setEventsPerFrame(m_dimension);
581    
582     m_data_buffer->setUpdatePeriod(m_period);
583     m_data_buffer->setNominalRate(m_ticks_per_frame);
584    
585     m_data_buffer->prepare();
586
587     // set the parameters of ports we can:
588     // we want the audio ports to be period buffered,
589     // and the midi ports to be packet buffered
590     for ( PortVectorIterator it = m_Ports.begin();
591           it != m_Ports.end();
592           ++it )
593     {
594         debugOutput(DEBUG_LEVEL_VERBOSE, "Setting up port %s\n",(*it)->getName().c_str());
595         if(!(*it)->setBufferSize(m_period)) {
596             debugFatal("Could not set buffer size to %d\n",m_period);
597             return false;
598         }
599        
600        
601         switch ((*it)->getPortType()) {
602             case Port::E_Audio:
603                 if(!(*it)->setSignalType(Port::E_PeriodSignalled)) {
604                     debugFatal("Could not set signal type to PeriodSignalling");
605                     return false;
606                 }
607                 debugWarning("---------------- ! Doing hardcoded test setup ! --------------\n");
608                 // buffertype and datatype are dependant on the API
609                 if(!(*it)->setBufferType(Port::E_PointerBuffer)) {
610                     debugFatal("Could not set buffer type");
611                     return false;
612                 }
613                 if(!(*it)->useExternalBuffer(true)) {
614                     debugFatal("Could not set external buffer usage");
615                     return false;
616                 }
617                
618                 if(!(*it)->setDataType(Port::E_Float)) {
619                     debugFatal("Could not set data type");
620                     return false;
621                 }
622                
623                
624                 break;
625             case Port::E_Midi:
626                 if(!(*it)->setSignalType(Port::E_PacketSignalled)) {
627                     debugFatal("Could not set signal type to PeriodSignalling");
628                     return false;
629                 }
630                
631                 // we use a timing unit of 10ns
632                 // this makes sure that for the max syt interval
633                 // we don't have rounding, and keeps the numbers low
634                 // we have 1 slot every 8 events
635                 // we have syt_interval events per packet
636                 // => syt_interval/8 slots per packet
637                 // packet rate is 8000pkt/sec => interval=125us
638                 // so the slot interval is (1/8000)/(syt_interval/8)
639                 // or: 1/(1000 * syt_interval) sec
640                 // which is 1e9/(1000*syt_interval) nsec
641                 // or 100000/syt_interval 'units'
642                 // the event interval is fixed to 320us = 32000 'units'
643                 if(!(*it)->useRateControl(true,(100000/m_syt_interval),32000, false)) {
644                     debugFatal("Could not set signal type to PeriodSignalling");
645                     return false;
646                 }
647                
648                 // buffertype and datatype are dependant on the API
649                 debugWarning("---------------- ! Doing hardcoded test setup ! --------------\n");
650                 // buffertype and datatype are dependant on the API
651                 if(!(*it)->setBufferType(Port::E_RingBuffer)) {
652                     debugFatal("Could not set buffer type");
653                     return false;
654                 }
655                 if(!(*it)->setDataType(Port::E_MidiEvent)) {
656                     debugFatal("Could not set data type");
657                     return false;
658                 }
659                 break;
660             default:
661                 debugWarning("Unsupported port type specified\n");
662                 break;
663         }
664     }
665
666     // the API specific settings of the ports should already be set,
667     // as this is called from the processorManager->prepare()
668     // so we can init the ports
669     if(!initPorts()) {
670         debugFatal("Could not initialize ports!\n");
671         return false;
672     }
673
674     if(!preparePorts()) {
675         debugFatal("Could not initialize ports!\n");
676         return false;
677     }
678    
679     // prefilling is done in ...()
680     // because at that point the streams are running,
681     // while here they are not.
682    
683     // prefill the event buffer
684     // NOTE: do we need to prefill? reset() is called, so everything is prefilled then
685 //     if (!prefill()) {
686 //         debugFatal("Could not prefill buffers\n");
687 //         return false;   
688 //     }
689    
690     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepared for:\n");
691     debugOutput( DEBUG_LEVEL_VERBOSE, " Samplerate: %d, FDF: %d, DBS: %d, SYT: %d\n",
692              m_framerate,m_fdf,m_dimension,m_syt_interval);
693     debugOutput( DEBUG_LEVEL_VERBOSE, " PeriodSize: %d, NbBuffers: %d\n",
694              m_period,m_nb_buffers);
695     debugOutput( DEBUG_LEVEL_VERBOSE, " Port: %d, Channel: %d\n",
696              m_port,m_channel);
697
698     return true;
699
700 }
701
702 bool AmdtpTransmitStreamProcessor::prepareForStart() {
703
704     return true;
705 }
706
707 bool AmdtpTransmitStreamProcessor::prepareForStop() {
708     disable();
709     return true;
710 }
711
712 bool AmdtpTransmitStreamProcessor::prepareForEnable() {
713
714     debugOutput(DEBUG_LEVEL_VERBOSE,"Preparing to enable...\n");
715
716     if (!StreamProcessor::prepareForEnable()) {
717         debugError("StreamProcessor::prepareForEnable failed\n");
718         return false;
719     }
720
721     return true;
722 }
723
724 bool AmdtpTransmitStreamProcessor::transferSilence(unsigned int nframes) {
725     bool retval;
726    
727     char *dummybuffer=(char *)calloc(sizeof(quadlet_t),nframes*m_dimension);
728    
729     transmitSilenceBlock(dummybuffer, nframes, 0);
730
731     // add the silence data to the ringbuffer
732     if(m_data_buffer->writeFrames(nframes, dummybuffer)) {
733         retval=true;
734     } else {
735         debugWarning("Could not write to event buffer\n");
736         retval=false;
737     }
738
739     free(dummybuffer);
740    
741     return retval;
742 }
743
744 bool AmdtpTransmitStreamProcessor::canClientTransferFrames(unsigned int nbframes) {
745     // there has to be enough space to put the frames in
746     return m_ringbuffer_size_frames - m_data_buffer->getFrameCounter() > nbframes;
747 }
748
749 bool AmdtpTransmitStreamProcessor::putFrames(unsigned int nbframes, int64_t ts) {
750     m_PeriodStat.mark(m_data_buffer->getBufferFill());
751    
752     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "AmdtpTransmitStreamProcessor::putFrames(%d, %llu)\n",nbframes, ts);
753    
754     m_data_buffer->blockProcessWriteFrames(nbframes, ts);
755    
756     // recalculate the buffer tail timestamp
757     float ticks_per_frame=m_SyncSource->getTicksPerFrame();
758    
759     // this makes that the last sample to be sent out on ISO
760     // has the same timestamp as the last one transfered
761     // to the client
762     // plus one frame
763     ts += (uint64_t)ticks_per_frame;
764     int64_t timestamp = ts;
765    
766     // however we have to preserve causality, meaning that we have to make
767     // sure that the worst-case buffer head timestamp still lies in the future.
768     // this worst case timestamp occurs when the xmit buffer is completely full.
769     // therefore we add m_ringbuffer_size_frames * ticks_per_frame to the timestamp.
770     // this will make sure that the buffer head timestamp lies in the future.
771     // the netto effect of this is that the system works as if the buffer processing
772     // by the client doesn't take time.
773    
774     timestamp += (int64_t)((float)m_ringbuffer_size_frames * ticks_per_frame);
775    
776     // wrap the timestamp if nescessary
777     if (timestamp >= TICKS_PER_SECOND * 128L) {
778         timestamp -= TICKS_PER_SECOND * 128L;
779     }
780
781     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::putFrames(%d, %llu)\n",nbframes, timestamp);
782
783     // update the frame counter such that it reflects the new value,
784     // and also update the buffer tail timestamp
785     // done in the SP base class
786     if (!StreamProcessor::putFrames(nbframes, timestamp)) {
787         debugError("Could not do StreamProcessor::putFrames(%d, %llu)\n",nbframes, timestamp);
788         return false;
789     }
790
791     return true;
792 }
793 /*
794  * write received events to the stream ringbuffers.
795  */
796
797 bool AmdtpTransmitStreamProcessor::processWriteBlock(char *data,
798                        unsigned int nevents, unsigned int offset)
799 {
800     bool no_problem=true;
801
802     for ( PortVectorIterator it = m_PeriodPorts.begin();
803           it != m_PeriodPorts.end();
804           ++it )
805     {
806
807         if((*it)->isDisabled()) {continue;};
808        
809         //FIXME: make this into a static_cast when not DEBUG?
810
811         AmdtpPortInfo *pinfo=dynamic_cast<AmdtpPortInfo *>(*it);
812         assert(pinfo); // this should not fail!!
813
814         switch(pinfo->getFormat()) {
815         case AmdtpPortInfo::E_MBLA:
816             if(encodePortToMBLAEvents(static_cast<AmdtpAudioPort *>(*it), (quadlet_t *)data, offset, nevents)) {
817                 debugWarning("Could not encode port %s to MBLA events",(*it)->getName().c_str());
818                 no_problem=false;
819             }
820             break;
821         case AmdtpPortInfo::E_SPDIF: // still unimplemented
822             break;
823         default: // ignore
824             break;
825         }
826     }
827     return no_problem;
828
829 }
830
831 int AmdtpTransmitStreamProcessor::transmitSilenceBlock(char *data,
832                        unsigned int nevents, unsigned int offset)
833 {
834     int problem=0;
835
836     for ( PortVectorIterator it = m_PeriodPorts.begin();
837           it != m_PeriodPorts.end();
838           ++it )
839     {
840
841         //FIXME: make this into a static_cast when not DEBUG?
842
843         AmdtpPortInfo *pinfo=dynamic_cast<AmdtpPortInfo *>(*it);
844         assert(pinfo); // this should not fail!!
845
846         switch(pinfo->getFormat()) {
847         case AmdtpPortInfo::E_MBLA:
848             if(encodeSilencePortToMBLAEvents(static_cast<AmdtpAudioPort *>(*it), (quadlet_t *)data, offset, nevents)) {
849                 debugWarning("Could not encode port %s to MBLA events",(*it)->getName().c_str());
850                 problem=1;
851             }
852             break;
853         case AmdtpPortInfo::E_SPDIF: // still unimplemented
854             break;
855         default: // ignore
856             break;
857         }
858     }
859     return problem;
860
861 }
862
863 /**
864  * @brief decode a packet for the packet-based ports
865  *
866  * @param data Packet data
867  * @param nevents number of events in data (including events of other ports & port types)
868  * @param dbc DataBlockCount value for this packet
869  * @return true if all successfull
870  */
871 bool AmdtpTransmitStreamProcessor::encodePacketPorts(quadlet_t *data, unsigned int nevents, unsigned int dbc)
872 {
873     bool ok=true;
874     char byte;
875    
876     quadlet_t *target_event=NULL;
877     unsigned int j;
878    
879     for ( PortVectorIterator it = m_PacketPorts.begin();
880           it != m_PacketPorts.end();
881           ++it )
882     {
883
884 #ifdef DEBUG
885         AmdtpPortInfo *pinfo=dynamic_cast<AmdtpPortInfo *>(*it);
886         assert(pinfo); // this should not fail!!
887
888         // the only packet type of events for AMDTP is MIDI in mbla
889         assert(pinfo->getFormat()==AmdtpPortInfo::E_Midi);
890 #endif
891        
892         AmdtpMidiPort *mp=static_cast<AmdtpMidiPort *>(*it);
893        
894         // we encode this directly (no function call) due to the high frequency
895         /* idea:
896         spec says: current_midi_port=(dbc+j)%8;
897         => if we start at (dbc+stream->location-1)%8 [due to location_min=1],
898         we'll start at the right event for the midi port.
899         => if we increment j with 8, we stay at the right event.
900         */
901         // FIXME: as we know in advance how big a packet is (syt_interval) we can
902         //        predict how much loops will be present here
903         // first prefill the buffer with NO_DATA's on all time muxed channels
904        
905         for(j = (dbc & 0x07)+mp->getLocation()-1; j < nevents; j += 8) {
906        
907             target_event=(quadlet_t *)(data + ((j * m_dimension) + mp->getPosition()));
908            
909             if(mp->canRead()) { // we can send a byte
910                 mp->readEvent(&byte);
911                 *target_event=htonl(
912                     IEC61883_AM824_SET_LABEL((byte)<<16,
913                                              IEC61883_AM824_LABEL_MIDI_1X));
914             } else {
915                 // can't send a byte, either because there is no byte,
916                 // or because this would exceed the maximum rate
917                 *target_event=htonl(
918                     IEC61883_AM824_SET_LABEL(0,IEC61883_AM824_LABEL_MIDI_NO_DATA));
919             }
920         }
921
922     }
923        
924     return ok;
925 }
926
927
928 int AmdtpTransmitStreamProcessor::encodePortToMBLAEvents(AmdtpAudioPort *p, quadlet_t *data,
929                        unsigned int offset, unsigned int nevents)
930 {
931     unsigned int j=0;
932
933     quadlet_t *target_event;
934
935     target_event=(quadlet_t *)(data + p->getPosition());
936
937     switch(p->getDataType()) {
938         default:
939         case Port::E_Int24:
940             {
941                 quadlet_t *buffer=(quadlet_t *)(p->getBufferAddress());
942
943                 assert(nevents + offset <= p->getBufferSize());
944
945                 buffer+=offset;
946
947                 for(j = 0; j < nevents; j += 1) { // decode max nsamples
948                     *target_event = htonl((*(buffer) & 0x00FFFFFF) | 0x40000000);
949                     buffer++;
950                     target_event += m_dimension;
951                 }
952             }
953             break;
954         case Port::E_Float:
955             {
956                 const float multiplier = (float)(0x7FFFFF00);
957                 float *buffer=(float *)(p->getBufferAddress());
958
959                 assert(nevents + offset <= p->getBufferSize());
960
961                 buffer+=offset;
962
963                 for(j = 0; j < nevents; j += 1) { // decode max nsamples               
964    
965                     // don't care for overflow
966                     float v = *buffer * multiplier;  // v: -231 .. 231
967                     unsigned int tmp = ((int)v);
968                     *target_event = htonl((tmp >> 8) | 0x40000000);
969                    
970                     buffer++;
971                     target_event += m_dimension;
972                 }
973             }
974             break;
975     }
976
977     return 0;
978 }
979 int AmdtpTransmitStreamProcessor::encodeSilencePortToMBLAEvents(AmdtpAudioPort *p, quadlet_t *data,
980                        unsigned int offset, unsigned int nevents)
981 {
982     unsigned int j=0;
983
984     quadlet_t *target_event;
985
986     target_event=(quadlet_t *)(data + p->getPosition());
987
988     switch(p->getDataType()) {
989         default:
990         case Port::E_Int24:
991         case Port::E_Float:
992             {
993                 for(j = 0; j < nevents; j += 1) { // decode max nsamples
994                     *target_event = htonl(0x40000000);
995                     target_event += m_dimension;
996                 }
997             }
998             break;
999     }
1000
1001     return 0;
1002 }
1003
1004 /* --------------------- RECEIVE ----------------------- */
1005
1006 AmdtpReceiveStreamProcessor::AmdtpReceiveStreamProcessor(int port, int framerate, int dimension)
1007     : ReceiveStreamProcessor(port, framerate), m_dimension(dimension), m_last_timestamp(0), m_last_timestamp2(0) {
1008
1009 }
1010
1011 AmdtpReceiveStreamProcessor::~AmdtpReceiveStreamProcessor() {
1012
1013 }
1014
1015 bool AmdtpReceiveStreamProcessor::init() {
1016
1017     // call the parent init
1018     // this has to be done before allocating the buffers,
1019     // because this sets the buffersizes from the processormanager
1020     if(!ReceiveStreamProcessor::init()) {
1021         debugFatal("Could not do base class init (%d)\n",this);
1022         return false;
1023     }
1024
1025     return true;
1026 }
1027
1028 enum raw1394_iso_disposition
1029 AmdtpReceiveStreamProcessor::putPacket(unsigned char *data, unsigned int length,
1030                   unsigned char channel, unsigned char tag, unsigned char sy,
1031                   unsigned int cycle, unsigned int dropped) {
1032    
1033     enum raw1394_iso_disposition retval=RAW1394_ISO_OK;
1034     m_last_cycle=cycle;
1035    
1036     struct iec61883_packet *packet = (struct iec61883_packet *) data;
1037     assert(packet);
1038
1039 #ifdef DEBUG
1040     if(dropped>0) {
1041         debugWarning("Dropped %d packets on cycle %d\n",dropped, cycle);
1042     }
1043 #endif
1044
1045     debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"ch%2u: CY=%4u, SYT=%08X (%4ucy + %04uticks) (running=%d, disabled=%d,%d)\n",
1046         channel, cycle,ntohs(packet->syt), 
1047         CYCLE_TIMER_GET_CYCLES(ntohs(packet->syt)), CYCLE_TIMER_GET_OFFSET(ntohs(packet->syt)),
1048         m_running,m_disabled,m_is_disabled);
1049
1050     if((packet->fmt == 0x10) && (packet->fdf != 0xFF) && (packet->syt != 0xFFFF) && (packet->dbs>0) && (length>=2*sizeof(quadlet_t))) {
1051         unsigned int nevents=((length / sizeof (quadlet_t)) - 2)/packet->dbs;
1052
1053         //=> store the previous timestamp
1054         m_last_timestamp2=m_last_timestamp;
1055
1056         //=> convert the SYT to ticks
1057         unsigned int syt_timestamp=ntohs(packet->syt);
1058
1059         debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"ch%2u: CY=%4u, SYT=%08X (%4u cycles + %04u ticks), FC=%04d, %d\n",
1060             channel, cycle,syt_timestamp, 
1061             CYCLE_TIMER_GET_CYCLES(syt_timestamp), CYCLE_TIMER_GET_OFFSET(syt_timestamp),
1062             m_data_buffer->getFrameCounter(), m_is_disabled);
1063        
1064         // reconstruct the full cycle
1065         unsigned int cc=m_handler->getCycleTimer();
1066         unsigned int cc_cycles=CYCLE_TIMER_GET_CYCLES(cc);
1067         unsigned int cc_seconds=CYCLE_TIMER_GET_SECS(cc);
1068        
1069         // the cycletimer has wrapped since this packet was received
1070         // we want cc_seconds to reflect the 'seconds' at the point this
1071         // was received
1072         if (cycle>cc_cycles) {
1073             if (cc_seconds) {
1074                 cc_seconds--;
1075             } else {
1076                 // seconds has wrapped around, so we'd better not substract 1
1077                 // the good value is 127
1078                 cc_seconds=127;
1079             }
1080         }
1081        
1082         // reconstruct the top part of the timestamp using the current cycle number
1083         unsigned int now_cycle_masked=cycle & 0xF;
1084         unsigned int syt_cycle=CYCLE_TIMER_GET_CYCLES(syt_timestamp);
1085        
1086         // if this is true, wraparound has occurred, undo this wraparound
1087         if(syt_cycle<now_cycle_masked) syt_cycle += 0x10;
1088        
1089         // this is the difference in cycles wrt the cycle the
1090         // timestamp was received
1091         unsigned int delta_cycles=syt_cycle-now_cycle_masked;
1092        
1093         // reconstruct the cycle part of the timestamp
1094         unsigned int new_cycles=cycle + delta_cycles;
1095        
1096         // if the cycles cause a wraparound of the cycle timer,
1097         // perform this wraparound
1098         // and convert the timestamp into ticks
1099         if(new_cycles<8000) {
1100             m_last_timestamp  = new_cycles * TICKS_PER_CYCLE;
1101         } else {
1102             debugOutput(DEBUG_LEVEL_VERY_VERBOSE,
1103                 "Detected wraparound: %d + %d = %d\n",
1104                 cycle,delta_cycles,new_cycles);
1105            
1106             new_cycles-=8000; // wrap around
1107             m_last_timestamp  = new_cycles * TICKS_PER_CYCLE;
1108             // add one second due to wraparound
1109             m_last_timestamp += TICKS_PER_SECOND;
1110         }
1111        
1112         m_last_timestamp += CYCLE_TIMER_GET_OFFSET(syt_timestamp);
1113         m_last_timestamp += cc_seconds * TICKS_PER_SECOND;
1114        
1115         // we have to keep in mind that there are also
1116         // some packets buffered by the ISO layer
1117         // at most x=m_handler->getWakeupInterval()
1118         // these contain at most x*syt_interval
1119         // frames, meaning that we might receive
1120         // this packet x*syt_interval*ticks_per_frame
1121         // later than expected (the real receive time)
1122         debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"STMP: %lluticks | buff=%d, syt_interval=%d, tpf=%f\n",
1123             m_last_timestamp, m_handler->getWakeupInterval(),m_syt_interval,m_ticks_per_frame);
1124        
1125         m_last_timestamp += (uint64_t)(((float)m_handler->getWakeupInterval())
1126                                        * ((float)m_syt_interval) * m_ticks_per_frame);
1127         debugOutput(DEBUG_LEVEL_VERY_VERBOSE," ==> %lluticks\n", m_last_timestamp);
1128        
1129         // the receive processing delay indicates how much
1130         // extra time we need as slack
1131         m_last_timestamp += RECEIVE_PROCESSING_DELAY;
1132        
1133         // wrap if nescessary
1134         if (m_last_timestamp >= TICKS_PER_SECOND * 128L) {
1135             m_last_timestamp -= TICKS_PER_SECOND * 128L;
1136         }
1137        
1138         //=> now estimate the device frame rate
1139         if (m_last_timestamp2 && m_last_timestamp) {
1140             // try and estimate the frame rate from the device
1141            
1142             // first get the measured difference between both
1143             // timestamps
1144             int64_t measured_difference;
1145             measured_difference=((int64_t)(m_last_timestamp))
1146                                -((int64_t)(m_last_timestamp2));
1147             // correct for seconds wraparound
1148             if (m_last_timestamp<m_last_timestamp2) {
1149                 measured_difference+=128L*TICKS_PER_SECOND;
1150             }
1151
1152             // implement a 1st order DLL to estimate the framerate
1153             // this is the number of ticks between two samples
1154             float f=measured_difference;
1155             float err = f / (1.0*m_syt_interval) - m_ticks_per_frame;
1156            
1157             debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"SYT: %08X | STMP: %lluticks | DLL: in=%f, current=%f, err=%e\n",syt_timestamp, m_last_timestamp, f,m_ticks_per_frame,err);
1158
1159 #ifdef DEBUG
1160             // this helps to detect wraparound issues
1161             if(f > 1.5*((TICKS_PER_SECOND*1.0) / m_framerate)*m_syt_interval) {
1162                 debugWarning("Timestamp diff more than 50%% of the nominal diff too large!\n");
1163                 debugWarning(" SYT: %08X | STMP: %llu,%llu | DLL: in=%f, current=%f, err=%e\n",syt_timestamp, m_last_timestamp, m_last_timestamp2, f,m_ticks_per_frame,err);
1164                 debugWarning(" CC: %08X | CC_CY: %u | CC_SEC: %u | SYT_CY: %u | NEW_CY: %u\n",
1165                     cc, cc_cycles, cc_seconds, syt_cycle,new_cycles);
1166                
1167             }
1168             if(f < 0.5*((TICKS_PER_SECOND*1.0) / m_framerate)*m_syt_interval) {
1169                 debugWarning("Timestamp diff more than 50%% of the nominal diff too small!\n");
1170                 debugWarning(" SYT: %08X | STMP: %llu,%llu | DLL: in=%f, current=%f, err=%e\n",syt_timestamp, m_last_timestamp, m_last_timestamp2, f,m_ticks_per_frame,err);
1171             }
1172 #endif
1173             // integrate the error
1174             m_ticks_per_frame += RECEIVE_DLL_INTEGRATION_COEFFICIENT*err;
1175            
1176         }
1177        
1178          debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"R-SYT for cycle (%2d %2d)=>%2d: %5uT (%04uC + %04uT) %04X %04X %d\n",
1179             cycle,now_cycle_masked,delta_cycles,
1180             (m_last_timestamp),
1181             TICKS_TO_CYCLES(m_last_timestamp),
1182             TICKS_TO_OFFSET(m_last_timestamp),
1183             ntohs(packet->syt),TICKS_TO_CYCLE_TIMER(m_last_timestamp)&0xFFFF, dropped
1184          );
1185
1186         //=> signal that we're running (if we are)
1187         if(!m_running && nevents && m_last_timestamp2 && m_last_timestamp) {
1188             debugOutput(DEBUG_LEVEL_VERBOSE,"Receive StreamProcessor %p started running at %d\n", this, cycle);
1189             m_running=true;
1190         }
1191
1192         //=> don't process the stream samples when it is not enabled.
1193         if (!m_disabled && m_is_disabled) {
1194             // this means that we are trying to enable
1195             if (cycle == m_cycle_to_enable_at) {
1196                 m_is_disabled=false;
1197                 debugOutput(DEBUG_LEVEL_VERBOSE,"enabling StreamProcessor %p at %d\n", this, cycle);
1198                 // the previous timestamp is the one we need to start with
1199                 // because we're going to update the buffer again this loop
1200                 m_data_buffer->setBufferTailTimestamp(m_last_timestamp2);
1201                
1202             } else {
1203                 debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"will enable StreamProcessor %p at %u, now is %d\n", this, m_cycle_to_enable_at, cycle);
1204             }
1205         } else if (m_disabled && !m_is_disabled) {
1206             // trying to disable
1207             debugOutput(DEBUG_LEVEL_VERBOSE,"disabling StreamProcessor %p at %u\n", this, cycle);
1208             m_is_disabled=true;
1209         }
1210        
1211         if(m_is_disabled) {
1212
1213             // we keep track of the timestamp here
1214             // this makes sure that we will have a somewhat accurate
1215             // estimate as to when a period might be ready. i.e. it will not
1216             // be ready earlier than this timestamp + period time
1217            
1218             // the next (possible) sample is not this one, but lies
1219             // SYT_INTERVAL * rate later
1220             uint64_t ts=m_last_timestamp+(uint64_t)((float)m_syt_interval * m_ticks_per_frame);
1221            
1222             // wrap if nescessary
1223             if (ts >= TICKS_PER_SECOND * 128L) {
1224                 ts -= TICKS_PER_SECOND * 128L;
1225             }
1226             // set the timestamps
1227             m_data_buffer->setBufferTailTimestamp(ts);
1228            
1229             return RAW1394_ISO_DEFER;
1230         }
1231        
1232         //=> process the packet
1233         // add the data payload to the ringbuffer
1234         if(m_data_buffer->writeFrames(nevents, (char *)(data+8))) {
1235             retval=RAW1394_ISO_OK;
1236            
1237             // process all ports that should be handled on a per-packet base
1238             // this is MIDI for AMDTP (due to the need of DBC)
1239             if (!decodePacketPorts((quadlet_t *)(data+8), nevents, packet->dbc)) {
1240                 debugWarning("Problem decoding Packet Ports\n");
1241                 retval=RAW1394_ISO_DEFER;
1242             }
1243            
1244         } else {
1245        
1246             debugWarning("Receive buffer overrun (cycle %d, FC=%d, PC=%d)\n",
1247                  cycle, m_data_buffer->getFrameCounter(), m_handler->getPacketCount());
1248            
1249             m_xruns++;
1250            
1251             // disable the processing, will be re-enabled when
1252             // the xrun is handled
1253             m_disabled=true;
1254             m_is_disabled=true;
1255
1256             retval=RAW1394_ISO_DEFER;
1257            
1258         }
1259
1260 #ifdef DEBUG
1261         if(packet->dbs) {
1262             debugOutput(DEBUG_LEVEL_VERY_VERBOSE,
1263                 "RCV %04d: CH = %d, FDF = %X. SYT = %6d, DBS = %3d, DBC = %3d, FMT = %3d, LEN = %4d (%2d)\n",
1264                 cycle, channel, packet->fdf,
1265                 packet->syt,
1266                 packet->dbs,
1267                 packet->dbc,
1268                 packet->fmt,
1269                 length,
1270                 ((length / sizeof (quadlet_t)) - 2)/packet->dbs);
1271         }
1272 #endif
1273
1274         // update the frame counter such that it reflects the new value,
1275         // and also update the buffer tail timestamp, as we add new frames
1276         // done in the SP base class
1277         if (!StreamProcessor::putFrames(nevents, m_last_timestamp)) {
1278             debugError("Could not do StreamProcessor::putFrames(%d, %llu)\n",nevents, m_last_timestamp);
1279             return RAW1394_ISO_ERROR;
1280         }
1281
1282     }
1283    
1284     return retval;
1285 }
1286
1287 int64_t AmdtpReceiveStreamProcessor::getTimeUntilNextPeriodUsecs() {
1288     uint64_t time_at_period=getTimeAtPeriod();
1289    
1290     uint64_t cycle_timer=m_handler->getCycleTimerTicks();
1291    
1292     int64_t until_next=time_at_period-cycle_timer;
1293    
1294     // the maximal difference we can allow (64secs)
1295     const int64_t max=TICKS_PER_SECOND*64L;
1296    
1297     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> TAP=%11llu, CTR=%11llu, UTN=%11lld, TPUS=%f\n",
1298         time_at_period, cycle_timer, until_next, m_handler->getTicksPerUsec()
1299         );
1300        
1301     if(until_next > max) {
1302         // this means that cycle_timer has wrapped, but
1303         // time_at_period has not. we should unwrap cycle_timer
1304         // by adding TICKS_PER_SECOND*128L, meaning that we should substract
1305         // this value from until_next           
1306         until_next -= TICKS_PER_SECOND*128L;
1307     } else if (until_next < -max) {
1308         // this means that time_at_period has wrapped, but
1309         // cycle_timer has not. we should unwrap time_at_period
1310         // by adding TICKS_PER_SECOND*128L, meaning that we should add
1311         // this value from until_next
1312         until_next += TICKS_PER_SECOND*128L;
1313     }
1314    
1315     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "   TAP=%11llu, CTR=%11llu, UTN=%11lld, TPUS=%f\n",
1316         time_at_period, cycle_timer, until_next, m_handler->getTicksPerUsec()
1317         );
1318    
1319     // now convert to usecs
1320     // don't use the mapping function because it only works
1321     // for absolute times, not the relative time we are
1322     // using here (which can also be negative).
1323     return (int64_t)(((float)until_next) / m_handler->getTicksPerUsec());
1324 }
1325
1326 uint64_t AmdtpReceiveStreamProcessor::getTimeAtPeriodUsecs() {
1327     // then we should convert this into usecs
1328     // FIXME: we assume that the TimeSource of the IsoHandler is
1329     //        in usecs.
1330     return m_handler->mapToTimeSource(getTimeAtPeriod());
1331 }
1332
1333 uint64_t AmdtpReceiveStreamProcessor::getTimeAtPeriod() {
1334
1335     // every time a packet is received both the framecounter and the base
1336     // timestamp are updated. This means that at any instance of time, the
1337     // front of the buffer (latest sample) timestamp is known.
1338     // As we know the number of frames in the buffer, and we now the rate
1339     // in ticks/frame, we can calculate the back of buffer timestamp as:
1340     //    back_of_buffer_time = front_time - nbframes * rate
1341     // the next period boundary time lies m_period frames later:
1342     //    next_period_boundary = back_of_buffer_time + m_period * rate
1343    
1344     // NOTE: we should account for the fact that the timestamp is not for
1345     //       the latest sample, but for the latest sample minus syt_interval-1
1346     //       because it is the timestamp for the first sample in the packet,
1347     //       while the complete packet contains SYT_INTERVAL samples.
1348     //       this makes the equation:
1349     //          back_of_buffer_time = front_time - (nbframes - (syt_interval - 1)) * rate
1350     //          next_period_boundary = back_of_buffer_time + m_period * rate
1351
1352     // NOTE: where do we add the processing delay?
1353     //       if we add it here:
1354     //          next_period_boundary += RECEIVE_PROCESSING_DELAY
1355    
1356     // the complete equation now is:
1357     // next_period_boundary = front_time - (nbframes - (syt_interval - 1)) * rate
1358     //                        + m_period * rate + RECEIVE_PROCESSING_DELAY
1359     // since syt_interval is a constant value, we can equally well ignore it, as
1360     // if it were already included in RECEIVE_PROCESSING_DELAY
1361     // making the equation (simplified:
1362     // next_period_boundary = front_time + (-nbframes + m_period) * rate
1363     //                        + RECEIVE_PROCESSING_DELAY
1364     // currently this is in ticks
1365    
1366     int64_t fc=m_data_buffer->getFrameCounter();
1367    
1368     int64_t next_period_boundary =  m_last_timestamp;
1369     next_period_boundary     += (int64_t)(((int64_t)m_period
1370                                           - fc) * m_ticks_per_frame);
1371    
1372     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD=%11lld, LTS=%11llu, FC=%5d, TPF=%f\n",
1373         next_period_boundary, m_last_timestamp, fc, m_ticks_per_frame
1374         );
1375    
1376     // this happens if the timestamp wraps around while there are a lot of
1377     // frames in the buffer. We should add the wraparound value of the ticks
1378     // counter
1379     if (next_period_boundary < 0) {
1380         next_period_boundary += TICKS_PER_SECOND * 128L;
1381     }
1382     // this happens when the last timestamp is near wrapping, and
1383     // m_framecounter is low.
1384     // this means: m_last_timestamp is near wrapping and have just had
1385     // a getPackets() from the client side. the projected next_period
1386     // boundary lies beyond the wrap value.
1387     // the action is to wrap the value.
1388     else if (next_period_boundary >= TICKS_PER_SECOND * 128L) {
1389         next_period_boundary -= TICKS_PER_SECOND * 128L;
1390     }
1391    
1392     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "   NPD=%11lld, LTS=%11llu, FC=%5d, TPF=%f\n",
1393         next_period_boundary, m_last_timestamp, fc, m_ticks_per_frame
1394         );
1395
1396     return next_period_boundary;
1397 }
1398
1399 void AmdtpReceiveStreamProcessor::dumpInfo()
1400 {
1401
1402     StreamProcessor::dumpInfo();
1403    
1404         debugOutputShort( DEBUG_LEVEL_NORMAL, "  Device framerate  : %f\n", 24576000.0/m_ticks_per_frame);
1405
1406 }
1407
1408
1409 void AmdtpReceiveStreamProcessor::setVerboseLevel(int l) {
1410         setDebugLevel(l);
1411         ReceiveStreamProcessor::setVerboseLevel(l);
1412
1413 }
1414
1415 bool AmdtpReceiveStreamProcessor::reset() {
1416
1417     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting...\n");
1418
1419     m_PeriodStat.reset();
1420     m_PacketStat.reset();
1421     m_WakeupStat.reset();
1422    
1423     // this needs to be reset to the nominal value
1424     // because xruns can cause the DLL value to shift a lot
1425     // making that we run into problems when trying to re-enable
1426     // streaming
1427     m_ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_framerate);
1428
1429     // reset all non-device specific stuff
1430     // i.e. the iso stream and the associated ports
1431     if(!ReceiveStreamProcessor::reset()) {
1432             debugFatal("Could not do base class reset\n");
1433             return false;
1434     }
1435     return true;
1436 }
1437
1438 bool AmdtpReceiveStreamProcessor::prepare() {
1439
1440     m_PeriodStat.setName("RCV PERIOD");
1441     m_PacketStat.setName("RCV PACKET");
1442     m_WakeupStat.setName("RCV WAKEUP");
1443
1444         // prepare all non-device specific stuff
1445         // i.e. the iso stream and the associated ports
1446         if(!ReceiveStreamProcessor::prepare()) {
1447                 debugFatal("Could not prepare base class\n");
1448                 return false;
1449         }
1450        
1451         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
1452         switch (m_framerate) {
1453         case 32000:
1454                 m_syt_interval = 8;
1455                 break;
1456         case 44100:
1457                 m_syt_interval = 8;
1458                 break;
1459         default:
1460         case 48000:
1461                 m_syt_interval = 8;
1462                 break;
1463         case 88200:
1464                 m_syt_interval = 16;
1465                 break;
1466         case 96000:
1467                 m_syt_interval = 16;
1468                 break;
1469         case 176400:
1470                 m_syt_interval = 32;
1471                 break;
1472         case 192000:
1473                 m_syt_interval = 32;
1474                 break;
1475         }
1476
1477     // prepare the framerate estimate
1478     m_ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_framerate);
1479
1480     debugOutput(DEBUG_LEVEL_VERBOSE,"Initializing remote ticks/frame to %f\n",m_ticks_per_frame);
1481
1482     // allocate the event buffer
1483     unsigned int ringbuffer_size_frames=m_nb_buffers * m_period;
1484    
1485     // add the processing delay
1486     debugOutput(DEBUG_LEVEL_VERBOSE,"Adding %u frames of SYT slack buffering...\n",
1487         (uint)(RECEIVE_PROCESSING_DELAY/m_ticks_per_frame));
1488     ringbuffer_size_frames+=(uint)(RECEIVE_PROCESSING_DELAY/m_ticks_per_frame);
1489    
1490     assert(m_data_buffer);   
1491     m_data_buffer->setBufferSize(ringbuffer_size_frames);
1492     m_data_buffer->setEventSize(sizeof(quadlet_t));
1493     m_data_buffer->setEventsPerFrame(m_dimension);
1494        
1495     // the buffer is written every syt_interval
1496     m_data_buffer->setUpdatePeriod(m_syt_interval);
1497     m_data_buffer->setNominalRate(m_ticks_per_frame);
1498    
1499     m_data_buffer->prepare();
1500
1501         // set the parameters of ports we can:
1502         // we want the audio ports to be period buffered,
1503         // and the midi ports to be packet buffered
1504         for ( PortVectorIterator it = m_Ports.begin();
1505                   it != m_Ports.end();
1506                   ++it )
1507         {
1508                 debugOutput(DEBUG_LEVEL_VERBOSE, "Setting up port %s\n",(*it)->getName().c_str());
1509                 if(!(*it)->setBufferSize(m_period)) {
1510                         debugFatal("Could not set buffer size to %d\n",m_period);
1511                         return false;
1512                 }
1513
1514                 switch ((*it)->getPortType()) {
1515                         case Port::E_Audio:
1516                                 if(!(*it)->setSignalType(Port::E_PeriodSignalled)) {
1517                                         debugFatal("Could not set signal type to PeriodSignalling");
1518                                         return false;
1519                                 }
1520                                 // buffertype and datatype are dependant on the API
1521                                 debugWarning("---------------- ! Doing hardcoded dummy setup ! --------------\n");
1522                                 // buffertype and datatype are dependant on the API
1523                                 if(!(*it)->setBufferType(Port::E_PointerBuffer)) {
1524                                         debugFatal("Could not set buffer type");
1525                                         return false;
1526                                 }
1527                                 if(!(*it)->useExternalBuffer(true)) {
1528                                         debugFatal("Could not set external buffer usage");
1529                                         return false;
1530                                 }
1531                                 if(!(*it)->setDataType(Port::E_Float)) {
1532                                         debugFatal("Could not set data type");
1533                                         return false;
1534                                 }
1535                                 break;
1536                         case Port::E_Midi:
1537                                 if(!(*it)->setSignalType(Port::E_PacketSignalled)) {
1538                                         debugFatal("Could not set signal type to PacketSignalling");
1539                                         return false;
1540                                 }
1541                                 // buffertype and datatype are dependant on the API
1542                                 // buffertype and datatype are dependant on the API
1543                                 debugWarning("---------------- ! Doing hardcoded test setup ! --------------\n");
1544                                 // buffertype and datatype are dependant on the API
1545                                 if(!(*it)->setBufferType(Port::E_RingBuffer)) {
1546                                         debugFatal("Could not set buffer type");
1547                                         return false;
1548                                 }
1549                                 if(!(*it)->setDataType(Port::E_MidiEvent)) {
1550                                         debugFatal("Could not set data type");
1551                                         return false;
1552                                 }
1553                                 break;
1554                         default:
1555                                 debugWarning("Unsupported port type specified\n");
1556                                 break;
1557                 }
1558
1559         }
1560
1561         // the API specific settings of the ports should already be set,
1562         // as this is called from the processorManager->prepare()
1563         // so we can init the ports
1564         if(!initPorts()) {
1565                 debugFatal("Could not initialize ports!\n");
1566                 return false;
1567         }
1568
1569         if(!preparePorts()) {
1570                 debugFatal("Could not initialize ports!\n");
1571                 return false;
1572         }
1573
1574
1575         debugOutput( DEBUG_LEVEL_VERBOSE, "Prepared for:\n");
1576         debugOutput( DEBUG_LEVEL_VERBOSE, " Samplerate: %d, DBS: %d, SYT: %d\n",
1577                      m_framerate,m_dimension,m_syt_interval);
1578         debugOutput( DEBUG_LEVEL_VERBOSE, " PeriodSize: %d, NbBuffers: %d\n",
1579                      m_period,m_nb_buffers);
1580         debugOutput( DEBUG_LEVEL_VERBOSE, " Port: %d, Channel: %d\n",
1581                      m_port,m_channel);
1582         return true;
1583
1584 }
1585
1586 bool AmdtpReceiveStreamProcessor::prepareForStart() {
1587     disable();
1588     return true;
1589 }
1590
1591 bool AmdtpReceiveStreamProcessor::prepareForStop() {
1592     disable();
1593     return true;
1594 }
1595
1596 bool AmdtpReceiveStreamProcessor::canClientTransferFrames(unsigned int nbframes) {
1597     return m_data_buffer->getFrameCounter() >= (int) nbframes;
1598 }
1599
1600 bool AmdtpReceiveStreamProcessor::getFrames(unsigned int nbframes) {
1601
1602     m_PeriodStat.mark(m_data_buffer->getBufferFill());
1603
1604     // ask the buffer to process nbframes of frames
1605     // using it's registered client's processReadBlock(),
1606     // which should be ours
1607     m_data_buffer->blockProcessReadFrames(nbframes);
1608        
1609     // update the frame counter such that it reflects the new value,
1610     // done in the SP base class
1611    
1612     if (!StreamProcessor::getFrames(nbframes)) {
1613         debugError("Could not do StreamProcessor::getFrames(%d)\n", nbframes);
1614         return false;
1615     }
1616    
1617     return true;
1618 }
1619
1620 /**
1621  * \brief write received events to the stream ringbuffers.
1622  */
1623 bool AmdtpReceiveStreamProcessor::processReadBlock(char *data,
1624                                            unsigned int nevents, unsigned int offset)
1625 {
1626         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "(%p)->processReadBlock(%u, %u)\n",this,nevents,offset);
1627        
1628         bool no_problem=true;
1629
1630         for ( PortVectorIterator it = m_PeriodPorts.begin();
1631           it != m_PeriodPorts.end();
1632           ++it )
1633     {
1634
1635         if((*it)->isDisabled()) {continue;};
1636
1637                 //FIXME: make this into a static_cast when not DEBUG?
1638
1639                 AmdtpPortInfo *pinfo=dynamic_cast<AmdtpPortInfo *>(*it);
1640                 assert(pinfo); // this should not fail!!
1641
1642                 switch(pinfo->getFormat()) {
1643                 case AmdtpPortInfo::E_MBLA:
1644                         if(decodeMBLAEventsToPort(static_cast<AmdtpAudioPort *>(*it), (quadlet_t *)data, offset, nevents)) {
1645                                 debugWarning("Could not decode packet MBLA to port %s",(*it)->getName().c_str());
1646                                 no_problem=false;
1647                         }
1648                         break;
1649                 case AmdtpPortInfo::E_SPDIF: // still unimplemented
1650                         break;
1651         /* for this processor, midi is a packet based port
1652                 case AmdtpPortInfo::E_Midi:
1653                         break;*/
1654                 default: // ignore
1655                         break;
1656                 }
1657     }
1658     return no_problem;
1659
1660 }
1661
1662 /**
1663  * @brief decode a packet for the packet-based ports
1664  *
1665  * @param data Packet data
1666  * @param nevents number of events in data (including events of other ports & port types)
1667  * @param dbc DataBlockCount value for this packet
1668  * @return true if all successfull
1669  */
1670 bool AmdtpReceiveStreamProcessor::decodePacketPorts(quadlet_t *data, unsigned int nevents, unsigned int dbc)
1671 {
1672         bool ok=true;
1673        
1674         quadlet_t *target_event=NULL;
1675         unsigned int j;
1676        
1677         for ( PortVectorIterator it = m_PacketPorts.begin();
1678           it != m_PacketPorts.end();
1679           ++it )
1680         {
1681
1682 #ifdef DEBUG
1683                 AmdtpPortInfo *pinfo=dynamic_cast<AmdtpPortInfo *>(*it);
1684                 assert(pinfo); // this should not fail!!
1685
1686                 // the only packet type of events for AMDTP is MIDI in mbla
1687                 assert(pinfo->getFormat()==AmdtpPortInfo::E_Midi);
1688 #endif
1689                 AmdtpMidiPort *mp=static_cast<AmdtpMidiPort *>(*it);
1690                
1691                 // we decode this directly (no function call) due to the high frequency
1692                 /* idea:
1693                 spec says: current_midi_port=(dbc+j)%8;
1694                 => if we start at (dbc+stream->location-1)%8 [due to location_min=1],
1695                 we'll start at the right event for the midi port.
1696                 => if we increment j with 8, we stay at the right event.
1697                 */
1698                 // FIXME: as we know in advance how big a packet is (syt_interval) we can
1699                 //        predict how much loops will be present here
1700                 for(j = (dbc & 0x07)+mp->getLocation()-1; j < nevents; j += 8) {
1701                         target_event=(quadlet_t *)(data + ((j * m_dimension) + mp->getPosition()));
1702                         quadlet_t sample_int=ntohl(*target_event);
1703                         // FIXME: this assumes that 2X and 3X speed isn't used,
1704                         // because only the 1X slot is put into the ringbuffer
1705                         if(IEC61883_AM824_GET_LABEL(sample_int) != IEC61883_AM824_LABEL_MIDI_NO_DATA) {
1706                                 sample_int=(sample_int >> 16) & 0x000000FF;
1707                                 if(!mp->writeEvent(&sample_int)) {
1708                                         debugWarning("Packet port events lost\n");
1709                                         ok=false;
1710                                 }
1711                         }
1712                 }
1713
1714         }
1715        
1716         return ok;
1717 }
1718
1719 int AmdtpReceiveStreamProcessor::decodeMBLAEventsToPort(AmdtpAudioPort *p, quadlet_t *data,
1720                                            unsigned int offset, unsigned int nevents)
1721 {
1722         unsigned int j=0;
1723
1724 //      printf("****************\n");
1725 //      hexDumpQuadlets(data,m_dimension*4);
1726 //      printf("****************\n");
1727
1728         quadlet_t *target_event;
1729
1730         target_event=(quadlet_t *)(data + p->getPosition());
1731
1732         switch(p->getDataType()) {
1733                 default:
1734                 case Port::E_Int24:
1735                         {
1736                                 quadlet_t *buffer=(quadlet_t *)(p->getBufferAddress());
1737
1738                                 assert(nevents + offset <= p->getBufferSize());
1739
1740                                 buffer+=offset;
1741
1742                                 for(j = 0; j < nevents; j += 1) { // decode max nsamples
1743                                         *(buffer)=(ntohl((*target_event) ) & 0x00FFFFFF);
1744                                         buffer++;
1745                                         target_event+=m_dimension;
1746                                 }
1747                         }
1748                         break;
1749                 case Port::E_Float:
1750                         {
1751                                 const float multiplier = 1.0f / (float)(0x7FFFFF);
1752                                 float *buffer=(float *)(p->getBufferAddress());
1753
1754                                 assert(nevents + offset <= p->getBufferSize());
1755
1756                                 buffer+=offset;
1757
1758                                 for(j = 0; j < nevents; j += 1) { // decode max nsamples               
1759        
1760                                         unsigned int v = ntohl(*target_event) & 0x00FFFFFF;
1761                                         // sign-extend highest bit of 24-bit int
1762                                         int tmp = (int)(v << 8) / 256;
1763                
1764                                         *buffer = tmp * multiplier;
1765                                
1766                                         buffer++;
1767                                         target_event+=m_dimension;
1768                                 }
1769                         }
1770                         break;
1771         }
1772
1773         return 0;
1774 }
1775
1776 } // end of namespace FreebobStreaming
Note: See TracBrowser for help on using the browser.