Changeset 395
- Timestamp:
- 02/16/07 02:35:25 (17 years ago)
- Files:
-
- branches/streaming-rework/src/bebob/bebob_avdevice.cpp (modified) (2 diffs)
- branches/streaming-rework/src/debugmodule/debugmodule.cpp (modified) (1 diff)
- branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp (modified) (23 diffs)
- branches/streaming-rework/src/libstreaming/IsoStream.cpp (modified) (1 diff)
- branches/streaming-rework/src/libstreaming/StreamProcessor.cpp (modified) (1 diff)
- branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp (modified) (4 diffs)
- branches/streaming-rework/src/libutil/TimestampedBuffer.cpp (modified) (11 diffs)
- branches/streaming-rework/src/libutil/TimestampedBuffer.h (modified) (3 diffs)
- branches/streaming-rework/tests/test-timestampedbuffer.cpp (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
branches/streaming-rework/src/bebob/bebob_avdevice.cpp
r384 r395 1085 1085 } 1086 1086 1087 #ifdef TEST_XMIT_ONLY 1088 int 1089 AvDevice::getStreamCount() { 1090 // return 2; // one receive, one transmit 1091 return 1; // one receive, one transmit 1092 } 1093 1094 FreebobStreaming::StreamProcessor * 1095 AvDevice::getStreamProcessorByIndex(int i) { 1096 switch (i) { 1097 case 0: 1098 // return m_receiveProcessor; 1099 // case 1: 1100 // if (m_snoopMode) { 1101 // return m_receiveProcessor2; 1102 // } else { 1103 return m_transmitProcessor; 1104 // } 1105 default: 1106 return NULL; 1107 } 1108 return 0; 1109 } 1110 1111 // FIXME: error checking 1112 int 1113 AvDevice::startStreamByIndex(int i) { 1114 int iso_channel=0; 1115 int plug=0; 1116 int hostplug=-1; 1117 1118 // if (m_snoopMode) { 1119 // 1120 // switch (i) { 1121 // case 0: 1122 // // snooping doesn't use CMP, but obtains the info of the channel 1123 // // from the target plug 1124 // 1125 // // TODO: get isochannel from plug 1126 // 1127 // // set the channel obtained by the connection management 1128 // m_receiveProcessor->setChannel(iso_channel); 1129 // break; 1130 // case 1: 1131 // // snooping doesn't use CMP, but obtains the info of the channel 1132 // // from the target plug 1133 // 1134 // // TODO: get isochannel from plug 1135 // 1136 // // set the channel obtained by the connection management 1137 // m_receiveProcessor2->setChannel(iso_channel); 1138 // 1139 // break; 1140 // default: 1141 // return 0; 1142 // } 1143 // } else { 1144 1145 switch (i) { 1146 case 0: 1147 // // do connection management: make connection 1148 // iso_channel = iec61883_cmp_connect( 1149 // m_p1394Service->getHandle(), 1150 // m_pConfigRom->getNodeId() | 0xffc0, 1151 // &plug, 1152 // raw1394_get_local_id (m_p1394Service->getHandle()), 1153 // &hostplug, 1154 // &m_receiveProcessorBandwidth); 1155 // 1156 // // set the channel obtained by the connection management 1157 // m_receiveProcessor->setChannel(iso_channel); 1158 // break; 1159 // case 1: 1160 // do connection management: make connection 1161 iso_channel = iec61883_cmp_connect( 1162 m_p1394Service->getHandle(), 1163 raw1394_get_local_id (m_p1394Service->getHandle()), 1164 &hostplug, 1165 m_pConfigRom->getNodeId() | 0xffc0, 1166 &plug, 1167 &m_transmitProcessorBandwidth); 1168 1169 // set the channel obtained by the connection management 1170 m_transmitProcessor->setChannel(iso_channel); 1171 break; 1172 default: 1173 return -1; 1174 } 1175 // } 1176 1177 if (iso_channel < 0) return -1; 1178 1179 return 0; 1180 1181 } 1182 1183 // FIXME: error checking 1184 int 1185 AvDevice::stopStreamByIndex(int i) { 1186 // do connection management: break connection 1187 1188 int plug=0; 1189 int hostplug=-1; 1190 // if (m_snoopMode) { 1191 // switch (i) { 1192 // case 0: 1193 // // do connection management: break connection 1194 // 1195 // break; 1196 // case 1: 1197 // // do connection management: break connection 1198 // 1199 // break; 1200 // default: 1201 // return 0; 1202 // } 1203 // } else { 1204 switch (i) { 1205 case 0: 1206 // // do connection management: break connection 1207 // iec61883_cmp_disconnect( 1208 // m_p1394Service->getHandle(), 1209 // m_pConfigRom->getNodeId() | 0xffc0, 1210 // plug, 1211 // raw1394_get_local_id (m_p1394Service->getHandle()), 1212 // hostplug, 1213 // m_receiveProcessor->getChannel(), 1214 // m_receiveProcessorBandwidth); 1215 // 1216 // break; 1217 // case 1: 1218 // do connection management: break connection 1219 iec61883_cmp_disconnect( 1220 m_p1394Service->getHandle(), 1221 raw1394_get_local_id (m_p1394Service->getHandle()), 1222 hostplug, 1223 m_pConfigRom->getNodeId() | 0xffc0, 1224 plug, 1225 m_transmitProcessor->getChannel(), 1226 m_transmitProcessorBandwidth); 1227 1228 break; 1229 default: 1230 return 0; 1231 } 1232 // } 1233 1234 return 0; 1235 } 1236 #else 1087 1237 int 1088 1238 AvDevice::getStreamCount() { … … 1232 1382 return 0; 1233 1383 } 1384 #endif 1234 1385 1235 1386 template <typename T> bool serializeVector( Glib::ustring path, branches/streaming-rework/src/debugmodule/debugmodule.cpp
r392 r395 99 99 va_list arg; 100 100 va_start( arg, format ); 101 102 // remove the path info from the filename 103 char *f=(char *)file; 104 char *fname; 105 while((f=strstr(f, "/"))) { 106 f++; // move away from delimiter 107 fname=f; 108 } 109 101 110 DebugModuleManager::instance()->print( "%s (%s)[%4d] %s: ", getPreSequence( level ), 102 f ile, line, function );111 fname, line, function ); 103 112 DebugModuleManager::instance()->va_print( format, arg ); 104 113 DebugModuleManager::instance()->print( "%s", getPostSequence( level ) ); branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp
r394 r395 36 36 #include <assert.h> 37 37 38 #define RECEIVE_DLL_INTEGRATION_COEFFICIENT 0.015 39 40 #define RECEIVE_PROCESSING_DELAY (10000U) 38 #define RECEIVE_PROCESSING_DELAY (0U) 41 39 42 40 // in ticks … … 172 170 // therefore sync_lag_cycles * TICKS_PER_CYCLE earlier than 173 171 // T1. 174 ts_head += sync_lag_cycles * TICKS_PER_CYCLE; 175 176 float ticks_per_frame=m_SyncSource->getTicksPerFrame(); 177 178 // calculate the buffer tail timestamp of our buffer 179 // this timestamp corresponds to the buffer head timestamp 180 // of the sync source plus the 'length' of our buffer. 181 // this makes the buffer head timestamp of our buffer 182 // equal to (at max) the buffer head timestamp of the 183 // sync source. 184 ts_head = addTicks(ts_head, (uint32_t)((float)m_ringbuffer_size_frames * ticks_per_frame)); 172 ts_head = addTicks(ts_head, sync_lag_cycles * TICKS_PER_CYCLE); 173 174 m_data_buffer->setBufferTailTimestamp(ts_head); 185 175 186 176 #ifdef DEBUG … … 189 179 } 190 180 #endif 191 192 m_data_buffer->setBufferTailTimestamp(ts_head); 193 194 debugOutput(DEBUG_LEVEL_VERBOSE,"XMIT TS SET: TS=%10lld, FC=%4d, %f\n", 195 ts_head, m_data_buffer->getFrameCounter(), ticks_per_frame); 181 debugOutput(DEBUG_LEVEL_VERBOSE,"XMIT TS SET: TS=%10lld, FC=%4d\n", 182 ts_head, m_data_buffer->getFrameCounter()); 196 183 } else { 197 184 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, … … 235 222 #ifdef DEBUG 236 223 if(!m_is_disabled) { 237 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "R: TS=%011llu, NOW=%011llu, CYN=%04d, CYT=%04d\n", 224 uint32_t timestamp_u=timestamp; 225 uint32_t syt = addTicks(timestamp_u, TRANSMIT_TRANSFER_DELAY); 226 227 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "T: TS=%011llu, NOW=%011llu, CYN=%04d, CYT=%04d\n", 238 228 timestamp, cycle_timer, now_cycles, cycle 239 229 ); … … 241 231 until_next 242 232 ); 243 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " CY_NOW=%04d, CY_TARGET=%04d, CY_DIFF=%04d \n",244 now_cycles, cycle, cycle_diff 233 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " CY_NOW=%04d, CY_TARGET=%04d, CY_DIFF=%04d, CY_SYT=%04d\n", 234 now_cycles, cycle, cycle_diff, TICKS_TO_CYCLES(syt) 245 235 ); 246 236 } … … 256 246 uint32_t test_ts=sytXmitToFullTicks(syt, cycle, now); 257 247 258 debugOutput(DEBUG_LEVEL_VER Y_VERBOSE, "T %04d: SYT=%08X, CY=%02d OFF=%04d\n",248 debugOutput(DEBUG_LEVEL_VERBOSE, "T %04d: SYT=%08X, CY=%02d OFF=%04d\n", 259 249 cycle, syt, CYCLE_TIMER_GET_CYCLES(syt), CYCLE_TIMER_GET_OFFSET(syt) 260 250 ); 261 debugOutput(DEBUG_LEVEL_VER Y_VERBOSE, "T %04d: NOW=%011lu, SEC=%03u CY=%02u OFF=%04u\n",251 debugOutput(DEBUG_LEVEL_VERBOSE, "T %04d: NOW=%011lu, SEC=%03u CY=%02u OFF=%04u\n", 262 252 cycle, now_ticks, CYCLE_TIMER_GET_SECS(now), CYCLE_TIMER_GET_CYCLES(now), CYCLE_TIMER_GET_OFFSET(now) 263 253 ); 264 debugOutput(DEBUG_LEVEL_VER Y_VERBOSE, "T %04d: TSS=%011lu, SEC=%03u CY=%02u OFF=%04u\n",254 debugOutput(DEBUG_LEVEL_VERBOSE, "T %04d: TSS=%011lu, SEC=%03u CY=%02u OFF=%04u\n", 265 255 cycle, test_ts, TICKS_TO_SECS(test_ts), TICKS_TO_CYCLES(test_ts), TICKS_TO_OFFSET(test_ts) 266 256 ); 267 debugOutput(DEBUG_LEVEL_VER Y_VERBOSE, "T %04d: TSO=%011lu, SEC=%03u CY=%02u OFF=%04u\n",257 debugOutput(DEBUG_LEVEL_VERBOSE, "T %04d: TSO=%011lu, SEC=%03u CY=%02u OFF=%04u\n", 268 258 cycle, timestamp_u, TICKS_TO_SECS(timestamp_u), TICKS_TO_CYCLES(timestamp_u), TICKS_TO_OFFSET(timestamp_u) 269 259 ); … … 371 361 372 362 int64_t AmdtpTransmitStreamProcessor::getTimeUntilNextPeriodUsecs() { 373 debugFatal("IMPLEMENT ME!"); 374 return 0; 363 uint64_t time_at_period=getTimeAtPeriod(); 364 365 uint64_t cycle_timer=m_handler->getCycleTimerTicks(); 366 367 // calculate the time until the next period 368 int64_t until_next=substractTicks(time_at_period,cycle_timer); 369 370 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> TAP=%11llu, CTR=%11llu, UTN=%11lld, TPUS=%f\n", 371 time_at_period, cycle_timer, until_next, m_handler->getTicksPerUsec() 372 ); 373 374 // now convert to usecs 375 // don't use the mapping function because it only works 376 // for absolute times, not the relative time we are 377 // using here (which can also be negative). 378 return (int64_t)(((float)until_next) / m_handler->getTicksPerUsec()); 375 379 } 376 380 … … 383 387 384 388 uint64_t AmdtpTransmitStreamProcessor::getTimeAtPeriod() { 385 debugFatal("IMPLEMENT ME!"); 386 387 return 0; 389 uint64_t next_period_boundary=m_data_buffer->getTimestampFromTail((m_nb_buffers-1) * m_period); 390 391 #ifdef DEBUG 392 uint64_t ts,fc; 393 m_data_buffer->getBufferTailTimestamp(&ts,&fc); 394 395 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD=%11lld, LTS=%11llu, FC=%5u, TPF=%f\n", 396 next_period_boundary, ts, fc, m_ticks_per_frame 397 ); 398 #endif 399 400 return next_period_boundary; 388 401 } 389 402 … … 430 443 m_WakeupStat.setName("XMT WAKEUP"); 431 444 432 debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing ...\n");445 debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing (%p)...\n", this); 433 446 434 447 // prepare all non-device specific stuff … … 485 498 m_ringbuffer_size_frames=m_nb_buffers * m_period; 486 499 487 // add the receive processing delay488 // m_ringbuffer_size_frames+=(uint)(RECEIVE_PROCESSING_DELAY/m_ticks_per_frame);489 490 500 assert(m_data_buffer); 491 501 m_data_buffer->setBufferSize(m_ringbuffer_size_frames); … … 497 507 498 508 m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND); 509 510 // we have to make sure that the buffer HEAD timestamp 511 // lies in the future for every possible buffer fill case. 512 m_data_buffer->setTickOffset((int)(m_ringbuffer_size_frames*m_ticks_per_frame)); 499 513 500 514 m_data_buffer->prepare(); … … 656 670 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "AmdtpTransmitStreamProcessor::putFrames(%d, %llu)\n", nbframes, ts); 657 671 658 // The timestamp passed to this function is the time659 // of the period transfer. This means that we have to660 // add our max buffer size to it to ensure causality661 // in all cases:662 // we have to make sure that the buffer HEAD timestamp663 // lies in the future for every possible buffer fill case.664 float ticks_per_frame=m_SyncSource->getTicksPerFrame();665 ts = addTicks(ts,(uint32_t)((float)m_ringbuffer_size_frames * ticks_per_frame));666 667 672 // transfer the data 668 673 m_data_buffer->blockProcessWriteFrames(nbframes, ts); … … 929 934 m_running,m_disabled,m_is_disabled); 930 935 936 // check our enable status 937 if (!m_disabled && m_is_disabled) { 938 // this means that we are trying to enable 939 if (cycle == m_cycle_to_enable_at) { 940 m_is_disabled=false; 941 debugOutput(DEBUG_LEVEL_VERBOSE,"Enabling StreamProcessor %p at %d (SYT=%04X)\n", 942 this, cycle, ntohs(packet->syt)); 943 // the previous timestamp is the one we need to start with 944 // because we're going to update the buffer again this loop 945 // using writeframes 946 m_data_buffer->setBufferTailTimestamp(m_last_timestamp2); 947 948 } else { 949 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, 950 "will enable StreamProcessor %p at %u, now is %d\n", 951 this, m_cycle_to_enable_at, cycle); 952 } 953 } else if (m_disabled && !m_is_disabled) { 954 // trying to disable 955 debugOutput(DEBUG_LEVEL_VERBOSE,"disabling StreamProcessor %p at %u\n", this, cycle); 956 m_is_disabled=true; 957 } 958 931 959 if((packet->syt != 0xFFFF) 932 960 && (packet->fdf != 0xFF) … … 957 985 m_last_timestamp, m_handler->getWakeupInterval(),m_syt_interval,m_ticks_per_frame); 958 986 959 m_last_timestamp += (uint64_t)(((float)m_handler->getWakeupInterval())960 * ((float)m_syt_interval) * m_ticks_per_frame);961 debugOutput(DEBUG_LEVEL_VERY_VERBOSE," ==> %lluticks\n", m_last_timestamp);962 963 // the receive processing delay indicates how much964 // extra time we use as slack965 m_last_timestamp += RECEIVE_PROCESSING_DELAY;966 967 // wrap if nescessary968 m_last_timestamp=wrapAtMaxTicks(m_last_timestamp);969 970 //=> now estimate the device frame rate971 if (m_last_timestamp2 && m_last_timestamp) {972 // try and estimate the frame rate from the device973 974 // first get the measured difference between both975 // timestamps976 int64_t measured_difference;977 measured_difference=((int64_t)(m_last_timestamp))978 -((int64_t)(m_last_timestamp2));979 // correct for seconds wraparound980 if (m_last_timestamp<m_last_timestamp2) {981 measured_difference+=128L*TICKS_PER_SECOND;982 }983 984 // implement a 1st order DLL to estimate the framerate985 // this is the number of ticks between two samples986 float f=measured_difference;987 float err = f / (1.0*m_syt_interval) - m_ticks_per_frame;988 989 // integrate the error990 m_ticks_per_frame += RECEIVE_DLL_INTEGRATION_COEFFICIENT*err;991 992 }993 994 987 //=> signal that we're running (if we are) 995 988 if(!m_running && nevents && m_last_timestamp2 && m_last_timestamp) { … … 999 992 1000 993 //=> don't process the stream samples when it is not enabled. 1001 if (!m_disabled && m_is_disabled) {1002 // this means that we are trying to enable1003 if (cycle == m_cycle_to_enable_at) {1004 m_is_disabled=false;1005 debugOutput(DEBUG_LEVEL_VERBOSE,"enabling StreamProcessor %p at %d\n", this, cycle);1006 // the previous timestamp is the one we need to start with1007 // because we're going to update the buffer again this loop1008 // using writeframes1009 m_data_buffer->setBufferTailTimestamp(m_last_timestamp2);1010 1011 } else {1012 debugOutput(DEBUG_LEVEL_VERY_VERBOSE,1013 "will enable StreamProcessor %p at %u, now is %d\n",1014 this, m_cycle_to_enable_at, cycle);1015 }1016 } else if (m_disabled && !m_is_disabled) {1017 // trying to disable1018 debugOutput(DEBUG_LEVEL_VERBOSE,"disabling StreamProcessor %p at %u\n", this, cycle);1019 m_is_disabled=true;1020 }1021 1022 994 if(m_is_disabled) { 1023 995 … … 1029 1001 // the next (possible) sample is not this one, but lies 1030 1002 // SYT_INTERVAL * rate later 1031 uint64_t ts=m_last_timestamp+(uint64_t)((float)m_syt_interval * m_ticks_per_frame); 1032 1033 // wrap if nescessary 1034 ts=wrapAtMaxTicks(ts); 1003 uint64_t ts=addTicks(m_last_timestamp, 1004 (uint64_t)((float)m_syt_interval * m_ticks_per_frame)); 1035 1005 1036 1006 // set the timestamp as if there will be a sample put into … … 1049 1019 uint32_t test_ts=sytRecvToFullTicks(syt, cycle, now); 1050 1020 1051 debugOutput(DEBUG_LEVEL_VER Y_VERBOSE, "R %04d: SYT=%08X, CY=%02d OFF=%04d\n",1021 debugOutput(DEBUG_LEVEL_VERBOSE, "R %04d: SYT=%08X, CY=%02d OFF=%04d\n", 1052 1022 cycle, syt, CYCLE_TIMER_GET_CYCLES(syt), CYCLE_TIMER_GET_OFFSET(syt) 1053 1023 ); 1054 debugOutput(DEBUG_LEVEL_VER Y_VERBOSE, "R %04d: NOW=%011lu, SEC=%03u CY=%02u OFF=%04u\n",1024 debugOutput(DEBUG_LEVEL_VERBOSE, "R %04d: NOW=%011lu, SEC=%03u CY=%02u OFF=%04u\n", 1055 1025 cycle, now_ticks, CYCLE_TIMER_GET_SECS(now), CYCLE_TIMER_GET_CYCLES(now), CYCLE_TIMER_GET_OFFSET(now) 1056 1026 ); 1057 debugOutput(DEBUG_LEVEL_VER Y_VERBOSE, "R %04d: TSS=%011lu, SEC=%03u CY=%02u OFF=%04u\n",1027 debugOutput(DEBUG_LEVEL_VERBOSE, "R %04d: TSS=%011lu, SEC=%03u CY=%02u OFF=%04u\n", 1058 1028 cycle, test_ts, TICKS_TO_SECS(test_ts), TICKS_TO_CYCLES(test_ts), TICKS_TO_OFFSET(test_ts) 1059 1029 ); … … 1119 1089 1120 1090 uint64_t AmdtpReceiveStreamProcessor::getTimeAtPeriod() { 1121 1122 // every time a packet is received both the framecounter and the base 1123 // timestamp are updated. This means that at any instance of time, the 1124 // front of the buffer (latest sample) timestamp is known. 1125 // As we know the number of frames in the buffer, and we now the rate 1126 // in ticks/frame, we can calculate the back of buffer timestamp as: 1127 // back_of_buffer_time = front_time - nbframes * rate 1128 // the next period boundary time lies m_period frames later: 1129 // next_period_boundary = back_of_buffer_time + m_period * rate 1130 1131 // NOTE: we should account for the fact that the timestamp is not for 1132 // the latest sample, but for the latest sample minus syt_interval-1 1133 // because it is the timestamp for the first sample in the packet, 1134 // while the complete packet contains SYT_INTERVAL samples. 1135 // this makes the equation: 1136 // back_of_buffer_time = front_time - (nbframes - (syt_interval - 1)) * rate 1137 // next_period_boundary = back_of_buffer_time + m_period * rate 1138 1139 // NOTE: where do we add the processing delay? 1140 // if we add it here: 1141 // next_period_boundary += RECEIVE_PROCESSING_DELAY 1142 1143 // the complete equation now is: 1144 // next_period_boundary = front_time - (nbframes - (syt_interval - 1)) * rate 1145 // + m_period * rate + RECEIVE_PROCESSING_DELAY 1146 // since syt_interval is a constant value, we can equally well ignore it, as 1147 // if it were already included in RECEIVE_PROCESSING_DELAY 1148 // making the equation (simplified: 1149 // next_period_boundary = front_time + (-nbframes + m_period) * rate 1150 // + RECEIVE_PROCESSING_DELAY 1151 // currently this is in ticks 1152 1153 int64_t fc=m_data_buffer->getFrameCounter(); 1154 1155 int64_t next_period_boundary = m_last_timestamp; 1156 next_period_boundary += (int64_t)(((int64_t)m_period 1157 - fc) * m_ticks_per_frame); 1158 1159 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD=%11lld, LTS=%11llu, FC=%5d, TPF=%f\n", 1160 next_period_boundary, m_last_timestamp, fc, m_ticks_per_frame 1091 uint64_t next_period_boundary=m_data_buffer->getTimestampFromHead(m_period); 1092 1093 #ifdef DEBUG 1094 uint64_t ts,fc; 1095 m_data_buffer->getBufferTailTimestamp(&ts,&fc); 1096 1097 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD=%11lld, LTS=%11llu, FC=%5u, TPF=%f\n", 1098 next_period_boundary, ts, fc, m_ticks_per_frame 1161 1099 ); 1162 1163 // next_period_boundary too large 1164 // happens if the timestamp wraps around while there are a lot of 1165 // frames in the buffer. We should add the wraparound value of the ticks 1166 // counter 1167 1168 // next_period_boundary too small 1169 // happens when the last timestamp is near wrapping, and 1170 // m_framecounter is low. 1171 // this means: m_last_timestamp is near wrapping and have just had 1172 // a getPackets() from the client side. the projected next_period 1173 // boundary lies beyond the wrap value. 1174 1175 // the action is to wrap the value. 1176 next_period_boundary=wrapAtMinMaxTicks(next_period_boundary); 1100 #endif 1177 1101 1178 1102 return next_period_boundary; … … 1183 1107 1184 1108 StreamProcessor::dumpInfo(); 1185 1186 debugOutputShort( DEBUG_LEVEL_NORMAL, " Device framerate : %f\n", 24576000.0/m_ticks_per_frame);1187 1109 1188 1110 } … … 1203 1125 m_WakeupStat.reset(); 1204 1126 1205 // this needs to be reset to the nominal value 1206 // because xruns can cause the DLL value to shift a lot 1207 // making that we run into problems when trying to re-enable 1208 // streaming 1209 m_ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_framerate); 1127 // this makes that the buffer lags a little compared to reality 1128 // the result is that we get some extra time before period boundaries 1129 // are signaled. 1130 // The RECEIVE_PROCESSING_DELAY directly introduces some slack 1131 // the other term handles the fact that the linux1394 stack does some 1132 // buffering. This buffering causes the packets to be received at max 1133 // m_handler->getWakeupInterval() later than the time they were received. 1134 // hence their payload is available this amount of time later. However, the 1135 // period boundary is predicted based upon earlier samples, and therefore can 1136 // pass before these packets are processed. Adding this extra term makes that 1137 // the period boundary is signalled later 1138 m_data_buffer->setTickOffset(RECEIVE_PROCESSING_DELAY 1139 + (int)(m_handler->getWakeupInterval() * m_syt_interval * m_ticks_per_frame)); 1210 1140 1211 1141 // reset all non-device specific stuff … … 1224 1154 m_WakeupStat.setName("RCV WAKEUP"); 1225 1155 1156 debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing (%p)...\n", this); 1157 1226 1158 // prepare all non-device specific stuff 1227 1159 // i.e. the iso stream and the associated ports … … 1231 1163 } 1232 1164 1233 debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");1234 1165 switch (m_framerate) { 1235 1166 case 32000: … … 1280 1211 1281 1212 m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND); 1213 1214 // The timestamp passed to this function is the time 1215 // of the period transfer. This means that we have to 1216 // add our max buffer size to it to ensure causality 1217 // in all cases: 1218 // we have to make sure that the buffer HEAD timestamp 1219 // lies in the future for every possible buffer fill case. 1220 1221 // the receive processing delay indicates how much 1222 // extra time we use as slack 1223 m_data_buffer->setTickOffset(RECEIVE_PROCESSING_DELAY); 1282 1224 1283 1225 m_data_buffer->prepare(); branches/streaming-rework/src/libstreaming/IsoStream.cpp
r390 r395 84 84 85 85 bool IsoStream::setChannel(int c) { 86 debugOutput( DEBUG_LEVEL_VERBOSE, "setting channel to %d\n",c);86 debugOutput( DEBUG_LEVEL_VERBOSE, "setting channel for (%p) to %d\n",this, c); 87 87 88 88 m_channel=c; branches/streaming-rework/src/libstreaming/StreamProcessor.cpp
r392 r395 76 76 debugOutputShort( DEBUG_LEVEL_NORMAL, " Enabled : %s\n", m_disabled ? "No" : "Yes"); 77 77 debugOutputShort( DEBUG_LEVEL_NORMAL, " enable status : %s\n", m_is_disabled ? "No" : "Yes"); 78 79 debugOutputShort( DEBUG_LEVEL_NORMAL, " Device framerate : Sync: %f, Buffer %f\n", 80 24576000.0*m_SyncSource->m_data_buffer->getRate(), 81 24576000.0*m_data_buffer->getRate() 82 ); 78 83 79 84 m_data_buffer->dumpInfo(); branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp
r391 r395 905 905 it != m_ReceiveProcessors.end(); 906 906 ++it ) { 907 908 //#ifdef DEBUG909 #if 0910 {911 uint64_t ts_tail=0;912 uint64_t fc_tail=0;913 907 914 uint64_t ts_head=0;915 uint64_t fc_head=0;916 917 int cnt=0;918 919 (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);920 (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);921 922 while((fc_head != fc_tail) && (cnt++ < 10)) {923 (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);924 (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);925 }926 927 debugOutput(DEBUG_LEVEL_VERBOSE,"R => HEAD: TS=%11llu, FC=%5llu | TAIL: TS=%11llu, FC=%5llu, %d\n",928 ts_head, fc_head, ts_tail, fc_tail, cnt);929 }930 #endif931 932 908 if(!(*it)->getFrames(m_period)) { 933 909 debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)", … … 935 911 return false; // buffer underrun 936 912 } 937 938 //#ifdef DEBUG 939 #if 0 940 { 941 uint64_t ts_tail=0; 942 uint64_t fc_tail=0; 943 944 uint64_t ts_head=0; 945 uint64_t fc_head=0; 946 947 int cnt=0; 948 949 (*it)->getBufferHeadTimestamp(&ts_head,&fc_head); 950 (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail); 951 952 while((fc_head != fc_tail) && (cnt++ < 10)) { 953 (*it)->getBufferHeadTimestamp(&ts_head,&fc_head); 954 (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail); 955 } 956 957 debugOutput(DEBUG_LEVEL_VERBOSE,"R > HEAD: TS=%11llu, FC=%5llu | TAIL: TS=%11llu, FC=%5llu, %d\n", 958 ts_head, fc_head, ts_tail, fc_tail, cnt); 959 } 960 #endif 961 913 962 914 } 963 915 } else { … … 965 917 it != m_TransmitProcessors.end(); 966 918 ++it ) { 967 968 //#ifdef DEBUG969 #if 0970 {971 uint64_t ts_tail=0;972 uint64_t fc_tail=0;973 974 uint64_t ts_head=0;975 uint64_t fc_head=0;976 977 int cnt=0;978 979 (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);980 (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);981 982 while((fc_head != fc_tail) && (cnt++ < 10)) {983 (*it)->getBufferHeadTimestamp(&ts_head,&fc_head);984 (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail);985 }986 987 debugOutput(DEBUG_LEVEL_VERBOSE,"T => HEAD: TS=%11llu, FC=%5llu | TAIL: TS=%11llu, FC=%5llu, %d\n",988 ts_head, fc_head, ts_tail, fc_tail, cnt);989 }990 #endif991 919 992 920 if(!(*it)->putFrames(m_period, (int64_t)m_time_of_transfer)) { … … 995 923 return false; // buffer overrun 996 924 } 997 998 //#ifdef DEBUG 999 #if 0 1000 { 1001 uint64_t ts_tail=0; 1002 uint64_t fc_tail=0; 1003 1004 uint64_t ts_head=0; 1005 uint64_t fc_head=0; 1006 1007 int cnt=0; 1008 1009 (*it)->getBufferHeadTimestamp(&ts_head,&fc_head); 1010 (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail); 1011 1012 while((fc_head != fc_tail) && (cnt++ < 10)) { 1013 (*it)->getBufferHeadTimestamp(&ts_head,&fc_head); 1014 (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail); 1015 } 1016 1017 debugOutput(DEBUG_LEVEL_VERBOSE,"T > HEAD: TS=%11llu, FC=%5llu | TAIL: TS=%11llu, FC=%5llu, %d\n", 1018 ts_head, fc_head, ts_tail, fc_tail, cnt); 1019 } 1020 #endif 925 1021 926 } 1022 927 } branches/streaming-rework/src/libutil/TimestampedBuffer.cpp
r393 r395 39 39 TimestampedBuffer::TimestampedBuffer(TimestampedBufferClient *c) 40 40 : m_event_buffer(NULL), m_cluster_buffer(NULL), 41 m_event_size(0), m_events_per_frame(0), m_buffer_size(0), 41 m_event_size(0), m_events_per_frame(0), m_buffer_size(0), 42 42 m_bytes_per_frame(0), m_bytes_per_buffer(0), 43 43 m_wrap_at(0xFFFFFFFFFFFFFFFFLLU), 44 m_Client(c), m_framecounter(0), m_buffer_tail_timestamp(0), 44 m_Client(c), m_framecounter(0), 45 m_tick_offset(0), 46 m_buffer_tail_timestamp(0), 45 47 m_buffer_next_tail_timestamp(0), 46 48 m_dll_e2(0.0), m_dll_b(0.877), m_dll_c(0.384), … … 146 148 m_bytes_per_buffer=m_bytes_per_frame*m_buffer_size; 147 149 150 return true; 151 } 152 153 /** 154 * Sets the buffer offset in ticks. 155 * 156 * A positive value means that the buffer is 'delayed' for nticks ticks. 157 * 158 * @note These offsets are only used when reading timestamps. Any function 159 * that returns a timestamp will incorporate this offset. 160 * @param nframes the number of ticks (positive = delay buffer) 161 * @return true if successful 162 */ 163 bool TimestampedBuffer::setTickOffset(int nticks) { 164 debugOutput(DEBUG_LEVEL_VERBOSE,"Setting ticks offset to %d\n",nticks); 165 m_tick_offset=nticks; 148 166 return true; 149 167 } … … 519 537 * 520 538 * This is thread safe. 539 * 540 * @note considers offsets 521 541 * 522 542 * @param new_timestamp 523 543 */ 524 544 void TimestampedBuffer::setBufferTailTimestamp(uint64_t new_timestamp) { 545 546 // add the offsets 547 int64_t diff=m_buffer_next_tail_timestamp - m_buffer_tail_timestamp; 548 if (diff < 0) diff += m_wrap_at; 549 550 double rate; 551 552 if (diff) { 553 rate=(double)diff / (double)m_update_period; 554 } else { 555 rate=m_nominal_rate; 556 } 557 558 int64_t ts=new_timestamp; 559 ts += m_tick_offset; 560 561 if (ts >= (int64_t)m_wrap_at) { 562 ts -= m_wrap_at; 563 } else if (ts < 0) { 564 ts += m_wrap_at; 565 } 525 566 526 567 #ifdef DEBUG 527 if (new_timestamp >= 128L*TICKS_PER_SECOND) {568 if (new_timestamp >= m_wrap_at) { 528 569 debugWarning("timestamp not wrapped: %llu\n",new_timestamp); 570 } 571 if ((ts >= (int64_t)m_wrap_at) || (ts < 0 )) { 572 debugWarning("ts not wrapped correctly: %lld\n",ts); 529 573 } 530 574 #endif … … 532 576 pthread_mutex_lock(&m_framecounter_lock); 533 577 534 m_buffer_tail_timestamp = new_timestamp;578 m_buffer_tail_timestamp = ts; 535 579 536 580 m_dll_e2=m_update_period * m_nominal_rate; … … 539 583 pthread_mutex_unlock(&m_framecounter_lock); 540 584 541 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Set buffer tail timestamp for (%p) to %11llu , NTS=%llu, DLL2=%f\n",542 this, new_timestamp, m_buffer_next_tail_timestamp, m_dll_e2);585 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Set buffer tail timestamp for (%p) to %11llu => %11lld, NTS=%llu, DLL2=%f, RATE=%f\n", 586 this, new_timestamp, ts, m_buffer_next_tail_timestamp, m_dll_e2, rate); 543 587 544 588 } … … 555 599 */ 556 600 void TimestampedBuffer::getBufferHeadTimestamp(uint64_t *ts, uint64_t *fc) { 557 int64_t diff=m_buffer_next_tail_timestamp - m_buffer_tail_timestamp; 558 if (diff < 0) diff += m_wrap_at; 559 560 double rate=(double)diff / (double)m_update_period; 561 562 int64_t timestamp; 563 564 pthread_mutex_lock(&m_framecounter_lock); 601 // NOTE: this is still ok with threads, because we use *fc to compute 602 // the timestamp 565 603 *fc = m_framecounter; 566 567 // ts(x) = m_buffer_tail_timestamp - 568 // (m_buffer_next_tail_timestamp - m_buffer_tail_timestamp)/(samples_between_updates)*(x) 569 570 // buffer head is the frame that is framecounter away from the tail 571 timestamp=(int64_t)m_buffer_tail_timestamp - (int64_t)((m_framecounter) * rate); 572 573 pthread_mutex_unlock(&m_framecounter_lock); 574 575 #ifdef DEBUG 576 if(timestamp || m_buffer_tail_timestamp || *fc) { 577 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p): HTS = %011lld, TTS=%011llu, FC=%05u\n", 578 this, timestamp, m_buffer_tail_timestamp, *fc, rate); 579 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p): rate = %f\n", this, rate); 580 } 581 #endif 582 583 if(timestamp >= (int64_t)m_wrap_at) { 584 timestamp -= m_wrap_at; 585 } else if(timestamp < 0) { 586 timestamp += m_wrap_at; 587 } 588 589 *ts=timestamp; 590 591 #ifdef DEBUG 592 if(timestamp || m_buffer_tail_timestamp || *fc) { 593 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p): HTS = %011lld, FC=%05u\n", 594 this, *ts, *fc); 595 } 596 #endif 597 604 *ts=getTimestampFromTail(*fc); 598 605 } 599 606 … … 616 623 617 624 /** 625 * @brief Get timestamp for a specific position from the buffer tail 626 * 627 * Returns the timestamp for a position that is nframes earlier than the 628 * buffer tail 629 * 630 * @param nframes number of frames 631 * @return timestamp value 632 */ 633 uint64_t TimestampedBuffer::getTimestampFromTail(int nframes) 634 { 635 // ts(x) = m_buffer_tail_timestamp - 636 // (m_buffer_next_tail_timestamp - m_buffer_tail_timestamp)/(samples_between_updates)*(x) 637 638 int64_t diff=m_buffer_next_tail_timestamp - m_buffer_tail_timestamp; 639 if (diff < 0) diff += m_wrap_at; 640 641 double rate=(double)diff / (double)m_update_period; 642 643 int64_t timestamp; 644 645 pthread_mutex_lock(&m_framecounter_lock); 646 647 timestamp=(int64_t)m_buffer_tail_timestamp - (int64_t)((nframes) * rate); 648 649 pthread_mutex_unlock(&m_framecounter_lock); 650 651 if(timestamp >= (int64_t)m_wrap_at) { 652 timestamp -= m_wrap_at; 653 } else if(timestamp < 0) { 654 timestamp += m_wrap_at; 655 } 656 657 return (uint64_t)timestamp; 658 } 659 660 /** 661 * @brief Get timestamp for a specific position from the buffer head 662 * 663 * Returns the timestamp for a position that is nframes later than the 664 * buffer head 665 * 666 * @param nframes number of frames 667 * @return timestamp value 668 */ 669 uint64_t TimestampedBuffer::getTimestampFromHead(int nframes) 670 { 671 return getTimestampFromTail(m_framecounter-nframes); 672 } 673 674 /** 618 675 * Resets the frame counter, in a atomic way. This 619 676 * is thread safe. … … 639 696 * Increments the frame counter in a thread safe way. 640 697 * Also updates the timestamp. 641 * 698 * 699 * @note the offsets defined by setTicksOffset and setFrameOffset 700 * are added here. 701 * 642 702 * @param nbframes the number of frames to add 643 703 * @param new_timestamp the new timestamp 644 704 */ 645 705 void TimestampedBuffer::incrementFrameCounter(int nbframes, uint64_t new_timestamp) { 646 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer tail timestamp for (%p) to %11llu\n", 647 this, new_timestamp); 706 707 // add the offsets 708 int64_t diff=m_buffer_next_tail_timestamp - m_buffer_tail_timestamp; 709 if (diff < 0) diff += m_wrap_at; 710 711 double rate=(double)diff / (double)m_update_period; 712 713 int64_t ts=new_timestamp; 714 ts += (int64_t)m_tick_offset; 715 716 if (ts >= (int64_t)m_wrap_at) { 717 ts -= m_wrap_at; 718 } else if (ts < 0) { 719 ts += m_wrap_at; 720 } 721 722 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer tail timestamp for (%p) to %11llu => %11lld\n", 723 this, new_timestamp, ts); 648 724 649 725 #ifdef DEBUG 650 if (new_timestamp >= 128L*TICKS_PER_SECOND) {726 if (new_timestamp >= m_wrap_at) { 651 727 debugWarning("timestamp not wrapped: %llu\n",new_timestamp); 728 } 729 if ((ts >= (int64_t)m_wrap_at) || (ts < 0 )) { 730 debugWarning("ts not wrapped correctly: %lld\n",ts); 652 731 } 653 732 #endif 654 655 pthread_mutex_lock(&m_framecounter_lock); 656 m_framecounter += nbframes; 657 733 658 734 // update the DLL 659 int64_t diff = (int64_t)new_timestamp-(int64_t)m_buffer_next_tail_timestamp; 660 735 diff = ts-(int64_t)m_buffer_next_tail_timestamp; 736 737 #ifdef DEBUG 738 if ((diff > 1000) || (diff < -1000)) { 739 debugWarning("(%p) difference rather large: %lld, %011lld, %011lld\n", 740 this, diff, ts, m_buffer_next_tail_timestamp); 741 } 742 #endif 743 661 744 // idea to implement it for nbframes values that differ from m_update_period: 662 745 // diff = diff * nbframes/m_update_period … … 674 757 diff += m_wrap_at; 675 758 } 676 759 677 760 double err=diff; 678 761 … … 682 765 m_framecounter, m_buffer_tail_timestamp, m_buffer_next_tail_timestamp); 683 766 767 pthread_mutex_lock(&m_framecounter_lock); 768 m_framecounter += nbframes; 769 684 770 m_buffer_tail_timestamp=m_buffer_next_tail_timestamp; 685 m_buffer_next_tail_timestamp += (uint64_t)(m_dll_b * err + m_dll_e2); 771 m_buffer_next_tail_timestamp += (int64_t)(m_dll_b * err + m_dll_e2); 772 773 m_dll_e2 += m_dll_c*err; 686 774 687 775 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "U: FC=%10u, TS=%011llu, NTS=%011llu\n", … … 689 777 690 778 if (m_buffer_next_tail_timestamp >= m_wrap_at) { 779 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Unwrapping next tail timestamp: %011llu", 780 m_buffer_next_tail_timestamp); 781 691 782 m_buffer_next_tail_timestamp -= m_wrap_at; 692 783 693 debugOutput (DEBUG_LEVEL_VERY_VERBOSE, "Unwrapping next tail timestamp: %11llu\n",784 debugOutputShort(DEBUG_LEVEL_VERY_VERBOSE, " => %011llu\n", 694 785 m_buffer_next_tail_timestamp); 695 } 696 697 m_dll_e2 += m_dll_c*err; 698 699 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "A: TS=%011llu, NTS=%011llu, DLLe2=%f\n", 700 m_buffer_tail_timestamp, m_buffer_next_tail_timestamp, m_dll_e2); 786 787 } 788 789 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "A: TS=%011llu, NTS=%011llu, DLLe2=%f, RATE=%f\n", 790 m_buffer_tail_timestamp, m_buffer_next_tail_timestamp, m_dll_e2, rate); 701 791 702 792 pthread_mutex_unlock(&m_framecounter_lock); 703 793 704 794 if(m_buffer_tail_timestamp>=m_wrap_at) { 705 debugError("Wrapping failed for m_buffer_tail_timestamp!\n"); 795 debugError("Wrapping failed for m_buffer_tail_timestamp! %011llu\n",m_buffer_tail_timestamp); 796 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " IN=%011lld, TS=%011llu, NTS=%011llu\n", 797 ts, m_buffer_tail_timestamp, m_buffer_next_tail_timestamp); 798 706 799 } 707 800 if(m_buffer_next_tail_timestamp>=m_wrap_at) { 708 debugError("Wrapping failed for m_buffer_next_tail_timestamp!\n"); 801 debugError("Wrapping failed for m_buffer_next_tail_timestamp! %011llu\n",m_buffer_next_tail_timestamp); 802 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " IN=%011lld, TS=%011llu, NTS=%011llu\n", 803 ts, m_buffer_tail_timestamp, m_buffer_next_tail_timestamp); 709 804 } 710 805 branches/streaming-rework/src/libutil/TimestampedBuffer.h
r392 r395 112 112 113 113 void setBufferTailTimestamp(uint64_t new_timestamp); 114 114 115 uint64_t getTimestampFromTail(int nframes); 116 uint64_t getTimestampFromHead(int nframes); 117 118 // buffer offset stuff 119 /// return the tick offset value 120 signed int getTickOffset() {return m_tick_offset;}; 121 122 bool setFrameOffset(int nframes); 123 bool setTickOffset(int nframes); 124 115 125 // dll stuff 116 126 bool setNominalRate(float r); … … 149 159 signed int m_framecounter; 150 160 161 // the offset that define the timing of the buffer 162 signed int m_tick_offset; 163 151 164 // the buffer tail timestamp gives the timestamp of the last frame 152 165 // that was put into the buffer … … 154 167 uint64_t m_buffer_next_tail_timestamp; 155 168 156 // the buffer head timestamp gives the timestamp of the first frame157 // that was put into the buffer158 // uint64_t m_buffer_head_timestamp;159 169 // this mutex protects the access to the framecounter 160 170 // and the buffer head timestamp. branches/streaming-rework/tests/test-timestampedbuffer.cpp
r393 r395 289 289 t->setWrapValue(arguments.wrap_at); 290 290 291 t->setTickOffset(10000); 292 291 293 t->prepare(); 292 294