root/branches/streaming-rework/src/libutil/TimestampedBuffer.cpp

Revision 391, 17.4 kB (checked in by pieterpalmers, 16 years ago)

* Partially finished:

  • Introduce TimestampedBuffer? util class
  • replace interal ringbuffer of SP with timed ringbuffer

* Compiles & works

Line 
1 /* $Id$ */
2
3 /*
4  *   FreeBob Streaming API
5  *   FreeBob = Firewire (pro-)audio for linux
6  *
7  *   http://freebob.sf.net
8  *
9  *   Copyright (C) 2005,2006,2007 Pieter Palmers <pieterpalmers@users.sourceforge.net>
10  *
11  *   This program is free software {} you can redistribute it and/or modify
12  *   it under the terms of the GNU General Public License as published by
13  *   the Free Software Foundation {} either version 2 of the License, or
14  *   (at your option) any later version.
15  *
16  *   This program is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY {} without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with this program {} if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  *
26  *
27  */
28  
29 #include "libutil/Atomic.h"
30 #include "libstreaming/cycletimer.h"
31
32 #include "TimestampedBuffer.h"
33 #include "assert.h"
34
35 namespace FreebobUtil {
36
37 IMPL_DEBUG_MODULE( TimestampedBuffer, TimestampedBuffer, DEBUG_LEVEL_VERBOSE );
38
39 TimestampedBuffer::TimestampedBuffer(TimestampedBufferClient *c)
40     : m_event_buffer(NULL), m_cluster_buffer(NULL),
41       m_event_size(0), m_events_per_frame(0), m_buffer_size(0),
42       m_bytes_per_frame(0), m_bytes_per_buffer(0),
43       m_Client(c), m_framecounter(0), m_buffer_tail_timestamp(0),
44       m_dll_e2(0.0), m_dll_b(0.877), m_dll_c(0.384),
45       m_nominal_rate(0.0), m_update_period(0)
46 {
47
48 }
49
50 TimestampedBuffer::~TimestampedBuffer() {
51     freebob_ringbuffer_free(m_event_buffer);
52     free(m_cluster_buffer);
53 }
54
55 bool TimestampedBuffer::setEventSize(unsigned int s) {
56     m_event_size=s;
57    
58     m_bytes_per_frame=m_event_size*m_events_per_frame;
59     m_bytes_per_buffer=m_bytes_per_frame*m_buffer_size;
60    
61     return true;
62 }
63
64 bool TimestampedBuffer::setEventsPerFrame(unsigned int s) {
65     m_events_per_frame=s;
66    
67     m_bytes_per_frame=m_event_size*m_events_per_frame;
68     m_bytes_per_buffer=m_bytes_per_frame*m_buffer_size;
69    
70     return true;
71 }
72
73 bool TimestampedBuffer::setBufferSize(unsigned int s) {
74     m_buffer_size=s;
75    
76     m_bytes_per_frame=m_event_size*m_events_per_frame;
77     m_bytes_per_buffer=m_bytes_per_frame*m_buffer_size;
78    
79     return true;
80 }
81
82 unsigned int TimestampedBuffer::getBufferFill() {
83     return freebob_ringbuffer_read_space(m_event_buffer)/(m_bytes_per_frame);
84 }
85
86 bool TimestampedBuffer::init() {
87    
88     pthread_mutex_init(&m_framecounter_lock, NULL);
89    
90     return true;
91 }
92
93 bool TimestampedBuffer::reset() {
94     freebob_ringbuffer_reset(m_event_buffer);
95    
96     resetFrameCounter();
97    
98     return true;
99 }
100
101 void TimestampedBuffer::dumpInfo() {
102    
103     uint64_t ts_head, fc;
104     getBufferHeadTimestamp(&ts_head,&fc);
105    
106     int64_t diff=(int64_t)ts_head - (int64_t)m_buffer_tail_timestamp;
107
108     debugOutputShort( DEBUG_LEVEL_NORMAL, "  TimestampedBuffer (%p) info:\n",this);
109     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Frame counter         : %d\n", m_framecounter);
110     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer head timestamp : %011llu\n",ts_head);
111     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp);
112     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Head - Tail           : %011lld\n",diff);
113     debugOutputShort( DEBUG_LEVEL_NORMAL, "  rate                  : %lf (%f)\n",m_dll_e2,m_dll_e2/m_update_period);
114 }
115
116 bool TimestampedBuffer::prepare() {
117     debugOutput(DEBUG_LEVEL_VERBOSE,"Preparing buffer (%p)\n",this);
118     debugOutput(DEBUG_LEVEL_VERBOSE," Size=%u events, events/frame=%u, event size=%ubytes\n",
119                                         m_buffer_size,m_events_per_frame,m_event_size);
120
121     assert(m_buffer_size);
122     assert(m_events_per_frame);
123     assert(m_event_size);
124
125     if( !(m_event_buffer=freebob_ringbuffer_create(
126             (m_events_per_frame * m_buffer_size) * m_event_size))) {
127            
128         debugFatal("Could not allocate memory event ringbuffer\n");
129         return false;
130     }
131    
132     // allocate the temporary cluster buffer
133     if( !(m_cluster_buffer=(char *)calloc(m_events_per_frame,m_event_size))) {
134             debugFatal("Could not allocate temporary cluster buffer\n");
135         freebob_ringbuffer_free(m_event_buffer);
136         return false;
137     }
138
139     assert(m_nominal_rate != 0.0);
140     assert(m_update_period != 0);
141    
142     // init the DLL
143     m_dll_e2=m_nominal_rate * (double)m_update_period;
144    
145     m_dll_b=((double)(0.877));
146     m_dll_c=((double)(0.384));
147
148     return true;
149 }
150
151 bool TimestampedBuffer::writeFrames(unsigned int nevents, char *data) {
152
153     unsigned int write_size=nevents*m_event_size*m_events_per_frame;
154
155     // add the data payload to the ringbuffer
156     if (freebob_ringbuffer_write(m_event_buffer,data,write_size) < write_size)
157     {
158         debugWarning("writeFrames buffer overrun\n");
159         return false;
160     }
161     return true;
162
163 }
164
165 bool TimestampedBuffer::readFrames(unsigned int nevents, char *data) {
166
167     unsigned int read_size=nevents*m_event_size*m_events_per_frame;
168
169     // get the data payload to the ringbuffer
170     if ((freebob_ringbuffer_read(m_event_buffer,data,read_size)) < read_size)
171     {
172         debugWarning("readFrames buffer underrun\n");
173         return false;
174     }
175     return true;
176
177 }
178
179 bool TimestampedBuffer::blockProcessWriteFrames(unsigned int nbframes, int64_t ts) {
180
181     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
182     int xrun;
183     unsigned int offset=0;
184    
185     freebob_ringbuffer_data_t vec[2];
186     // we received one period of frames
187     // this is period_size*dimension of events
188     unsigned int events2write=nbframes*m_events_per_frame;
189     unsigned int bytes2write=events2write*m_event_size;
190
191     /* write events2write bytes to the ringbuffer
192     *  first see if it can be done in one read.
193     *  if so, ok.
194     *  otherwise write up to a multiple of clusters directly to the buffer
195     *  then do the buffer wrap around using ringbuffer_write
196     *  then write the remaining data directly to the buffer in a third pass
197     *  Make sure that we cannot end up on a non-cluster aligned position!
198     */
199     unsigned int cluster_size=m_events_per_frame*m_event_size;
200
201     while(bytes2write>0) {
202         int byteswritten=0;
203        
204         unsigned int frameswritten=(nbframes*cluster_size-bytes2write)/cluster_size;
205         offset=frameswritten;
206        
207         freebob_ringbuffer_get_write_vector(m_event_buffer, vec);
208            
209         if(vec[0].len==0) { // this indicates a full event buffer
210             debugError("Event buffer overrun in buffer %p\n",this);
211             break;
212         }
213            
214         /* if we don't take care we will get stuck in an infinite loop
215         * because we align to a cluster boundary later
216         * the remaining nb of bytes in one write operation can be
217         * smaller than one cluster
218         * this can happen because the ringbuffer size is always a power of 2
219         */
220         if(vec[0].len<cluster_size) {
221            
222             // encode to the temporary buffer
223             xrun = m_Client->processWriteBlock(m_cluster_buffer, 1, offset);
224            
225             if(xrun<0) {
226                 // xrun detected
227                 debugError("Frame buffer underrun in buffer %p\n",this);
228                 return false;
229             }
230                
231             // use the ringbuffer function to write one cluster
232             // the write function handles the wrap around.
233             freebob_ringbuffer_write(m_event_buffer,
234                          m_cluster_buffer,
235                          cluster_size);
236                
237             // we advanced one cluster_size
238             bytes2write-=cluster_size;
239                
240         } else { //
241            
242             if(bytes2write>vec[0].len) {
243                 // align to a cluster boundary
244                 byteswritten=vec[0].len-(vec[0].len%cluster_size);
245             } else {
246                 byteswritten=bytes2write;
247             }
248                
249             xrun = m_Client->processWriteBlock(vec[0].buf,
250                          byteswritten/cluster_size,
251                          offset);
252            
253             if(xrun<0) {
254                     // xrun detected
255                 debugError("Frame buffer underrun in buffer %p\n",this);
256                 return false; // FIXME: return false ?
257             }
258
259             freebob_ringbuffer_write_advance(m_event_buffer, byteswritten);
260             bytes2write -= byteswritten;
261         }
262
263         // the bytes2write should always be cluster aligned
264         assert(bytes2write%cluster_size==0);
265
266     }
267    
268     return true;
269    
270 }
271
272 bool TimestampedBuffer::blockProcessReadFrames(unsigned int nbframes) {
273
274     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Reading %u from buffer (%p)...\n", nbframes, this);
275    
276     int xrun;
277     unsigned int offset=0;
278    
279     freebob_ringbuffer_data_t vec[2];
280     // we received one period of frames on each connection
281     // this is period_size*dimension of events
282
283     unsigned int events2read=nbframes*m_events_per_frame;
284     unsigned int bytes2read=events2read*m_event_size;
285     /* read events2read bytes from the ringbuffer
286     *  first see if it can be done in one read.
287     *  if so, ok.
288     *  otherwise read up to a multiple of clusters directly from the buffer
289     *  then do the buffer wrap around using ringbuffer_read
290     *  then read the remaining data directly from the buffer in a third pass
291     *  Make sure that we cannot end up on a non-cluster aligned position!
292     */
293     unsigned int cluster_size=m_events_per_frame*m_event_size;
294    
295     while(bytes2read>0) {
296         unsigned int framesread=(nbframes*cluster_size-bytes2read)/cluster_size;
297         offset=framesread;
298
299         int bytesread=0;
300
301         freebob_ringbuffer_get_read_vector(m_event_buffer, vec);
302
303         if(vec[0].len==0) { // this indicates an empty event buffer
304             debugError("RCV: Event buffer underrun in processor %p\n",this);
305             return false;
306         }
307
308         /* if we don't take care we will get stuck in an infinite loop
309         * because we align to a cluster boundary later
310         * the remaining nb of bytes in one read operation can be smaller than one cluster
311         * this can happen because the ringbuffer size is always a power of 2
312                 */
313         if(vec[0].len<cluster_size) {
314             // use the ringbuffer function to read one cluster
315             // the read function handles wrap around
316             freebob_ringbuffer_read(m_event_buffer,m_cluster_buffer,cluster_size);
317
318             assert(m_Client);
319             xrun = m_Client->processReadBlock(m_cluster_buffer, 1, offset);
320
321             if(xrun<0) {
322                 // xrun detected
323                 debugError("RCV: Frame buffer overrun in processor %p\n",this);
324                     return false;
325             }
326
327             // we advanced one cluster_size
328             bytes2read-=cluster_size;
329
330         } else { //
331
332             if(bytes2read>vec[0].len) {
333                 // align to a cluster boundary
334                 bytesread=vec[0].len-(vec[0].len%cluster_size);
335             } else {
336                 bytesread=bytes2read;
337             }
338            
339             assert(m_Client);
340             xrun = m_Client->processReadBlock(vec[0].buf, bytesread/cluster_size, offset);
341
342             if(xrun<0) {
343                 // xrun detected
344                 debugError("RCV: Frame buffer overrun in processor %p\n",this);
345                 return false;
346             }
347
348             freebob_ringbuffer_read_advance(m_event_buffer, bytesread);
349             bytes2read -= bytesread;
350         }
351
352         // the bytes2read should always be cluster aligned
353         assert(bytes2read%cluster_size==0);
354     }
355
356     return true;
357 }
358
359 /**
360  * Decrements the frame counter, in a atomic way.
361  * is thread safe.
362  */
363 void TimestampedBuffer::decrementFrameCounter(int nbframes) {
364     pthread_mutex_lock(&m_framecounter_lock);
365     m_framecounter -= nbframes;
366     pthread_mutex_unlock(&m_framecounter_lock);
367 }
368
369 /**
370  * Increments the frame counter, in a atomic way.
371  * also sets the buffer tail timestamp
372  * This is thread safe.
373  */
374 void TimestampedBuffer::incrementFrameCounter(int nbframes, uint64_t new_timestamp) {
375     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer tail timestamp for (%p) to %11llu\n",
376                 this, new_timestamp);
377    
378     pthread_mutex_lock(&m_framecounter_lock);
379     m_framecounter += nbframes;
380    
381     // update the DLL
382     int64_t diff = (int64_t)new_timestamp-(int64_t)m_buffer_next_tail_timestamp;
383    
384     // idea to implement it for nbframes values that differ from m_update_period:
385     // diff = diff * nbframes/m_update_period
386     // m_buffer_next_tail_timestamp = m_buffer_tail_timestamp + diff
387    
388     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p): diff=%lld ",
389                 this, diff);
390                    
391     // the maximal difference we can allow (64secs)
392     const int64_t max=TICKS_PER_SECOND*64L;
393    
394     if(diff > max) {
395         diff -= TICKS_PER_SECOND*128L;
396     } else if (diff < -max) {
397         diff += TICKS_PER_SECOND*128L;
398     }
399
400     double err=diff;
401    
402     debugOutputShort(DEBUG_LEVEL_VERY_VERBOSE, "diff2=%lld err=%f\n",
403                 diff, err);
404     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "FC=%10u, TS=%011llu\n",m_framecounter, m_buffer_tail_timestamp);
405    
406     m_buffer_tail_timestamp=m_buffer_next_tail_timestamp;
407     m_buffer_next_tail_timestamp += (uint64_t)(m_dll_b * err + m_dll_e2);
408    
409     if (m_buffer_next_tail_timestamp > TICKS_PER_SECOND*128L) {
410         m_buffer_next_tail_timestamp -= TICKS_PER_SECOND*128L;
411     }
412    
413     m_dll_e2 += m_dll_c*err;
414    
415     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "TS=%011llu, NTS=%011llu, DLLe2=%f\n",
416                 m_buffer_tail_timestamp, m_buffer_next_tail_timestamp, m_dll_e2);
417    
418     pthread_mutex_unlock(&m_framecounter_lock);
419    
420     // this DLL allows the calculation of any sample timestamp relative to the buffer tail,
421     // to the next period and beyond (through extrapolation)
422     //
423     // ts(x) = m_buffer_tail_timestamp +
424     //         (m_buffer_next_tail_timestamp - m_buffer_tail_timestamp)/(samples_between_updates)*x
425    
426 }
427
428 /**
429  * Sets the buffer tail timestamp (in usecs)
430  * This is thread safe.
431  */
432 void TimestampedBuffer::setBufferTailTimestamp(uint64_t new_timestamp) {
433    
434     pthread_mutex_lock(&m_framecounter_lock);
435    
436     m_buffer_tail_timestamp = new_timestamp;
437    
438     m_dll_e2=m_update_period * m_nominal_rate;
439     m_buffer_next_tail_timestamp = (uint64_t)((double)m_buffer_tail_timestamp + m_dll_e2);
440    
441     pthread_mutex_unlock(&m_framecounter_lock);   
442    
443     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Set buffer tail timestamp for (%p) to %11llu, NTS=%llu, DLL2=%f\n",
444                 this, new_timestamp, m_buffer_next_tail_timestamp, m_dll_e2);
445
446 }
447
448 /**
449  * \brief return the timestamp of the first frame in the buffer
450  *
451  * This function returns the timestamp of the very first sample in
452  * the StreamProcessor's buffer. This is useful for slave StreamProcessors
453  * to find out what the base for their timestamp generation should
454  * be. It also returns the framecounter value for which this timestamp
455  * is valid.
456  *
457  * The system is built in such a way that we assume that the processing
458  * of the buffers doesn't take any time. Assume we have a buffer transfer at
459  * time T1, meaning that the last sample of this buffer occurs at T1. As
460  * processing does not take time, we don't have to add anything to T1. When
461  * transferring the processed buffer to the xmit processor, the timestamp
462  * of the last sample is still T1.
463  *
464  * When starting the streams, we don't have any information on this last
465  * timestamp. We prefill the buffer at the xmit side, and we should find
466  * out what the timestamp for the last sample in the buffer is. If we sync
467  * on a receive SP, we know that the last prefilled sample corresponds with
468  * the first sample received - 1 sample duration. This is the same as if the last
469  * transfer from iso to client would have emptied the receive buffer.
470  *
471  *
472  * @param ts address to store the timestamp in
473  * @param fc address to store the associated framecounter in
474  */
475 void TimestampedBuffer::getBufferHeadTimestamp(uint64_t *ts, uint64_t *fc) {
476     double rate=(double)m_buffer_next_tail_timestamp - (double)m_buffer_tail_timestamp;
477     rate /= (double)m_update_period;
478    
479     pthread_mutex_lock(&m_framecounter_lock);
480     *fc = m_framecounter;
481
482     // ts(x) = m_buffer_tail_timestamp +
483     //         (m_buffer_next_tail_timestamp - m_buffer_tail_timestamp)/(samples_between_updates)*x
484    
485     *ts=m_buffer_tail_timestamp + (uint64_t)(m_framecounter * rate);
486
487     pthread_mutex_unlock(&m_framecounter_lock);
488     if(*ts > TICKS_PER_SECOND*128L) {
489         *ts -= TICKS_PER_SECOND*128L;
490     }
491 }
492        
493 /**
494  * \brief return the timestamp of the last frame in the buffer
495  *
496  * This function returns the timestamp of the last frame in
497  * the StreamProcessor's buffer. It also returns the framecounter
498  * value for which this timestamp is valid.
499  *
500  * @param ts address to store the timestamp in
501  * @param fc address to store the associated framecounter in
502  */
503 void TimestampedBuffer::getBufferTailTimestamp(uint64_t *ts, uint64_t *fc) {
504     pthread_mutex_lock(&m_framecounter_lock);
505     *fc = m_framecounter;
506     *ts = m_buffer_tail_timestamp;
507     pthread_mutex_unlock(&m_framecounter_lock);
508 }
509
510 /**
511  * Resets the frame counter, in a atomic way. This
512  * is thread safe.
513  */
514 void TimestampedBuffer::resetFrameCounter() {
515     pthread_mutex_lock(&m_framecounter_lock);
516     m_framecounter = 0;
517     pthread_mutex_unlock(&m_framecounter_lock);
518 }
519
520
521 } // end of namespace FreebobUtil
Note: See TracBrowser for help on using the browser.