Changeset 399
- Timestamp:
- 02/17/07 06:22:15 (17 years ago)
- Files:
-
- branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp (modified) (9 diffs)
- branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.h (modified) (2 diffs)
- branches/streaming-rework/src/libstreaming/StreamProcessor.cpp (modified) (2 diffs)
- branches/streaming-rework/src/libstreaming/StreamProcessor.h (modified) (3 diffs)
- branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp (modified) (4 diffs)
- branches/streaming-rework/src/libutil/TimestampedBuffer.cpp (modified) (2 diffs)
- branches/streaming-rework/tests/SytMonitor.cpp (modified) (5 diffs)
- branches/streaming-rework/tests/SytMonitor.h (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp
r398 r399 39 39 40 40 // in ticks 41 #define TRANSMIT_TRANSFER_DELAY 9000U41 #define TRANSMIT_TRANSFER_DELAY 6000U 42 42 // the number of cycles to send a packet in advance of it's timestamp 43 43 #define TRANSMIT_ADVANCE_CYCLES 1U … … 132 132 133 133 #ifdef DEBUG 134 if( cycle_diff < 0) {134 if(m_running && (cycle_diff < 0)) { 135 135 debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy)\n", 136 136 cycle, now_cycles); … … 318 318 return RAW1394_ISO_OK; 319 319 320 } else if (now_cycles<cycle) { 321 // we can still postpone the queueing of the packets 322 return RAW1394_ISO_AGAIN; 320 323 } else { // there is no more data in the ringbuffer 321 324 … … 399 402 } 400 403 404 int AmdtpTransmitStreamProcessor::getMinimalSyncDelay() { 405 return 0; 406 } 407 401 408 bool AmdtpTransmitStreamProcessor::prefill() { 402 409 … … 419 426 m_PacketStat.reset(); 420 427 m_WakeupStat.reset(); 428 429 // we have to make sure that the buffer HEAD timestamp 430 // lies in the future for every possible buffer fill case. 431 int offset=(int)(m_ringbuffer_size_frames*m_ticks_per_frame); 432 433 // we can substract the delay as it introduces 434 // unnescessary delay 435 offset -= m_SyncSource->getSyncDelay(); 436 437 m_data_buffer->setTickOffset(offset); 421 438 422 439 // reset all non-device specific stuff … … 505 522 506 523 m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND); 507 508 // we have to make sure that the buffer HEAD timestamp509 // lies in the future for every possible buffer fill case.510 m_data_buffer->setTickOffset((int)(m_ringbuffer_size_frames*m_ticks_per_frame));511 524 512 525 m_data_buffer->prepare(); … … 1127 1140 } 1128 1141 1142 // returns the delay between the actual (real) time of a timestamp as received, 1143 // and the timestamp that is passed on for the same event. This is to cope with 1144 // ISO buffering 1145 int AmdtpReceiveStreamProcessor::getMinimalSyncDelay() { 1146 return (int)(m_handler->getWakeupInterval() * m_syt_interval * m_ticks_per_frame); 1147 // return RECEIVE_PROCESSING_DELAY; 1148 } 1149 1129 1150 void AmdtpReceiveStreamProcessor::dumpInfo() 1130 1151 { … … 1160 1181 // pass before these packets are processed. Adding this extra term makes that 1161 1182 // the period boundary is signalled later 1162 m_data_buffer->setTickOffset(RECEIVE_PROCESSING_DELAY 1163 + (int)(m_handler->getWakeupInterval() * m_syt_interval * m_ticks_per_frame)); 1183 m_data_buffer->setTickOffset(m_SyncSource->getSyncDelay()); 1164 1184 1165 1185 // reset all non-device specific stuff … … 1236 1256 m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND); 1237 1257 1238 // The timestamp passed to this function is the time 1239 // of the period transfer. This means that we have to 1240 // add our max buffer size to it to ensure causality 1241 // in all cases: 1242 // we have to make sure that the buffer HEAD timestamp 1243 // lies in the future for every possible buffer fill case. 1244 1245 // the receive processing delay indicates how much 1246 // extra time we use as slack 1247 m_data_buffer->setTickOffset(RECEIVE_PROCESSING_DELAY); 1248 1258 // offset is in reset() 1259 1249 1260 m_data_buffer->prepare(); 1250 1261 branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.h
r396 r399 113 113 uint64_t getTimeAtPeriodUsecs(); 114 114 uint64_t getTimeAtPeriod(); 115 int getMinimalSyncDelay(); 115 116 116 117 void setVerboseLevel(int l); … … 202 203 uint64_t getTimeAtPeriodUsecs(); 203 204 uint64_t getTimeAtPeriod(); 205 int getMinimalSyncDelay(); 204 206 205 207 void setVerboseLevel(int l); branches/streaming-rework/src/libstreaming/StreamProcessor.cpp
r397 r399 54 54 , m_SyncSource(NULL) 55 55 , m_ticks_per_frame(0) 56 , m_last_cycle(0) 57 , m_sync_delay(0) 56 58 { 57 59 // create the timestamped buffer and register ourselves as its client … … 172 174 } 173 175 174 // /** 175 // * @brief Notify the StreamProcessor that frames were written 176 // * 177 // * This notifies the StreamProcessor of the fact that frames were written to the internal 178 // * buffer. This is for framecounter & timestamp bookkeeping. 179 // * 180 // * @param nbframes the number of frames that are written to the internal buffers 181 // * @param ts the new timestamp of the 'tail' of the buffer, i.e. the last sample 182 // * present in the buffer. 183 // * @return true if successful 184 // */ 185 // bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts) { 186 // 187 // debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Putting %d frames for %llu into frame buffer...\n", nbframes,ts); 188 // // m_data_buffer->incrementFrameCounter(nbframes, ts); 189 // return true; 190 // } 191 192 // /** 193 // * @brief Notify the StreamProcessor that frames were read 194 // * 195 // * This notifies the StreamProcessor of the fact that frames were read from the internal 196 // * buffer. This is for framecounter & timestamp bookkeeping. 197 // * 198 // * @param nbframes the number of frames that are read from the internal buffers 199 // * @return true if successful 200 // */ 201 // bool StreamProcessor::getFrames(unsigned int nbframes) { 202 // 203 // debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Getting %d frames from frame buffer...\n", nbframes); 204 // // m_data_buffer->decrementFrameCounter(nbframes); 205 // return true; 206 // } 176 int StreamProcessor::getBufferFill() { 177 // return m_data_buffer->getFrameCounter(); 178 return m_data_buffer->getBufferFill(); 179 } 207 180 208 181 uint64_t StreamProcessor::getTimeNow() { branches/streaming-rework/src/libstreaming/StreamProcessor.h
r396 r399 188 188 189 189 uint64_t getTimeNow(); 190 191 192 /** 193 * Returns the sync delay. This is the time a syncsource 194 * delays a period signal, e.g. to cope with buffering. 195 * @return the sync delay 196 */ 197 int getSyncDelay() {return m_sync_delay;}; 198 /** 199 * sets the sync delay 200 * @param d sync delay 201 */ 202 void setSyncDelay(int d) {m_sync_delay=d;}; 203 204 /** 205 * Returns the minimal sync delay a SP needs 206 * @return minimal sync delay 207 */ 208 virtual int getMinimalSyncDelay() = 0; 190 209 191 210 bool setSyncSource(StreamProcessor *s); … … 194 213 int getLastCycle() {return m_last_cycle;}; 195 214 215 int getBufferFill(); 216 196 217 protected: 197 218 StreamProcessor *m_SyncSource; … … 200 221 201 222 int m_last_cycle; 223 int m_sync_delay; 202 224 203 225 }; branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp
r398 r399 312 312 313 313 debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n"); 314 315 debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n"); 316 317 int max_of_min_delay=0; 318 int min_delay=0; 319 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 320 it != m_ReceiveProcessors.end(); 321 ++it ) { 322 min_delay=(*it)->getMinimalSyncDelay(); 323 if(min_delay>max_of_min_delay) max_of_min_delay=min_delay; 324 } 325 326 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 327 it != m_TransmitProcessors.end(); 328 ++it ) { 329 min_delay=(*it)->getMinimalSyncDelay(); 330 if(min_delay>max_of_min_delay) max_of_min_delay=min_delay; 331 } 332 333 debugOutput( DEBUG_LEVEL_VERBOSE, " %d ticks\n", max_of_min_delay); 334 m_SyncSource->setSyncDelay(max_of_min_delay); 335 336 314 337 debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting StreamProcessors...\n"); 315 316 338 // now we reset the frame counters 317 339 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 318 340 it != m_ReceiveProcessors.end(); 319 341 ++it ) { 320 321 debugOutput( DEBUG_LEVEL_VERBOSE, "Before:\n");322 323 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {324 (*it)->dumpInfo();325 }326 327 342 (*it)->reset(); 328 329 debugOutput( DEBUG_LEVEL_VERBOSE, "After:\n");330 331 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {332 (*it)->dumpInfo();333 }334 335 343 } 336 344 … … 338 346 it != m_TransmitProcessors.end(); 339 347 ++it ) { 340 341 debugOutput( DEBUG_LEVEL_VERBOSE, "Before:\n");342 343 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {344 (*it)->dumpInfo();345 }346 347 348 (*it)->reset(); 348 349 debugOutput( DEBUG_LEVEL_VERBOSE, "After:\n");350 351 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) {352 (*it)->dumpInfo();353 }354 349 } 355 350 … … 776 771 } 777 772 778 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", -time_till_next_period);773 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", time_till_next_period); 779 774 780 775 // this is to notify the client of the delay … … 792 787 m_time_of_transfer); 793 788 789 #ifdef DEBUG 790 int rcv_bf=0, xmt_bf=0; 791 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 792 it != m_ReceiveProcessors.end(); 793 ++it ) { 794 rcv_bf = (*it)->getBufferFill(); 795 } 796 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 797 it != m_TransmitProcessors.end(); 798 ++it ) { 799 xmt_bf = (*it)->getBufferFill(); 800 } 801 debugOutput( DEBUG_LEVEL_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n", 802 m_time_of_transfer,rcv_bf,xmt_bf,rcv_bf+xmt_bf); 803 804 #endif 805 794 806 xrun_occurred=false; 795 807 branches/streaming-rework/src/libutil/TimestampedBuffer.cpp
r397 r399 285 285 if (freebob_ringbuffer_write(m_event_buffer,data,write_size) < write_size) 286 286 { 287 debugWarning("writeFrames buffer overrun\n");287 // debugWarning("writeFrames buffer overrun\n"); 288 288 return false; 289 289 } … … 311 311 if ((freebob_ringbuffer_read(m_event_buffer,data,read_size)) < read_size) 312 312 { 313 debugWarning("readFrames buffer underrun\n");313 // debugWarning("readFrames buffer underrun\n"); 314 314 return false; 315 315 } branches/streaming-rework/tests/SytMonitor.cpp
r384 r399 34 34 35 35 36 IMPL_DEBUG_MODULE( SytMonitor, SytMonitor, DEBUG_LEVEL_ NORMAL);36 IMPL_DEBUG_MODULE( SytMonitor, SytMonitor, DEBUG_LEVEL_VERBOSE ); 37 37 38 38 … … 58 58 struct iec61883_packet *packet = (struct iec61883_packet *) data; 59 59 unsigned int syt_timestamp=ntohs(packet->syt); 60 unsigned int m_full_timestamp;61 60 62 bool wraparound_occurred=false; 63 debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"%2u,%2u: CY=%4u, SYT=%08X (%3u secs + %4u cycles + %04u ticks)\n", 61 if (syt_timestamp == 0xFFFF) { 62 return RAW1394_ISO_OK; 63 } 64 65 uint32_t m_full_timestamp; 66 uint32_t m_full_timestamp_ticks; 67 68 debugOutput(DEBUG_LEVEL_VERBOSE,"%2u,%2u: CY=%4u, SYT=%08X (%3u secs + %4u cycles + %04u ticks)\n", 64 69 m_port,channel, 65 70 cycle,syt_timestamp, … … 72 77 // reconstruct the full cycle 73 78 unsigned int cc=m_handler->getCycleTimer(); 74 unsigned int cc_ticks=CYCLE_TIMER_TO_TICKS(cc);75 79 76 80 unsigned int cc_cycles=CYCLE_TIMER_GET_CYCLES(cc); … … 83 87 // was received 84 88 if (cycle>cc_cycles) cc_seconds--; 85 86 // reconstruct the top part of the timestamp using the current cycle number87 unsigned int now_cycle_masked=cycle & 0xF;88 unsigned int syt_cycle=CYCLE_TIMER_GET_CYCLES(syt_timestamp);89 89 90 // if this is true, wraparound has occurred, undo this wraparound91 if(syt_cycle<now_cycle_masked) syt_cycle += 0x10;90 m_full_timestamp_ticks=sytRecvToFullTicks((uint32_t)syt_timestamp, cycle, cc); 91 m_full_timestamp=TICKS_TO_CYCLE_TIMER(m_full_timestamp_ticks); 92 92 93 // this is the difference in cycles wrt the cycle the94 // timestamp was received95 unsigned int delta_cycles=syt_cycle-now_cycle_masked;96 97 // reconstruct the cycle part of the timestamp98 unsigned int new_cycles=cycle + delta_cycles;99 100 // if the cycles cause a wraparound of the cycle timer,101 // perform this wraparound102 if(new_cycles>7999) {103 debugOutput(DEBUG_LEVEL_VERY_VERBOSE,104 "Detected wraparound: %d + %d = %d\n",105 cycle,delta_cycles,new_cycles);106 107 new_cycles-=8000; // wrap around108 wraparound_occurred=true;109 110 }111 112 m_full_timestamp = (new_cycles) << 12;113 114 // now add the offset part on top of that115 m_full_timestamp |= (syt_timestamp & 0xFFF);116 117 // and the seconds part118 // if the timestamp causes a wraparound of the cycle timer,119 // the timestamp lies in the next second120 // if it didn't, the timestamp lies in this second121 if (wraparound_occurred) {122 m_full_timestamp |= ((cc_seconds+1 << 25) & 0xFE000000);123 } else {124 m_full_timestamp |= ((cc_seconds << 25) & 0xFE000000);125 }126 127 93 struct cycle_info cif; 128 94 cif.cycle=cycle; … … 136 102 cif.pres_cycle = CYCLE_TIMER_GET_CYCLES(m_full_timestamp); 137 103 cif.pres_offset = CYCLE_TIMER_GET_OFFSET(m_full_timestamp); 138 cif.pres_ticks = CYCLE_TIMER_TO_TICKS(m_full_timestamp); 139 140 if (wraparound_occurred) { 141 debugOutput(DEBUG_LEVEL_VERY_VERBOSE," (P: %08X, R: %08X)\n", 142 m_full_timestamp, syt_timestamp); 143 } 104 cif.pres_ticks = m_full_timestamp_ticks; 144 105 145 106 if (cif.pres_offset != (syt_timestamp & 0xFFF)) { branches/streaming-rework/tests/SytMonitor.h
r392 r399 60 60 61 61 void dumpInfo(); 62 unsigned int getMaxPacketSize() {return 4096;}; 63 unsigned int getPacketsPerPeriod() {return 1;}; 62 64 63 65 bool readNextCycleInfo(struct cycle_info *cif);