Changeset 393
- Timestamp:
- 02/13/07 09:22:14 (16 years ago)
- Files:
-
- branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp (modified) (24 diffs)
- branches/streaming-rework/src/libstreaming/cycletimer.h (modified) (2 diffs)
- branches/streaming-rework/src/libutil/TimestampedBuffer.cpp (modified) (10 diffs)
- branches/streaming-rework/tests/test-cycletimer.cpp (modified) (6 diffs)
- branches/streaming-rework/tests/test-sytmonitor.cpp (modified) (5 diffs)
- branches/streaming-rework/tests/test-timestampedbuffer.cpp (modified) (15 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp
r392 r393 91 91 92 92 struct iec61883_packet *packet = (struct iec61883_packet *) data; 93 unsigned int nevents=0;94 93 95 94 m_last_cycle=cycle; … … 103 102 } 104 103 #endif 104 105 // calculate & preset common values 105 106 106 107 /* Our node ID can change after a bus reset, so it is best to fetch … … 116 117 packet->eoh1 = 2; 117 118 packet->fmt = IEC61883_FMT_AMDTP; 118 119 // recalculate the buffer head timestamp 120 float ticks_per_frame=m_SyncSource->getTicksPerFrame(); 121 122 // the base timestamp is the one of the last sample in the buffer 123 uint64_t ts_tail; 124 uint64_t fc; 125 126 m_data_buffer->getBufferTailTimestamp(&ts_tail, &fc); // thread safe 127 128 int64_t timestamp = ts_tail; 129 130 // meaning that the first sample in the buffer lies m_framecounter * rate 131 // earlier. This gives the next equation: 132 // timestamp = m_last_timestamp - m_framecounter * rate 133 timestamp -= (int64_t)((float)fc * ticks_per_frame); 134 135 // FIXME: test 136 // timestamp -= (uint64_t)(((float)m_handler->getWakeupInterval()) 137 // * ((float)m_syt_interval) * ticks_per_frame); 138 // 139 // // substract the receive transfer delay 140 // timestamp -= RECEIVE_PROCESSING_DELAY; 141 142 // this happens if m_buffer_tail_timestamp wraps around while there are 143 // not much frames in the buffer. We should add the wraparound value of the ticks 144 // counter 145 if (timestamp < 0) { 146 timestamp += TICKS_PER_SECOND * 128L; 147 } 148 // this happens when the last timestamp is near wrapping, and 149 // m_framecounter is low. 150 // this means: m_last_timestamp is near wrapping and have just had 151 // a getPackets() from the client side. the projected next_period 152 // boundary lies beyond the wrap value. 153 // the action is to wrap the value. 154 else if (timestamp >= TICKS_PER_SECOND * 128L) { 155 timestamp -= TICKS_PER_SECOND * 128L; 156 } 119 120 *tag = IEC61883_TAG_WITH_CIP; 121 *sy = 0; 157 122 158 123 // determine if we want to send a packet or not … … 166 131 // the difference between 'now' and the cycle this 167 132 // packet is intended for 168 int cycle_diff = cycle - now_cycles; 169 170 // detect wraparounds 171 if(cycle_diff < -(int)(CYCLES_PER_SECOND/2)) { 172 cycle_diff += CYCLES_PER_SECOND; 173 } else if(cycle_diff > (int)(CYCLES_PER_SECOND/2)) { 174 cycle_diff -= CYCLES_PER_SECOND; 175 } 176 177 // as long as the cycle parameter is not in sync with 178 // the current time, the stream is considered not 179 // to be 'running' 180 if (!m_running && cycle_diff >= 0 && cycle != -1) { 181 debugOutput(DEBUG_LEVEL_VERBOSE, "Xmit StreamProcessor %p started running at cycle %d\n",this, cycle); 182 m_running=true; 183 } 133 int cycle_diff = substractCycles(cycle, now_cycles); 184 134 185 135 #ifdef DEBUG … … 189 139 } 190 140 #endif 191 // if cycle lies cycle_diff cycles in the future, we should 192 // queue this packet cycle_diff * TICKS_PER_CYCLE earlier than 193 // we would if it were to be sent immediately. 194 195 // determine the 'now' time in ticks 196 uint64_t cycle_timer=CYCLE_TIMER_TO_TICKS(ctr); 197 198 // time until the packet is to be sent (if > 0: send packet) 199 int64_t until_next=timestamp-(int64_t)cycle_timer; 200 201 #ifdef DEBUG 202 int64_t utn2=until_next; // debug!! 203 #endif 141 142 // as long as the cycle parameter is not in sync with 143 // the current time, the stream is considered not 144 // to be 'running' 145 // NOTE: this works only at startup 146 if (!m_running && cycle_diff >= 0 && cycle != -1) { 147 debugOutput(DEBUG_LEVEL_VERBOSE, "Xmit StreamProcessor %p started running at cycle %d\n",this, cycle); 148 m_running=true; 149 } 150 151 uint64_t ts_head, fc; 152 if (!m_disabled && m_is_disabled) { 153 // this means that we are trying to enable 154 if ((unsigned int)cycle == m_cycle_to_enable_at) { 155 m_is_disabled=false; 156 157 debugOutput(DEBUG_LEVEL_VERBOSE,"Enabling StreamProcessor %p at %u\n", this, cycle); 158 159 // initialize the buffer head & tail 160 m_SyncSource->m_data_buffer->getBufferHeadTimestamp(&ts_head, &fc); // thread safe 161 162 // the number of cycles the sync source lags (> 0) 163 // or leads (< 0) 164 int sync_lag_cycles=substractCycles(cycle, m_SyncSource->getLastCycle()); 165 166 // account for the cycle lag between sync SP and this SP 167 // the last update of the sync source's timestamps was sync_lag_cycles 168 // cycles before the cycle we are calculating the timestamp for. 169 // if we were to use one-frame buffers, you would expect the 170 // frame that is sent on cycle CT to have a timestamp T1. 171 // ts_head however is for cycle CT-sync_lag_cycles, and lies 172 // therefore sync_lag_cycles * TICKS_PER_CYCLE earlier than 173 // T1. 174 ts_head += sync_lag_cycles * TICKS_PER_CYCLE; 175 176 float ticks_per_frame=m_SyncSource->getTicksPerFrame(); 177 178 // calculate the buffer tail timestamp of our buffer 179 // this timestamp corresponds to the buffer head timestamp 180 // of the sync source plus the 'length' of our buffer. 181 // this makes the buffer head timestamp of our buffer 182 // equal to (at max) the buffer head timestamp of the 183 // sync source. 184 ts_head = addTicks(ts_head, (uint32_t)((float)m_ringbuffer_size_frames * ticks_per_frame)); 185 186 #ifdef DEBUG 187 if ((unsigned int)m_data_buffer->getFrameCounter() != m_ringbuffer_size_frames) { 188 debugWarning("m_data_buffer->getFrameCounter() != m_ringbuffer_size_frames\n"); 189 } 190 #endif 191 192 m_data_buffer->setBufferTailTimestamp(ts_head); 193 194 debugOutput(DEBUG_LEVEL_VERBOSE,"XMIT TS SET: TS=%10lld, FC=%4d, %f\n", 195 ts_head, m_data_buffer->getFrameCounter(), ticks_per_frame); 196 } else { 197 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, 198 "will enable StreamProcessor %p at %u, now is %d\n", 199 this, m_cycle_to_enable_at, cycle); 200 } 201 } else if (m_disabled && !m_is_disabled) { 202 // trying to disable 203 debugOutput(DEBUG_LEVEL_VERBOSE,"disabling StreamProcessor %p at %u\n", 204 this, cycle); 205 m_is_disabled=true; 206 } 207 208 // the base timestamp is the one of the next sample in the buffer 209 m_data_buffer->getBufferHeadTimestamp(&ts_head, &fc); // thread safe 210 211 int64_t timestamp = ts_head; 204 212 205 213 // we send a packet some cycles in advance, to avoid the … … 212 220 // packet is sent, unless TRANSFER_DELAY > 3072. 213 221 // this means that we need at least one cycle of extra buffering. 214 until_next -= TICKS_PER_CYCLE * TRANSMIT_ADVANCE_CYCLES; 215 216 // we have to queue it cycle_diff * TICKS_PER_CYCLE earlier 217 until_next -= cycle_diff * TICKS_PER_CYCLE; 218 219 // the maximal difference we can allow (64secs) 220 const int64_t max=TICKS_PER_SECOND*64L; 221 222 #ifdef DEBUG 223 if(!m_is_disabled) { 224 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> TS=%11llu, CTR=%11llu, FC=%5d\n", 225 timestamp, cycle_timer, fc 226 ); 227 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " UTN=%11lld, UTN2=%11lld\n", 228 until_next, utn2 229 ); 230 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " CY_NOW=%04d, CY_TARGET=%04d, CY_DIFF=%04d\n", 231 now_cycles, cycle, cycle_diff 232 ); 233 } 234 #endif 235 236 if(until_next > max) { 237 // this means that cycle_timer has wrapped, but 238 // timestamp has not. we should unwrap cycle_timer 239 // by adding TICKS_PER_SECOND*128L, meaning that we should substract 240 // this value from until_next 241 until_next -= TICKS_PER_SECOND*128L; 242 } else if (until_next < -max) { 243 // this means that timestamp has wrapped, but 244 // cycle_timer has not. we should unwrap timestamp 245 // by adding TICKS_PER_SECOND*128L, meaning that we should add 246 // this value from until_next 247 until_next += TICKS_PER_SECOND*128L; 248 } 222 uint64_t ticks_to_advance = TICKS_PER_CYCLE * TRANSMIT_ADVANCE_CYCLES; 223 224 // if cycle lies cycle_diff cycles in the future, we should 225 // queue this packet cycle_diff * TICKS_PER_CYCLE earlier than 226 // we would if it were to be sent immediately. 227 ticks_to_advance += cycle_diff * TICKS_PER_CYCLE; 228 229 // determine the 'now' time in ticks 230 uint64_t cycle_timer=CYCLE_TIMER_TO_TICKS(ctr); 231 232 // time until the packet is to be sent (if > 0: send packet) 233 int64_t until_next=substractTicks(timestamp, cycle_timer + ticks_to_advance); 249 234 250 235 #ifdef DEBUG … … 253 238 timestamp, cycle_timer, fc 254 239 ); 255 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " UTN=%11lld , UTN2=%11lld\n",256 until_next , utn2240 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " UTN=%11lld\n", 241 until_next 257 242 ); 243 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " CY_NOW=%04d, CY_TARGET=%04d, CY_DIFF=%04d\n", 244 now_cycles, cycle, cycle_diff 245 ); 258 246 } 259 247 #endif 260 261 if (!m_disabled && m_is_disabled) { 262 // this means that we are trying to enable 263 if ((unsigned int)cycle == m_cycle_to_enable_at) { 264 m_is_disabled=false; 265 debugOutput(DEBUG_LEVEL_VERBOSE,"Enabling StreamProcessor %p at %u\n", this, cycle); 266 267 // initialize the buffer head & tail 268 uint64_t ts; 269 uint64_t fc; 270 271 debugOutput(DEBUG_LEVEL_VERBOSE,"Preparing to enable...\n"); 272 273 m_SyncSource->m_data_buffer->getBufferHeadTimestamp(&ts, &fc); // thread safe 274 275 // the number of cycles the sync source lags 276 // or leads (< 0) 277 int sync_lag_cycles=cycle-m_SyncSource->getLastCycle()-1; 278 if(sync_lag_cycles > (int)(CYCLES_PER_SECOND/2)) { 279 sync_lag_cycles -= CYCLES_PER_SECOND/2; 280 } 281 if (sync_lag_cycles < -((int)CYCLES_PER_SECOND/2)) { 282 sync_lag_cycles += CYCLES_PER_SECOND/2; 283 } 284 285 // recalculate the buffer head timestamp 286 float ticks_per_frame=m_SyncSource->getTicksPerFrame(); 287 288 // set buffer head timestamp 289 // this makes that the next sample to be sent out 290 // has the same timestamp as the last one received 291 // plus one frame 292 ts += (uint64_t)ticks_per_frame; 293 294 // account for the cycle lag between sync SP and this SP 295 ts += sync_lag_cycles * TICKS_PER_CYCLE; 296 297 if (ts >= TICKS_PER_SECOND * 128L) { 298 ts -= TICKS_PER_SECOND * 128L; 299 } 300 301 // m_data_buffer->setBufferHeadTimestamp(ts); 302 int64_t timestamp = ts; 303 304 // since we have frames_in_buffer frames in the buffer, 305 // we know that the buffer tail lies 306 // frames_in_buffer * rate 307 // later 308 int frames_in_buffer=m_data_buffer->getFrameCounter(); 309 timestamp += (int64_t)((float)frames_in_buffer * ticks_per_frame); 310 311 // this happens when the last timestamp is near wrapping, and 312 // m_framecounter is low. 313 // this means: m_last_timestamp is near wrapping and have just had 314 // a getPackets() from the client side. the projected next_period 315 // boundary lies beyond the wrap value. 316 // the action is to wrap the value. 317 if (timestamp >= TICKS_PER_SECOND * 128L) { 318 timestamp -= TICKS_PER_SECOND * 128L; 319 } 320 321 m_data_buffer->setBufferTailTimestamp(timestamp); 322 323 debugOutput(DEBUG_LEVEL_VERBOSE,"XMIT TS SET: TS=%10lld, TSTMP=%10llu, FC=%4d, %f\n", 324 ts, timestamp, frames_in_buffer, ticks_per_frame); 325 326 } else { 327 debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"will enable StreamProcessor %p at %u, now is %d\n", this, m_cycle_to_enable_at, cycle); 328 } 329 } else if (m_disabled && !m_is_disabled) { 330 // trying to disable 331 debugOutput(DEBUG_LEVEL_VERBOSE,"disabling StreamProcessor %p at %u\n", this, cycle); 332 m_is_disabled=true; 333 } 334 248 335 249 // don't process the stream when it is not enabled, not running 336 250 // or when the next sample is not due yet. … … 338 252 // we do have to generate (semi) valid packets 339 253 // that means that we'll send NODATA packets. 340 // we don't add payload because DICE devices don't like that.341 254 if((until_next>0) || m_is_disabled || !m_running) { 342 255 // no-data packets have syt=0xFFFF … … 345 258 packet->syt = 0xffff; 346 259 347 // the dbc is incremented even with no data packets 260 // FIXME: either make this a setting or choose 261 bool send_payload=true; 262 if(send_payload) { 263 // the dbc is incremented even with no data packets 264 m_dbc += m_syt_interval; 265 266 // this means no-data packets with payload (DICE doesn't like that) 267 *length = 2*sizeof(quadlet_t) + m_syt_interval * m_dimension * sizeof(quadlet_t); 268 } else { 269 // dbc is not incremented 270 271 // this means no-data packets without payload 272 *length = 2*sizeof(quadlet_t); 273 } 274 275 return RAW1394_ISO_DEFER; 276 } 277 278 // construct the packet 279 280 // add the transmit transfer delay to construct the playout time (=SYT timestamp) 281 uint64_t ts=addTicks(timestamp, TRANSMIT_TRANSFER_DELAY); 282 283 unsigned int nevents = m_syt_interval; 284 if (m_data_buffer->readFrames(nevents, (char *)(data + 8))) { 285 348 286 m_dbc += m_syt_interval; 349 350 // this means no-data packets with payload (DICE doesn't like that) 351 *length = 2*sizeof(quadlet_t) + m_syt_interval * m_dimension * sizeof(quadlet_t); 352 353 // this means no-data packets without payload 354 //*length = 2*sizeof(quadlet_t); 355 356 *tag = IEC61883_TAG_WITH_CIP; 357 *sy = 0; 358 359 return RAW1394_ISO_DEFER; 360 } 361 362 // construct the packet 363 nevents = m_syt_interval; 364 m_dbc += m_syt_interval; 365 366 *tag = IEC61883_TAG_WITH_CIP; 367 *sy = 0; 368 369 if (m_data_buffer->readFrames(nevents, (char *)(data + 8))) 370 { 287 288 packet->fdf = m_fdf; 289 290 // convert the timestamp to SYT format 291 uint16_t timestamp_SYT = TICKS_TO_SYT(ts); 292 packet->syt = ntohs(timestamp_SYT); 293 371 294 *length = nevents*sizeof(quadlet_t)*m_dimension + 8; 372 295 … … 377 300 } 378 301 379 packet->fdf = m_fdf; 380 381 // convert the timestamp to SYT format 382 uint64_t ts=timestamp + TRANSMIT_TRANSFER_DELAY; 383 384 // check if it wrapped 385 if (ts >= TICKS_PER_SECOND * 128L) { 386 ts -= TICKS_PER_SECOND * 128L; 387 } 388 389 unsigned int timestamp_SYT = TICKS_TO_SYT(ts); 390 packet->syt = ntohs(timestamp_SYT); 391 392 // update the frame counter such that it reflects the new value 393 // done in the SP base class 394 // if (!StreamProcessor::getFrames(nevents)) { 395 // debugError("Could not do StreamProcessor::getFrames(%d)\n",nevents); 396 // return RAW1394_ISO_ERROR; 397 // } 398 399 return RAW1394_ISO_OK; 400 } else { 401 /* there is no more data in the ringbuffer */ 402 // convert the timestamp to SYT format 403 uint64_t ts=timestamp + TRANSMIT_TRANSFER_DELAY; 404 405 // check if it wrapped 406 if (ts >= TICKS_PER_SECOND * 128L) { 407 ts -= TICKS_PER_SECOND * 128L; 408 } 409 302 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT: CY=%04u TS=%011llu TSS=%011llu\n", 303 cycle, timestamp, ts); 304 305 return RAW1394_ISO_OK; 306 307 } else { // there is no more data in the ringbuffer 308 309 // TODO: maybe we have to be a little smarter here 310 // because we have some slack on the device side (TRANSFER_DELAY) 311 // we can allow some skipped packets 410 312 debugWarning("Transmit buffer underrun (now %d, queue %d, target %d)\n", 411 313 now_cycles, cycle, TICKS_TO_CYCLES(ts)); 412 314 413 nevents=0;414 415 // TODO: we have to be a little smarter here416 // because we have some slack on the device side (TRANSFER_DELAY)417 // we can allow some skipped packets418 315 // signal underrun 419 316 m_xruns++; … … 426 323 // compose a no-data packet, we should always 427 324 // send a valid packet 428 packet->fdf = IEC61883_FDF_NODATA; 429 packet->syt = 0xffff; 430 431 // this means no-data packets with payload (DICE doesn't like that) 432 *length = 2*sizeof(quadlet_t) + m_syt_interval * m_dimension * sizeof(quadlet_t); 433 434 // this means no-data packets without payload 435 //*length = 2*sizeof(quadlet_t); 325 326 // FIXME: either make this a setting or choose 327 bool send_payload=true; 328 if(send_payload) { 329 // the dbc is incremented even with no data packets 330 m_dbc += m_syt_interval; 331 332 // this means no-data packets with payload (DICE doesn't like that) 333 *length = 2*sizeof(quadlet_t) + m_syt_interval * m_dimension * sizeof(quadlet_t); 334 } else { 335 // dbc is not incremented 336 337 // this means no-data packets without payload 338 *length = 2*sizeof(quadlet_t); 339 } 436 340 437 341 return RAW1394_ISO_DEFER; … … 462 366 463 367 bool AmdtpTransmitStreamProcessor::prefill() { 368 369 debugOutput( DEBUG_LEVEL_VERBOSE, "Prefill transmit buffers...\n"); 370 464 371 if(!transferSilence(m_ringbuffer_size_frames)) { 465 372 debugFatal("Could not prefill transmit stream\n"); 466 373 return false; 467 374 } 468 469 // when the buffer is prefilled, we should470 // also initialize the base timestamp471 // this base timestamp is the timestamp of the472 // last buffer transfer.473 // uint64_t ts;474 // uint64_t fc;475 // m_SyncSource->m_data_buffer->getBufferHeadTimestamp(&ts, &fc); // thread safe476 477 // update the frame counter such that it reflects the buffer content,478 // the buffer tail timestamp is initialized when the SP is enabled479 // done in the SP base class480 // if (!StreamProcessor::putFrames(m_ringbuffer_size_frames, ts)) {481 // debugError("Could not do StreamProcessor::putFrames(%d, %011llu)\n",482 // m_ringbuffer_size_frames,ts);483 // return false;484 // }485 375 486 376 return true; … … 678 568 return false; 679 569 } 680 681 // prefilling is done in ...() 682 // because at that point the streams are running, 683 // while here they are not. 684 685 // prefill the event buffer 686 // NOTE: do we need to prefill? reset() is called, so everything is prefilled then 687 // if (!prefill()) { 688 // debugFatal("Could not prefill buffers\n"); 689 // return false; 690 // } 691 570 692 571 debugOutput( DEBUG_LEVEL_VERBOSE, "Prepared for:\n"); 693 572 debugOutput( DEBUG_LEVEL_VERBOSE, " Samplerate: %d, FDF: %d, DBS: %d, SYT: %d\n", … … 752 631 m_PeriodStat.mark(m_data_buffer->getBufferFill()); 753 632 754 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "AmdtpTransmitStreamProcessor::putFrames(%d, %llu)\n",nbframes, ts); 755 633 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "AmdtpTransmitStreamProcessor::putFrames(%d, %llu)\n", nbframes, ts); 634 635 // The timestamp passed to this function is the time 636 // of the period transfer. This means that we have to 637 // add our max buffer size to it to ensure causality 638 // in all cases: 639 // we have to make sure that the buffer HEAD timestamp 640 // lies in the future for every possible buffer fill case. 641 float ticks_per_frame=m_SyncSource->getTicksPerFrame(); 642 ts = addTicks(ts,(uint32_t)((float)m_ringbuffer_size_frames * ticks_per_frame)); 643 644 // transfer the data 756 645 m_data_buffer->blockProcessWriteFrames(nbframes, ts); 757 758 // recalculate the buffer tail timestamp 759 float ticks_per_frame=m_SyncSource->getTicksPerFrame(); 760 761 // this makes that the last sample to be sent out on ISO 762 // has the same timestamp as the last one transfered 763 // to the client 764 // plus one frame 765 ts += (uint64_t)ticks_per_frame; 766 int64_t timestamp = ts; 767 768 // however we have to preserve causality, meaning that we have to make 769 // sure that the worst-case buffer head timestamp still lies in the future. 770 // this worst case timestamp occurs when the xmit buffer is completely full. 771 // therefore we add m_ringbuffer_size_frames * ticks_per_frame to the timestamp. 772 // this will make sure that the buffer head timestamp lies in the future. 773 // the netto effect of this is that the system works as if the buffer processing 774 // by the client doesn't take time. 775 776 timestamp += (int64_t)((float)m_ringbuffer_size_frames * ticks_per_frame); 777 778 // wrap the timestamp if nescessary 779 if (timestamp >= TICKS_PER_SECOND * 128L) { 780 timestamp -= TICKS_PER_SECOND * 128L; 781 } 782 783 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::putFrames(%d, %llu)\n",nbframes, timestamp); 784 785 // update the frame counter such that it reflects the new value, 786 // and also update the buffer tail timestamp 787 // done in the SP base class 788 // if (!StreamProcessor::putFrames(nbframes, timestamp)) { 789 // debugError("Could not do StreamProcessor::putFrames(%d, %llu)\n",nbframes, timestamp); 790 // return false; 791 // } 646 647 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " New timestamp: %llu\n", ts); 792 648 793 649 return true; … … 1050 906 m_running,m_disabled,m_is_disabled); 1051 907 1052 if((packet->fmt == 0x10) && (packet->fdf != 0xFF) && (packet->syt != 0xFFFF) && (packet->dbs>0) && (length>=2*sizeof(quadlet_t))) { 908 if((packet->syt != 0xFFFF) 909 && (packet->fdf != 0xFF) 910 && (packet->fmt == 0x10) 911 && (packet->dbs>0) 912 && (length>=2*sizeof(quadlet_t))) { 913 1053 914 unsigned int nevents=((length / sizeof (quadlet_t)) - 2)/packet->dbs; 1054 915 … … 1056 917 m_last_timestamp2=m_last_timestamp; 1057 918 1058 //=> convert the SYT to ticks 1059 unsigned int syt_timestamp=ntohs(packet->syt); 1060 1061 debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"ch%2u: CY=%4u, SYT=%08X (%4u cycles + %04u ticks), FC=%04d, %d\n", 1062 channel, cycle,syt_timestamp, 1063 CYCLE_TIMER_GET_CYCLES(syt_timestamp), CYCLE_TIMER_GET_OFFSET(syt_timestamp), 1064 m_data_buffer->getFrameCounter(), m_is_disabled); 1065 1066 // reconstruct the full cycle 1067 unsigned int cc=m_handler->getCycleTimer(); 1068 unsigned int cc_cycles=CYCLE_TIMER_GET_CYCLES(cc); 1069 unsigned int cc_seconds=CYCLE_TIMER_GET_SECS(cc); 1070 1071 // the cycletimer has wrapped since this packet was received 1072 // we want cc_seconds to reflect the 'seconds' at the point this 1073 // was received 1074 if (cycle>cc_cycles) { 1075 if (cc_seconds) { 1076 cc_seconds--; 1077 } else { 1078 // seconds has wrapped around, so we'd better not substract 1 1079 // the good value is 127 1080 cc_seconds=127; 1081 } 1082 } 1083 1084 // reconstruct the top part of the timestamp using the current cycle number 1085 unsigned int now_cycle_masked=cycle & 0xF; 1086 unsigned int syt_cycle=CYCLE_TIMER_GET_CYCLES(syt_timestamp); 1087 1088 // if this is true, wraparound has occurred, undo this wraparound 1089 if(syt_cycle<now_cycle_masked) syt_cycle += 0x10; 1090 1091 // this is the difference in cycles wrt the cycle the 1092 // timestamp was received 1093 unsigned int delta_cycles=syt_cycle-now_cycle_masked; 1094 1095 // reconstruct the cycle part of the timestamp 1096 unsigned int new_cycles=cycle + delta_cycles; 1097 1098 // if the cycles cause a wraparound of the cycle timer, 1099 // perform this wraparound 1100 // and convert the timestamp into ticks 1101 if(new_cycles<8000) { 1102 m_last_timestamp = new_cycles * TICKS_PER_CYCLE; 1103 } else { 1104 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, 1105 "Detected wraparound: %d + %d = %d\n", 1106 cycle,delta_cycles,new_cycles); 1107 1108 new_cycles-=8000; // wrap around 1109 m_last_timestamp = new_cycles * TICKS_PER_CYCLE; 1110 // add one second due to wraparound 1111 m_last_timestamp += TICKS_PER_SECOND; 1112 } 1113 1114 m_last_timestamp += CYCLE_TIMER_GET_OFFSET(syt_timestamp); 1115 m_last_timestamp += cc_seconds * TICKS_PER_SECOND; 919 //=> convert the SYT to a full timestamp in ticks 920 m_last_timestamp=sytToFullTicks((uint32_t)ntohs(packet->syt), 921 cycle, m_handler->getCycleTimer()); 922 923 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "RECV: CY=%04u TS=%011llu\n", 924 cycle, m_last_timestamp); 1116 925 1117 926 // we have to keep in mind that there are also 1118 // some packets buffered by the ISO layer 927 // some packets buffered by the ISO layer, 1119 928 // at most x=m_handler->getWakeupInterval() 1120 929 // these contain at most x*syt_interval … … 1130 939 1131 940 // the receive processing delay indicates how much 1132 // extra time we needas slack941 // extra time we use as slack 1133 942 m_last_timestamp += RECEIVE_PROCESSING_DELAY; 1134 943 1135 944 // wrap if nescessary 1136 if (m_last_timestamp >= TICKS_PER_SECOND * 128L) { 1137 m_last_timestamp -= TICKS_PER_SECOND * 128L; 1138 } 945 m_last_timestamp=wrapAtMaxTicks(m_last_timestamp); 1139 946 1140 947 //=> now estimate the device frame rate … … 1156 963 float f=measured_difference; 1157 964 float err = f / (1.0*m_syt_interval) - m_ticks_per_frame; 1158 1159 debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"SYT: %08X | STMP: %lluticks | DLL: in=%f, current=%f, err=%e\n",syt_timestamp, m_last_timestamp, f,m_ticks_per_frame,err); 1160 1161 #ifdef DEBUG 1162 // this helps to detect wraparound issues 1163 if(f > 1.5*((TICKS_PER_SECOND*1.0) / m_framerate)*m_syt_interval) { 1164 debugWarning("Timestamp diff more than 50%% of the nominal diff too large!\n"); 1165 debugWarning(" SYT: %08X | STMP: %llu,%llu | DLL: in=%f, current=%f, err=%e\n",syt_timestamp, m_last_timestamp, m_last_timestamp2, f,m_ticks_per_frame,err); 1166 debugWarning(" CC: %08X | CC_CY: %u | CC_SEC: %u | SYT_CY: %u | NEW_CY: %u\n", 1167 cc, cc_cycles, cc_seconds, syt_cycle,new_cycles); 1168 1169 } 1170 if(f < 0.5*((TICKS_PER_SECOND*1.0) / m_framerate)*m_syt_interval) { 1171 debugWarning("Timestamp diff more than 50%% of the nominal diff too small!\n"); 1172 debugWarning(" SYT: %08X | STMP: %llu,%llu | DLL: in=%f, current=%f, err=%e\n",syt_timestamp, m_last_timestamp, m_last_timestamp2, f,m_ticks_per_frame,err); 1173 } 1174 #endif 965 1175 966 // integrate the error 1176 967 m_ticks_per_frame += RECEIVE_DLL_INTEGRATION_COEFFICIENT*err; 1177 968 1178 969 } 1179 1180 debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"R-SYT for cycle (%2d %2d)=>%2d: %5uT (%04uC + %04uT) %04X %04X %d\n",1181 cycle,now_cycle_masked,delta_cycles,1182 (m_last_timestamp),1183 TICKS_TO_CYCLES(m_last_timestamp),1184 TICKS_TO_OFFSET(m_last_timestamp),1185 ntohs(packet->syt),TICKS_TO_CYCLE_TIMER(m_last_timestamp)&0xFFFF, dropped1186 );1187 970 1188 971 //=> signal that we're running (if we are) … … 1200 983 // the previous timestamp is the one we need to start with 1201 984 // because we're going to update the buffer again this loop 985 // using writeframes 1202 986 m_data_buffer->setBufferTailTimestamp(m_last_timestamp2); 1203 987 1204 988 } else { 1205 debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"will enable StreamProcessor %p at %u, now is %d\n", this, m_cycle_to_enable_at, cycle); 989 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, 990 "will enable StreamProcessor %p at %u, now is %d\n", 991 this, m_cycle_to_enable_at, cycle); 1206 992 } 1207 993 } else if (m_disabled && !m_is_disabled) { … … 1223 1009 1224 1010 // wrap if nescessary 1225 if (ts >= TICKS_PER_SECOND * 128L) {1226 ts -= TICKS_PER_SECOND * 128L; 1227 }1228 // set the timestamps1011 ts=wrapAtMaxTicks(ts); 1012 1013 // set the timestamp as if there will be a sample put into 1014 // the buffer by the next packet. 1229 1015 m_data_buffer->setBufferTailTimestamp(ts); 1230 1016 … … 1257 1043 1258 1044 retval=RAW1394_ISO_DEFER; 1259 1260 1045 } 1261 1262 #ifdef DEBUG 1263 if(packet->dbs) { 1264 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, 1265 "RCV %04d: CH = %d, FDF = %X. SYT = %6d, DBS = %3d, DBC = %3d, FMT = %3d, LEN = %4d (%2d)\n", 1266 cycle, channel, packet->fdf, 1267 packet->syt, 1268 packet->dbs, 1269 packet->dbc, 1270 packet->fmt, 1271 length, 1272 ((length / sizeof (quadlet_t)) - 2)/packet->dbs); 1273 } 1274 #endif 1275 1276 // update the frame counter such that it reflects the new value, 1277 // and also update the buffer tail timestamp, as we add new frames 1278 // done in the SP base class 1279 // if (!StreamProcessor::putFrames(nevents, m_last_timestamp)) { 1280 // debugError("Could not do StreamProcessor::putFrames(%d, %llu)\n",nevents, m_last_timestamp); 1281 // return RAW1394_ISO_ERROR; 1282 // } 1283 1284 } 1285 1046 } 1047 1286 1048 return retval; 1287 1049 } … … 1292 1054 uint64_t cycle_timer=m_handler->getCycleTimerTicks(); 1293 1055 1294 int64_t until_next=time_at_period-cycle_timer; 1295 1296 // the maximal difference we can allow (64secs) 1297 const int64_t max=TICKS_PER_SECOND*64L; 1056 // calculate the time until the next period 1057 int64_t until_next=substractTicks(time_at_period,cycle_timer); 1298 1058 1299 1059 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> TAP=%11llu, CTR=%11llu, UTN=%11lld, TPUS=%f\n", 1300 time_at_period, cycle_timer, until_next, m_handler->getTicksPerUsec()1301 );1302 1303 if(until_next > max) {1304 // this means that cycle_timer has wrapped, but1305 // time_at_period has not. we should unwrap cycle_timer1306 // by adding TICKS_PER_SECOND*128L, meaning that we should substract1307 // this value from until_next1308 until_next -= TICKS_PER_SECOND*128L;1309 } else if (until_next < -max) {1310 // this means that time_at_period has wrapped, but1311 // cycle_timer has not. we should unwrap time_at_period1312 // by adding TICKS_PER_SECOND*128L, meaning that we should add1313 // this value from until_next1314 until_next += TICKS_PER_SECOND*128L;1315 }1316 1317 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " TAP=%11llu, CTR=%11llu, UTN=%11lld, TPUS=%f\n",1318 1060 time_at_period, cycle_timer, until_next, m_handler->getTicksPerUsec() 1319 1061 ); … … 1375 1117 next_period_boundary, m_last_timestamp, fc, m_ticks_per_frame 1376 1118 ); 1377 1378 // this happens if the timestamp wraps around while there are a lot of 1119 1120 // next_period_boundary too large 1121 // happens if the timestamp wraps around while there are a lot of 1379 1122 // frames in the buffer. We should add the wraparound value of the ticks 1380 1123 // counter 1381 if (next_period_boundary < 0) { 1382 next_period_boundary += TICKS_PER_SECOND * 128L; 1383 } 1384 // this happens when the last timestamp is near wrapping, and 1124 1125 // next_period_boundary too small 1126 // happens when the last timestamp is near wrapping, and 1385 1127 // m_framecounter is low. 1386 1128 // this means: m_last_timestamp is near wrapping and have just had 1387 1129 // a getPackets() from the client side. the projected next_period 1388 1130 // boundary lies beyond the wrap value. 1131 1389 1132 // the action is to wrap the value. 1390 else if (next_period_boundary >= TICKS_PER_SECOND * 128L) { 1391 next_period_boundary -= TICKS_PER_SECOND * 128L; 1392 } 1393 1394 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " NPD=%11lld, LTS=%11llu, FC=%5d, TPF=%f\n", 1395 next_period_boundary, m_last_timestamp, fc, m_ticks_per_frame 1396 ); 1397 1133 next_period_boundary=wrapAtMinMaxTicks(next_period_boundary); 1134 1398 1135 return next_period_boundary; 1399 1136 } … … 1611 1348 m_data_buffer->blockProcessReadFrames(nbframes); 1612 1349 1613 // update the frame counter such that it reflects the new value,1614 // done in the SP base class1615 /*1616 if (!StreamProcessor::getFrames(nbframes)) {1617 debugError("Could not do StreamProcessor::getFrames(%d)\n", nbframes);1618 return false;1619 }*/1620 1621 1350 return true; 1622 1351 } branches/streaming-rework/src/libstreaming/cycletimer.h
r385 r393 31 31 #ifndef __CYCLETIMER_H__ 32 32 #define __CYCLETIMER_H__ 33 34 #include <inttypes.h> 35 #include "debugmodule/debugmodule.h" 33 36 34 37 #define CSR_CYCLE_TIME 0x200 … … 69 72 #define CYCLE_TIMER_WRAP_TICKS(x) ((x % TICKS_PER_SECOND)) 70 73 74 DECLARE_GLOBAL_DEBUG_MODULE; 75 76 /** 77 * @brief Converts a SYT timestamp to a full timestamp in ticks. 78 * 79 * 80 * 81 * @param syt_timestamp The SYT timestamp as present in the packet 82 * @param rcv_cycle The cycle this timestamp was received on 83 * @param ctr_now The current value of the cycle timer ('now') 84 * @return 85 */ 86 static inline uint32_t sytToFullTicks(uint32_t syt_timestamp, unsigned int rcv_cycle, uint32_t ctr_now) { 87 uint32_t timestamp; 88 89 // reconstruct the full cycle 90 uint32_t cc_cycles=CYCLE_TIMER_GET_CYCLES(ctr_now); 91 uint32_t cc_seconds=CYCLE_TIMER_GET_SECS(ctr_now); 92 93 // the cycletimer has wrapped since this packet was received 94 // we want cc_seconds to reflect the 'seconds' at the point this 95 // was received 96 if (rcv_cycle>cc_cycles) { 97 if (cc_seconds) { 98 cc_seconds--; 99 } else { 100 // seconds has wrapped around, so we'd better not substract 1 101 // the good value is 127 102 cc_seconds=127; 103 } 104 } 105 106 // reconstruct the top part of the timestamp using the current cycle number 107 uint32_t rcv_cycle_masked=rcv_cycle & 0xF; 108 uint32_t syt_cycle=CYCLE_TIMER_GET_CYCLES(syt_timestamp); 109 110 // if this is true, wraparound has occurred, undo this wraparound 111 if(syt_cycle<rcv_cycle_masked) syt_cycle += 0x10; 112 113 // this is the difference in cycles wrt the cycle the 114 // timestamp was received 115 uint32_t delta_cycles=syt_cycle-rcv_cycle_masked; 116 117 // reconstruct the cycle part of the timestamp 118 uint32_t new_cycles=rcv_cycle + delta_cycles; 119 120 // if the cycles cause a wraparound of the cycle timer, 121 // perform this wraparound 122 // and convert the timestamp into ticks 123 if(new_cycles<8000) { 124 timestamp = new_cycles * TICKS_PER_CYCLE; 125 } else { 126 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, 127 "Detected wraparound: %d + %d = %d\n", 128 rcv_cycle,delta_cycles,new_cycles); 129 130 new_cycles-=8000; // wrap around 131 #ifdef DEBUG 132 if (new_cycles >= 8000) { 133 debugWarning("insufficient unwrapping\n"); 134 } 135 #endif 136 timestamp = new_cycles * TICKS_PER_CYCLE; 137 // add one second due to wraparound 138 timestamp += TICKS_PER_SECOND; 139 } 140 141 timestamp += CYCLE_TIMER_GET_OFFSET(syt_timestamp); 142 timestamp += cc_seconds * TICKS_PER_SECOND; 143 return timestamp; 144 } 145 146 /** 147 * @brief Wraps x to the maximum number of ticks 148 * 149 * The input value is wrapped to the maximum value of the cycle 150 * timer, in ticks (128sec * 24576000 ticks/sec). 151 * 152 * @param x time to wrap 153 * @return wrapped time 154 */ 155 static inline uint32_t wrapAtMaxTicks(uint64_t x) { 156 if (x >= TICKS_PER_SECOND * 128L) { 157 x -= TICKS_PER_SECOND * 128L; 158 } 159 160 #ifdef DEBUG 161 if (x >= TICKS_PER_SECOND * 128L) { 162 debugWarning("insufficient wrapping: %llu\n",x); 163 } 164 #endif 165 166 return x; 167 } 168 169 /** 170 * @brief Wraps both at minimum and maximum value for ticks 171 * @param x value to wrap 172 * @return wrapped value 173 */ 174 static inline uint32_t wrapAtMinMaxTicks(int64_t x) { 175 176 if (x < 0) { 177 x += TICKS_PER_SECOND * 128L; 178 } else if (x >= TICKS_PER_SECOND * 128L) { 179 x -= TICKS_PER_SECOND * 128L; 180 } 181 182 #ifdef DEBUG 183 if (x >= TICKS_PER_SECOND * 128L) { 184 debugWarning("insufficient wrapping (max): %llu\n",x); 185 } 186 if (x < 0) { 187 debugWarning("insufficient wrapping (min): %lld\n",x); 188 } 189 #endif 190 return x; 191 192 } 193 194 /** 195 * @brief Computes a difference between timestamps 196 * 197 * This function computes a difference between timestamps 198 * such that it respects wrapping. 199 * 200 * If x wraps around, but y doesn't, the result of x-y is 201 * negative and very large. However the real difference is 202 * not large. It can be calculated by unwrapping x and then 203 * calculating x-y. 204 * 205 * @param x First timestamp 206 * @param y Second timestamp 207 * @return the difference x-y, unwrapped 208 */ 209 static inline int32_t substractTicks(uint32_t x, uint32_t y) { 210 int64_t diff=(int64_t)x - (int64_t)y; 211 212 // the maximal difference we allow (64secs) 213 const int64_t max=TICKS_PER_SECOND*64L; 214 215 if(diff > max) { 216 // this means that y has wrapped, but 217 // x has not. we should unwrap y 218 // by adding TICKS_PER_SECOND*128L, meaning that we should substract 219 // this value from diff 220 diff -= TICKS_PER_SECOND*128L; 221 } else if (diff < -max) { 222 // this means that x has wrapped, but 223 // y has not. we should unwrap x 224 // by adding TICKS_PER_SECOND*128L, meaning that we should add 225 // this value to diff 226 diff += TICKS_PER_SECOND*128L; 227 } 228 229 return (int32_t)diff; 230 231 } 232 233 /** 234 * @brief Computes a sum of timestamps 235 * 236 * This function computes a sum of timestamps in ticks, 237 * wrapping the result if nescessary. 238 * 239 * @param x First timestamp 240 * @param y Second timestamp 241 * @return the sum x+y, wrapped 242 */ 243 static inline uint32_t addTicks(uint32_t x, uint32_t y) { 244 uint64_t sum=x+y; 245 246 return wrapAtMaxTicks(sum); 247 } 248 249 /** 250 * @brief Computes a difference between cycles 251 * 252 * This function computes a difference between cycles 253 * such that it respects wrapping (at 8000 cycles). 254 * 255 * See substractTicks 256 * 257 * @param x First cycle value 258 * @param y Second cycle value 259 * @return the difference x-y, unwrapped 260 */ 261 static inline int substractCycles(unsigned int x, unsigned int y) { 262 int diff = x - y; 263 264 // the maximal difference we allow (64secs) 265 const int max=CYCLES_PER_SECOND/2; 266 267 if(diff > max) { 268 diff -= CYCLES_PER_SECOND; 269 } else if (diff < -max) { 270 diff += CYCLES_PER_SECOND; 271 } 272 273 return diff; 274 } 275 71 276 #endif // __CYCLETIMER_H__ branches/streaming-rework/src/libutil/TimestampedBuffer.cpp
r392 r393 524 524 void TimestampedBuffer::setBufferTailTimestamp(uint64_t new_timestamp) { 525 525 526 #ifdef DEBUG 527 if (new_timestamp >= 128L*TICKS_PER_SECOND) { 528 debugWarning("timestamp not wrapped: %llu\n",new_timestamp); 529 } 530 #endif 531 526 532 pthread_mutex_lock(&m_framecounter_lock); 527 533 … … 533 539 pthread_mutex_unlock(&m_framecounter_lock); 534 540 535 debugOutput(DEBUG_LEVEL_VER BOSE, "Set buffer tail timestamp for (%p) to %11llu, NTS=%llu, DLL2=%f\n",541 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Set buffer tail timestamp for (%p) to %11llu, NTS=%llu, DLL2=%f\n", 536 542 this, new_timestamp, m_buffer_next_tail_timestamp, m_dll_e2); 537 543 … … 549 555 */ 550 556 void TimestampedBuffer::getBufferHeadTimestamp(uint64_t *ts, uint64_t *fc) { 551 double rate=(double)m_buffer_next_tail_timestamp - (double)m_buffer_tail_timestamp; 552 rate /= (double)m_update_period; 557 int64_t diff=m_buffer_next_tail_timestamp - m_buffer_tail_timestamp; 558 if (diff < 0) diff += m_wrap_at; 559 560 double rate=(double)diff / (double)m_update_period; 553 561 554 562 int64_t timestamp; … … 560 568 // (m_buffer_next_tail_timestamp - m_buffer_tail_timestamp)/(samples_between_updates)*(x) 561 569 562 // buffer head is the frame that is framecounter -1away from the tail570 // buffer head is the frame that is framecounter away from the tail 563 571 timestamp=(int64_t)m_buffer_tail_timestamp - (int64_t)((m_framecounter) * rate); 564 572 565 573 pthread_mutex_unlock(&m_framecounter_lock); 566 574 567 debugOutput(DEBUG_LEVEL_VERBOSE, "(%p): HTS = %011lld, TTS=%011llu, FC=%05u, RATE=%f\n", 568 this, timestamp, m_buffer_tail_timestamp, *fc, rate); 575 #ifdef DEBUG 576 if(timestamp || m_buffer_tail_timestamp || *fc) { 577 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p): HTS = %011lld, TTS=%011llu, FC=%05u\n", 578 this, timestamp, m_buffer_tail_timestamp, *fc, rate); 579 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p): rate = %f\n", this, rate); 580 } 581 #endif 569 582 570 583 if(timestamp >= (int64_t)m_wrap_at) { … … 576 589 *ts=timestamp; 577 590 578 debugOutput(DEBUG_LEVEL_VERBOSE, " HTS = %011lld, FC=%05u\n", 579 *ts, *fc); 591 #ifdef DEBUG 592 if(timestamp || m_buffer_tail_timestamp || *fc) { 593 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p): HTS = %011lld, FC=%05u\n", 594 this, *ts, *fc); 595 } 596 #endif 580 597 581 598 } … … 593 610 void TimestampedBuffer::getBufferTailTimestamp(uint64_t *ts, uint64_t *fc) { 594 611 pthread_mutex_lock(&m_framecounter_lock); 595 *fc = m_framecounter;612 *fc = (uint64_t)m_framecounter; 596 613 *ts = m_buffer_tail_timestamp; 597 614 pthread_mutex_unlock(&m_framecounter_lock); … … 627 644 */ 628 645 void TimestampedBuffer::incrementFrameCounter(int nbframes, uint64_t new_timestamp) { 629 debugOutput(DEBUG_LEVEL_VER BOSE, "Setting buffer tail timestamp for (%p) to %11llu\n",646 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer tail timestamp for (%p) to %11llu\n", 630 647 this, new_timestamp); 648 649 #ifdef DEBUG 650 if (new_timestamp >= 128L*TICKS_PER_SECOND) { 651 debugWarning("timestamp not wrapped: %llu\n",new_timestamp); 652 } 653 #endif 631 654 632 655 pthread_mutex_lock(&m_framecounter_lock); … … 662 685 m_buffer_next_tail_timestamp += (uint64_t)(m_dll_b * err + m_dll_e2); 663 686 687 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "U: FC=%10u, TS=%011llu, NTS=%011llu\n", 688 m_framecounter, m_buffer_tail_timestamp, m_buffer_next_tail_timestamp); 689 664 690 if (m_buffer_next_tail_timestamp >= m_wrap_at) { 665 691 m_buffer_next_tail_timestamp -= m_wrap_at; … … 671 697 m_dll_e2 += m_dll_c*err; 672 698 673 debugOutput(DEBUG_LEVEL_VER BOSE, "A: TS=%011llu, NTS=%011llu, DLLe2=%f\n",699 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "A: TS=%011llu, NTS=%011llu, DLLe2=%f\n", 674 700 m_buffer_tail_timestamp, m_buffer_next_tail_timestamp, m_dll_e2); 675 701 … … 706 732 debugOutputShort( DEBUG_LEVEL_NORMAL, " Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp); 707 733 debugOutputShort( DEBUG_LEVEL_NORMAL, " Head - Tail : %011lld\n",diff); 708 debugOutputShort( DEBUG_LEVEL_NORMAL, " rate : % lf (%f)\n",m_dll_e2,m_dll_e2/m_update_period);734 debugOutputShort( DEBUG_LEVEL_NORMAL, " rate : %f (%f)\n",m_dll_e2,m_dll_e2/m_update_period); 709 735 } 710 736 branches/streaming-rework/tests/test-cycletimer.cpp
r384 r393 158 158 } 159 159 160 int32_t subs; 161 subs=substractTicks(10, 8); 162 if (subs != 2) { 163 debugOutput(DEBUG_LEVEL_NORMAL, " substractTicks(10, 8) != 2 : %ld\n", 164 subs); 165 failures++; 166 } 167 168 subs=substractTicks(10, 12); 169 if (subs != -2) { 170 debugOutput(DEBUG_LEVEL_NORMAL, " substractTicks(10, 12) != -2 : %ld\n", 171 subs); 172 failures++; 173 } 174 175 subs=substractTicks(TICKS_PER_SECOND*128L + 10, 8); 176 if (subs != 2) { 177 debugOutput(DEBUG_LEVEL_NORMAL, " substractTicks(TICKS_PER_SECOND*128L + 10, 8) != 2 : %ld\n", 178 subs); 179 failures++; 180 } 181 182 subs=substractTicks(TICKS_PER_SECOND*128L + 10, 12); 183 if (subs != -2) { 184 debugOutput(DEBUG_LEVEL_NORMAL, " substractTicks(TICKS_PER_SECOND*128L + 10, 12) != -2 : %ld\n", 185 subs); 186 failures++; 187 } 188 189 subs=substractTicks(10, TICKS_PER_SECOND*128L + 8); 190 if (subs != 2) { 191 debugOutput(DEBUG_LEVEL_NORMAL, " substractTicks(10, TICKS_PER_SECOND*128L + 8) != 2 : %ld\n", 192 subs); 193 failures++; 194 } 195 196 subs=substractTicks(10, TICKS_PER_SECOND*128L + 12); 197 if (subs != -2) { 198 debugOutput(DEBUG_LEVEL_NORMAL, " substractTicks(10, TICKS_PER_SECOND*128L + 12) != -2 : %ld\n", 199 subs); 200 failures++; 201 } 202 160 203 if (failures) { 161 204 debugOutput(DEBUG_LEVEL_NORMAL, " %d failures\n",failures); … … 173 216 174 217 IsoHandlerManager *m_isoManager=NULL; 175 PosixThread * m_isoManagerThread=NULL; 176 218 177 219 #ifdef TEST_PORT_0 178 220 IsoStream *s=NULL; … … 208 250 209 251 m_isoManager->setVerboseLevel(DEBUG_LEVEL_VERBOSE); 252 253 if(!m_isoManager->init()) { 254 debugOutput(DEBUG_LEVEL_NORMAL, "Could not init() IsoHandlerManager\n"); 255 goto finish; 256 } 210 257 211 // the thread to iterate the manager 212 m_isoManagerThread=new PosixThread( 213 m_isoManager, 214 true, 80, 215 PTHREAD_CANCEL_DEFERRED); 216 217 if(!m_isoManagerThread) { 218 debugOutput(DEBUG_LEVEL_NORMAL, "Could not create iso manager thread\n"); 219 goto finish; 220 } 221 258 222 259 #ifdef TEST_PORT_0 223 260 // add a stream to the manager so that it has something to do … … 298 335 } 299 336 300 debugOutput(DEBUG_LEVEL_NORMAL, "Starting ISO manager sync update thread...\n");301 // start the runner thread302 m_isoManagerThread->Start();303 304 337 debugOutput(DEBUG_LEVEL_NORMAL, "Starting IsoHandler...\n"); 305 338 if (!m_isoManager->startHandlers(0)) { … … 319 352 } 320 353 321 // stop the sync thread322 debugOutput(DEBUG_LEVEL_NORMAL, "Stopping ISO manager sync update thread...\n");323 m_isoManagerThread->Stop();324 325 354 #ifdef TEST_PORT_0 326 355 if(!m_isoManager->unregisterStream(s)) { … … 347 376 #endif 348 377 349 delete m_isoManagerThread;350 378 delete m_isoManager; 351 379 branches/streaming-rework/tests/test-sytmonitor.cpp
r392 r393 170 170 171 171 IsoHandlerManager *m_isoManager=NULL; 172 PosixThread * m_isoManagerThread=NULL;173 172 174 173 SytMonitor *monitors[128]; … … 212 211 m_isoManager->setVerboseLevel(DEBUG_LEVEL_VERBOSE); 213 212 214 // the thread to execute the manager 215 m_isoManagerThread=new PosixThread( 216 m_isoManager, 217 run_realtime, realtime_prio, 218 PTHREAD_CANCEL_DEFERRED); 219 220 if(!m_isoManagerThread) { 221 debugOutput(DEBUG_LEVEL_NORMAL, "Could not create iso manager thread\n"); 213 if(!m_isoManager->init()) { 214 debugOutput(DEBUG_LEVEL_NORMAL, "Could not init() IsoHandlerManager\n"); 222 215 goto finish; 223 216 } 224 217 225 218 // register monitors 226 219 for (i=0;i<arguments.nb_combos;i++) { … … 270 263 goto finish; 271 264 } 272 273 // start the runner thread274 m_isoManagerThread->Start();275 265 276 266 if (arguments.realtime) { … … 458 448 } 459 449 460 // stop the sync thread461 debugOutput(DEBUG_LEVEL_NORMAL, "Stopping ISO manager sync update thread...\n");462 m_isoManagerThread->Stop();463 464 450 // unregister monitors 465 451 for (i=0;i<arguments.nb_combos;i++) { … … 473 459 } 474 460 475 delete m_isoManagerThread;476 461 delete m_isoManager; 477 462 branches/streaming-rework/tests/test-timestampedbuffer.cpp
r392 r393 73 73 uint64_t total_cycles; 74 74 uint64_t buffersize; 75 uint64_t start_at_cycle; 75 76 }; 76 77 … … 84 85 {"cycles", 'c', "n", 0, "Total cycles to run (2000)" }, 85 86 {"buffersize", 'b', "n", 0, "Buffer size (in frames) (1024)" }, 87 {"startcycle", 's', "n", 0, "Start at cycle (0)" }, 86 88 { 0 } 87 89 }; … … 101 103 case 'v': 102 104 if (arg) { 103 arguments->verbose = strtol ( arg, &tail, 0 );105 arguments->verbose = strtoll( arg, &tail, 0 ); 104 106 if ( errno ) { 105 107 fprintf( stderr, "Could not parse 'verbose' argument\n" ); … … 115 117 case 'w': 116 118 if (arg) { 117 arguments->wrap_at = strtol ( arg, &tail, 0 );119 arguments->wrap_at = strtoll( arg, &tail, 0 ); 118 120 if ( errno ) { 119 121 fprintf( stderr, "Could not parse 'wrap' argument\n" ); … … 129 131 case 'f': 130 132 if (arg) { 131 arguments->frames_per_packet = strtol ( arg, &tail, 0 );133 arguments->frames_per_packet = strtoll( arg, &tail, 0 ); 132 134 if ( errno ) { 133 135 fprintf( stderr, "Could not parse 'fpp' argument\n" ); … … 143 145 case 'e': 144 146 if (arg) { 145 arguments->events_per_frame = strtol ( arg, &tail, 0 );147 arguments->events_per_frame = strtoll( arg, &tail, 0 ); 146 148 if ( errno ) { 147 149 fprintf( stderr, "Could not parse 'epf' argument\n" ); … … 157 159 case 'c': 158 160 if (arg) { 159 arguments->total_cycles = strtol ( arg, &tail, 0 );161 arguments->total_cycles = strtoll( arg, &tail, 0 ); 160 162 if ( errno ) { 161 163 fprintf( stderr, "Could not parse 'cycles' argument\n" ); … … 169 171 } 170 172 break; 173 case 's': 174 if (arg) { 175 arguments->start_at_cycle = strtoll( arg, &tail, 0 ); 176 if ( errno ) { 177 fprintf( stderr, "Could not parse 'startcycle' argument\n" ); 178 return ARGP_ERR_UNKNOWN; 179 } 180 } else { 181 if ( errno ) { 182 fprintf( stderr, "Could not parse 'startcycle' argument\n" ); 183 return ARGP_ERR_UNKNOWN; 184 } 185 } 186 break; 171 187 case 'b': 172 188 if (arg) { 173 arguments->buffersize = strtol ( arg, &tail, 0 );189 arguments->buffersize = strtoll( arg, &tail, 0 ); 174 190 if ( errno ) { 175 191 fprintf( stderr, "Could not parse 'buffersize' argument\n" ); … … 228 244 arguments.total_cycles = 2000; 229 245 arguments.buffersize = 1024; 246 arguments.start_at_cycle = 0; 230 247 231 248 // Parse our arguments; every option seen by `parse_opt' will … … 275 292 276 293 usleep(1000); 294 295 debugOutput(DEBUG_LEVEL_NORMAL, "Start test 1...\n"); 277 296 278 297 int dummyframe_in[arguments.events_per_frame*arguments.frames_per_packet]; … … 283 302 } 284 303 285 uint64_t timestamp=0; 304 uint64_t time=arguments.start_at_cycle*3072; 305 306 // initialize the timestamp 307 uint64_t timestamp=time; 308 if (timestamp >= arguments.wrap_at) { 309 // here we need a modulo because start_at_cycle can be large 310 timestamp %= arguments.wrap_at; 311 } 286 312 t->setBufferTailTimestamp(timestamp); 313 287 314 timestamp += (uint64_t)(arguments.rate * arguments.frames_per_packet); 288 289 uint64_t time=0; 315 if (timestamp >= arguments.wrap_at) { 316 timestamp -= arguments.wrap_at; 317 } 290 318 291 for(unsigned int cycle=0;cycle < arguments.total_cycles; cycle++) { 319 for(unsigned int cycle=arguments.start_at_cycle; 320 cycle < arguments.start_at_cycle+arguments.total_cycles; 321 cycle++) { 322 292 323 // simulate the rate adaptation 293 int64_t diff= time-timestamp;324 int64_t diff=(time%arguments.wrap_at)-timestamp; 294 325 295 326 if (diff>(int64_t)arguments.wrap_at/2) { … … 302 333 303 334 if(diff>0) { 335 uint64_t ts_head, fc_head; 336 uint64_t ts_tail, fc_tail; 337 304 338 // write one packet 305 339 t->writeFrames(arguments.frames_per_packet, (char *)&dummyframe_in, timestamp); 306 340 341 // read the buffer head timestamp 342 t->getBufferHeadTimestamp(&ts_head, &fc_head); 343 t->getBufferTailTimestamp(&ts_tail, &fc_tail); 344 debugOutput(DEBUG_LEVEL_NORMAL, 345 " TS after write: HEAD: %011llu, FC=%04u\n", 346 ts_head,fc_head); 347 debugOutput(DEBUG_LEVEL_NORMAL, 348 " TAIL: %011llu, FC=%04u\n", 349 ts_tail,fc_tail); 350 307 351 // read one packet 308 352 t->readFrames(arguments.frames_per_packet, (char *)&dummyframe_out); 309 353 354 // read the buffer head timestamp 355 t->getBufferHeadTimestamp(&ts_head, &fc_head); 356 t->getBufferTailTimestamp(&ts_tail, &fc_tail); 357 debugOutput(DEBUG_LEVEL_NORMAL, 358 " TS after write: HEAD: %011llu, FC=%04u\n", 359 ts_head,fc_head); 360 debugOutput(DEBUG_LEVEL_NORMAL, 361 " TAIL: %011llu, FC=%04u\n", 362 ts_tail,fc_tail); 363 310 364 // check 311 365 bool pass=true; … … 315 369 if (!pass) { 316 370 debugOutput(DEBUG_LEVEL_NORMAL, "write/read check for cycle %d failed\n",cycle); 371 } 372 373 // update the timestamp 374 timestamp += (uint64_t)(arguments.rate * arguments.frames_per_packet); 375 if (timestamp >= arguments.wrap_at) { 376 timestamp -= arguments.wrap_at; 377 } 378 } 379 380 // simulate the cycle timer clock in ticks 381 time += 3072; 382 if (time >= arguments.wrap_at) { 383 time -= arguments.wrap_at; 384 } 385 386 // allow for the messagebuffer thread to catch up 387 usleep(200); 388 389 if(!run) break; 390 } 391 392 // second run, now do block processing 393 debugOutput(DEBUG_LEVEL_NORMAL, "Start test 2...\n"); 394 unsigned int blocksize=32; 395 int dummyframe_out_block[arguments.events_per_frame*arguments.frames_per_packet*blocksize]; 396 397 time=arguments.start_at_cycle*3072; 398 399 // initialize the timestamp 400 timestamp=time; 401 if (timestamp >= arguments.wrap_at) { 402 // here we need a modulo because start_at_cycle can be large 403 timestamp %= arguments.wrap_at; 404 } 405 t->setBufferTailTimestamp(timestamp); 406 407 timestamp += (uint64_t)(arguments.rate * arguments.frames_per_packet); 408 if (timestamp >= arguments.wrap_at) { 409 timestamp -= arguments.wrap_at; 410 } 411 412 for(unsigned int cycle=arguments.start_at_cycle; 413 cycle < arguments.start_at_cycle+arguments.total_cycles; 414 cycle++) { 415 416 // simulate the rate adaptation 417 int64_t diff=(time%arguments.wrap_at)-timestamp; 418 419 if (diff>(int64_t)arguments.wrap_at/2) { 420 diff -= arguments.wrap_at; 421 } else if (diff<(-(int64_t)arguments.wrap_at)/2){ 422 diff += arguments.wrap_at; 423 } 424 425 debugOutput(DEBUG_LEVEL_NORMAL, "Simulating cycle %d @ time=%011llu, diff=%lld\n",cycle,time,diff); 426 427 if(diff>0) { 428 uint64_t ts_head, fc_head; 429 uint64_t ts_tail, fc_tail; 430 431 // write one packet 432 t->writeFrames(arguments.frames_per_packet, (char *)&dummyframe_in, timestamp); 433 434 // read the buffer head timestamp 435 t->getBufferHeadTimestamp(&ts_head, &fc_head); 436 t->getBufferTailTimestamp(&ts_tail, &fc_tail); 437 debugOutput(DEBUG_LEVEL_NORMAL, 438 " TS after write: HEAD: %011llu, FC=%04u\n", 439 ts_head,fc_head); 440 debugOutput(DEBUG_LEVEL_NORMAL, 441 " TAIL: %011llu, FC=%04u\n", 442 ts_tail,fc_tail); 443 444 if (fc_head > blocksize) { 445 debugOutput(DEBUG_LEVEL_NORMAL,"Reading one block (%u frames)\n",blocksize); 446 447 // read one block 448 t->readFrames(blocksize, (char *)&dummyframe_out_block); 449 450 // read the buffer head timestamp 451 t->getBufferHeadTimestamp(&ts_head, &fc_head); 452 t->getBufferTailTimestamp(&ts_tail, &fc_tail); 453 debugOutput(DEBUG_LEVEL_NORMAL, 454 " TS after read: HEAD: %011llu, FC=%04u\n", 455 ts_head,fc_head); 456 debugOutput(DEBUG_LEVEL_NORMAL, 457 " TAIL: %011llu, FC=%04u\n", 458 ts_tail,fc_tail); 317 459 } 318 460 … … 323 465 } 324 466 } 325 467 326 468 // simulate the cycle timer clock in ticks 327 469 time += 3072; … … 332 474 // allow for the messagebuffer thread to catch up 333 475 usleep(200); 334 335 } 476 477 if(!run) break; 478 } 479 336 480 337 481 delete t;