root/trunk/libffado/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.cpp

Revision 893, 16.7 kB (checked in by ppalmers, 16 years ago)

improve AMDTP receive performance

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23 #include "config.h"
24
25 #include "AmdtpReceiveStreamProcessor.h"
26 #include "AmdtpPort.h"
27 #include "../StreamProcessorManager.h"
28 #include "devicemanager.h"
29
30 #include "libieee1394/ieee1394service.h"
31 #include "libieee1394/IsoHandlerManager.h"
32 #include "libieee1394/cycletimer.h"
33
34 #include <netinet/in.h>
35 #include <assert.h>
36
37 namespace Streaming {
38
39 /* --------------------- RECEIVE ----------------------- */
40
41 AmdtpReceiveStreamProcessor::AmdtpReceiveStreamProcessor(FFADODevice &parent, int dimension)
42     : StreamProcessor(parent, ePT_Receive)
43     , m_dimension( dimension )
44     , m_nb_audio_ports( 0 )
45     , m_nb_midi_ports( 0 )
46
47 {}
48
49 unsigned int
50 AmdtpReceiveStreamProcessor::getSytInterval() {
51     switch (m_StreamProcessorManager.getNominalRate()) {
52         case 32000:
53         case 44100:
54         case 48000:
55             return 8;
56         case 88200:
57         case 96000:
58             return 16;
59         case 176400:
60         case 192000:
61             return 32;
62         default:
63             debugError("Unsupported rate: %d\n", m_StreamProcessorManager.getNominalRate());
64             return 0;
65     }
66 }
67
68 bool AmdtpReceiveStreamProcessor::prepareChild() {
69     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing (%p)...\n", this);
70     m_syt_interval = getSytInterval();
71
72     if (!initPortCache()) {
73         debugError("Could not init port cache\n");
74         return false;
75     }
76
77     return true;
78 }
79
80
81 /**
82  * Processes packet header to extract timestamps and so on
83  * @param data
84  * @param length
85  * @param channel
86  * @param tag
87  * @param sy
88  * @param cycle
89  * @param dropped
90  * @return
91  */
92 enum StreamProcessor::eChildReturnValue
93 AmdtpReceiveStreamProcessor::processPacketHeader(unsigned char *data, unsigned int length,
94                   unsigned char channel, unsigned char tag, unsigned char sy,
95                   unsigned int cycle, unsigned int dropped)
96 {
97     #ifdef DEBUG
98     static uint32_t now_prev=0;
99     static uint64_t now_prev_ticks=0;
100     #endif
101
102     struct iec61883_packet *packet = (struct iec61883_packet *) data;
103     assert(packet);
104     bool ok = (packet->syt != 0xFFFF) &&
105                   (packet->fdf != 0xFF) &&
106                   (packet->fmt == 0x10) &&
107                   (packet->dbs > 0) &&
108                   (length >= 2*sizeof(quadlet_t));
109     if(ok) {
110         uint32_t now = m_1394service.getCycleTimer();
111
112         #ifdef DEBUG
113         uint64_t now_ticks = CYCLE_TIMER_TO_TICKS(now);
114
115         if (diffTicks(now_ticks, now_prev_ticks) < 0) {
116             debugWarning("non-monotonic CTR on cycle %04u: %llu -> %llu\n", cycle, now_prev_ticks, now_ticks);
117             debugWarning("                               : %08X -> %08X\n", now_prev, now);
118             debugOutput ( DEBUG_LEVEL_VERBOSE,
119                         " current: %011llu (%03us %04ucy %04uticks)\n",
120                         now,
121                         (unsigned int)TICKS_TO_SECS( now ),
122                         (unsigned int)TICKS_TO_CYCLES( now ),
123                         (unsigned int)TICKS_TO_OFFSET( now ) );
124             debugOutput ( DEBUG_LEVEL_VERBOSE,
125                         " prev   : %011llu (%03us %04ucy %04uticks)\n",
126                         now_prev,
127                         (unsigned int)TICKS_TO_SECS( now_prev ),
128                         (unsigned int)TICKS_TO_CYCLES( now_prev ),
129                         (unsigned int)TICKS_TO_OFFSET( now_prev ) );
130         }
131         now_prev = now;
132         now_prev_ticks=now_ticks;
133         #endif
134
135         //=> convert the SYT to a full timestamp in ticks
136         m_last_timestamp = sytRecvToFullTicks((uint32_t)ntohs(packet->syt),
137                                               cycle, now);
138     }
139     return (ok ? eCRV_OK : eCRV_Invalid );
140 }
141
142 /**
143  * extract the data from the packet
144  * @pre the IEC61883 packet is valid according to isValidPacket
145  * @param data
146  * @param length
147  * @param channel
148  * @param tag
149  * @param sy
150  * @param cycle
151  * @param dropped
152  * @return
153  */
154 enum StreamProcessor::eChildReturnValue
155 AmdtpReceiveStreamProcessor::processPacketData(unsigned char *data, unsigned int length,
156                   unsigned char channel, unsigned char tag, unsigned char sy,
157                   unsigned int cycle, unsigned int dropped_cycles) {
158     struct iec61883_packet *packet = (struct iec61883_packet *) data;
159     assert(packet);
160
161     unsigned int nevents=((length / sizeof (quadlet_t)) - 2)/packet->dbs;
162
163     // we have to keep in mind that there are also
164     // some packets buffered by the ISO layer,
165     // at most x=m_handler->getWakeupInterval()
166     // these contain at most x*syt_interval
167     // frames, meaning that we might receive
168     // this packet x*syt_interval*ticks_per_frame
169     // later than expected (the real receive time)
170     #ifdef DEBUG
171     if(isRunning()) {
172         debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
173                            "STMP: %lluticks | syt_interval=%d, tpf=%f\n",
174                            m_last_timestamp, m_syt_interval, getTicksPerFrame());
175     }
176     #endif
177
178     if(m_data_buffer->writeFrames(nevents, (char *)(data+8), m_last_timestamp)) {
179         return eCRV_OK;
180     } else {
181         return eCRV_XRun;
182     }
183 }
184
185 /***********************************************
186  * Encoding/Decoding API                       *
187  ***********************************************/
188 /**
189  * @brief write received events to the stream ringbuffers.
190  */
191 bool AmdtpReceiveStreamProcessor::processReadBlock(char *data,
192                        unsigned int nevents, unsigned int offset)
193 {
194     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,
195                         "(%p)->processReadBlock(%u, %u)\n",
196                         this,nevents,offset);
197
198     // update the variable parts of the cache
199     updatePortCache();
200
201     // decode audio data
202     switch(m_StreamProcessorManager.getAudioDataType()) {
203         case StreamProcessorManager::eADT_Int24:
204             decodeAudioPortsInt24((quadlet_t *)data, offset, nevents);
205             break;
206         case StreamProcessorManager::eADT_Float:
207             decodeAudioPortsFloat((quadlet_t *)data, offset, nevents);
208             break;
209     }
210
211     // do midi ports
212     decodeMidiPorts((quadlet_t *)data, offset, nevents);
213     return true;
214 }
215
216 //#ifdef __SSE2__
217 #if 0 // SSE code is not ready yet
218 #include <emmintrin.h>
219 #warning SSE2 build
220
221 /**
222  * @brief demux events to all audio ports (int24)
223  * @param data
224  * @param offset
225  * @param nevents
226  */
227 void
228 AmdtpReceiveStreamProcessor::decodeAudioPortsInt24(quadlet_t *data,
229                                                     unsigned int offset,
230                                                     unsigned int nevents)
231 {
232     unsigned int j;
233     quadlet_t *target_event;
234     unsigned int i;
235
236     for (i = 0; i < m_nb_audio_ports; i++) {
237         struct _MBLA_port_cache &p = m_audio_ports.at(i);
238         target_event = (quadlet_t *)(data + i);
239         assert(nevents + offset <= p.buffer_size );
240
241         if(p.buffer && p.enabled) {
242             quadlet_t *buffer = (quadlet_t *)(p.buffer);
243             buffer += offset;
244
245             for(j = 0; j < nevents; j += 1) {
246                 *(buffer)=(ntohl((*target_event) ) & 0x00FFFFFF);
247                 buffer++;
248                 target_event+=m_dimension;
249             }
250         }
251     }
252 }
253
254 /**
255  * @brief demux events to all audio ports (float)
256  * @param data
257  * @param offset
258  * @param nevents
259  */
260 void
261 AmdtpReceiveStreamProcessor::decodeAudioPortsFloat(quadlet_t *data,
262                                                     unsigned int offset,
263                                                     unsigned int nevents)
264 {
265     unsigned int j;
266     quadlet_t *target_event;
267     unsigned int i;
268     const float multiplier = 1.0f / (float)(0x7FFFFF);
269
270     for (i = 0; i < m_nb_audio_ports; i++) {
271         struct _MBLA_port_cache &p = m_audio_ports.at(i);
272         target_event = (quadlet_t *)(data + i);
273         assert(nevents + offset <= p.buffer_size );
274
275         if(p.buffer && p.enabled) {
276             float *buffer = (float *)(p.buffer);
277             buffer += offset;
278
279             for(j = 0; j < nevents; j += 1) {
280                 unsigned int v = ntohl(*target_event) & 0x00FFFFFF;
281                 // sign-extend highest bit of 24-bit int
282                 int tmp = (int)(v << 8) / 256;
283                 *buffer = tmp * multiplier;
284                 buffer++;
285                 target_event+=m_dimension;
286             }
287         }
288     }
289 }
290
291 #else
292 /**
293  * @brief demux events to all audio ports (int24)
294  * @param data
295  * @param offset
296  * @param nevents
297  */
298 void
299 AmdtpReceiveStreamProcessor::decodeAudioPortsInt24(quadlet_t *data,
300                                                     unsigned int offset,
301                                                     unsigned int nevents)
302 {
303     unsigned int j;
304     quadlet_t *target_event;
305     unsigned int i;
306
307     for (i = 0; i < m_nb_audio_ports; i++) {
308         struct _MBLA_port_cache &p = m_audio_ports.at(i);
309         target_event = (quadlet_t *)(data + i);
310         assert(nevents + offset <= p.buffer_size );
311
312         if(p.buffer && p.enabled) {
313             quadlet_t *buffer = (quadlet_t *)(p.buffer);
314             buffer += offset;
315
316             for(j = 0; j < nevents; j += 1) {
317                 *(buffer)=(ntohl((*target_event) ) & 0x00FFFFFF);
318                 buffer++;
319                 target_event+=m_dimension;
320             }
321         }
322     }
323 }
324
325 /**
326  * @brief demux events to all audio ports (float)
327  * @param data
328  * @param offset
329  * @param nevents
330  */
331 void
332 AmdtpReceiveStreamProcessor::decodeAudioPortsFloat(quadlet_t *data,
333                                                     unsigned int offset,
334                                                     unsigned int nevents)
335 {
336     unsigned int j;
337     quadlet_t *target_event;
338     unsigned int i;
339     const float multiplier = 1.0f / (float)(0x7FFFFF);
340
341     for (i = 0; i < m_nb_audio_ports; i++) {
342         struct _MBLA_port_cache &p = m_audio_ports.at(i);
343         target_event = (quadlet_t *)(data + i);
344         assert(nevents + offset <= p.buffer_size );
345
346         if(p.buffer && p.enabled) {
347             float *buffer = (float *)(p.buffer);
348             buffer += offset;
349
350             for(j = 0; j < nevents; j += 1) {
351                 unsigned int v = ntohl(*target_event) & 0x00FFFFFF;
352                 // sign-extend highest bit of 24-bit int
353                 int tmp = (int)(v << 8) / 256;
354                 *buffer = tmp * multiplier;
355                 buffer++;
356                 target_event+=m_dimension;
357             }
358         }
359     }
360 }
361
362 #endif
363
364 /**
365  * @brief decode all midi ports in the cache from events
366  * @param data
367  * @param offset
368  * @param nevents
369  */
370 void
371 AmdtpReceiveStreamProcessor::decodeMidiPorts(quadlet_t *data,
372                                               unsigned int offset,
373                                               unsigned int nevents)
374 {
375     quadlet_t *target_event;
376     quadlet_t sample_int;
377     unsigned int i,j;
378
379     for (i = 0; i < m_nb_midi_ports; i++) {
380         struct _MIDI_port_cache &p = m_midi_ports.at(i);
381         if (p.buffer && p.enabled) {
382             uint32_t *buffer = (quadlet_t *)(p.buffer);
383             buffer += offset;
384
385             for (j = p.location;j < nevents; j += 8) {
386                 target_event = (quadlet_t *) (data + ((j * m_dimension) + p.position));
387                 sample_int=ntohl(*target_event);
388                 // FIXME: this assumes that 2X and 3X speed isn't used,
389                 // because only the 1X slot is put into the ringbuffer
390                 if(IEC61883_AM824_GET_LABEL(sample_int) != IEC61883_AM824_LABEL_MIDI_NO_DATA) {
391                     sample_int=(sample_int >> 16) & 0x000000FF;
392                     sample_int |= 0x01000000; // flag that there is a midi event present
393                     *buffer = sample_int;
394                     debugOutput(DEBUG_LEVEL_VERBOSE, "Received midi byte %08X on port %p index %d\n", sample_int, p, j-p.location);
395                 } else {
396                     // make sure no event is received
397                     *buffer = 0;
398                 }
399                 buffer+=8;
400             }
401         }
402     }
403 }
404
405 bool
406 AmdtpReceiveStreamProcessor::initPortCache() {
407     // make use of the fact that audio ports are the first ports in
408     // the cluster as per AMDTP. so we can sort the ports by position
409     // and have very efficient lookups:
410     // m_float_ports.at(i).buffer -> audio stream i buffer
411     // for midi ports we simply cache all port info since they are (usually) not
412     // that numerous
413     m_nb_audio_ports = 0;
414     m_audio_ports.clear();
415    
416     m_nb_midi_ports = 0;
417     m_midi_ports.clear();
418    
419     for(PortVectorIterator it = m_Ports.begin();
420         it != m_Ports.end();
421         ++it )
422     {
423         AmdtpPortInfo *pinfo=dynamic_cast<AmdtpPortInfo *>(*it);
424         assert(pinfo); // this should not fail!!
425
426         switch( pinfo->getFormat() )
427         {
428             case AmdtpPortInfo::E_MBLA:
429                 m_nb_audio_ports++;
430                 break;
431             case AmdtpPortInfo::E_SPDIF: // still unimplemented
432                 break;
433             case AmdtpPortInfo::E_Midi:
434                 m_nb_midi_ports++;
435                 break;
436             default: // ignore
437                 break;
438         }
439     }
440
441     unsigned int idx;
442     for (idx = 0; idx < m_nb_audio_ports; idx++) {
443         for(PortVectorIterator it = m_Ports.begin();
444             it != m_Ports.end();
445             ++it )
446         {
447             AmdtpPortInfo *pinfo=dynamic_cast<AmdtpPortInfo *>(*it);
448             debugOutput(DEBUG_LEVEL_VERY_VERBOSE,
449                         "idx %u: looking at port %s at position %u\n",
450                         idx, (*it)->getName().c_str(), pinfo->getPosition());
451             if(pinfo->getPosition() == idx) {
452                 struct _MBLA_port_cache p;
453                 p.port = dynamic_cast<AmdtpAudioPort *>(*it);
454                 if(p.port == NULL) {
455                     debugError("Port is not an AmdtpAudioPort!\n");
456                     return false;
457                 }
458                 p.buffer = NULL; // to be filled by updatePortCache
459                 #ifdef DEBUG
460                 p.buffer_size = (*it)->getBufferSize();
461                 #endif
462
463                 m_audio_ports.push_back(p);
464                 debugOutput(DEBUG_LEVEL_VERBOSE,
465                             "Cached port %s at position %u\n",
466                             p.port->getName().c_str(), idx);
467                 goto next_index;
468             }
469         }
470         debugError("No MBLA port found for position %d\n", idx);
471         return false;
472 next_index:
473         continue;
474     }
475
476     for(PortVectorIterator it = m_Ports.begin();
477         it != m_Ports.end();
478         ++it )
479     {
480         AmdtpPortInfo *pinfo=dynamic_cast<AmdtpPortInfo *>(*it);
481         debugOutput(DEBUG_LEVEL_VERY_VERBOSE,
482                     "idx %u: looking at port %s at position %u, location %u\n",
483                     idx, (*it)->getName().c_str(), pinfo->getPosition(), pinfo->getLocation());
484         if ((*it)->getPortType() == Port::E_Midi) {
485             struct _MIDI_port_cache p;
486             p.port = dynamic_cast<AmdtpMidiPort *>(*it);
487             if(p.port == NULL) {
488                 debugError("Port is not an AmdtpMidiPort!\n");
489                 return false;
490             }
491             p.position = pinfo->getPosition();
492             p.location = pinfo->getLocation();
493             p.buffer = NULL; // to be filled by updatePortCache
494             #ifdef DEBUG
495             p.buffer_size = (*it)->getBufferSize();
496             #endif
497
498             m_midi_ports.push_back(p);
499             debugOutput(DEBUG_LEVEL_VERBOSE,
500                         "Cached port %s at position %u, location %u\n",
501                         p.port->getName().c_str(), p.position, p.location);
502         }
503     }
504
505     return true;
506 }
507
508 void
509 AmdtpReceiveStreamProcessor::updatePortCache() {
510     unsigned int idx;
511     for (idx = 0; idx < m_nb_audio_ports; idx++) {
512         struct _MBLA_port_cache& p = m_audio_ports.at(idx);
513         AmdtpAudioPort *port = p.port;
514         p.buffer = port->getBufferAddress();
515         p.enabled = !port->isDisabled();
516     }
517     for (idx = 0; idx < m_nb_midi_ports; idx++) {
518         struct _MIDI_port_cache& p = m_midi_ports.at(idx);
519         AmdtpMidiPort *port = p.port;
520         p.buffer = port->getBufferAddress();
521         p.enabled = !port->isDisabled();
522     }
523 }
524
525 } // end of namespace Streaming
Note: See TracBrowser for help on using the browser.