Changeset 392 for branches/streaming-rework
- Timestamp:
- 02/10/07 04:06:26 (16 years ago)
- Files:
-
- branches/streaming-rework/src/debugmodule/debugmodule.cpp (modified) (1 diff)
- branches/streaming-rework/src/debugmodule/debugmodule.h (modified) (1 diff)
- branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp (modified) (9 diffs)
- branches/streaming-rework/src/libstreaming/StreamProcessor.cpp (modified) (1 diff)
- branches/streaming-rework/src/libstreaming/StreamProcessor.h (modified) (5 diffs)
- branches/streaming-rework/src/libutil/TimestampedBuffer.cpp (modified) (19 diffs)
- branches/streaming-rework/src/libutil/TimestampedBuffer.h (modified) (6 diffs)
- branches/streaming-rework/tests/Makefile.am (modified) (3 diffs)
- branches/streaming-rework/tests/SytMonitor.h (modified) (1 diff)
- branches/streaming-rework/tests/test-sytmonitor.cpp (modified) (2 diffs)
- branches/streaming-rework/tests/test-timestampedbuffer.cpp (added)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
branches/streaming-rework/src/debugmodule/debugmodule.cpp
r385 r392 270 270 } 271 271 272 void 273 DebugModuleManager::sync() 274 { 275 mb_flush(); 276 } 272 277 273 278 void branches/streaming-rework/src/debugmodule/debugmodule.h
r390 r392 199 199 bool setMgrDebugLevel( std::string name, debug_level_t level ); 200 200 201 void sync(); 202 201 203 protected: 202 204 bool registerModule( DebugModule& debugModule ); branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp
r391 r392 392 392 // update the frame counter such that it reflects the new value 393 393 // done in the SP base class 394 if (!StreamProcessor::getFrames(nevents)) {395 debugError("Could not do StreamProcessor::getFrames(%d)\n",nevents);396 return RAW1394_ISO_ERROR;397 }394 // if (!StreamProcessor::getFrames(nevents)) { 395 // debugError("Could not do StreamProcessor::getFrames(%d)\n",nevents); 396 // return RAW1394_ISO_ERROR; 397 // } 398 398 399 399 return RAW1394_ISO_OK; … … 471 471 // this base timestamp is the timestamp of the 472 472 // last buffer transfer. 473 uint64_t ts;474 uint64_t fc;475 m_SyncSource->m_data_buffer->getBufferHeadTimestamp(&ts, &fc); // thread safe473 // uint64_t ts; 474 // uint64_t fc; 475 // m_SyncSource->m_data_buffer->getBufferHeadTimestamp(&ts, &fc); // thread safe 476 476 477 477 // update the frame counter such that it reflects the buffer content, 478 478 // the buffer tail timestamp is initialized when the SP is enabled 479 479 // done in the SP base class 480 if (!StreamProcessor::putFrames(m_ringbuffer_size_frames, ts)) {481 debugError("Could not do StreamProcessor::putFrames(%d, %011llu)\n",482 m_ringbuffer_size_frames,ts);483 return false;484 }480 // if (!StreamProcessor::putFrames(m_ringbuffer_size_frames, ts)) { 481 // debugError("Could not do StreamProcessor::putFrames(%d, %011llu)\n", 482 // m_ringbuffer_size_frames,ts); 483 // return false; 484 // } 485 485 486 486 return true; … … 582 582 m_data_buffer->setUpdatePeriod(m_period); 583 583 m_data_buffer->setNominalRate(m_ticks_per_frame); 584 585 m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND); 584 586 585 587 m_data_buffer->prepare(); … … 730 732 731 733 // add the silence data to the ringbuffer 732 if(m_data_buffer->writeFrames(nframes, dummybuffer )) {734 if(m_data_buffer->writeFrames(nframes, dummybuffer, 0)) { 733 735 retval=true; 734 736 } else { … … 784 786 // and also update the buffer tail timestamp 785 787 // done in the SP base class 786 if (!StreamProcessor::putFrames(nbframes, timestamp)) {787 debugError("Could not do StreamProcessor::putFrames(%d, %llu)\n",nbframes, timestamp);788 return false;789 }788 // if (!StreamProcessor::putFrames(nbframes, timestamp)) { 789 // debugError("Could not do StreamProcessor::putFrames(%d, %llu)\n",nbframes, timestamp); 790 // return false; 791 // } 790 792 791 793 return true; … … 1232 1234 //=> process the packet 1233 1235 // add the data payload to the ringbuffer 1234 if(m_data_buffer->writeFrames(nevents, (char *)(data+8) )) {1236 if(m_data_buffer->writeFrames(nevents, (char *)(data+8), m_last_timestamp)) { 1235 1237 retval=RAW1394_ISO_OK; 1236 1238 … … 1275 1277 // and also update the buffer tail timestamp, as we add new frames 1276 1278 // done in the SP base class 1277 if (!StreamProcessor::putFrames(nevents, m_last_timestamp)) {1278 debugError("Could not do StreamProcessor::putFrames(%d, %llu)\n",nevents, m_last_timestamp);1279 return RAW1394_ISO_ERROR;1280 }1279 // if (!StreamProcessor::putFrames(nevents, m_last_timestamp)) { 1280 // debugError("Could not do StreamProcessor::putFrames(%d, %llu)\n",nevents, m_last_timestamp); 1281 // return RAW1394_ISO_ERROR; 1282 // } 1281 1283 1282 1284 } … … 1496 1498 m_data_buffer->setUpdatePeriod(m_syt_interval); 1497 1499 m_data_buffer->setNominalRate(m_ticks_per_frame); 1500 1501 m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND); 1498 1502 1499 1503 m_data_buffer->prepare(); … … 1606 1610 // which should be ours 1607 1611 m_data_buffer->blockProcessReadFrames(nbframes); 1608 1612 1609 1613 // update the frame counter such that it reflects the new value, 1610 1614 // done in the SP base class 1611 1615 /* 1612 1616 if (!StreamProcessor::getFrames(nbframes)) { 1613 1617 debugError("Could not do StreamProcessor::getFrames(%d)\n", nbframes); 1614 1618 return false; 1615 } 1616 1619 }*/ 1620 1617 1621 return true; 1618 1622 } branches/streaming-rework/src/libstreaming/StreamProcessor.cpp
r391 r392 166 166 } 167 167 168 / **169 * @brief Notify the StreamProcessor that frames were written170 *171 * This notifies the StreamProcessor of the fact that frames were written to the internal172 * buffer. This is for framecounter & timestamp bookkeeping.173 *174 * @param nbframes the number of frames that are written to the internal buffers175 * @param ts the new timestamp of the 'tail' of the buffer, i.e. the last sample176 * present in the buffer.177 * @return true if successful178 */179 bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts) {180 181 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Putting %d frames for %llu into frame buffer...\n", nbframes,ts);182 m_data_buffer->incrementFrameCounter(nbframes, ts);183 return true;184 }185 186 / **187 * @brief Notify the StreamProcessor that frames were read188 *189 * This notifies the StreamProcessor of the fact that frames were read from the internal190 * buffer. This is for framecounter & timestamp bookkeeping.191 *192 * @param nbframes the number of frames that are read from the internal buffers193 * @return true if successful194 */195 bool StreamProcessor::getFrames(unsigned int nbframes) {196 197 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Getting %d frames from frame buffer...\n", nbframes);198 m_data_buffer->decrementFrameCounter(nbframes);199 return true;200 }168 // /** 169 // * @brief Notify the StreamProcessor that frames were written 170 // * 171 // * This notifies the StreamProcessor of the fact that frames were written to the internal 172 // * buffer. This is for framecounter & timestamp bookkeeping. 173 // * 174 // * @param nbframes the number of frames that are written to the internal buffers 175 // * @param ts the new timestamp of the 'tail' of the buffer, i.e. the last sample 176 // * present in the buffer. 177 // * @return true if successful 178 // */ 179 // bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts) { 180 // 181 // debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Putting %d frames for %llu into frame buffer...\n", nbframes,ts); 182 // // m_data_buffer->incrementFrameCounter(nbframes, ts); 183 // return true; 184 // } 185 186 // /** 187 // * @brief Notify the StreamProcessor that frames were read 188 // * 189 // * This notifies the StreamProcessor of the fact that frames were read from the internal 190 // * buffer. This is for framecounter & timestamp bookkeeping. 191 // * 192 // * @param nbframes the number of frames that are read from the internal buffers 193 // * @return true if successful 194 // */ 195 // bool StreamProcessor::getFrames(unsigned int nbframes) { 196 // 197 // debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Getting %d frames from frame buffer...\n", nbframes); 198 // // m_data_buffer->decrementFrameCounter(nbframes); 199 // return true; 200 // } 201 201 202 202 uint64_t StreamProcessor::getTimeNow() { branches/streaming-rework/src/libstreaming/StreamProcessor.h
r391 r392 88 88 bool isEnabled() {return !m_is_disabled;}; 89 89 90 virtual bool putFrames(unsigned int nbframes, int64_t ts) ; ///< transfer the buffer contents from client91 virtual bool getFrames(unsigned int nbframes) ; ///< transfer the buffer contents to the client90 virtual bool putFrames(unsigned int nbframes, int64_t ts) = 0; ///< transfer the buffer contents from client 91 virtual bool getFrames(unsigned int nbframes) = 0; ///< transfer the buffer contents to the client 92 92 93 93 virtual bool reset(); ///< reset the streams & buffers (e.g. after xrun) … … 175 175 */ 176 176 virtual uint64_t getTimeAtPeriodUsecs() = 0; 177 177 178 178 /** 179 179 * \brief return the time of the next period boundary (in internal units) … … 184 184 */ 185 185 virtual uint64_t getTimeAtPeriod() = 0; 186 186 187 187 uint64_t getTimeNow(); 188 188 189 189 bool setSyncSource(StreamProcessor *s); 190 190 float getTicksPerFrame() {return m_ticks_per_frame;}; 191 191 192 192 int getLastCycle() {return m_last_cycle;}; 193 194 193 195 194 protected: 196 195 StreamProcessor *m_SyncSource; 197 196 198 197 float m_ticks_per_frame; 199 198 200 199 int m_last_cycle; 201 200 … … 221 220 int cycle, unsigned int dropped, unsigned int max_length) 222 221 {return RAW1394_ISO_STOP;}; 223 224 virtual enum raw1394_iso_disposition putPacket(unsigned char *data, unsigned int length, 222 virtual bool putFrames(unsigned int nbframes, int64_t ts) {return false;}; 223 224 virtual enum raw1394_iso_disposition putPacket(unsigned char *data, unsigned int length, 225 225 unsigned char channel, unsigned char tag, unsigned char sy, 226 226 unsigned int cycle, unsigned int dropped) = 0; … … 251 251 unsigned char channel, unsigned char tag, unsigned char sy, 252 252 unsigned int cycle, unsigned int dropped) {return RAW1394_ISO_STOP;}; 253 253 virtual bool getFrames(unsigned int nbframes) {return false;}; 254 254 255 virtual enum raw1394_iso_disposition 255 256 getPacket(unsigned char *data, unsigned int *length, branches/streaming-rework/src/libutil/TimestampedBuffer.cpp
r391 r392 41 41 m_event_size(0), m_events_per_frame(0), m_buffer_size(0), 42 42 m_bytes_per_frame(0), m_bytes_per_buffer(0), 43 m_wrap_at(0xFFFFFFFFFFFFFFFFLLU), 43 44 m_Client(c), m_framecounter(0), m_buffer_tail_timestamp(0), 45 m_buffer_next_tail_timestamp(0), 44 46 m_dll_e2(0.0), m_dll_b(0.877), m_dll_c(0.384), 45 47 m_nominal_rate(0.0), m_update_period(0) 46 48 { 49 pthread_mutex_init(&m_framecounter_lock, NULL); 47 50 48 51 } … … 53 56 } 54 57 58 /** 59 * \brief Set the nominal rate in frames/timeunit 60 * 61 * Sets the nominal rate in frames per time unit. This rate is used 62 * to initialize the DLL that will extract the effective rate based 63 * upon the timestamps it gets fed. 64 * 65 * @param r rate 66 * @return true if successful 67 */ 68 bool TimestampedBuffer::setNominalRate(float r) { 69 m_nominal_rate=r; 70 debugOutput(DEBUG_LEVEL_VERBOSE," nominal rate=%e set to %e\n", 71 m_nominal_rate, r); 72 return true; 73 } 74 75 /** 76 * \brief Set the nominal update period (in frames) 77 * 78 * Sets the nominal update period. This period is the number of frames 79 * between two timestamp updates (hence buffer writes) 80 * 81 * @param n period in frames 82 * @return true if successful 83 */ 84 bool TimestampedBuffer::setUpdatePeriod(unsigned int n) { 85 m_update_period=n; 86 return true; 87 } 88 89 /** 90 * \brief set the value at which timestamps should wrap around 91 * @param w value to wrap at 92 * @return true if successful 93 */ 94 bool TimestampedBuffer::setWrapValue(uint64_t w) { 95 m_wrap_at=w; 96 return true; 97 } 98 99 /** 100 * \brief return the effective rate 101 * 102 * Returns the effective rate calculated by the DLL. 103 * 104 * @return rate (in frames/timeunit) 105 */ 106 float TimestampedBuffer::getRate() { 107 return ((float) m_update_period)/m_dll_e2; 108 } 109 110 /** 111 * \brief Sets the size of the events 112 * @param s event size in bytes 113 * @return true if successful 114 */ 55 115 bool TimestampedBuffer::setEventSize(unsigned int s) { 56 116 m_event_size=s; … … 62 122 } 63 123 64 bool TimestampedBuffer::setEventsPerFrame(unsigned int s) { 65 m_events_per_frame=s; 124 /** 125 * \brief Sets the number of events per frame 126 * @param n number of events per frame 127 * @return true if successful 128 */ 129 bool TimestampedBuffer::setEventsPerFrame(unsigned int n) { 130 m_events_per_frame=n; 66 131 67 132 m_bytes_per_frame=m_event_size*m_events_per_frame; … … 70 135 return true; 71 136 } 72 73 bool TimestampedBuffer::setBufferSize(unsigned int s) { 74 m_buffer_size=s; 75 137 /** 138 * \brief Sets the buffer size in frames 139 * @param n number frames 140 * @return true if successful 141 */ 142 bool TimestampedBuffer::setBufferSize(unsigned int n) { 143 m_buffer_size=n; 144 76 145 m_bytes_per_frame=m_event_size*m_events_per_frame; 77 146 m_bytes_per_buffer=m_bytes_per_frame*m_buffer_size; 78 79 return true; 80 } 81 147 148 return true; 149 } 150 151 /** 152 * \brief Returns the current fill of the buffer 153 * 154 * This returns the buffer fill of the internal ringbuffer. This 155 * can only be used as an indication because it's state is not 156 * guaranteed to be consistent at all times due to threading issues. 157 * 158 * In order to get the number of frames in the buffer, use the 159 * getFrameCounter, getBufferHeadTimestamp, getBufferTailTimestamp 160 * functions 161 * 162 * @return the internal buffer fill in frames 163 */ 82 164 unsigned int TimestampedBuffer::getBufferFill() { 83 165 return freebob_ringbuffer_read_space(m_event_buffer)/(m_bytes_per_frame); 84 166 } 85 167 168 /** 169 * \brief Initializes the TimestampedBuffer 170 * 171 * Initializes the TimestampedBuffer, should be called before anything else 172 * is done. 173 * 174 * @return true if successful 175 */ 86 176 bool TimestampedBuffer::init() { 87 88 pthread_mutex_init(&m_framecounter_lock, NULL); 89 90 return true; 91 } 92 177 return true; 178 } 179 180 /** 181 * \brief Resets the TimestampedBuffer 182 * 183 * Resets the TimestampedBuffer, clearing the buffers and counters. 184 * (not true yet: Also resets the DLL to the nominal values.) 185 * 186 * \note when this is called, you should make sure that the buffer 187 * tail timestamp gets set before continuing 188 * 189 * @return true if successful 190 */ 93 191 bool TimestampedBuffer::reset() { 94 192 freebob_ringbuffer_reset(m_event_buffer); … … 99 197 } 100 198 101 void TimestampedBuffer::dumpInfo() { 102 103 uint64_t ts_head, fc; 104 getBufferHeadTimestamp(&ts_head,&fc); 105 106 int64_t diff=(int64_t)ts_head - (int64_t)m_buffer_tail_timestamp; 107 108 debugOutputShort( DEBUG_LEVEL_NORMAL, " TimestampedBuffer (%p) info:\n",this); 109 debugOutputShort( DEBUG_LEVEL_NORMAL, " Frame counter : %d\n", m_framecounter); 110 debugOutputShort( DEBUG_LEVEL_NORMAL, " Buffer head timestamp : %011llu\n",ts_head); 111 debugOutputShort( DEBUG_LEVEL_NORMAL, " Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp); 112 debugOutputShort( DEBUG_LEVEL_NORMAL, " Head - Tail : %011lld\n",diff); 113 debugOutputShort( DEBUG_LEVEL_NORMAL, " rate : %lf (%f)\n",m_dll_e2,m_dll_e2/m_update_period); 114 } 115 199 /** 200 * \brief Perpares the TimestampedBuffer 201 * 202 * Prepare the TimestampedBuffer. This allocates all internal buffers and 203 * initializes all data structures. 204 * 205 * This should be called after parameters such as buffer size, event size etc.. are set, 206 * and before any read/write operations are performed. 207 * 208 * @return true if successful 209 */ 116 210 bool TimestampedBuffer::prepare() { 117 211 debugOutput(DEBUG_LEVEL_VERBOSE,"Preparing buffer (%p)\n",this); 118 212 debugOutput(DEBUG_LEVEL_VERBOSE," Size=%u events, events/frame=%u, event size=%ubytes\n", 119 213 m_buffer_size,m_events_per_frame,m_event_size); 214 215 debugOutput(DEBUG_LEVEL_VERBOSE," update period %u\n", 216 m_update_period); 217 debugOutput(DEBUG_LEVEL_VERBOSE," nominal rate=%f\n", 218 m_nominal_rate); 219 220 debugOutput(DEBUG_LEVEL_VERBOSE," wrapping at %llu\n",m_wrap_at); 120 221 121 222 assert(m_buffer_size); … … 123 224 assert(m_event_size); 124 225 226 assert(m_nominal_rate != 0.0L); 227 assert(m_update_period != 0); 228 125 229 if( !(m_event_buffer=freebob_ringbuffer_create( 126 230 (m_events_per_frame * m_buffer_size) * m_event_size))) { 127 128 231 debugFatal("Could not allocate memory event ringbuffer\n"); 129 232 return false; … … 136 239 return false; 137 240 } 138 139 assert(m_nominal_rate != 0.0);140 assert(m_update_period != 0);141 241 142 242 // init the DLL … … 149 249 } 150 250 151 bool TimestampedBuffer::writeFrames(unsigned int nevents, char *data) { 152 153 unsigned int write_size=nevents*m_event_size*m_events_per_frame; 251 /** 252 * @brief Write frames to the buffer 253 * 254 * Copies \ref nframes of frames from the buffer pointed to by \ref data to the 255 * internal ringbuffer. The time of the last frame in the buffer is set to \ref ts. 256 * 257 * @param nframes number of frames to copy 258 * @param data pointer to the frame buffer 259 * @param ts timestamp of the last frame copied 260 * @return true if successful 261 */ 262 bool TimestampedBuffer::writeFrames(unsigned int nframes, char *data, uint64_t ts) { 263 264 unsigned int write_size=nframes*m_event_size*m_events_per_frame; 154 265 155 266 // add the data payload to the ringbuffer … … 159 270 return false; 160 271 } 161 return true; 162 163 } 164 165 bool TimestampedBuffer::readFrames(unsigned int nevents, char *data) { 166 167 unsigned int read_size=nevents*m_event_size*m_events_per_frame; 272 273 incrementFrameCounter(nframes,ts); 274 275 return true; 276 277 } 278 /** 279 * @brief Read frames from the buffer 280 * 281 * Copies \ref nframes of frames from the internal buffer to the data buffer pointed 282 * to by \ref data. 283 * 284 * @param nframes number of frames to copy 285 * @param data pointer to the frame buffer 286 * @return true if successful 287 */ 288 bool TimestampedBuffer::readFrames(unsigned int nframes, char *data) { 289 290 unsigned int read_size=nframes*m_event_size*m_events_per_frame; 168 291 169 292 // get the data payload to the ringbuffer … … 173 296 return false; 174 297 } 175 return true; 176 177 } 178 298 299 decrementFrameCounter(nframes); 300 301 return true; 302 303 } 304 305 /** 306 * @brief Performs block processing write of frames 307 * 308 * This function allows for zero-copy writing into the ringbuffer. 309 * It calls the client's processWriteBlock function to write frames 310 * into the internal buffer's data area, in a thread safe fashion. 311 * 312 * It also updates the timestamp. 313 * 314 * @param nbframes number of frames to process 315 * @param ts timestamp of the last frame written to the buffer 316 * @return true if successful 317 */ 179 318 bool TimestampedBuffer::blockProcessWriteFrames(unsigned int nbframes, int64_t ts) { 180 319 … … 266 405 } 267 406 268 return true; 269 270 } 271 407 incrementFrameCounter(nbframes,ts); 408 409 return true; 410 411 } 412 413 /** 414 * @brief Performs block processing read of frames 415 * 416 * This function allows for zero-copy reading from the ringbuffer. 417 * It calls the client's processReadBlock function to read frames 418 * directly from the internal buffer's data area, in a thread safe 419 * fashion. 420 * 421 * @param nbframes number of frames to process 422 * @return true if successful 423 */ 272 424 bool TimestampedBuffer::blockProcessReadFrames(unsigned int nbframes) { 273 425 … … 302 454 303 455 if(vec[0].len==0) { // this indicates an empty event buffer 304 debugError(" RCV: Event buffer underrun in processor %p\n",this);456 debugError("Event buffer underrun in buffer %p\n",this); 305 457 return false; 306 458 } … … 321 473 if(xrun<0) { 322 474 // xrun detected 323 debugError(" RCV: Frame buffer overrun in processor %p\n",this);475 debugError("Frame buffer overrun in buffer %p\n",this); 324 476 return false; 325 477 } … … 342 494 if(xrun<0) { 343 495 // xrun detected 344 debugError(" RCV: Frame buffer overrun in processor %p\n",this);496 debugError("Frame buffer overrun in buffer %p\n",this); 345 497 return false; 346 498 } … … 354 506 } 355 507 356 return true; 357 } 358 359 /** 360 * Decrements the frame counter, in a atomic way. 508 decrementFrameCounter(nbframes); 509 510 return true; 511 } 512 513 /** 514 * @brief Sets the buffer tail timestamp. 515 * 516 * Set the buffer tail timestamp to \ref new_timestamp. This will recalculate 517 * the internal state such that the buffer's timeframe starts at 518 * \ref new_timestamp. 519 * 520 * This is thread safe. 521 * 522 * @param new_timestamp 523 */ 524 void TimestampedBuffer::setBufferTailTimestamp(uint64_t new_timestamp) { 525 526 pthread_mutex_lock(&m_framecounter_lock); 527 528 m_buffer_tail_timestamp = new_timestamp; 529 530 m_dll_e2=m_update_period * m_nominal_rate; 531 m_buffer_next_tail_timestamp = (uint64_t)((double)m_buffer_tail_timestamp + m_dll_e2); 532 533 pthread_mutex_unlock(&m_framecounter_lock); 534 535 debugOutput(DEBUG_LEVEL_VERBOSE, "Set buffer tail timestamp for (%p) to %11llu, NTS=%llu, DLL2=%f\n", 536 this, new_timestamp, m_buffer_next_tail_timestamp, m_dll_e2); 537 538 } 539 540 /** 541 * \brief return the timestamp of the first frame in the buffer 542 * 543 * This function returns the timestamp of the very first sample in 544 * the StreamProcessor's buffer. It also returns the framecounter value 545 * for which this timestamp is valid. 546 * 547 * @param ts address to store the timestamp in 548 * @param fc address to store the associated framecounter in 549 */ 550 void TimestampedBuffer::getBufferHeadTimestamp(uint64_t *ts, uint64_t *fc) { 551 double rate=(double)m_buffer_next_tail_timestamp - (double)m_buffer_tail_timestamp; 552 rate /= (double)m_update_period; 553 554 int64_t timestamp; 555 556 pthread_mutex_lock(&m_framecounter_lock); 557 *fc = m_framecounter; 558 559 // ts(x) = m_buffer_tail_timestamp - 560 // (m_buffer_next_tail_timestamp - m_buffer_tail_timestamp)/(samples_between_updates)*(x) 561 562 // buffer head is the frame that is framecounter-1 away from the tail 563 timestamp=(int64_t)m_buffer_tail_timestamp - (int64_t)((m_framecounter) * rate); 564 565 pthread_mutex_unlock(&m_framecounter_lock); 566 567 debugOutput(DEBUG_LEVEL_VERBOSE, "(%p): HTS = %011lld, TTS=%011llu, FC=%05u, RATE=%f\n", 568 this, timestamp, m_buffer_tail_timestamp, *fc, rate); 569 570 if(timestamp >= (int64_t)m_wrap_at) { 571 timestamp -= m_wrap_at; 572 } else if(timestamp < 0) { 573 timestamp += m_wrap_at; 574 } 575 576 *ts=timestamp; 577 578 debugOutput(DEBUG_LEVEL_VERBOSE, " HTS = %011lld, FC=%05u\n", 579 *ts, *fc); 580 581 } 582 583 /** 584 * \brief return the timestamp of the last frame in the buffer 585 * 586 * This function returns the timestamp of the last frame in 587 * the StreamProcessor's buffer. It also returns the framecounter 588 * value for which this timestamp is valid. 589 * 590 * @param ts address to store the timestamp in 591 * @param fc address to store the associated framecounter in 592 */ 593 void TimestampedBuffer::getBufferTailTimestamp(uint64_t *ts, uint64_t *fc) { 594 pthread_mutex_lock(&m_framecounter_lock); 595 *fc = m_framecounter; 596 *ts = m_buffer_tail_timestamp; 597 pthread_mutex_unlock(&m_framecounter_lock); 598 } 599 600 /** 601 * Resets the frame counter, in a atomic way. This 361 602 * is thread safe. 603 */ 604 void TimestampedBuffer::resetFrameCounter() { 605 pthread_mutex_lock(&m_framecounter_lock); 606 m_framecounter = 0; 607 pthread_mutex_unlock(&m_framecounter_lock); 608 } 609 610 /** 611 * Decrements the frame counter in a thread safe way. 612 * 613 * @param nbframes number of frames to decrement 362 614 */ 363 615 void TimestampedBuffer::decrementFrameCounter(int nbframes) { … … 368 620 369 621 /** 370 * Increments the frame counter, in a atomic way. 371 * also sets the buffer tail timestamp 372 * This is thread safe. 622 * Increments the frame counter in a thread safe way. 623 * Also updates the timestamp. 624 * 625 * @param nbframes the number of frames to add 626 * @param new_timestamp the new timestamp 373 627 */ 374 628 void TimestampedBuffer::incrementFrameCounter(int nbframes, uint64_t new_timestamp) { 375 debugOutput(DEBUG_LEVEL_VER Y_VERBOSE, "Setting buffer tail timestamp for (%p) to %11llu\n",629 debugOutput(DEBUG_LEVEL_VERBOSE, "Setting buffer tail timestamp for (%p) to %11llu\n", 376 630 this, new_timestamp); 377 631 … … 390 644 391 645 // the maximal difference we can allow (64secs) 392 const int64_t max= TICKS_PER_SECOND*64L;646 const int64_t max=m_wrap_at/2; 393 647 394 648 if(diff > max) { 395 diff -= TICKS_PER_SECOND*128L;649 diff -= m_wrap_at; 396 650 } else if (diff < -max) { 397 diff += TICKS_PER_SECOND*128L;651 diff += m_wrap_at; 398 652 } 399 653 … … 401 655 402 656 debugOutputShort(DEBUG_LEVEL_VERY_VERBOSE, "diff2=%lld err=%f\n", 403 diff, err); 404 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "FC=%10u, TS=%011llu\n",m_framecounter, m_buffer_tail_timestamp); 657 diff, err); 658 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "B: FC=%10u, TS=%011llu, NTS=%011llu\n", 659 m_framecounter, m_buffer_tail_timestamp, m_buffer_next_tail_timestamp); 405 660 406 661 m_buffer_tail_timestamp=m_buffer_next_tail_timestamp; 407 662 m_buffer_next_tail_timestamp += (uint64_t)(m_dll_b * err + m_dll_e2); 408 663 409 if (m_buffer_next_tail_timestamp > TICKS_PER_SECOND*128L) { 410 m_buffer_next_tail_timestamp -= TICKS_PER_SECOND*128L; 664 if (m_buffer_next_tail_timestamp >= m_wrap_at) { 665 m_buffer_next_tail_timestamp -= m_wrap_at; 666 667 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Unwrapping next tail timestamp: %11llu\n", 668 m_buffer_next_tail_timestamp); 411 669 } 412 670 413 671 m_dll_e2 += m_dll_c*err; 414 672 415 debugOutput(DEBUG_LEVEL_VER Y_VERBOSE, "TS=%011llu, NTS=%011llu, DLLe2=%f\n",673 debugOutput(DEBUG_LEVEL_VERBOSE, "A: TS=%011llu, NTS=%011llu, DLLe2=%f\n", 416 674 m_buffer_tail_timestamp, m_buffer_next_tail_timestamp, m_dll_e2); 417 675 418 676 pthread_mutex_unlock(&m_framecounter_lock); 677 678 if(m_buffer_tail_timestamp>=m_wrap_at) { 679 debugError("Wrapping failed for m_buffer_tail_timestamp!\n"); 680 } 681 if(m_buffer_next_tail_timestamp>=m_wrap_at) { 682 debugError("Wrapping failed for m_buffer_next_tail_timestamp!\n"); 683 } 419 684 420 685 // this DLL allows the calculation of any sample timestamp relative to the buffer tail, … … 427 692 428 693 /** 429 * Sets the buffer tail timestamp (in usecs) 430 * This is thread safe. 431 */ 432 void TimestampedBuffer::setBufferTailTimestamp(uint64_t new_timestamp) { 433 434 pthread_mutex_lock(&m_framecounter_lock); 435 436 m_buffer_tail_timestamp = new_timestamp; 437 438 m_dll_e2=m_update_period * m_nominal_rate; 439 m_buffer_next_tail_timestamp = (uint64_t)((double)m_buffer_tail_timestamp + m_dll_e2); 440 441 pthread_mutex_unlock(&m_framecounter_lock); 442 443 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Set buffer tail timestamp for (%p) to %11llu, NTS=%llu, DLL2=%f\n", 444 this, new_timestamp, m_buffer_next_tail_timestamp, m_dll_e2); 445 446 } 447 448 /** 449 * \brief return the timestamp of the first frame in the buffer 450 * 451 * This function returns the timestamp of the very first sample in 452 * the StreamProcessor's buffer. This is useful for slave StreamProcessors 453 * to find out what the base for their timestamp generation should 454 * be. It also returns the framecounter value for which this timestamp 455 * is valid. 456 * 457 * The system is built in such a way that we assume that the processing 458 * of the buffers doesn't take any time. Assume we have a buffer transfer at 459 * time T1, meaning that the last sample of this buffer occurs at T1. As 460 * processing does not take time, we don't have to add anything to T1. When 461 * transferring the processed buffer to the xmit processor, the timestamp 462 * of the last sample is still T1. 463 * 464 * When starting the streams, we don't have any information on this last 465 * timestamp. We prefill the buffer at the xmit side, and we should find 466 * out what the timestamp for the last sample in the buffer is. If we sync 467 * on a receive SP, we know that the last prefilled sample corresponds with 468 * the first sample received - 1 sample duration. This is the same as if the last 469 * transfer from iso to client would have emptied the receive buffer. 470 * 471 * 472 * @param ts address to store the timestamp in 473 * @param fc address to store the associated framecounter in 474 */ 475 void TimestampedBuffer::getBufferHeadTimestamp(uint64_t *ts, uint64_t *fc) { 476 double rate=(double)m_buffer_next_tail_timestamp - (double)m_buffer_tail_timestamp; 477 rate /= (double)m_update_period; 478 479 pthread_mutex_lock(&m_framecounter_lock); 480 *fc = m_framecounter; 481 482 // ts(x) = m_buffer_tail_timestamp + 483 // (m_buffer_next_tail_timestamp - m_buffer_tail_timestamp)/(samples_between_updates)*x 484 485 *ts=m_buffer_tail_timestamp + (uint64_t)(m_framecounter * rate); 486 487 pthread_mutex_unlock(&m_framecounter_lock); 488 if(*ts > TICKS_PER_SECOND*128L) { 489 *ts -= TICKS_PER_SECOND*128L; 490 } 491 } 492 493 /** 494 * \brief return the timestamp of the last frame in the buffer 495 * 496 * This function returns the timestamp of the last frame in 497 * the StreamProcessor's buffer. It also returns the framecounter 498 * value for which this timestamp is valid. 499 * 500 * @param ts address to store the timestamp in 501 * @param fc address to store the associated framecounter in 502 */ 503 void TimestampedBuffer::getBufferTailTimestamp(uint64_t *ts, uint64_t *fc) { 504 pthread_mutex_lock(&m_framecounter_lock); 505 *fc = m_framecounter; 506 *ts = m_buffer_tail_timestamp; 507 pthread_mutex_unlock(&m_framecounter_lock); 508 } 509 510 /** 511 * Resets the frame counter, in a atomic way. This 512 * is thread safe. 513 */ 514 void TimestampedBuffer::resetFrameCounter() { 515 pthread_mutex_lock(&m_framecounter_lock); 516 m_framecounter = 0; 517 pthread_mutex_unlock(&m_framecounter_lock); 518 } 519 694 * @brief Print status info. 695 */ 696 void TimestampedBuffer::dumpInfo() { 697 698 uint64_t ts_head, fc; 699 getBufferHeadTimestamp(&ts_head,&fc); 700 701 int64_t diff=(int64_t)ts_head - (int64_t)m_buffer_tail_timestamp; 702 703 debugOutputShort( DEBUG_LEVEL_NORMAL, " TimestampedBuffer (%p) info:\n",this); 704 debugOutputShort( DEBUG_LEVEL_NORMAL, " Frame counter : %d\n", m_framecounter); 705 debugOutputShort( DEBUG_LEVEL_NORMAL, " Buffer head timestamp : %011llu\n",ts_head); 706 debugOutputShort( DEBUG_LEVEL_NORMAL, " Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp); 707 debugOutputShort( DEBUG_LEVEL_NORMAL, " Head - Tail : %011lld\n",diff); 708 debugOutputShort( DEBUG_LEVEL_NORMAL, " rate : %lf (%f)\n",m_dll_e2,m_dll_e2/m_update_period); 709 } 520 710 521 711 } // end of namespace FreebobUtil branches/streaming-rework/src/libutil/TimestampedBuffer.h
r391 r392 36 36 class TimestampedBufferClient; 37 37 38 /** 39 * \brief Class implementing a frame buffer that is time-aware 40 * 41 * This class implements a buffer that is time-aware. Whenever new frames 42 * are written to the buffer, the timestamp corresponding to the last frame 43 * in the buffer is updated. This allows to calculate the timestamp of any 44 * other frame in the buffer. 45 * 46 * The buffer is a frame buffer, having the following parameters defining 47 * it's behaviour: 48 * - buff_size: buffer size in frames (setBufferSize()) 49 * - events_per_frame: the number of events per frame (setEventsPerFrame()) 50 * - event_size: the storage size of the events (in bytes) (setEventSize()) 51 * 52 * The total size of the buffer (in bytes) is at least 53 * buff_size*events_per_frame*event_size. 54 * 55 * Timestamp tracking is done by requiring that a timestamp is specified every 56 * time frames are added to the buffer. In combination with the buffer fill and 57 * the frame rate (calculated internally), this allows to calculate the timestamp 58 * of any frame in the buffer. In order to initialize the internal data structures, 59 * the setNominalRate() and setUpdatePeriod() functions are provided. 60 * 61 * \note Currently the class only supports fixed size writes of size update_period. 62 * This can change in the future, implementation ideas are already in place. 63 * 64 * The TimestampedBuffer class is time unit agnostic. It can handle any time unit 65 * as long as it fits in a 64 bit unsigned integer. The buffer supports wrapped 66 * timestamps using (...). 67 * 68 * There are two methods of reading and writing to the buffer. 69 * 70 * The first method uses conventional readFrames() and writeFrames() functions. 71 * 72 * The second method makes use of the TimestampedBufferClient interface. When a 73 * TimestampedBuffer is created, it is required that a TimestampedBufferClient is 74 * registered. This client implements the processReadBlock and processWriteBlock 75 * functions. These are block processing 'callbacks' that allow zero-copy processing 76 * of the buffer contents. In order to initiate block processing, the 77 * blockProcessWriteFrames and blockProcessReadFrames functions are provided by 78 * TimestampedBuffer. 79 * 80 */ 38 81 class TimestampedBuffer { 39 82 40 83 public: 41 84 85 42 86 TimestampedBuffer(TimestampedBufferClient *); 43 87 virtual ~TimestampedBuffer(); 44 88 45 bool writeFrames(unsigned int nbframes, char *data );89 bool writeFrames(unsigned int nbframes, char *data, uint64_t ts); 46 90 bool readFrames(unsigned int nbframes, char *data); 47 91 … … 57 101 bool setBufferSize(unsigned int s); 58 102 103 bool setWrapValue(uint64_t w); 104 59 105 unsigned int getBufferFill(); 60 106 … … 62 108 int getFrameCounter() {return m_framecounter;}; 63 109 64 void decrementFrameCounter(int nbframes);65 void incrementFrameCounter(int nbframes, uint64_t new_timestamp);66 void resetFrameCounter();67 68 110 void getBufferHeadTimestamp(uint64_t *ts, uint64_t *fc); 69 111 void getBufferTailTimestamp(uint64_t *ts, uint64_t *fc); … … 72 114 73 115 // dll stuff 74 void setNominalRate(double r) {m_nominal_rate=r;};75 double getRate() {return m_dll_e2;};116 bool setNominalRate(float r); 117 float getRate(); 76 118 77 void setUpdatePeriod(unsigned int t) {m_update_period=t;};119 bool setUpdatePeriod(unsigned int t); 78 120 79 121 // misc stuff 80 122 void dumpInfo(); 81 82 123 void setVerboseLevel(int l) {setDebugLevel(l);}; 124 125 private: 126 void decrementFrameCounter(int nbframes); 127 void incrementFrameCounter(int nbframes, uint64_t new_timestamp); 128 void resetFrameCounter(); 129 83 130 protected: 84 131 … … 91 138 unsigned int m_bytes_per_frame; 92 139 unsigned int m_bytes_per_buffer; 140 141 uint64_t m_wrap_at; // value to wrap at 93 142 94 143 TimestampedBufferClient *m_Client; … … 117 166 double m_dll_c; 118 167 119 doublem_nominal_rate;168 float m_nominal_rate; 120 169 unsigned int m_update_period; 121 170 }; 122 171 172 /** 173 * \brief Interface to be implemented by TimestampedBuffer clients 174 */ 123 175 class TimestampedBufferClient { 124 176 public: branches/streaming-rework/tests/Makefile.am
r384 r392 22 22 23 23 noinst_PROGRAMS = test-freebob test-extplugcmd test-fw410 freebob-server \ 24 test-volume test-mixer test-cycletimer test-sytmonitor 24 test-volume test-mixer test-cycletimer test-sytmonitor \ 25 test-timestampedbuffer 25 26 26 27 noinst_HEADERS = … … 49 50 $(LIBAVC1394_LIBS) $(LIBIEC61883_LIBS) -lrom1394 50 51 51 52 52 #TESTS_ENVIRONMENT 53 53 TEST = test-freebob … … 61 61 test_sytmonitor_SOURCES = test-sytmonitor.cpp SytMonitor.cpp \ 62 62 SytMonitor.h 63 64 test_timestampedbuffer_LDADD = $(top_builddir)/src/libfreebob.la $(LIBIEC61883_LIBS) \ 65 $(LIBRAW1394_LIBS) $(LIBAVC1394_LIBS) 66 test_timestampedbuffer_SOURCES = test-timestampedbuffer.cpp branches/streaming-rework/tests/SytMonitor.h
r386 r392 7 7 * http://freebob.sf.net 8 8 * 9 * Copyright (C) 200 5,2006Pieter Palmers <pieterpalmers@users.sourceforge.net>9 * Copyright (C) 2007 Pieter Palmers <pieterpalmers@users.sourceforge.net> 10 10 * 11 11 * This program is free software {} you can redistribute it and/or modify branches/streaming-rework/tests/test-sytmonitor.cpp
r390 r392 1 1 /*************************************************************************** 2 Copyright (C) 200 5by Pieter Palmers *2 Copyright (C) 2007 by Pieter Palmers * 3 3 * 4 4 This program is free software; you can redistribute it and/or modify * … … 41 41 #include "src/libutil/SystemTimeSource.h" 42 42 43 #include "pthread.h"43 #include <pthread.h> 44 44 45 45 using namespace FreebobStreaming;