Changeset 390
- Timestamp:
- 02/06/07 23:59:56 (17 years ago)
- Files:
-
- branches/streaming-rework/src/debugmodule/debugmodule.h (modified) (1 diff)
- branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp (modified) (24 diffs)
- branches/streaming-rework/src/libstreaming/IsoHandler.cpp (modified) (1 diff)
- branches/streaming-rework/src/libstreaming/IsoHandler.h (modified) (1 diff)
- branches/streaming-rework/src/libstreaming/IsoHandlerManager.cpp (modified) (8 diffs)
- branches/streaming-rework/src/libstreaming/IsoHandlerManager.h (modified) (4 diffs)
- branches/streaming-rework/src/libstreaming/IsoStream.cpp (modified) (1 diff)
- branches/streaming-rework/src/libstreaming/IsoStream.h (modified) (1 diff)
- branches/streaming-rework/src/libstreaming/StreamProcessor.cpp (modified) (3 diffs)
- branches/streaming-rework/src/libstreaming/StreamProcessor.h (modified) (5 diffs)
- branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp (modified) (11 diffs)
- branches/streaming-rework/src/libstreaming/StreamProcessorManager.h (modified) (5 diffs)
- branches/streaming-rework/tests/test-sytmonitor.cpp (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
branches/streaming-rework/src/debugmodule/debugmodule.h
r384 r390 31 31 32 32 /* MB_NEXT() relies on the fact that MB_BUFFERS is a power of two */ 33 #define MB_BUFFERS 409633 #define MB_BUFFERS 8192 34 34 #define MB_NEXT(index) ((index+1) & (MB_BUFFERS-1)) 35 35 #define MB_BUFFERSIZE 256 /* message length limit */ branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp
r386 r390 38 38 #define RECEIVE_DLL_INTEGRATION_COEFFICIENT 0.015 39 39 40 #define RECEIVE_PROCESSING_DELAY (10000 )40 #define RECEIVE_PROCESSING_DELAY (10000U) 41 41 42 42 // in ticks … … 95 95 unsigned int nevents=0; 96 96 97 debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"Xmit handler for cycle %d\n",cycle); 97 debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"Xmit handler for cycle %d, (running=%d, enabled=%d,%d)\n", 98 cycle, m_running, m_disabled, m_is_disabled); 98 99 99 100 #ifdef DEBUG … … 174 175 // the current time, the stream is considered not 175 176 // to be 'running' 176 if(cycle_diff < 0 || cycle == -1) { 177 m_running=false; 178 } else { 179 m_running=true; 177 if (!m_running && cycle_diff >= 0 && cycle != -1) { 178 debugOutput(DEBUG_LEVEL_VERBOSE, "Xmit StreamProcessor %p started running at cycle %d\n",this, cycle); 179 m_running=true; 180 180 } 181 181 … … 196 196 int64_t until_next=timestamp-(int64_t)cycle_timer; 197 197 198 #ifdef DEBUG 198 199 int64_t utn2=until_next; // debug!! 199 200 #endif 201 200 202 // we send a packet some cycles in advance, to avoid the 201 203 // following situation: … … 216 218 217 219 #ifdef DEBUG 218 if(!m_ disabled) {220 if(!m_is_disabled) { 219 221 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> TS=%11llu, CTR=%11llu, FC=%5d\n", 220 222 timestamp, cycle_timer, fc … … 223 225 until_next, utn2 224 226 ); 225 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " CY_NOW=%04d, CY_TARGET=%04d, CY_DIFF=%04 lld\n",227 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " CY_NOW=%04d, CY_TARGET=%04d, CY_DIFF=%04d\n", 226 228 now_cycles, cycle, cycle_diff 227 229 ); … … 244 246 245 247 #ifdef DEBUG 246 if(!m_ disabled) {248 if(!m_is_disabled) { 247 249 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " > TS=%11llu, CTR=%11llu, FC=%5d\n", 248 250 timestamp, cycle_timer, fc … … 254 256 #endif 255 257 258 if (!m_disabled && m_is_disabled) { 259 // this means that we are trying to enable 260 if ((unsigned int)cycle == m_cycle_to_enable_at) { 261 m_is_disabled=false; 262 debugOutput(DEBUG_LEVEL_VERBOSE,"Enabling StreamProcessor %p at %u\n", this, cycle); 263 264 // initialize the buffer head & tail 265 uint64_t ts; 266 uint64_t fc; 267 268 debugOutput(DEBUG_LEVEL_VERBOSE,"Preparing to enable...\n"); 269 270 m_SyncSource->getBufferHeadTimestamp(&ts, &fc); // thread safe 271 272 // recalculate the buffer head timestamp 273 float ticks_per_frame=m_SyncSource->getTicksPerFrame(); 274 275 // set buffer head timestamp 276 // this makes that the next sample to be sent out 277 // has the same timestamp as the last one received 278 // plus one frame 279 ts += (uint64_t)ticks_per_frame; 280 if (ts >= TICKS_PER_SECOND * 128L) { 281 ts -= TICKS_PER_SECOND * 128L; 282 } 283 284 setBufferHeadTimestamp(ts); 285 int64_t timestamp = ts; 286 287 // since we have frames_in_buffer frames in the buffer, 288 // we know that the buffer tail lies 289 // frames_in_buffer * rate 290 // later 291 int frames_in_buffer=getFrameCounter(); 292 timestamp += (int64_t)((float)frames_in_buffer * ticks_per_frame); 293 294 // this happens when the last timestamp is near wrapping, and 295 // m_framecounter is low. 296 // this means: m_last_timestamp is near wrapping and have just had 297 // a getPackets() from the client side. the projected next_period 298 // boundary lies beyond the wrap value. 299 // the action is to wrap the value. 300 if (timestamp >= TICKS_PER_SECOND * 128L) { 301 timestamp -= TICKS_PER_SECOND * 128L; 302 } 303 304 StreamProcessor::setBufferTailTimestamp(timestamp); 305 306 debugOutput(DEBUG_LEVEL_VERBOSE,"XMIT TS SET: TS=%10lld, TSTMP=%10llu, FC=%4d, %f\n", 307 ts, timestamp, frames_in_buffer, ticks_per_frame); 308 309 } else { 310 debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"will enable StreamProcessor %p at %u, now is %d\n", this, m_cycle_to_enable_at, cycle); 311 } 312 } else if (m_disabled && !m_is_disabled) { 313 // trying to disable 314 debugOutput(DEBUG_LEVEL_VERBOSE,"disabling StreamProcessor %p at %u\n", this, cycle); 315 m_is_disabled=true; 316 } 317 256 318 // don't process the stream when it is not enabled, not running 257 319 // or when the next sample is not due yet. … … 260 322 // that means that we'll send NODATA packets. 261 323 // we don't add payload because DICE devices don't like that. 262 if((until_next>0) || m_ disabled || !m_running) {324 if((until_next>0) || m_is_disabled || !m_running) { 263 325 // no-data packets have syt=0xFFFF 264 326 // and have the usual amount of events as dummy data (?) … … 278 340 *sy = 0; 279 341 280 if(m_disabled) {281 // indicate that we are now in a disabled state.282 m_is_disabled=true;283 } else {284 // indicate that we are now in an enabled state.285 m_is_disabled=false;286 }287 288 342 return RAW1394_ISO_DEFER; 289 343 } 290 344 291 // indicate that we are now in an enabled state.292 m_is_disabled=false;293 294 345 // construct the packet 295 346 nevents = m_syt_interval; … … 299 350 *sy = 0; 300 351 301 enum raw1394_iso_disposition retval;302 303 352 unsigned int read_size=nevents*sizeof(quadlet_t)*m_dimension; 304 353 … … 307 356 { 308 357 /* there is no more data in the ringbuffer */ 309 310 debugWarning("Transmit buffer underrun (cycle %d, FC=%d, PC=%d)\n", 311 cycle, getFrameCounter(), m_handler->getPacketCount()); 358 // convert the timestamp to SYT format 359 uint64_t ts=timestamp + TRANSMIT_TRANSFER_DELAY; 360 361 // check if it wrapped 362 if (ts >= TICKS_PER_SECOND * 128L) { 363 ts -= TICKS_PER_SECOND * 128L; 364 } 365 366 debugWarning("Transmit buffer underrun (now %d, queue %d, target %d)\n", 367 now_cycles, cycle, TICKS_TO_CYCLES(ts)); 312 368 313 369 nevents=0; … … 319 375 m_xruns++; 320 376 377 // disable the processing, will be re-enabled when 378 // the xrun is handled 379 m_disabled=true; 380 m_is_disabled=true; 381 321 382 // compose a no-data packet, we should always 322 383 // send a valid packet … … 330 391 //*length = 2*sizeof(quadlet_t); 331 392 332 ret val=RAW1394_ISO_DEFER;393 return RAW1394_ISO_DEFER; 333 394 } else { 334 395 *length = read_size + 8; … … 353 414 packet->syt = ntohs(timestamp_SYT); 354 415 355 retval=RAW1394_ISO_OK; 356 } 357 358 // calculate the new buffer head timestamp. this is 359 // the previous buffer head timestamp plus 360 // the number of frames sent * ticks_per_frame 361 timestamp += (int64_t)((float)nevents * ticks_per_frame ); 362 363 // check if it wrapped 364 if (timestamp >= TICKS_PER_SECOND * 128L) { 365 timestamp -= TICKS_PER_SECOND * 128L; 366 } 367 368 // update the frame counter such that it reflects the new value 369 // also update the buffer head timestamp 370 // done in the SP base class 371 if (!StreamProcessor::getFrames(nevents, timestamp)) { 372 debugError("Could not do StreamProcessor::getFrames(%d, %llu)\n",nevents, timestamp); 373 retval=RAW1394_ISO_ERROR; 374 } 375 376 return retval; 416 // calculate the new buffer head timestamp. this is 417 // the previous buffer head timestamp plus 418 // the number of frames sent * ticks_per_frame 419 timestamp += (int64_t)((float)nevents * ticks_per_frame ); 420 421 // check if it wrapped 422 if (timestamp >= TICKS_PER_SECOND * 128L) { 423 timestamp -= TICKS_PER_SECOND * 128L; 424 } 425 426 // update the frame counter such that it reflects the new value 427 // also update the buffer head timestamp 428 // done in the SP base class 429 if (!StreamProcessor::getFrames(nevents, timestamp)) { 430 debugError("Could not do StreamProcessor::getFrames(%d, %llu)\n",nevents, timestamp); 431 return RAW1394_ISO_ERROR; 432 } 433 434 return RAW1394_ISO_OK; 435 } 436 437 // we shouldn't get here 438 return RAW1394_ISO_ERROR; 377 439 378 440 } … … 411 473 412 474 // update the frame counter such that it reflects the buffer content, 413 // and also update the buffer tail timestamp475 // the buffer tail timestamp is initialized when the SP is enabled 414 476 // done in the SP base class 415 477 if (!StreamProcessor::putFrames(m_ringbuffer_size_frames, ts)) { 416 debugError("Could not do StreamProcessor::putFrames(%d, % llu)\n",417 m_ringbuffer_size_frames , ts);478 debugError("Could not do StreamProcessor::putFrames(%d, %0)\n", 479 m_ringbuffer_size_frames); 418 480 return false; 419 481 } 420 482 421 483 return true; 422 423 484 } 424 485 … … 652 713 653 714 bool AmdtpTransmitStreamProcessor::prepareForEnable() { 654 uint64_t ts; 655 uint64_t fc; 656 715 657 716 debugOutput(DEBUG_LEVEL_VERBOSE,"Preparing to enable...\n"); 658 659 m_SyncSource->getBufferHeadTimestamp(&ts, &fc); // thread safe 660 661 // recalculate the buffer head timestamp 662 float ticks_per_frame=m_SyncSource->getTicksPerFrame(); 663 664 // set buffer head timestamp 665 // this makes that the next sample to be sent out 666 // has the same timestamp as the last one received 667 // plus one frame 668 ts += (uint64_t)ticks_per_frame; 669 setBufferHeadTimestamp(ts); 670 int64_t timestamp = ts; 671 672 // since we have a full buffer, we know that the buffer tail lies 673 // m_ringbuffer_size_frames * rate earlier 674 timestamp += (int64_t)((float)m_ringbuffer_size_frames * ticks_per_frame); 675 676 // this happens when the last timestamp is near wrapping, and 677 // m_framecounter is low. 678 // this means: m_last_timestamp is near wrapping and have just had 679 // a getPackets() from the client side. the projected next_period 680 // boundary lies beyond the wrap value. 681 // the action is to wrap the value. 682 if (timestamp >= TICKS_PER_SECOND * 128L) { 683 timestamp -= TICKS_PER_SECOND * 128L; 684 } 685 686 StreamProcessor::setBufferTailTimestamp(timestamp); 687 688 debugOutput(DEBUG_LEVEL_VERBOSE,"TS=%10lld, TSTMP=%10llu, %f\n", 689 ts, timestamp, ticks_per_frame); 690 717 691 718 if (!StreamProcessor::prepareForEnable()) { 692 719 debugError("StreamProcessor::prepareForEnable failed\n"); … … 1099 1126 } 1100 1127 #endif 1101 1128 1129 debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"ch%2u: CY=%4u, SYT=%08X (%4ucy + %04uticks) (running=%d, disabled=%d,%d)\n", 1130 channel, cycle,ntohs(packet->syt), 1131 CYCLE_TIMER_GET_CYCLES(ntohs(packet->syt)), CYCLE_TIMER_GET_OFFSET(ntohs(packet->syt)), 1132 m_running,m_disabled,m_is_disabled); 1133 1102 1134 if((packet->fmt == 0x10) && (packet->fdf != 0xFF) && (packet->syt != 0xFFFF) && (packet->dbs>0) && (length>=2*sizeof(quadlet_t))) { 1103 1135 unsigned int nevents=((length / sizeof (quadlet_t)) - 2)/packet->dbs; 1104 1136 1105 1137 //=> store the previous timestamp 1106 1138 m_last_timestamp2=m_last_timestamp; 1107 1139 1108 1140 //=> convert the SYT to ticks 1109 1141 unsigned int syt_timestamp=ntohs(packet->syt); 1110 1111 debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"ch%2u: CY=%4u, SYT=%08X (%3u secs + %4u cycles + %04u ticks)\n", 1112 channel, cycle,syt_timestamp, CYCLE_TIMER_GET_SECS(syt_timestamp), 1113 CYCLE_TIMER_GET_CYCLES(syt_timestamp), CYCLE_TIMER_GET_OFFSET(syt_timestamp)); 1142 1143 debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"ch%2u: CY=%4u, SYT=%08X (%4u cycles + %04u ticks), FC=%04d, %d\n", 1144 channel, cycle,syt_timestamp, 1145 CYCLE_TIMER_GET_CYCLES(syt_timestamp), CYCLE_TIMER_GET_OFFSET(syt_timestamp), 1146 getFrameCounter(), m_is_disabled); 1114 1147 1115 1148 // reconstruct the full cycle … … 1163 1196 m_last_timestamp += CYCLE_TIMER_GET_OFFSET(syt_timestamp); 1164 1197 m_last_timestamp += cc_seconds * TICKS_PER_SECOND; 1198 1199 // we have to keep in mind that there are also 1200 // some packets buffered by the ISO layer 1201 // at most x=m_handler->getNbBuffers() 1202 // these contain at most x*syt_interval 1203 // frames, meaning that we might receive 1204 // this packet x*syt_interval*ticks_per_frame 1205 // later than expected (the real receive time) 1206 m_last_timestamp += (uint64_t)(((float)m_handler->getNbBuffers()) 1207 * m_syt_interval * m_ticks_per_frame); 1165 1208 1166 1209 // the receive processing delay indicates how much … … 1217 1260 1218 1261 //=> signal that we're running (if we are) 1219 if(!m_running && nevents && m_last_timestamp2 && m_last_timestamp) m_running=true; 1262 if(!m_running && nevents && m_last_timestamp2 && m_last_timestamp) { 1263 debugOutput(DEBUG_LEVEL_VERBOSE,"Receive StreamProcessor %p started running at %d\n", this, cycle); 1264 m_running=true; 1265 } 1220 1266 1221 1267 //=> don't process the stream samples when it is not enabled. 1222 if(m_disabled) { 1268 if (!m_disabled && m_is_disabled) { 1269 // this means that we are trying to enable 1270 if (cycle == m_cycle_to_enable_at) { 1271 m_is_disabled=false; 1272 debugOutput(DEBUG_LEVEL_VERBOSE,"enabling StreamProcessor %p at %d\n", this, cycle); 1273 } else { 1274 debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"will enable StreamProcessor %p at %u, now is %d\n", this, m_cycle_to_enable_at, cycle); 1275 } 1276 } else if (m_disabled && !m_is_disabled) { 1277 // trying to disable 1278 debugOutput(DEBUG_LEVEL_VERBOSE,"disabling StreamProcessor %p at %u\n", this, cycle); 1279 m_is_disabled=true; 1280 } 1281 1282 if(m_is_disabled) { 1223 1283 1224 1284 // we keep track of the timestamp here … … 1238 1298 StreamProcessor::setBufferTimestamps(ts,ts); 1239 1299 1240 // indicate that we are now in a disabled state.1241 m_is_disabled=true;1242 1243 1300 return RAW1394_ISO_DEFER; 1244 1301 } 1245 1246 // indicate that we are now in an enabled state.1247 m_is_disabled=false;1248 1302 1249 1303 //=> process the packet … … 1257 1311 1258 1312 m_xruns++; 1313 1314 // disable the processing, will be re-enabled when 1315 // the xrun is handled 1316 m_disabled=true; 1317 m_is_disabled=true; 1259 1318 1260 1319 retval=RAW1394_ISO_DEFER; … … 1380 1439 next_period_boundary += (int64_t)(((int64_t)m_period 1381 1440 - fc) * m_ticks_per_frame); 1382 // next_period_boundary += RECEIVE_PROCESSING_DELAY;1383 1441 1384 1442 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD=%11lld, LTS=%11llu, FC=%5d, TPF=%f\n", … … 1437 1495 m_WakeupStat.reset(); 1438 1496 1439 // m_ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_framerate); 1497 // this needs to be reset to the nominal value 1498 // because xruns can cause the DLL value to shift a lot 1499 // making that we run into problems when trying to re-enable 1500 // streaming 1501 m_ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_framerate); 1440 1502 1441 1503 // reset all non-device specific stuff branches/streaming-rework/src/libstreaming/IsoHandler.cpp
r385 r390 194 194 err=raw1394_read_cycle_timer(m_handle_util, &ctr); 195 195 if(err) { 196 debugWarning("raw1394_read_cycle_timer: %s", strerror(err)); 196 debugError("raw1394_read_cycle_timer failed.\n"); 197 debugError(" Error: %s\n", strerror(err)); 198 debugError(" Your system doesn't seem to support the raw1394_read_cycle_timer call\n"); 199 return false; 197 200 } 198 201 new_timer=ctr.cycle_timer; branches/streaming-rework/src/libstreaming/IsoHandler.h
r384 r390 78 78 // no setter functions, because those would require a re-init 79 79 unsigned int getMaxPacketSize() { return m_max_packet_size;}; 80 unsigned int get Buffersize() { return m_buf_packets;};80 unsigned int getNbBuffers() { return m_buf_packets;}; 81 81 int getWakeupInterval() { return m_irq_interval;}; 82 82 branches/streaming-rework/src/libstreaming/IsoHandlerManager.cpp
r386 r390 32 32 #include <assert.h> 33 33 34 #include "../libutil/PosixThread.h" 35 36 34 37 #define MINIMUM_INTERRUPTS_PER_PERIOD 4U 35 38 #define PACKETS_PER_INTERRUPT 4U … … 42 45 IsoHandlerManager::IsoHandlerManager() : 43 46 m_State(E_Created), 44 m_poll_timeout(1), m_poll_fds(0), m_poll_nfds(0) 45 { 46 47 } 48 47 m_poll_timeout(1), m_poll_fds(0), m_poll_nfds(0), 48 m_realtime(false), m_priority(0) 49 { 50 51 } 52 53 IsoHandlerManager::IsoHandlerManager(bool run_rt, unsigned int rt_prio) : 54 m_State(E_Created), 55 m_poll_timeout(1), m_poll_fds(0), m_poll_nfds(0), 56 m_realtime(run_rt), m_priority(rt_prio) 57 { 58 59 } 49 60 50 61 IsoHandlerManager::~IsoHandlerManager() … … 53 64 } 54 65 66 bool IsoHandlerManager::init() 67 { 68 // the tread that performs the actual packet transfer 69 // needs high priority 70 unsigned int prio=m_priority+6; 71 72 if (prio>98) prio=98; 73 74 m_isoManagerThread=new FreebobUtil::PosixThread( 75 this, 76 m_realtime, prio, 77 PTHREAD_CANCEL_DEFERRED); 78 79 if(!m_isoManagerThread) { 80 debugFatal("Could not create iso manager thread\n"); 81 return false; 82 } 83 84 // propagate the debug level 85 // m_isoManagerThread->setVerboseLevel(getDebugLevel()); 86 87 return true; 88 } 89 55 90 bool IsoHandlerManager::Init() 56 91 { 57 58 59 92 debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); 93 94 return true; 60 95 } 61 96 … … 288 323 unsigned int packets_per_period=stream->getPacketsPerPeriod(); 289 324 290 #if 0325 #if 1 291 326 // hardware interrupts occur when one DMA block is full, and the size of one DMA 292 327 // block = PAGE_SIZE. Setting the max_packet_size makes sure that the HW irq is … … 377 412 unsigned int packets_per_period=stream->getPacketsPerPeriod(); 378 413 379 #if 0414 #if 1 380 415 // hardware interrupts occur when one DMA block is full, and the size of one DMA 381 416 // block = PAGE_SIZE. Setting the max_packet_size makes sure that the HW irq … … 431 466 // buffers get transfered, meaning that we should have at least some 432 467 // margin here 433 //int buffers=irq_interval * 2;468 int buffers=irq_interval * 2; 434 469 435 470 // half a period. the xmit handler will take care of this 436 int buffers=packets_per_period/2;471 // int buffers=packets_per_period/4; 437 472 438 473 // NOTE: this is dangerous: what if there is not enough prefill? … … 612 647 } 613 648 649 debugOutput( DEBUG_LEVEL_VERBOSE, "Starting ISO iterator thread...\n"); 650 651 // note: libraw1394 doesn't like it if you poll() and/or iterate() before 652 // starting the streams. 653 // start the iso runner thread 654 m_isoManagerThread->Start(); 655 614 656 if (retval) { 615 657 m_State=E_Running; … … 632 674 bool retval=true; 633 675 676 debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping ISO iterator thread...\n"); 677 m_isoManagerThread->Stop(); 678 634 679 for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin(); 635 680 it != m_IsoHandlers.end(); 636 681 ++it ) 637 682 { 638 debugOutput( DEBUG_LEVEL_VERBOSE, " stopping handler (%p)\n",*it);683 debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handler (%p)\n",*it); 639 684 if(!(*it)->stop()){ 640 685 debugOutput( DEBUG_LEVEL_VERBOSE, " could not stop handler (%p)\n",*it); branches/streaming-rework/src/libstreaming/IsoHandlerManager.h
r384 r390 40 40 #define USLEEP_AFTER_UPDATE 100 41 41 #define MAX_UPDATE_TRIES 10 42 namespace FreebobUtil { 43 class PosixThread; 44 } 42 45 43 46 namespace FreebobStreaming … … 73 76 74 77 IsoHandlerManager(); 78 IsoHandlerManager(bool run_rt, unsigned int rt_prio); 75 79 virtual ~IsoHandlerManager(); 76 80 … … 92 96 93 97 bool prepare(); ///< prepare the ISO manager and all streams 98 99 bool init(); 94 100 95 101 void disablePolling(IsoStream *); ///< disables polling on a stream … … 144 150 bool rebuildFdMap(); 145 151 152 // threading 153 bool m_realtime; 154 unsigned int m_priority; 155 FreebobUtil::PosixThread *m_isoManagerThread; 156 157 146 158 // debug stuff 147 159 DECLARE_DEBUG_MODULE; branches/streaming-rework/src/libstreaming/IsoStream.cpp
r386 r390 119 119 } 120 120 121 121 122 } branches/streaming-rework/src/libstreaming/IsoStream.h
r383 r390 104 104 105 105 IsoHandler *m_handler; 106 106 107 107 DECLARE_DEBUG_MODULE; 108 108 branches/streaming-rework/src/libstreaming/StreamProcessor.cpp
r386 r390 31 31 #include "StreamProcessor.h" 32 32 #include "StreamProcessorManager.h" 33 #include "cycletimer.h" 33 34 34 35 #include <assert.h> … … 50 51 , m_disabled(true) 51 52 , m_is_disabled(true) 53 , m_cycle_to_enable_at(0) 52 54 , m_framecounter(0) 53 55 , m_SyncSource(NULL) … … 211 213 } 212 214 215 uint64_t StreamProcessor::getTimeNow() { 216 return m_handler->getCycleTimerTicks(); 217 } 218 219 213 220 bool StreamProcessor::isRunning() { 214 221 return m_running; 215 222 } 216 223 217 bool StreamProcessor::enable() { 218 int cnt=0; 224 bool StreamProcessor::enable(uint64_t time_to_enable_at) { 225 // FIXME: time_to_enable_at will be in 'time' not cycles 226 m_cycle_to_enable_at=time_to_enable_at; 219 227 220 228 if(!m_running) { 221 229 debugWarning("The StreamProcessor is not running yet, enable() might not be a good idea.\n"); 222 230 } 223 231 232 #ifdef DEBUG 233 uint64_t now_cycles=TICKS_TO_CYCLES(m_handler->getCycleTimerTicks()); 234 const int64_t max=(int64_t)(TICKS_PER_SECOND/2); 235 236 int64_t diff=m_cycle_to_enable_at-now_cycles; 237 238 if (diff > max) { 239 diff-=TICKS_PER_SECOND; 240 } else if (diff < -max) { 241 diff+=TICKS_PER_SECOND; 242 } 243 244 if (diff<0) { 245 debugWarning("Request to enable streamprocessor %d cycles ago.\n",diff); 246 } 247 #endif 248 224 249 m_disabled=false; 225 250 226 // now wait until it is effectively enabled227 // time-out at 100ms228 while(m_is_disabled && cnt++ < 1000) {229 usleep(100);230 }231 232 // check if the operation timed out233 if(cnt==1000) {234 debugWarning("Timeout when enabling StreamProcessor (%p)\n",this);235 return false;236 }237 238 251 return true; 239 252 } 240 253 241 254 bool StreamProcessor::disable() { 242 int cnt=0;243 255 244 256 m_disabled=true; 245 246 // now wait until it is effectively disabled 247 // time-out at 248 while(!m_is_disabled && cnt++ < 1000) { 249 usleep(100); 250 } 251 252 // check if the operation timed out (100ms) 253 if(cnt==1000) { 254 debugWarning("Timeout when disabling StreamProcessor (%p)\n",this); 255 return false; 256 } 257 257 258 258 return true; 259 259 branches/streaming-rework/src/libstreaming/StreamProcessor.h
r386 r390 81 81 82 82 bool isRunning(); ///< returns true if there is some stream data processed 83 bool enable( ); ///< enable the stream processing83 bool enable(uint64_t time_to_enable_at); ///< enable the stream processing 84 84 bool disable(); ///< disable the stream processing 85 85 bool isEnabled() {return !m_is_disabled;}; … … 122 122 bool m_disabled; 123 123 bool m_is_disabled; 124 unsigned int m_cycle_to_enable_at; 124 125 125 126 StreamStatistics m_PacketStat; … … 185 186 virtual uint64_t getTimeAtPeriod() = 0; 186 187 188 uint64_t getTimeNow(); 189 187 190 void getBufferHeadTimestamp(uint64_t *ts, uint64_t *fc); 188 191 void getBufferTailTimestamp(uint64_t *ts, uint64_t *fc); … … 194 197 bool setSyncSource(StreamProcessor *s); 195 198 float getTicksPerFrame() {return m_ticks_per_frame;}; 199 200 unsigned int getLastCycle() {return m_last_cycle;}; 196 201 197 202 private: … … 211 216 212 217 float m_ticks_per_frame; 218 219 unsigned int m_last_cycle; 213 220 214 221 private: branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp
r385 r390 32 32 #include <errno.h> 33 33 #include <assert.h> 34 35 #include "../libutil/PosixThread.h" 34 36 35 37 #include "libstreaming/cycletimer.h" … … 165 167 // the tread that runs the StreamProcessor 166 168 // checking the period boundaries 169 int prio=m_thread_priority+5; 170 if (prio>98) prio=98; 171 167 172 m_streamingThread=new FreebobUtil::PosixThread(this, 168 m_thread_realtime, m_thread_priority+5,173 m_thread_realtime, prio, 169 174 PTHREAD_CANCEL_DEFERRED); 170 175 … … 174 179 } 175 180 176 m_isoManager=new IsoHandlerManager( );181 m_isoManager=new IsoHandlerManager(m_thread_realtime, m_thread_priority); 177 182 178 183 if(!m_isoManager) { … … 181 186 } 182 187 188 // propagate the debug level 183 189 m_isoManager->setVerboseLevel(getDebugLevel()); 184 190 185 // the tread that keeps the handler's cycle timers up to date 186 // and performs the actual packet transfer 187 // needs high priority 188 m_isoManagerThread=new FreebobUtil::PosixThread( 189 m_isoManager, 190 m_thread_realtime, m_thread_priority+6, 191 PTHREAD_CANCEL_DEFERRED); 192 193 if(!m_isoManagerThread) { 194 debugFatal("Could not create iso manager thread\n"); 191 if(!m_isoManager->init()) { 192 debugFatal("Could not initialize IsoHandlerManager\n"); 195 193 return false; 196 194 } 197 195 196 m_xrun_happened=false; 197 198 198 return true; 199 199 } … … 204 204 205 205 // no xrun has occurred (yet) 206 m_xrun_happened=false;207 208 if(sem_init(&m_period_semaphore, 0, 0)) {209 debugFatal( "Cannot init period transfer semaphore\n");210 debugFatal( " Error: %s\n",strerror(errno));211 return false;212 }213 206 214 207 return true; … … 295 288 } 296 289 290 bool StreamProcessorManager::syncStartAll() { 291 292 debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessor streams to start running...\n"); 293 // we have to wait until all streamprocessors indicate that they are running 294 // i.e. that there is actually some data stream flowing 295 int wait_cycles=2000; // two seconds 296 bool notRunning=true; 297 while (notRunning && wait_cycles) { 298 wait_cycles--; 299 notRunning=false; 300 301 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 302 it != m_ReceiveProcessors.end(); 303 ++it ) { 304 if(!(*it)->isRunning()) notRunning=true; 305 } 306 307 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 308 it != m_TransmitProcessors.end(); 309 ++it ) { 310 if(!(*it)->isRunning()) notRunning=true; 311 } 312 usleep(1000); 313 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Running check: %d\n",notRunning); 314 } 315 316 if(!wait_cycles) { // timout has occurred 317 debugFatal("One or more streams are not starting up (timeout):\n"); 318 319 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 320 it != m_ReceiveProcessors.end(); 321 ++it ) { 322 if(!(*it)->isRunning()) { 323 debugFatal(" receive stream %p not running\n",*it); 324 } else { 325 debugFatal(" receive stream %p running\n",*it); 326 } 327 } 328 329 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 330 it != m_TransmitProcessors.end(); 331 ++it ) { 332 if(!(*it)->isRunning()) { 333 debugFatal(" transmit stream %p not running\n",*it); 334 } else { 335 debugFatal(" transmit stream %p running\n",*it); 336 } 337 } 338 339 return false; 340 } 341 342 // we want to make sure that everything is running well, 343 // so wait for a while 344 usleep(USECS_PER_CYCLE * CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL); 345 346 debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n"); 347 debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting StreamProcessors...\n"); 348 349 // now we reset the frame counters 350 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 351 it != m_ReceiveProcessors.end(); 352 ++it ) { 353 354 debugOutput( DEBUG_LEVEL_VERBOSE, "Before:\n"); 355 356 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { 357 (*it)->dumpInfo(); 358 } 359 360 (*it)->reset(); 361 362 debugOutput( DEBUG_LEVEL_VERBOSE, "After:\n"); 363 364 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { 365 (*it)->dumpInfo(); 366 } 367 368 } 369 370 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 371 it != m_TransmitProcessors.end(); 372 ++it ) { 373 374 debugOutput( DEBUG_LEVEL_VERBOSE, "Before:\n"); 375 376 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { 377 (*it)->dumpInfo(); 378 } 379 380 (*it)->reset(); 381 382 debugOutput( DEBUG_LEVEL_VERBOSE, "After:\n"); 383 384 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { 385 (*it)->dumpInfo(); 386 } 387 } 388 389 debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n"); 390 391 uint64_t now=m_SyncSource->getTimeNow(); // fixme: should be in usecs, not ticks 392 393 // FIXME: this should not be in cycles, but in 'time' 394 unsigned int enable_at=TICKS_TO_CYCLES(now)+300; 395 396 debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor...\n"); 397 if (!m_SyncSource->prepareForEnable()) { 398 debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n"); 399 return false; 400 } 401 402 m_SyncSource->enable(enable_at); 403 404 debugOutput( DEBUG_LEVEL_VERBOSE, " All StreamProcessors...\n"); 405 if (!enableStreamProcessors(enable_at)) { 406 debugFatal("Could not enable StreamProcessors...\n"); 407 return false; 408 } 409 410 return true; 411 } 412 297 413 bool StreamProcessorManager::start() { 298 414 debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n"); … … 348 464 debugOutput( DEBUG_LEVEL_VERBOSE, "Starting streaming threads...\n"); 349 465 350 // note: libraw1394 doesn't like it if you poll() and/or iterate() before351 // starting the streams.352 // start the runner thread353 // FIXME: maybe this should go into the isomanager itself.354 m_isoManagerThread->Start();355 356 466 // start the runner thread 357 467 // FIXME: not used anymore (for updatecycletimers ATM, but that's not good) 358 468 m_streamingThread->Start(); 359 469 360 debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to start running...\n"); 361 // we have to wait until all streamprocessors indicate that they are running 362 // i.e. that there is actually some data stream flowing 363 int wait_cycles=2000; // two seconds 364 bool notRunning=true; 365 while (notRunning && wait_cycles) { 366 wait_cycles--; 367 notRunning=false; 368 369 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 370 it != m_ReceiveProcessors.end(); 371 ++it ) { 372 if(!(*it)->isRunning()) notRunning=true; 373 } 374 375 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 376 it != m_TransmitProcessors.end(); 377 ++it ) { 378 if(!(*it)->isRunning()) notRunning=true; 379 } 380 usleep(1000); 381 } 382 383 if(!wait_cycles) { // timout has occurred 384 debugFatal("One or more streams are not starting up (timeout):\n"); 385 386 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 387 it != m_ReceiveProcessors.end(); 388 ++it ) { 389 if(!(*it)->isRunning()) { 390 debugFatal(" receive stream %p not running\n",*it); 391 } else { 392 debugFatal(" receive stream %p running\n",*it); 393 } 394 } 395 396 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 397 it != m_TransmitProcessors.end(); 398 ++it ) { 399 if(!(*it)->isRunning()) { 400 debugFatal(" transmit stream %p not running\n",*it); 401 } else { 402 debugFatal(" transmit stream %p running\n",*it); 403 } 404 } 405 return false; 406 } 407 408 // we want to make sure that everything is running well, 409 // so wait for a while 410 usleep(USECS_PER_CYCLE * CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL); 411 412 debugOutput( DEBUG_LEVEL_VERBOSE, "StreamProcessors running...\n"); 413 debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting counters...\n"); 414 415 // now we reset the frame counters 416 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 417 it != m_ReceiveProcessors.end(); 418 ++it ) { 419 420 debugOutput( DEBUG_LEVEL_VERBOSE, "Before:\n"); 421 422 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { 423 (*it)->dumpInfo(); 424 } 425 426 (*it)->reset(); 427 428 debugOutput( DEBUG_LEVEL_VERBOSE, "After:\n"); 429 430 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { 431 (*it)->dumpInfo(); 432 } 433 434 } 435 436 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 437 it != m_TransmitProcessors.end(); 438 ++it ) { 439 440 debugOutput( DEBUG_LEVEL_VERBOSE, "Before:\n"); 441 442 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { 443 (*it)->dumpInfo(); 444 } 445 446 (*it)->reset(); 447 448 debugOutput( DEBUG_LEVEL_VERBOSE, "After:\n"); 449 450 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { 451 (*it)->dumpInfo(); 452 } 453 } 454 455 debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n"); 456 457 debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor...\n"); 458 if (!m_SyncSource->prepareForEnable()) { 459 debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n"); 460 return false; 461 } 462 463 m_SyncSource->enable(); 464 465 debugOutput( DEBUG_LEVEL_VERBOSE, " All StreamProcessors...\n"); 466 if (!enableStreamProcessors()) { 467 debugFatal("Could not enable StreamProcessors...\n"); 470 // start all SP's synchonized 471 if (!syncStartAll()) { 472 debugFatal("Could not syncStartAll...\n"); 468 473 return false; 469 474 } … … 511 516 512 517 m_streamingThread->Stop(); 513 m_isoManagerThread->Stop();514 518 515 519 debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n"); … … 551 555 * @return true if successful, false otherwise 552 556 */ 553 bool StreamProcessorManager::enableStreamProcessors() { 554 // and we enable the streamprocessors 555 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 556 it != m_ReceiveProcessors.end(); 557 ++it ) { 558 (*it)->prepareForEnable(); 559 (*it)->enable(); 560 } 561 562 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 563 it != m_TransmitProcessors.end(); 564 ++it ) { 565 (*it)->prepareForEnable(); 566 (*it)->enable(); 567 } 568 return true; 557 bool StreamProcessorManager::enableStreamProcessors(unsigned int time_to_enable_at) { 558 // we prepare the streamprocessors for enable 559 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 560 it != m_ReceiveProcessors.end(); 561 ++it ) { 562 (*it)->prepareForEnable(); 563 } 564 565 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 566 it != m_TransmitProcessors.end(); 567 ++it ) { 568 (*it)->prepareForEnable(); 569 } 570 571 // then we enable the streamprocessors 572 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 573 it != m_ReceiveProcessors.end(); 574 ++it ) { 575 (*it)->enable(time_to_enable_at); 576 } 577 578 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 579 it != m_TransmitProcessors.end(); 580 ++it ) { 581 (*it)->enable(time_to_enable_at); 582 } 583 584 // now we wait for the SP's to get enabled 585 debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be enabled...\n"); 586 // we have to wait until all streamprocessors indicate that they are running 587 // i.e. that there is actually some data stream flowing 588 int wait_cycles=2000; // two seconds 589 bool notEnabled=true; 590 while (notEnabled && wait_cycles) { 591 wait_cycles--; 592 notEnabled=false; 593 594 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 595 it != m_ReceiveProcessors.end(); 596 ++it ) { 597 if(!(*it)->isEnabled()) notEnabled=true; 598 } 599 600 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 601 it != m_TransmitProcessors.end(); 602 ++it ) { 603 if(!(*it)->isEnabled()) notEnabled=true; 604 } 605 usleep(1000); // one cycle 606 } 607 608 if(!wait_cycles) { // timout has occurred 609 debugFatal("One or more streams couldn't be enabled (timeout):\n"); 610 611 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 612 it != m_ReceiveProcessors.end(); 613 ++it ) { 614 if(!(*it)->isEnabled()) { 615 debugFatal(" receive stream %p not enabled\n",*it); 616 } else { 617 debugFatal(" receive stream %p enabled\n",*it); 618 } 619 } 620 621 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 622 it != m_TransmitProcessors.end(); 623 ++it ) { 624 if(!(*it)->isEnabled()) { 625 debugFatal(" transmit stream %p not enabled\n",*it); 626 } else { 627 debugFatal(" transmit stream %p enabled\n",*it); 628 } 629 } 630 return false; 631 } 632 633 debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors enabled...\n"); 634 635 return true; 569 636 } 570 637 … … 574 641 */ 575 642 bool StreamProcessorManager::disableStreamProcessors() { 576 // and we disable the streamprocessors 577 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 578 it != m_ReceiveProcessors.end(); 579 ++it ) { 580 (*it)->prepareForDisable(); 581 (*it)->disable(); 582 } 583 584 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 585 it != m_TransmitProcessors.end(); 586 ++it ) { 587 (*it)->prepareForDisable(); 588 (*it)->disable(); 589 } 590 return true; 643 // we prepare the streamprocessors for disable 644 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 645 it != m_ReceiveProcessors.end(); 646 ++it ) { 647 (*it)->prepareForDisable(); 648 } 649 650 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 651 it != m_TransmitProcessors.end(); 652 ++it ) { 653 (*it)->prepareForDisable(); 654 } 655 656 // then we disable the streamprocessors 657 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 658 it != m_ReceiveProcessors.end(); 659 ++it ) { 660 (*it)->disable(); 661 } 662 663 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 664 it != m_TransmitProcessors.end(); 665 ++it ) { 666 (*it)->disable(); 667 } 668 669 // now we wait for the SP's to get disabled 670 debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be disabled...\n"); 671 // we have to wait until all streamprocessors indicate that they are running 672 // i.e. that there is actually some data stream flowing 673 int wait_cycles=2000; // two seconds 674 bool enabled=true; 675 while (enabled && wait_cycles) { 676 wait_cycles--; 677 enabled=false; 678 679 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 680 it != m_ReceiveProcessors.end(); 681 ++it ) { 682 if((*it)->isEnabled()) enabled=true; 683 } 684 685 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 686 it != m_TransmitProcessors.end(); 687 ++it ) { 688 if((*it)->isEnabled()) enabled=true; 689 } 690 usleep(1000); // one cycle 691 } 692 693 if(!wait_cycles) { // timout has occurred 694 debugFatal("One or more streams couldn't be disabled (timeout):\n"); 695 696 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 697 it != m_ReceiveProcessors.end(); 698 ++it ) { 699 if(!(*it)->isEnabled()) { 700 debugFatal(" receive stream %p not enabled\n",*it); 701 } else { 702 debugFatal(" receive stream %p enabled\n",*it); 703 } 704 } 705 706 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 707 it != m_TransmitProcessors.end(); 708 ++it ) { 709 if(!(*it)->isEnabled()) { 710 debugFatal(" transmit stream %p not enabled\n",*it); 711 } else { 712 debugFatal(" transmit stream %p enabled\n",*it); 713 } 714 } 715 return false; 716 } 717 718 debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors disabled...\n"); 719 720 return true; 591 721 } 592 722 … … 617 747 } 618 748 619 debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting Processors...\n"); 620 621 // now we reset the streamprocessors 622 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 623 it != m_ReceiveProcessors.end(); 624 ++it ) { 625 626 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { 627 (*it)->dumpInfo(); 628 } 629 630 (*it)->reset(); 631 632 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { 633 (*it)->dumpInfo(); 634 } 635 } 636 637 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 638 it != m_TransmitProcessors.end(); 639 ++it ) { 640 641 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { 642 (*it)->dumpInfo(); 643 } 644 645 (*it)->reset(); 646 647 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { 648 (*it)->dumpInfo(); 649 } 650 } 651 652 debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n"); 653 654 debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor...\n"); 655 if (!m_SyncSource->prepareForEnable()) { 656 debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n"); 657 return false; 658 } 659 660 m_SyncSource->enable(); 661 662 debugOutput( DEBUG_LEVEL_VERBOSE, " All StreamProcessors...\n"); 663 if (!enableStreamProcessors()) { 664 debugFatal("Could not enable StreamProcessors...\n"); 749 debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n"); 750 // start all SP's synchonized 751 if (!syncStartAll()) { 752 debugFatal("Could not syncStartAll...\n"); 665 753 return false; 666 754 } branches/streaming-rework/src/libstreaming/StreamProcessorManager.h
r384 r390 31 31 #include "../debugmodule/debugmodule.h" 32 32 #include "../libutil/Thread.h" 33 #include "../libutil/PosixThread.h"34 33 #include <semaphore.h> 35 34 #include "Port.h" … … 64 63 bool start(); 65 64 bool stop(); 66 65 66 bool syncStartAll(); 67 67 68 68 // this is the setup API … … 70 70 bool unregisterProcessor(StreamProcessor *processor); ///< stop managing a streamprocessor 71 71 72 bool enableStreamProcessors( ); /// enable registered StreamProcessors72 bool enableStreamProcessors(unsigned int time_to_enable_at); /// enable registered StreamProcessors 73 73 bool disableStreamProcessors(); /// disable registered StreamProcessors 74 74 … … 137 137 StreamProcessorVector m_ReceiveProcessors; 138 138 StreamProcessorVector m_TransmitProcessors; 139 140 141 139 142 140 unsigned int m_nb_buffers; … … 147 145 148 146 FreebobUtil::PosixThread *m_streamingThread; 149 FreebobUtil::PosixThread *m_isoManagerThread;150 147 151 148 unsigned int m_nbperiods; branches/streaming-rework/tests/test-sytmonitor.cpp
r384 r390 174 174 SytMonitor *monitors[128]; 175 175 int64_t stream_offset_ticks[128]; 176 176 177 177 struct arguments arguments; 178 178 … … 191 191 } 192 192 193 memset(&stream_offset_ticks,0,sizeof(unsigned int) * 128); 194 195 196 run=1; 197 198 run_realtime=arguments.realtime; 199 realtime_prio=arguments.rtprio; 200 201 signal (SIGINT, sighandler); 202 signal (SIGPIPE, sighandler); 203 204 debugOutput(DEBUG_LEVEL_NORMAL, "Freebob SYT monitor\n"); 205 206 m_isoManager=new IsoHandlerManager(); 207 208 if(!m_isoManager) { 209 debugOutput(DEBUG_LEVEL_NORMAL, "Could not create IsoHandlerManager\n"); 210 goto finish; 211 } 212 213 m_isoManager->setVerboseLevel(DEBUG_LEVEL_VERBOSE); 214 215 // the thread to execute the manager 216 m_isoManagerThread=new PosixThread( 217 m_isoManager, 218 run_realtime, realtime_prio, 219 PTHREAD_CANCEL_DEFERRED); 220 221 if(!m_isoManagerThread) { 222 debugOutput(DEBUG_LEVEL_NORMAL, "Could not create iso manager thread\n"); 223 goto finish; 224 } 193 memset(&stream_offset_ticks, 0, sizeof(int64_t) * 128); 194 195 run=1; 196 197 run_realtime=arguments.realtime; 198 realtime_prio=arguments.rtprio; 199 200 signal (SIGINT, sighandler); 201 signal (SIGPIPE, sighandler); 202 203 debugOutput(DEBUG_LEVEL_NORMAL, "Freebob SYT monitor\n"); 204 205 m_isoManager=new IsoHandlerManager(); 206 207 if(!m_isoManager) { 208 debugOutput(DEBUG_LEVEL_NORMAL, "Could not create IsoHandlerManager\n"); 209 goto finish; 210 } 211 212 m_isoManager->setVerboseLevel(DEBUG_LEVEL_VERBOSE); 213 214 // the thread to execute the manager 215 m_isoManagerThread=new PosixThread( 216 m_isoManager, 217 run_realtime, realtime_prio, 218 PTHREAD_CANCEL_DEFERRED); 219 220 if(!m_isoManagerThread) { 221 debugOutput(DEBUG_LEVEL_NORMAL, "Could not create iso manager thread\n"); 222 goto finish; 223 } 225 224 226 225 // register monitors … … 354 353 // average out the offset 355 354 int64_t err=(((uint64_t)master_cif.pres_ticks) - ((uint64_t)cif.pres_ticks)); 355 356 debugOutput(DEBUG_LEVEL_NORMAL,"Diff for %d at cycle %04d: %6lld (MTS: %11llu | STS: %11llu\n", 357 i,cif.cycle,err, master_cif.pres_ticks, cif.pres_ticks); 356 358 357 359 err = err - stream_offset_ticks[i];