- Timestamp:
- 11/22/07 06:43:39 (15 years ago)
- Files:
-
- branches/ppalmers-streaming/src/ffado_streaming.cpp (modified) (1 diff)
- branches/ppalmers-streaming/src/genericavc/avc_avdevice.cpp (modified) (1 diff)
- branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.cpp (modified) (7 diffs)
- branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.h (modified) (3 diffs)
- branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.cpp (modified) (11 diffs)
- branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.h (modified) (1 diff)
- branches/ppalmers-streaming/src/libstreaming/generic/IsoStream.cpp (modified) (2 diffs)
- branches/ppalmers-streaming/src/libstreaming/generic/IsoStream.h (modified) (2 diffs)
- branches/ppalmers-streaming/src/libstreaming/generic/Port.cpp (modified) (2 diffs)
- branches/ppalmers-streaming/src/libstreaming/generic/PortManager.cpp (modified) (1 diff)
- branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.cpp (modified) (6 diffs)
- branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.h (modified) (10 diffs)
- branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp (modified) (29 diffs)
- branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.h (modified) (3 diffs)
- branches/ppalmers-streaming/src/libstreaming/util/IsoHandler.cpp (modified) (1 diff)
- branches/ppalmers-streaming/src/libstreaming/util/IsoHandlerManager.cpp (modified) (1 diff)
- branches/ppalmers-streaming/src/libutil/TimestampedBuffer.cpp (modified) (4 diffs)
- branches/ppalmers-streaming/src/libutil/TimestampedBuffer.h (modified) (1 diff)
- branches/ppalmers-streaming/tests/test-cycletimer.cpp (modified) (1 diff)
- branches/ppalmers-streaming/tests/test-sytmonitor.cpp (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
branches/ppalmers-streaming/src/ffado_streaming.cpp
r715 r719 340 340 341 341 int ffado_streaming_transfer_playback_buffers(ffado_device_t *dev) { 342 return dev->processorManager->transfer(StreamProcessor::ePT_ Receive);342 return dev->processorManager->transfer(StreamProcessor::ePT_Transmit); 343 343 } 344 344 branches/ppalmers-streaming/src/genericavc/avc_avdevice.cpp
r715 r719 592 592 int 593 593 AvDevice::getStreamCount() { 594 return m_receiveProcessors.size() + m_transmitProcessors.size(); 594 //return m_receiveProcessors.size() + m_transmitProcessors.size(); 595 return 1; 595 596 } 596 597 branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.cpp
r715 r719 45 45 : StreamProcessor(ePT_Receive , port) 46 46 , m_dimension(dimension) 47 , m_last_timestamp(0)48 , m_last_timestamp2(0)49 , m_dropped(0)50 47 {} 51 48 52 bool AmdtpReceiveStreamProcessor::init() { 53 54 // call the parent init 55 // this has to be done before allocating the buffers, 56 // because this sets the buffersizes from the processormanager 57 if(!StreamProcessor::init()) { 58 debugFatal("Could not do base class init (%d)\n",this); 59 return false; 60 } 61 return true; 62 } 63 64 enum raw1394_iso_disposition 65 AmdtpReceiveStreamProcessor::putPacket(unsigned char *data, unsigned int length, 66 unsigned char channel, unsigned char tag, unsigned char sy, 67 unsigned int cycle, unsigned int dropped) { 68 69 enum raw1394_iso_disposition retval=RAW1394_ISO_OK; 70 71 int dropped_cycles=diffCycles(cycle, m_last_cycle) - 1; 72 if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles); 73 else m_dropped += dropped_cycles; 74 if (dropped_cycles > 0) debugWarning("(%p) dropped %d packets on cycle %u\n", this, dropped_cycles, cycle); 75 76 m_last_cycle=cycle; 77 78 struct iec61883_packet *packet = (struct iec61883_packet *) data; 79 assert(packet); 80 81 #ifdef DEBUG 82 if(dropped>0) { 83 debugWarning("(%p) Dropped %d packets on cycle %d\n", this, dropped, cycle); 84 } 85 86 debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"ch%2u: CY=%4u, SYT=%08X (%4ucy + %04uticks) (running=%d)\n", 87 channel, cycle, ntohs(packet->syt), 88 CYCLE_TIMER_GET_CYCLES(ntohs(packet->syt)), CYCLE_TIMER_GET_OFFSET(ntohs(packet->syt)), 89 m_running); 90 91 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, 92 "RCV: CH = %d, FDF = %X. SYT = %6d, DBS = %3d, DBC = %3d, FMT = %3d, LEN = %4d\n", 93 channel, packet->fdf, 94 packet->syt, 95 packet->dbs, 96 packet->dbc, 97 packet->fmt, 98 length); 99 100 #endif 101 102 // check if this is a valid packet 103 if((packet->syt != 0xFFFF) 104 && (packet->fdf != 0xFF) 105 && (packet->fmt == 0x10) 106 && (packet->dbs>0) 107 && (length>=2*sizeof(quadlet_t))) { 108 109 unsigned int nevents=((length / sizeof (quadlet_t)) - 2)/packet->dbs; 110 111 //=> store the previous timestamp 112 m_last_timestamp2=m_last_timestamp; 113 114 uint64_t nowX = m_handler->getCycleTimer(); 115 //=> convert the SYT to a full timestamp in ticks 116 m_last_timestamp=sytRecvToFullTicks((uint32_t)ntohs(packet->syt), 117 cycle, nowX); 118 119 int64_t diffx = diffTicks(m_last_timestamp, m_last_timestamp2); 120 if (abs(diffx) > m_syt_interval * m_data_buffer->getRate() * 1.1) { 121 uint32_t now=m_handler->getCycleTimer(); 122 uint32_t syt = (uint32_t)ntohs(packet->syt); 123 uint32_t now_ticks=CYCLE_TIMER_TO_TICKS(now); 124 125 debugOutput(DEBUG_LEVEL_VERBOSE, "diff=%06lld TS=%011llu TS2=%011llu\n", 126 diffx, m_last_timestamp, m_last_timestamp2); 127 debugOutput(DEBUG_LEVEL_VERBOSE, "[1] cy=%04d dropped=%05llu syt=%04llX NOW=%08llX => TS=%011llu\n", 128 m_last_good_cycle, m_last_dropped, m_last_syt, m_last_now, m_last_timestamp2); 129 debugOutput(DEBUG_LEVEL_VERBOSE, "[2] cy=%04d dropped=%05d syt=%04X NOW=%08llX => TS=%011llu\n", 130 cycle, dropped_cycles, ntohs(packet->syt), nowX, m_last_timestamp); 131 132 uint32_t test_ts=sytRecvToFullTicks(syt, cycle, now); 133 134 debugOutput(DEBUG_LEVEL_VERBOSE, "R %04d: SYT=%08X, CY=%04d OFF=%04d\n", 135 cycle, syt, CYCLE_TIMER_GET_CYCLES(syt), CYCLE_TIMER_GET_OFFSET(syt) 136 ); 137 debugOutput(DEBUG_LEVEL_VERBOSE, "R %04d: NOW=%011lu, SEC=%03u CY=%04u OFF=%04u\n", 138 cycle, now_ticks, CYCLE_TIMER_GET_SECS(now), CYCLE_TIMER_GET_CYCLES(now), CYCLE_TIMER_GET_OFFSET(now) 139 ); 140 debugOutput(DEBUG_LEVEL_VERBOSE, "R %04d: TSS=%011lu, SEC=%03u CY=%04u OFF=%04u\n", 141 cycle, test_ts, TICKS_TO_SECS(test_ts), TICKS_TO_CYCLES(test_ts), TICKS_TO_OFFSET(test_ts) 142 ); 143 144 int64_t diff_ts = diffTicks(now_ticks, test_ts); 145 debugOutput(DEBUG_LEVEL_VERBOSE, "DIFF : TCK=%011lld, SEC=%03llu CY=%04llu OFF=%04llu\n", 146 diff_ts, 147 TICKS_TO_SECS((uint64_t)diff_ts), 148 TICKS_TO_CYCLES((uint64_t)diff_ts), 149 TICKS_TO_OFFSET((uint64_t)diff_ts) 150 ); 151 } 152 m_last_syt = ntohs(packet->syt); 153 m_last_now = nowX; 154 m_last_good_cycle = cycle; 155 m_last_dropped = dropped_cycles; 156 157 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "RECV: CY=%04u TS=%011llu\n", 158 cycle, m_last_timestamp); 159 160 // we have to keep in mind that there are also 161 // some packets buffered by the ISO layer, 162 // at most x=m_handler->getWakeupInterval() 163 // these contain at most x*syt_interval 164 // frames, meaning that we might receive 165 // this packet x*syt_interval*ticks_per_frame 166 // later than expected (the real receive time) 167 debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"STMP: %lluticks | buff=%d, syt_interval=%d, tpf=%f\n", 168 m_last_timestamp, m_handler->getWakeupInterval(),m_syt_interval,getTicksPerFrame()); 169 170 //=> signal that we're running (if we are) 171 if(!m_running && nevents && m_last_timestamp2 && m_last_timestamp) { 172 debugOutput(DEBUG_LEVEL_VERBOSE,"Receive StreamProcessor %p started running at %d\n", this, cycle); 173 m_running=true; 174 m_data_buffer->setBufferTailTimestamp(m_last_timestamp); 175 // we don't want this first sample to be written 176 return RAW1394_ISO_OK; 177 } 178 179 // if we are not running yet, there is nothing more to do 180 if(!m_running) { 181 return RAW1394_ISO_OK; 182 } 183 #ifdef DEBUG_OFF 184 if((cycle % 1000) == 0) { 185 uint32_t now=m_handler->getCycleTimer(); 186 uint32_t syt = (uint32_t)ntohs(packet->syt); 187 uint32_t now_ticks=CYCLE_TIMER_TO_TICKS(now); 188 189 uint32_t test_ts=sytRecvToFullTicks(syt, cycle, now); 190 191 debugOutput(DEBUG_LEVEL_VERBOSE, "R %04d: SYT=%08X, CY=%02d OFF=%04d\n", 192 cycle, syt, CYCLE_TIMER_GET_CYCLES(syt), CYCLE_TIMER_GET_OFFSET(syt) 193 ); 194 debugOutput(DEBUG_LEVEL_VERBOSE, "R %04d: NOW=%011lu, SEC=%03u CY=%02u OFF=%04u\n", 195 cycle, now_ticks, CYCLE_TIMER_GET_SECS(now), CYCLE_TIMER_GET_CYCLES(now), CYCLE_TIMER_GET_OFFSET(now) 196 ); 197 debugOutput(DEBUG_LEVEL_VERBOSE, "R %04d: TSS=%011lu, SEC=%03u CY=%02u OFF=%04u\n", 198 cycle, test_ts, TICKS_TO_SECS(test_ts), TICKS_TO_CYCLES(test_ts), TICKS_TO_OFFSET(test_ts) 199 ); 200 } 201 #endif 202 203 #ifdef DEBUG 204 // keep track of the lag 205 uint32_t now=m_handler->getCycleTimer(); 206 int32_t diff = diffCycles( cycle, ((int)CYCLE_TIMER_GET_CYCLES(now)) ); 207 m_PacketStat.mark(diff); 208 #endif 209 210 //=> process the packet 211 // add the data payload to the ringbuffer 212 213 if(dropped_cycles) { 214 debugWarning("(%p) Correcting timestamp for dropped cycles, discarding packet...\n", this); 215 m_data_buffer->setBufferTailTimestamp(m_last_timestamp); 216 // we don't want this first sample to be written 217 return RAW1394_ISO_OK; 218 } 219 220 if(m_data_buffer->writeFrames(nevents, (char *)(data+8), m_last_timestamp)) { 221 retval=RAW1394_ISO_OK; 222 223 // process all ports that should be handled on a per-packet base 224 // this is MIDI for AMDTP (due to the need of DBC) 225 if (!decodePacketPorts((quadlet_t *)(data+8), nevents, packet->dbc)) { 226 debugWarning("Problem decoding Packet Ports\n"); 227 retval=RAW1394_ISO_DEFER; 228 } 229 230 } else { 231 232 // debugWarning("Receive buffer overrun (cycle %d, FC=%d, PC=%d)\n", 233 // cycle, m_data_buffer->getFrameCounter(), m_handler->getPacketCount()); 234 235 m_xruns++; 236 237 retval=RAW1394_ISO_DEFER; 238 } 239 } 240 241 return retval; 242 } 243 244 void AmdtpReceiveStreamProcessor::dumpInfo() { 245 StreamProcessor::dumpInfo(); 246 } 247 248 bool AmdtpReceiveStreamProcessor::reset() { 249 250 debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting...\n"); 251 252 m_PeriodStat.reset(); 253 m_PacketStat.reset(); 254 m_WakeupStat.reset(); 255 256 m_data_buffer->setTickOffset(0); 257 258 // reset all non-device specific stuff 259 // i.e. the iso stream and the associated ports 260 if(!StreamProcessor::reset()) { 261 debugFatal("Could not do base class reset\n"); 262 return false; 263 } 264 return true; 265 } 266 267 bool AmdtpReceiveStreamProcessor::prepare() { 49 50 unsigned int 51 AmdtpReceiveStreamProcessor::getPacketsPerPeriod() 52 { 53 return (m_manager->getPeriodSize())/m_syt_interval; 54 } 55 56 bool AmdtpReceiveStreamProcessor::prepareChild() { 268 57 269 58 m_PeriodStat.setName("RCV PERIOD"); … … 273 62 debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing (%p)...\n", this); 274 63 275 // prepare all non-device specific stuff276 // i.e. the iso stream and the associated ports277 if(!StreamProcessor::prepare()) {278 debugFatal("Could not prepare base class\n");279 return false;280 }281 282 64 switch (m_manager->getNominalRate()) { 283 case 32000: 284 m_syt_interval = 8; 285 break; 286 case 44100: 287 m_syt_interval = 8; 288 break; 289 default: 290 case 48000: 291 m_syt_interval = 8; 292 break; 293 case 88200: 294 m_syt_interval = 16; 295 break; 296 case 96000: 297 m_syt_interval = 16; 298 break; 299 case 176400: 300 m_syt_interval = 32; 301 break; 302 case 192000: 303 m_syt_interval = 32; 304 break; 305 } 306 307 // prepare the framerate estimate 308 float ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_manager->getNominalRate()); 309 m_ticks_per_frame=ticks_per_frame; 310 311 debugOutput(DEBUG_LEVEL_VERBOSE,"Initializing remote ticks/frame to %f\n",ticks_per_frame); 312 313 // initialize internal buffer 314 unsigned int ringbuffer_size_frames=m_manager->getNbBuffers() * m_manager->getPeriodSize(); 315 316 assert(m_data_buffer); 317 m_data_buffer->setBufferSize(ringbuffer_size_frames * 2); 318 m_data_buffer->setEventSize(sizeof(quadlet_t)); 319 m_data_buffer->setEventsPerFrame(m_dimension); 320 321 // the buffer is written every syt_interval 322 m_data_buffer->setUpdatePeriod(m_syt_interval); 323 m_data_buffer->setNominalRate(ticks_per_frame); 324 325 m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND); 326 327 m_data_buffer->prepare(); 328 329 // set the parameters of ports we can: 330 // we want the audio ports to be period buffered, 331 // and the midi ports to be packet buffered 332 for ( PortVectorIterator it = m_Ports.begin(); 333 it != m_Ports.end(); 334 ++it ) 335 { 336 debugOutput(DEBUG_LEVEL_VERBOSE, "Setting up port %s\n",(*it)->getName().c_str()); 337 if(!(*it)->setBufferSize(m_manager->getPeriodSize())) { 338 debugFatal("Could not set buffer size to %d\n",m_manager->getPeriodSize()); 65 case 32000: 66 case 44100: 67 case 48000: 68 m_syt_interval = 8; 69 break; 70 case 88200: 71 case 96000: 72 m_syt_interval = 16; 73 break; 74 case 176400: 75 case 192000: 76 m_syt_interval = 32; 77 break; 78 default: 79 debugError("Unsupported rate: %d\n", m_manager->getNominalRate()); 339 80 return false; 340 }341 342 switch ((*it)->getPortType()) {343 case Port::E_Audio:344 if(!(*it)->setSignalType(Port::E_PeriodSignalled)) {345 debugFatal("Could not set signal type to PeriodSignalling");346 return false;347 }348 // buffertype and datatype are dependant on the API349 debugWarning("---------------- ! Doing hardcoded dummy setup ! --------------\n");350 // buffertype and datatype are dependant on the API351 if(!(*it)->setBufferType(Port::E_PointerBuffer)) {352 debugFatal("Could not set buffer type");353 return false;354 }355 if(!(*it)->useExternalBuffer(true)) {356 debugFatal("Could not set external buffer usage");357 return false;358 }359 if(!(*it)->setDataType(Port::E_Float)) {360 debugFatal("Could not set data type");361 return false;362 }363 break;364 case Port::E_Midi:365 if(!(*it)->setSignalType(Port::E_PacketSignalled)) {366 debugFatal("Could not set signal type to PacketSignalling");367 return false;368 }369 // buffertype and datatype are dependant on the API370 // buffertype and datatype are dependant on the API371 debugWarning("---------------- ! Doing hardcoded test setup ! --------------\n");372 // buffertype and datatype are dependant on the API373 if(!(*it)->setBufferType(Port::E_RingBuffer)) {374 debugFatal("Could not set buffer type");375 return false;376 }377 if(!(*it)->setDataType(Port::E_MidiEvent)) {378 debugFatal("Could not set data type");379 return false;380 }381 break;382 default:383 debugWarning("Unsupported port type specified\n");384 break;385 }386 }387 388 // the API specific settings of the ports should already be set,389 // as this is called from the processorManager->prepare()390 // so we can init the ports391 if(!initPorts()) {392 debugFatal("Could not initialize ports!\n");393 return false;394 }395 396 if(!preparePorts()) {397 debugFatal("Could not initialize ports!\n");398 return false;399 81 } 400 82 401 83 debugOutput( DEBUG_LEVEL_VERBOSE, "Prepared for:\n"); 402 84 debugOutput( DEBUG_LEVEL_VERBOSE, " Samplerate: %d, DBS: %d, SYT: %d\n", 403 m_manager->getNominalRate(), m_dimension,m_syt_interval);85 m_manager->getNominalRate(), m_dimension, m_syt_interval); 404 86 debugOutput( DEBUG_LEVEL_VERBOSE, " PeriodSize: %d, NbBuffers: %d\n", 405 87 m_manager->getPeriodSize(), m_manager->getNbBuffers()); … … 408 90 409 91 return true; 410 411 } 412 413 bool AmdtpReceiveStreamProcessor::prepareForStart() { 414 disable(); 415 return true; 416 } 417 418 bool AmdtpReceiveStreamProcessor::prepareForStop() { 419 disable(); 420 return true; 421 } 422 423 unsigned int 424 AmdtpReceiveStreamProcessor::getPacketsPerPeriod() 425 { 426 return (m_manager->getPeriodSize())/m_syt_interval; 427 } 428 429 bool AmdtpReceiveStreamProcessor::getFrames(unsigned int nbframes, int64_t ts) { 430 m_PeriodStat.mark(m_data_buffer->getBufferFill()); 431 432 #ifdef DEBUG 433 uint64_t ts_head; 434 signed int fc; 435 int32_t lag_ticks; 436 float lag_frames; 437 438 // in order to sync up multiple received streams, we should 439 // use the ts parameter. It specifies the time of the block's 440 // first sample. 441 442 ffado_timestamp_t ts_head_tmp; 443 m_data_buffer->getBufferHeadTimestamp(&ts_head_tmp, &fc); 444 ts_head=(uint64_t)ts_head_tmp; 445 lag_ticks=diffTicks(ts, ts_head); 446 float rate=m_data_buffer->getRate(); 447 448 assert(rate!=0.0); 449 450 lag_frames=(((float)lag_ticks)/rate); 451 452 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): drifts %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n", 453 this, lag_ticks, lag_frames,rate, ts, ts_head, fc); 454 455 if (lag_frames>=1.0) { 456 // the stream lags 457 debugWarning( "stream (%p): lags with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n", 458 this, lag_ticks, lag_frames,rate, ts, ts_head, fc); 459 } else if (lag_frames<=-1.0) { 460 // the stream leads 461 debugWarning( "stream (%p): leads with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n", 462 this, lag_ticks, lag_frames,rate, ts, ts_head, fc); 463 } 464 #endif 465 // ask the buffer to process nbframes of frames 466 // using it's registered client's processReadBlock(), 467 // which should be ours 468 m_data_buffer->blockProcessReadFrames(nbframes); 469 470 return true; 471 } 472 473 bool AmdtpReceiveStreamProcessor::getFramesDry(unsigned int nbframes, int64_t ts) { 474 m_PeriodStat.mark(m_data_buffer->getBufferFill()); 475 int frames_to_ditch=(int)(nbframes); 476 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): dry run %d frames (@ ts=%lld)\n", 477 this, frames_to_ditch, ts); 478 char dummy[m_data_buffer->getBytesPerFrame()]; // one frame of garbage 479 480 while (frames_to_ditch--) { 481 m_data_buffer->readFrames(1, dummy); 482 } 483 return true; 484 } 485 486 /** 487 * \brief write received events to the stream ringbuffers. 92 } 93 94 95 /** 96 * Processes packet header to extract timestamps and so on 97 * @param data 98 * @param length 99 * @param channel 100 * @param tag 101 * @param sy 102 * @param cycle 103 * @param dropped 104 * @return true if this is a valid packet, false if not 105 */ 106 bool 107 AmdtpReceiveStreamProcessor::processPacketHeader(unsigned char *data, unsigned int length, 108 unsigned char channel, unsigned char tag, unsigned char sy, 109 unsigned int cycle, unsigned int dropped) 110 { 111 struct iec61883_packet *packet = (struct iec61883_packet *) data; 112 assert(packet); 113 bool retval = (packet->syt != 0xFFFF) && 114 (packet->fdf != 0xFF) && 115 (packet->fmt == 0x10) && 116 (packet->dbs > 0) && 117 (length >= 2*sizeof(quadlet_t)); 118 if(retval) { 119 uint64_t now = m_handler->getCycleTimer(); 120 //=> convert the SYT to a full timestamp in ticks 121 m_last_timestamp = sytRecvToFullTicks((uint32_t)ntohs(packet->syt), 122 cycle, now); 123 } 124 return retval; 125 } 126 127 /** 128 * extract the data from the packet 129 * @pre the IEC61883 packet is valid according to isValidPacket 130 * @param data 131 * @param length 132 * @param channel 133 * @param tag 134 * @param sy 135 * @param cycle 136 * @param dropped 137 * @return true if successful, false if xrun 138 */ 139 bool 140 AmdtpReceiveStreamProcessor::processPacketData(unsigned char *data, unsigned int length, 141 unsigned char channel, unsigned char tag, unsigned char sy, 142 unsigned int cycle, unsigned int dropped_cycles) { 143 struct iec61883_packet *packet = (struct iec61883_packet *) data; 144 assert(packet); 145 146 unsigned int nevents=((length / sizeof (quadlet_t)) - 2)/packet->dbs; 147 148 // we have to keep in mind that there are also 149 // some packets buffered by the ISO layer, 150 // at most x=m_handler->getWakeupInterval() 151 // these contain at most x*syt_interval 152 // frames, meaning that we might receive 153 // this packet x*syt_interval*ticks_per_frame 154 // later than expected (the real receive time) 155 debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"STMP: %lluticks | buff=%d, syt_interval=%d, tpf=%f\n", 156 m_last_timestamp, m_handler->getWakeupInterval(), m_syt_interval, getTicksPerFrame()); 157 158 if(m_data_buffer->writeFrames(nevents, (char *)(data+8), m_last_timestamp)) { 159 // process all ports that should be handled on a per-packet base 160 // this is MIDI for AMDTP (due to the need of DBC) 161 if (!decodePacketPorts((quadlet_t *)(data+8), nevents, packet->dbc)) { 162 debugWarning("Problem decoding Packet Ports\n"); 163 } 164 return true; 165 } else { 166 return false; 167 } 168 } 169 170 /*********************************************** 171 * Encoding/Decoding API * 172 ***********************************************/ 173 /** 174 * @brief write received events to the stream ringbuffers. 488 175 */ 489 176 bool AmdtpReceiveStreamProcessor::processReadBlock(char *data, … … 498 185 ++it ) 499 186 { 500 501 187 if((*it)->isDisabled()) {continue;}; 502 188 … … 523 209 } 524 210 return no_problem; 525 211 } 212 213 /** 214 * @brief write silence events to the stream ringbuffers. 215 */ 216 bool AmdtpReceiveStreamProcessor::provideSilenceBlock(unsigned int nevents, unsigned int offset) 217 { 218 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "(%p)->proviceSilenceBlock(%u, %u)\n", this, nevents, offset); 219 220 bool no_problem=true; 221 222 for ( PortVectorIterator it = m_PeriodPorts.begin(); 223 it != m_PeriodPorts.end(); 224 ++it ) 225 { 226 if((*it)->isDisabled()) {continue;}; 227 //FIXME: make this into a static_cast when not DEBUG? 228 AmdtpPortInfo *pinfo=dynamic_cast<AmdtpPortInfo *>(*it); 229 assert(pinfo); // this should not fail!! 230 231 switch(pinfo->getFormat()) { 232 case AmdtpPortInfo::E_MBLA: 233 if(provideSilenceToPort(static_cast<AmdtpAudioPort *>(*it), offset, nevents)) { 234 debugWarning("Could not put silence into to port %s",(*it)->getName().c_str()); 235 no_problem=false; 236 } 237 break; 238 case AmdtpPortInfo::E_SPDIF: // still unimplemented 239 break; 240 /* for this processor, midi is a packet based port 241 case AmdtpPortInfo::E_Midi: 242 break;*/ 243 default: // ignore 244 break; 245 } 246 } 247 return no_problem; 526 248 } 527 249 … … 583 305 } 584 306 585 int AmdtpReceiveStreamProcessor::decodeMBLAEventsToPort(AmdtpAudioPort *p, quadlet_t *data, 307 int 308 AmdtpReceiveStreamProcessor::decodeMBLAEventsToPort( 309 AmdtpAudioPort *p, quadlet_t *data, 586 310 unsigned int offset, unsigned int nevents) 587 311 { 588 312 unsigned int j=0; 589 590 // printf("****************\n");591 // hexDumpQuadlets(data,m_dimension*4);592 // printf("****************\n");593 594 313 quadlet_t *target_event; 595 314 … … 640 359 } 641 360 361 int 362 AmdtpReceiveStreamProcessor::provideSilenceToPort( 363 AmdtpAudioPort *p, unsigned int offset, unsigned int nevents) 364 { 365 unsigned int j=0; 366 switch(p->getDataType()) { 367 default: 368 case Port::E_Int24: 369 { 370 quadlet_t *buffer=(quadlet_t *)(p->getBufferAddress()); 371 assert(nevents + offset <= p->getBufferSize()); 372 buffer+=offset; 373 374 for(j = 0; j < nevents; j += 1) { // decode max nsamples 375 *(buffer)=0; 376 buffer++; 377 } 378 } 379 break; 380 case Port::E_Float: 381 { 382 float *buffer=(float *)(p->getBufferAddress()); 383 assert(nevents + offset <= p->getBufferSize()); 384 buffer+=offset; 385 386 for(j = 0; j < nevents; j += 1) { // decode max nsamples 387 *buffer = 0.0; 388 buffer++; 389 } 390 } 391 break; 392 } 393 return 0; 394 } 395 642 396 } // end of namespace Streaming branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.h
r715 r719 72 72 * Create a AMDTP receive StreamProcessor 73 73 * @param port 1394 port 74 * @param framerate frame rate75 74 * @param dimension number of substreams in the ISO stream 76 75 * (midi-muxed is only one stream) … … 79 78 virtual ~AmdtpReceiveStreamProcessor() {}; 80 79 81 enum raw1394_iso_disposition putPacket(unsigned char *data, unsigned int length, 80 bool processPacketHeader(unsigned char *data, unsigned int length, 81 unsigned char channel, unsigned char tag, unsigned char sy, 82 unsigned int cycle, unsigned int dropped); 83 bool processPacketData(unsigned char *data, unsigned int length, 82 84 unsigned char channel, unsigned char tag, unsigned char sy, 83 85 unsigned int cycle, unsigned int dropped); 84 86 87 virtual bool prepareChild(); 85 88 86 bool init(); 87 bool reset();88 bool prepare();89 90 bool prepareForStop();91 bool prepareForStart();92 93 bool getFrames(unsigned int nbframes, int64_t ts); ///< transfer the buffer contents to the client94 bool getFramesDry(unsigned int nbframes, int64_t ts);89 public: 90 virtual unsigned int getEventSize() 91 {return 4;}; 92 virtual unsigned int getMaxPacketSize() 93 {return 4 * (2 + m_syt_interval * m_dimension);}; 94 virtual unsigned int getEventsPerFrame() 95 { return m_dimension; }; 96 virtual unsigned int getUpdatePeriod() 97 {return m_syt_interval;}; 95 98 96 99 // We have 1 period of samples = m_period … … 101 104 // however, if we only count the number of used packets 102 105 // it is m_period / m_syt_interval 103 unsigned int getPacketsPerPeriod();106 virtual unsigned int getPacketsPerPeriod(); 104 107 105 unsigned int getMaxPacketSize() {return 4 * (2 + m_syt_interval * m_dimension);};106 107 void dumpInfo();108 108 protected: 109 109 110 110 bool processReadBlock(char *data, unsigned int nevents, unsigned int offset); 111 bool provideSilenceBlock(unsigned int nevents, unsigned int offset); 111 112 112 113 bool decodePacketPorts(quadlet_t *data, unsigned int nevents, unsigned int dbc); 113 114 114 115 int decodeMBLAEventsToPort(AmdtpAudioPort *, quadlet_t *data, unsigned int offset, unsigned int nevents); 115 void updatePreparedState();116 int provideSilenceToPort(AmdtpAudioPort *p, unsigned int offset, unsigned int nevents); 116 117 117 118 int m_dimension; 118 119 unsigned int m_syt_interval; 119 120 120 uint64_t m_dropped; /// FIXME:debug121 uint64_t m_last_dropped; /// FIXME:debug122 121 uint64_t m_last_syt; /// FIXME:debug 123 122 uint64_t m_last_now; /// FIXME:debug 124 int m_last_good_cycle; /// FIXME:debug125 uint64_t m_last_timestamp; /// last timestamp (in ticks)126 uint64_t m_last_timestamp2; /// last timestamp (in ticks)127 uint64_t m_last_timestamp_at_period_ticks;128 123 }; 129 124 branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.cpp
r715 r719 49 49 {} 50 50 51 /**52 * @return53 */54 bool AmdtpTransmitStreamProcessor::init() {55 56 debugOutput( DEBUG_LEVEL_VERBOSE, "Initializing (%p)...\n", this);57 // call the parent init58 // this has to be done before allocating the buffers,59 // because this sets the buffersizes from the processormanager60 if(!StreamProcessor::init()) {61 debugFatal("Could not do base class init (%p)\n",this);62 return false;63 }64 return true;65 }66 67 51 enum raw1394_iso_disposition 68 52 AmdtpTransmitStreamProcessor::getPacket(unsigned char *data, unsigned int *length, … … 73 57 if (cycle<0) { 74 58 debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE,"Xmit handler for cycle %d, (running=%d)\n", 75 cycle, m_running);59 cycle, isRunning()); 76 60 *tag = 0; 77 61 *sy = 0; … … 81 65 82 66 debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE,"Xmit handler for cycle %d, (running=%d)\n", 83 cycle, m_running);67 cycle, isRunning()); 84 68 85 69 if (addCycles(m_last_cycle, 1) != cycle) { … … 99 83 /* Our node ID can change after a bus reset, so it is best to fetch 100 84 * our node ID for each packet. */ 101 packet->sid = getNodeId() & 0x3f;85 packet->sid = m_handler->getLocalNodeId() & 0x3f; 102 86 103 87 packet->dbs = m_dimension; … … 126 110 127 111 #ifdef DEBUG 128 if( m_running&& (cycle_diff < 0)) {112 if(isRunning() && (cycle_diff < 0)) { 129 113 debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy)\n", 130 114 cycle, now_cycles); … … 139 123 // to be 'running' 140 124 // NOTE: this works only at startup 141 if (! m_running&& cycle_diff >= 0 && cycle >= 0) {125 if (!isRunning() && cycle_diff >= 0 && cycle >= 0) { 142 126 debugOutput(DEBUG_LEVEL_VERBOSE, "Xmit StreamProcessor %p started running at cycle %d\n",this, cycle); 143 m_running=true;144 127 } 145 128 … … 169 152 const int max_cycles_to_transmit_early = 5; 170 153 171 if( ! m_running|| !m_data_buffer->isEnabled() ) {154 if( !isRunning() || !m_data_buffer->isEnabled() ) { 172 155 debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, 173 156 "Not running (%d) or buffer not enabled (enabled=%d)\n", 174 m_running, m_data_buffer->isEnabled());157 isRunning(), m_data_buffer->isEnabled()); 175 158 176 159 // not running or not enabled … … 357 340 } 358 341 342 unsigned int 343 AmdtpTransmitStreamProcessor::getEventsPerFrame() 344 { 345 return m_dimension; 346 } 347 348 unsigned int 349 AmdtpTransmitStreamProcessor::getUpdatePeriod() 350 { 351 return m_syt_interval; 352 } 353 354 359 355 unsigned int AmdtpTransmitStreamProcessor::fillDataPacketHeader( 360 356 struct iec61883_packet *packet, unsigned int* length, … … 417 413 m_data_buffer->setTickOffset(0); 418 414 419 // reset all non-device specific stuff420 // i.e. the iso stream and the associated ports421 if(!StreamProcessor::reset()) {422 debugFatal("Could not do base class reset\n");423 return false;424 }415 // // reset all non-device specific stuff 416 // // i.e. the iso stream and the associated ports 417 // if(!StreamProcessor::reset()) { 418 // debugFatal("Could not do base class reset\n"); 419 // return false; 420 // } 425 421 426 422 // we should prefill the event buffer … … 433 429 } 434 430 435 bool AmdtpTransmitStreamProcessor::prepare () {431 bool AmdtpTransmitStreamProcessor::prepareChild() { 436 432 m_PeriodStat.setName("XMT PERIOD"); 437 433 m_PacketStat.setName("XMT PACKET"); … … 620 616 bool AmdtpTransmitStreamProcessor::prepareForEnable(uint64_t time_to_enable_at) { 621 617 622 if (!StreamProcessor::prepareForEnable(time_to_enable_at)) {623 debugError("StreamProcessor::prepareForEnable failed\n");624 return false;625 }618 // if (!StreamProcessor::prepareForEnable(time_to_enable_at)) { 619 // debugError("StreamProcessor::prepareForEnable failed\n"); 620 // return false; 621 // } 626 622 627 623 return true; branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.h
r715 r719 85 85 int cycle, unsigned int dropped, unsigned int max_length); 86 86 87 bool init(); 87 virtual unsigned int getEventsPerFrame(); 88 virtual unsigned int getEventSize() {return 4;}; 89 virtual unsigned int getUpdatePeriod(); 90 88 91 bool reset(); 89 bool prepare ();92 bool prepareChild(); 90 93 91 94 bool prepareForStop(); branches/ppalmers-streaming/src/libstreaming/generic/IsoStream.cpp
r714 r719 55 55 *sy = 0; 56 56 57 58 57 return RAW1394_ISO_OK; 59 }60 61 int IsoStream::getNodeId() {62 if (m_handler) {63 return m_handler->getLocalNodeId();64 }65 return -1;66 }67 68 69 void IsoStream::dumpInfo()70 {71 72 debugOutputShort( DEBUG_LEVEL_NORMAL, " Address : %p\n",this);73 debugOutputShort( DEBUG_LEVEL_NORMAL, " Stream type : %s\n",74 (this->getStreamType()==eST_Receive ? "Receive" : "Transmit"));75 debugOutputShort( DEBUG_LEVEL_NORMAL, " Port, Channel : %d, %d\n",76 m_port, m_channel);77 78 58 } 79 59 80 60 bool IsoStream::setChannel(int c) { 81 61 debugOutput( DEBUG_LEVEL_VERBOSE, "setting channel for (%p) to %d\n",this, c); 82 83 62 m_channel=c; 84 63 return true; 85 }86 87 88 bool IsoStream::reset() {89 debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");90 return true;91 }92 93 bool IsoStream::prepare() {94 debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");95 return true;96 }97 98 bool IsoStream::init() {99 debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");100 return true;101 102 64 } 103 65 … … 109 71 void IsoStream::clearHandler() { 110 72 debugOutput( DEBUG_LEVEL_VERBOSE, "clearing handler of isostream %p\n", this); 73 m_handler=0; 74 } 111 75 112 m_handler=0; 76 void IsoStream::dumpInfo() 77 { 78 debugOutputShort( DEBUG_LEVEL_NORMAL, " Address : %p\n",this); 79 debugOutputShort( DEBUG_LEVEL_NORMAL, " Stream type : %s\n", 80 (this->getStreamType()==eST_Receive ? "Receive" : "Transmit")); 81 debugOutputShort( DEBUG_LEVEL_NORMAL, " Port, Channel : %d, %d\n", 82 m_port, m_channel); 83 } 113 84 114 85 } 115 116 117 }branches/ppalmers-streaming/src/libstreaming/generic/IsoStream.h
r714 r719 74 74 virtual unsigned int getMaxPacketSize() {return 1024;}; //FIXME: arbitrary 75 75 76 virtual bool init();77 78 76 virtual enum raw1394_iso_disposition 79 77 putPacket(unsigned char *data, unsigned int length, … … 86 84 87 85 void dumpInfo(); 88 89 int getNodeId();90 91 virtual bool reset();92 virtual bool prepare();93 86 94 87 protected: branches/ppalmers-streaming/src/libstreaming/generic/Port.cpp
r705 r719 111 111 }; 112 112 113 bool Port::setName(std::string name) { 114 debugOutput( DEBUG_LEVEL_VERBOSE, "Setting name to %s for port %s\n",name.c_str(),m_Name.c_str()); 115 116 if (m_State != E_Created) { 117 debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); 118 return false; 119 } 120 m_Name=name; 121 return true; 122 } 123 124 bool Port::setBufferSize(unsigned int newsize) { 125 debugOutput( DEBUG_LEVEL_VERBOSE, "Setting buffersize to %d for port %s\n",newsize,m_Name.c_str()); 126 if (m_State != E_Created) { 127 debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); 128 return false; 129 } 130 m_buffersize=newsize; 131 return true; 132 } 133 134 unsigned int Port::getEventSize() { 135 switch (m_DataType) { 136 case E_Float: 137 return sizeof(float); 138 case E_Int24: // 24 bit 2's complement, packed in a 32bit integer (LSB's) 139 return sizeof(uint32_t); 140 case E_MidiEvent: 141 return sizeof(uint32_t); 142 default: 143 return 0; 144 } 145 } 146 147 bool Port::setDataType(enum E_DataType d) { 148 debugOutput( DEBUG_LEVEL_VERBOSE, "Setting datatype to %d for port %s\n",(int) d,m_Name.c_str()); 149 if (m_State != E_Created) { 150 debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); 151 return false; 152 } 153 154 // do some sanity checks 155 bool type_is_ok=false; 156 switch (m_PortType) { 157 case E_Audio: 158 if(d == E_Int24) type_is_ok=true; 159 if(d == E_Float) type_is_ok=true; 160 break; 161 case E_Midi: 162 if(d == E_MidiEvent) type_is_ok=true; 163 break; 164 case E_Control: 165 if(d == E_Default) type_is_ok=true; 166 break; 167 default: 168 break; 169 } 170 171 if(!type_is_ok) { 172 debugFatal("Datatype not supported by this type of port!\n"); 173 return false; 174 } 175 176 m_DataType=d; 177 return true; 178 } 179 180 bool Port::setSignalType(enum E_SignalType s) { 181 debugOutput( DEBUG_LEVEL_VERBOSE, "Setting signaltype to %d for port %s\n",(int)s,m_Name.c_str()); 182 if (m_State != E_Created) { 183 debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); 184 return false; 185 } 186 187 // do some sanity checks 188 bool type_is_ok=false; 189 switch (m_PortType) { 190 case E_Audio: 191 if(s == E_PeriodSignalled) type_is_ok=true; 192 break; 193 case E_Midi: 194 if(s == E_PacketSignalled) type_is_ok=true; 195 break; 196 case E_Control: 197 if(s == E_PeriodSignalled) type_is_ok=true; 198 break; 199 default: 200 break; 201 } 202 if(!type_is_ok) { 203 debugFatal("Signalling type not supported by this type of port!\n"); 204 return false; 205 } 206 m_SignalType=s; 207 return true; 208 } 209 210 bool Port::setBufferType(enum E_BufferType b) { 211 debugOutput( DEBUG_LEVEL_VERBOSE, "Setting buffer type to %d for port %s\n",(int)b,m_Name.c_str()); 212 if (m_State != E_Created) { 213 debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); 214 return false; 215 } 216 // do some sanity checks 217 bool type_is_ok=false; 218 switch (m_PortType) { 219 case E_Audio: 220 if(b == E_PointerBuffer) type_is_ok=true; 221 break; 222 case E_Midi: 223 if(b == E_RingBuffer) type_is_ok=true; 224 break; 225 case E_Control: 226 break; 227 default: 228 break; 229 } 230 if(!type_is_ok) { 231 debugFatal("Buffer type not supported by this type of port!\n"); 232 return false; 233 } 234 m_BufferType=b; 235 return true; 236 } 237 238 bool Port::useExternalBuffer(bool b) { 239 // If called on an initialised stream but the request isn't for a change silently 240 // allow it (relied on by C API as used by jack backend driver) 241 if (m_State==E_Initialized && m_use_external_buffer==b) 242 return true; 243 244 debugOutput( DEBUG_LEVEL_VERBOSE, "Setting external buffer use to %d for port %s\n",(int)b,m_Name.c_str()); 245 246 if (m_State != E_Created) { 247 debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); 248 return false; 249 } 250 m_use_external_buffer=b; 251 return true; 252 } 253 254 // buffer handling api's for pointer buffers 255 /** 256 * Get the buffer address (being the external or the internal one). 257 * 258 * @param buff 259 */ 260 void *Port::getBufferAddress() { 261 assert(m_BufferType==E_PointerBuffer); 262 return m_buffer; 263 }; 264 265 /** 266 * Set the external buffer address. 267 * only call this when you have specified that you will use 268 * an external buffer before doing the init() 269 * 270 * @param buff 271 */ 272 void Port::setExternalBufferAddress(void *buff) { 273 assert(m_BufferType==E_PointerBuffer); 274 assert(m_use_external_buffer); // don't call this with an internal buffer! 275 m_buffer=buff; 276 }; 277 278 // buffer handling api's for ringbuffers 279 bool Port::writeEvent(void *event) { 280 281 #ifdef DEBUG 282 if (m_State != E_Initialized) { 283 debugFatal("Port (%s) not in E_Initialized state: %d\n",m_Name.c_str(),m_State); 284 return false; 285 } 286 287 if(m_BufferType!=E_RingBuffer) { 288 debugError("operation not allowed on non E_RingBuffer ports\n"); 289 show(); 290 return false; 291 } 292 assert(m_ringbuffer); 293 #endif 294 295 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Writing event %08X with size %d to port %s\n",*((quadlet_t *)event),m_eventsize, m_Name.c_str()); 296 297 return (ffado_ringbuffer_write(m_ringbuffer, (char *)event, m_eventsize)==m_eventsize); 298 } 299 300 bool Port::readEvent(void *event) { 301 302 #ifdef DEBUG 303 if (m_State != E_Initialized) { 304 debugFatal("Port (%s) not in E_Initialized state: %d\n",m_Name.c_str(),m_State); 305 return false; 306 } 307 308 if(m_BufferType!=E_RingBuffer) { 309 debugError("operation not allowed on non E_RingBuffer ports\n"); 310 show(); 311 return false; 312 } 313 assert(m_ringbuffer); 314 #endif 315 316 317 unsigned int read=ffado_ringbuffer_read(m_ringbuffer, (char *)event, m_eventsize); 318 319 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Reading event %X with size %d from port %s\n",*((quadlet_t *)event),m_eventsize,m_Name.c_str()); 320 321 322 return (read==m_eventsize); 323 } 324 325 int Port::writeEvents(void *event, unsigned int nevents) { 326 327 #ifdef DEBUG 328 if (m_State != E_Initialized) { 329 debugFatal("Port (%s) not in E_Initialized state: %d\n",m_Name.c_str(),m_State); 330 return false; 331 } 332 333 if(m_BufferType!=E_RingBuffer) { 334 debugError("operation not allowed on non E_RingBuffer ports\n"); 335 show(); 336 return false; 337 } 338 assert(m_ringbuffer); 339 #endif 340 341 342 unsigned int bytes2write=m_eventsize*nevents; 343 344 unsigned int written=ffado_ringbuffer_write(m_ringbuffer, (char *)event,bytes2write)/m_eventsize; 345 346 #ifdef DEBUG 347 if(written) { 348 unsigned int i=0; 349 quadlet_t * tmp=(quadlet_t *)event; 350 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Written %d events (",written); 351 for (i=0;i<written;i++) { 352 debugOutputShort(DEBUG_LEVEL_VERY_VERBOSE, "%X ", *(tmp+i)); 353 } 354 debugOutputShort(DEBUG_LEVEL_VERY_VERBOSE, ") to port %s\n",m_Name.c_str()); 355 } 356 #endif 357 358 return written; 359 360 } 361 362 int Port::readEvents(void *event, unsigned int nevents) { 363 364 #ifdef DEBUG 365 if (m_State != E_Initialized) { 366 debugFatal("Port (%s) not in E_Initialized state: %d\n",m_Name.c_str(),m_State); 367 return false; 368 } 369 if(m_BufferType!=E_RingBuffer) { 370 debugError("operation not allowed on non E_RingBuffer ports\n"); 371 show(); 372 return false; 373 } 374 assert(m_ringbuffer); 375 #endif 376 377 378 unsigned int bytes2read=m_eventsize*nevents; 379 380 unsigned int read=ffado_ringbuffer_read(m_ringbuffer, (char *)event, bytes2read)/m_eventsize; 381 382 #ifdef DEBUG 383 if(read) { 384 unsigned int i=0; 385 quadlet_t * tmp=(quadlet_t *)event; 386 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Read %d events (",read); 387 for (i=0;i<read;i++) { 388 debugOutputShort(DEBUG_LEVEL_VERY_VERBOSE, "%X ", *(tmp+i)); 389 } 390 debugOutputShort(DEBUG_LEVEL_VERY_VERBOSE, ") from port %s\n",m_Name.c_str()); 391 } 392 #endif 393 394 return read; 395 } 396 397 /* rate control */ 398 bool Port::canRead() { 399 bool byte_present_in_buffer; 400 401 bool retval=false; 402 403 assert(m_ringbuffer); 404 405 byte_present_in_buffer=(ffado_ringbuffer_read_space(m_ringbuffer) >= m_eventsize); 406 407 if(byte_present_in_buffer) { 408 409 if(!m_do_ratecontrol) { 410 return true; 411 } 412 413 if(m_rate_counter <= 0) { 414 // update the counter 415 if(m_average_ratecontrol) { 416 m_rate_counter += m_event_interval; 417 assert(m_rate_counter<m_event_interval); 418 } else { 419 m_rate_counter = m_event_interval; 420 } 421 422 retval=true; 423 } else { 424 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Rate limit (%s)! rate_counter=%d \n",m_Name.c_str(),m_rate_counter); 425 426 } 427 } 428 429 430 m_rate_counter -= m_slot_interval; 431 432 // we have to limit the decrement of the ratecounter somehow. 433 // m_rate_counter_minimum is initialized when enabling ratecontrol 434 if(m_rate_counter < m_rate_counter_minimum) { 435 m_rate_counter = m_rate_counter_minimum; 436 } 437 438 return retval; 439 } 440 441 bool Port::useRateControl(bool use, unsigned int slot_interval, 442 unsigned int event_interval, bool average) { 443 444 if (use) { 445 debugOutput(DEBUG_LEVEL_VERBOSE, "Enabling rate control for port %s...\n",m_Name.c_str()); 446 if(slot_interval>event_interval) { 447 debugWarning("Rate control not needed!\n",m_Name.c_str()); 448 m_do_ratecontrol=false; 449 return false; 450 } 451 if(slot_interval==0) { 452 debugFatal("Cannot have slot interval == 0!\n"); 453 m_do_ratecontrol=false; 454 return false; 455 } 456 if(event_interval==0) { 457 debugFatal("Cannot have event interval == 0!\n"); 458 m_do_ratecontrol=false; 459 return false; 460 } 461 m_do_ratecontrol=use; 462 m_event_interval=event_interval; 463 m_slot_interval=slot_interval; 464 m_rate_counter=0; 465 466 // NOTE: pretty arbitrary, but in average mode this limits the peak stream rate 467 m_rate_counter_minimum=-(2*event_interval); 468 469 m_average_ratecontrol=average; 470 471 } else { 472 debugOutput(DEBUG_LEVEL_VERBOSE, "Disabling rate control for port %s...\n",m_Name.c_str()); 473 m_do_ratecontrol=use; 474 } 475 return true; 476 } 477 478 /// Enable the port. (this can be called anytime) 479 void 480 Port::enable() { 481 debugOutput(DEBUG_LEVEL_VERBOSE, "Enabling port %s...\n",m_Name.c_str()); 482 m_disabled=false; 483 }; 484 485 /// Disable the port. (this can be called anytime) 486 void 487 Port::disable() { 488 debugOutput(DEBUG_LEVEL_VERBOSE, "Disabling port %s...\n",m_Name.c_str()); 489 m_disabled=false; 490 }; 491 492 493 /* Private functions */ 494 495 bool Port::allocateInternalBuffer() { 496 int event_size=getEventSize(); 497 498 debugOutput(DEBUG_LEVEL_VERBOSE, 499 "Allocating internal buffer of %d events with size %d (%s)\n", 500 m_buffersize, event_size, m_Name.c_str()); 501 502 if(m_buffer) { 503 debugWarning("already has an internal buffer attached, re-allocating\n"); 504 freeInternalBuffer(); 505 } 506 507 m_buffer=calloc(m_buffersize,event_size); 508 if (!m_buffer) { 509 debugFatal("could not allocate internal buffer\n"); 510 m_buffersize=0; 511 return false; 512 } 513 514 return true; 515 } 516 517 void Port::freeInternalBuffer() { 518 debugOutput(DEBUG_LEVEL_VERBOSE, 519 "Freeing internal buffer (%s)\n",m_Name.c_str()); 520 521 if(m_buffer) { 522 free(m_buffer); 523 m_buffer=0; 524 } 525 } 526 527 bool Port::allocateInternalRingBuffer() { 528 int event_size=getEventSize(); 529 530 debugOutput(DEBUG_LEVEL_VERBOSE, 531 "Allocating internal buffer of %d events with size %d (%s)\n", 532 m_buffersize, event_size, m_Name.c_str()); 533 534 if(m_ringbuffer) { 535 debugWarning("already has an internal ringbuffer attached, re-allocating\n"); 536 freeInternalRingBuffer(); 537 } 538 539 m_ringbuffer=ffado_ringbuffer_create(m_buffersize * event_size); 540 if (!m_ringbuffer) { 541 debugFatal("could not allocate internal ringbuffer\n"); 542 m_buffersize=0; 543 return false; 544 } 545 546 return true; 547 } 548 549 void Port::freeInternalRingBuffer() { 550 debugOutput(DEBUG_LEVEL_VERBOSE, 551 "Freeing internal ringbuffer (%s)\n",m_Name.c_str()); 552 553 if(m_ringbuffer) { 554 ffado_ringbuffer_free(m_ringbuffer); 555 m_ringbuffer=0; 556 } 557 } 558 113 559 void Port::show() { 114 560 debugOutput(DEBUG_LEVEL_VERBOSE,"Name : %s\n", m_Name.c_str()); … … 129 575 } 130 576 131 bool Port::setName(std::string name) { 132 debugOutput( DEBUG_LEVEL_VERBOSE, "Setting name to %s for port %s\n",name.c_str(),m_Name.c_str()); 133 134 if (m_State != E_Created) { 135 debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); 136 return false; 137 } 138 139 m_Name=name; 140 141 return true; 142 } 143 144 bool Port::setBufferSize(unsigned int newsize) { 145 debugOutput( DEBUG_LEVEL_VERBOSE, "Setting buffersize to %d for port %s\n",newsize,m_Name.c_str()); 146 if (m_State != E_Created) { 147 debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); 148 return false; 149 } 150 151 m_buffersize=newsize; 152 return true; 153 154 } 155 156 unsigned int Port::getEventSize() { 157 switch (m_DataType) { 158 case E_Float: 159 return sizeof(float); 160 case E_Int24: // 24 bit 2's complement, packed in a 32bit integer (LSB's) 161 return sizeof(uint32_t); 162 case E_MidiEvent: 163 return sizeof(uint32_t); 164 default: 165 return 0; 166 } 167 } 168 169 bool Port::setDataType(enum E_DataType d) { 170 debugOutput( DEBUG_LEVEL_VERBOSE, "Setting datatype to %d for port %s\n",(int) d,m_Name.c_str()); 171 if (m_State != E_Created) { 172 debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); 173 return false; 174 } 175 176 // do some sanity checks 177 bool type_is_ok=false; 178 switch (m_PortType) { 179 case E_Audio: 180 if(d == E_Int24) type_is_ok=true; 181 if(d == E_Float) type_is_ok=true; 182 break; 183 case E_Midi: 184 if(d == E_MidiEvent) type_is_ok=true; 185 break; 186 case E_Control: 187 if(d == E_Default) type_is_ok=true; 188 break; 189 default: 190 break; 191 } 192 193 if(!type_is_ok) { 194 debugFatal("Datatype not supported by this type of port!\n"); 195 return false; 196 } 197 198 m_DataType=d; 199 return true; 200 } 201 202 bool Port::setSignalType(enum E_SignalType s) { 203 debugOutput( DEBUG_LEVEL_VERBOSE, "Setting signaltype to %d for port %s\n",(int)s,m_Name.c_str()); 204 if (m_State != E_Created) { 205 debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); 206 return false; 207 } 208 209 // do some sanity checks 210 bool type_is_ok=false; 211 switch (m_PortType) { 212 case E_Audio: 213 if(s == E_PeriodSignalled) type_is_ok=true; 214 break; 215 case E_Midi: 216 if(s == E_PacketSignalled) type_is_ok=true; 217 break; 218 case E_Control: 219 if(s == E_PeriodSignalled) type_is_ok=true; 220 break; 221 default: 222 break; 223 } 224 225 if(!type_is_ok) { 226 debugFatal("Signalling type not supported by this type of port!\n"); 227 return false; 228 } 229 230 m_SignalType=s; 231 return true; 232 233 } 234 235 bool Port::setBufferType(enum E_BufferType b) { 236 debugOutput( DEBUG_LEVEL_VERBOSE, "Setting buffer type to %d for port %s\n",(int)b,m_Name.c_str()); 237 if (m_State != E_Created) { 238 debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); 239 return false; 240 } 241 242 // do some sanity checks 243 bool type_is_ok=false; 244 switch (m_PortType) { 245 case E_Audio: 246 if(b == E_PointerBuffer) type_is_ok=true; 247 break; 248 case E_Midi: 249 if(b == E_RingBuffer) type_is_ok=true; 250 break; 251 case E_Control: 252 break; 253 default: 254 break; 255 } 256 257 if(!type_is_ok) { 258 debugFatal("Buffer type not supported by this type of port!\n"); 259 return false; 260 } 261 262 m_BufferType=b; 263 return true; 264 265 } 266 267 bool Port::useExternalBuffer(bool b) { 268 269 // If called on an initialised stream but the request isn't for a change silently 270 // allow it (relied on by C API as used by jack backend driver) 271 if (m_State==E_Initialized && m_use_external_buffer==b) 272 return true; 273 274 debugOutput( DEBUG_LEVEL_VERBOSE, "Setting external buffer use to %d for port %s\n",(int)b,m_Name.c_str()); 275 276 if (m_State != E_Created) { 277 debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); 278 return false; 279 } 280 281 m_use_external_buffer=b; 282 return true; 283 } 284 285 // buffer handling api's for pointer buffers 286 /** 287 * Get the buffer address (being the external or the internal one). 288 * 289 * @param buff 290 */ 291 void *Port::getBufferAddress() { 292 assert(m_BufferType==E_PointerBuffer); 293 return m_buffer; 294 }; 295 296 /** 297 * Set the external buffer address. 298 * only call this when you have specified that you will use 299 * an external buffer before doing the init() 300 * 301 * @param buff 302 */ 303 void Port::setExternalBufferAddress(void *buff) { 304 assert(m_BufferType==E_PointerBuffer); 305 assert(m_use_external_buffer); // don't call this with an internal buffer! 306 m_buffer=buff; 307 }; 308 309 // buffer handling api's for ringbuffers 310 bool Port::writeEvent(void *event) { 311 312 #ifdef DEBUG 313 if (m_State != E_Initialized) { 314 debugFatal("Port (%s) not in E_Initialized state: %d\n",m_Name.c_str(),m_State); 315 return false; 316 } 317 318 if(m_BufferType!=E_RingBuffer) { 319 debugError("operation not allowed on non E_RingBuffer ports\n"); 320 show(); 321 return false; 322 } 323 assert(m_ringbuffer); 324 #endif 325 326 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Writing event %08X with size %d to port %s\n",*((quadlet_t *)event),m_eventsize, m_Name.c_str()); 327 328 return (ffado_ringbuffer_write(m_ringbuffer, (char *)event, m_eventsize)==m_eventsize); 329 } 330 331 bool Port::readEvent(void *event) { 332 333 #ifdef DEBUG 334 if (m_State != E_Initialized) { 335 debugFatal("Port (%s) not in E_Initialized state: %d\n",m_Name.c_str(),m_State); 336 return false; 337 } 338 339 if(m_BufferType!=E_RingBuffer) { 340 debugError("operation not allowed on non E_RingBuffer ports\n"); 341 show(); 342 return false; 343 } 344 assert(m_ringbuffer); 345 #endif 346 347 348 unsigned int read=ffado_ringbuffer_read(m_ringbuffer, (char *)event, m_eventsize); 349 350 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Reading event %X with size %d from port %s\n",*((quadlet_t *)event),m_eventsize,m_Name.c_str()); 351 352 353 return (read==m_eventsize); 354 } 355 356 int Port::writeEvents(void *event, unsigned int nevents) { 357 358 #ifdef DEBUG 359 if (m_State != E_Initialized) { 360 debugFatal("Port (%s) not in E_Initialized state: %d\n",m_Name.c_str(),m_State); 361 return false; 362 } 363 364 if(m_BufferType!=E_RingBuffer) { 365 debugError("operation not allowed on non E_RingBuffer ports\n"); 366 show(); 367 return false; 368 } 369 assert(m_ringbuffer); 370 #endif 371 372 373 unsigned int bytes2write=m_eventsize*nevents; 374 375 unsigned int written=ffado_ringbuffer_write(m_ringbuffer, (char *)event,bytes2write)/m_eventsize; 376 377 #ifdef DEBUG 378 if(written) { 379 unsigned int i=0; 380 quadlet_t * tmp=(quadlet_t *)event; 381 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Written %d events (",written); 382 for (i=0;i<written;i++) { 383 debugOutputShort(DEBUG_LEVEL_VERY_VERBOSE, "%X ", *(tmp+i)); 384 } 385 debugOutputShort(DEBUG_LEVEL_VERY_VERBOSE, ") to port %s\n",m_Name.c_str()); 386 } 387 #endif 388 389 return written; 390 391 } 392 393 int Port::readEvents(void *event, unsigned int nevents) { 394 395 #ifdef DEBUG 396 if (m_State != E_Initialized) { 397 debugFatal("Port (%s) not in E_Initialized state: %d\n",m_Name.c_str(),m_State); 398 return false; 399 } 400 if(m_BufferType!=E_RingBuffer) { 401 debugError("operation not allowed on non E_RingBuffer ports\n"); 402 show(); 403 return false; 404 } 405 assert(m_ringbuffer); 406 #endif 407 408 409 unsigned int bytes2read=m_eventsize*nevents; 410 411 unsigned int read=ffado_ringbuffer_read(m_ringbuffer, (char *)event, bytes2read)/m_eventsize; 412 413 #ifdef DEBUG 414 if(read) { 415 unsigned int i=0; 416 quadlet_t * tmp=(quadlet_t *)event; 417 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Read %d events (",read); 418 for (i=0;i<read;i++) { 419 debugOutputShort(DEBUG_LEVEL_VERY_VERBOSE, "%X ", *(tmp+i)); 420 } 421 debugOutputShort(DEBUG_LEVEL_VERY_VERBOSE, ") from port %s\n",m_Name.c_str()); 422 } 423 #endif 424 425 return read; 426 } 427 428 /* rate control */ 429 bool Port::canRead() { 430 bool byte_present_in_buffer; 431 432 bool retval=false; 433 434 assert(m_ringbuffer); 435 436 byte_present_in_buffer=(ffado_ringbuffer_read_space(m_ringbuffer) >= m_eventsize); 437 438 if(byte_present_in_buffer) { 439 440 if(!m_do_ratecontrol) { 441 return true; 442 } 443 444 if(m_rate_counter <= 0) { 445 // update the counter 446 if(m_average_ratecontrol) { 447 m_rate_counter += m_event_interval; 448 assert(m_rate_counter<m_event_interval); 449 } else { 450 m_rate_counter = m_event_interval; 451 } 452 453 retval=true; 454 } else { 455 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Rate limit (%s)! rate_counter=%d \n",m_Name.c_str(),m_rate_counter); 456 457 } 458 } 459 460 461 m_rate_counter -= m_slot_interval; 462 463 // we have to limit the decrement of the ratecounter somehow. 464 // m_rate_counter_minimum is initialized when enabling ratecontrol 465 if(m_rate_counter < m_rate_counter_minimum) { 466 m_rate_counter = m_rate_counter_minimum; 467 } 468 469 return retval; 470 } 471 472 bool Port::useRateControl(bool use, unsigned int slot_interval, 473 unsigned int event_interval, bool average) { 474 475 if (use) { 476 debugOutput(DEBUG_LEVEL_VERBOSE, "Enabling rate control for port %s...\n",m_Name.c_str()); 477 if(slot_interval>event_interval) { 478 debugWarning("Rate control not needed!\n",m_Name.c_str()); 479 m_do_ratecontrol=false; 480 return false; 481 } 482 if(slot_interval==0) { 483 debugFatal("Cannot have slot interval == 0!\n"); 484 m_do_ratecontrol=false; 485 return false; 486 } 487 if(event_interval==0) { 488 debugFatal("Cannot have event interval == 0!\n"); 489 m_do_ratecontrol=false; 490 return false; 491 } 492 m_do_ratecontrol=use; 493 m_event_interval=event_interval; 494 m_slot_interval=slot_interval; 495 m_rate_counter=0; 496 497 // NOTE: pretty arbitrary, but in average mode this limits the peak stream rate 498 m_rate_counter_minimum=-(2*event_interval); 499 500 m_average_ratecontrol=average; 501 502 } else { 503 debugOutput(DEBUG_LEVEL_VERBOSE, "Disabling rate control for port %s...\n",m_Name.c_str()); 504 m_do_ratecontrol=use; 505 } 506 return true; 507 } 508 509 /// Enable the port. (this can be called anytime) 510 void 511 Port::enable() { 512 debugOutput(DEBUG_LEVEL_VERBOSE, "Enabling port %s...\n",m_Name.c_str()); 513 m_disabled=false; 514 }; 515 516 /// Disable the port. (this can be called anytime) 517 void 518 Port::disable() { 519 debugOutput(DEBUG_LEVEL_VERBOSE, "Disabling port %s...\n",m_Name.c_str()); 520 m_disabled=false; 521 }; 522 523 524 /* Private functions */ 525 526 bool Port::allocateInternalBuffer() { 527 int event_size=getEventSize(); 528 529 debugOutput(DEBUG_LEVEL_VERBOSE, 530 "Allocating internal buffer of %d events with size %d (%s)\n", 531 m_buffersize, event_size, m_Name.c_str()); 532 533 if(m_buffer) { 534 debugWarning("already has an internal buffer attached, re-allocating\n"); 535 freeInternalBuffer(); 536 } 537 538 m_buffer=calloc(m_buffersize,event_size); 539 if (!m_buffer) { 540 debugFatal("could not allocate internal buffer\n"); 541 m_buffersize=0; 542 return false; 543 } 544 545 return true; 546 } 547 548 void Port::freeInternalBuffer() { 549 debugOutput(DEBUG_LEVEL_VERBOSE, 550 "Freeing internal buffer (%s)\n",m_Name.c_str()); 551 552 if(m_buffer) { 553 free(m_buffer); 554 m_buffer=0; 555 } 556 } 557 558 bool Port::allocateInternalRingBuffer() { 559 int event_size=getEventSize(); 560 561 debugOutput(DEBUG_LEVEL_VERBOSE, 562 "Allocating internal buffer of %d events with size %d (%s)\n", 563 m_buffersize, event_size, m_Name.c_str()); 564 565 if(m_ringbuffer) { 566 debugWarning("already has an internal ringbuffer attached, re-allocating\n"); 567 freeInternalRingBuffer(); 568 } 569 570 m_ringbuffer=ffado_ringbuffer_create(m_buffersize * event_size); 571 if (!m_ringbuffer) { 572 debugFatal("could not allocate internal ringbuffer\n"); 573 m_buffersize=0; 574 return false; 575 } 576 577 return true; 578 } 579 580 void Port::freeInternalRingBuffer() { 581 debugOutput(DEBUG_LEVEL_VERBOSE, 582 "Freeing internal ringbuffer (%s)\n",m_Name.c_str()); 583 584 if(m_ringbuffer) { 585 ffado_ringbuffer_free(m_ringbuffer); 586 m_ringbuffer=0; 587 } 588 } 589 590 } 577 } branches/ppalmers-streaming/src/libstreaming/generic/PortManager.cpp
r705 r719 251 251 } 252 252 } 253 254 255 256 253 return true; 257 254 } branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.cpp
r715 r719 39 39 , m_processor_type ( type ) 40 40 , m_state( ePS_Created ) 41 , m_next_state( ePS_Invalid ) 42 , m_cycle_to_switch_state( 0 ) 41 43 , m_xruns( 0 ) 42 , m_manager(NULL) 43 , m_running(false) 44 , m_disabled(true) 45 , m_is_disabled(true) 46 , m_cycle_to_enable_at(0) 47 , m_ticks_per_frame(0) 48 , m_last_cycle(0) 49 , m_sync_delay(0) 44 , m_manager( NULL ) 45 , m_ticks_per_frame( 0 ) 46 , m_last_cycle( 0 ) 47 , m_sync_delay( 0 ) 48 , m_last_timestamp(0) 49 , m_last_timestamp2(0) 50 , m_dropped(0) 50 51 { 51 52 // create the timestamped buffer and register ourselves as its client 52 53 m_data_buffer=new Util::TimestampedBuffer(this); 53 54 54 } 55 55 … … 58 58 } 59 59 60 void61 StreamProcessor::setState(enum eProcessorState s) {62 #ifdef DEBUG63 // check the state transistion64 debugOutput( DEBUG_LEVEL_VERBOSE, "State transition from %s to %s",65 ePSToString(m_state), ePSToString(s) );66 #endif67 m_state = s;68 }69 70 void StreamProcessor::dumpInfo()71 {72 debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor information\n");73 debugOutputShort( DEBUG_LEVEL_NORMAL, " Iso stream info:\n");74 75 IsoStream::dumpInfo();76 debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor info:\n");77 if (m_handler)78 debugOutputShort( DEBUG_LEVEL_NORMAL, " Now : %011u\n",m_handler->getCycleTimerTicks());79 debugOutputShort( DEBUG_LEVEL_NORMAL, " Xruns : %d\n", m_xruns);80 debugOutputShort( DEBUG_LEVEL_NORMAL, " Running : %d\n", m_running);81 debugOutputShort( DEBUG_LEVEL_NORMAL, " Enabled : %s\n", m_disabled ? "No" : "Yes");82 debugOutputShort( DEBUG_LEVEL_NORMAL, " enable status : %s\n", m_is_disabled ? "No" : "Yes");83 84 debugOutputShort( DEBUG_LEVEL_NORMAL, " Nominal framerate : %u\n", m_manager->getNominalRate());85 debugOutputShort( DEBUG_LEVEL_NORMAL, " Device framerate : Sync: %f, Buffer %f\n",86 24576000.0/getSyncSource().m_data_buffer->getRate(),87 24576000.0/m_data_buffer->getRate()88 );89 90 m_data_buffer->dumpInfo();91 92 m_PeriodStat.dumpInfo();93 m_PacketStat.dumpInfo();94 // m_WakeupStat.dumpInfo();95 }96 97 bool StreamProcessor::init()98 {99 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");100 m_data_buffer->init();101 return IsoStream::init();102 }103 104 /**105 * Resets the frame counter, the xrun counter, the ports and the iso stream.106 * @return true if reset succeeded107 */108 bool StreamProcessor::reset() {109 110 debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting...\n");111 112 // reset the event buffer, discard all content113 if (!m_data_buffer->reset()) {114 debugFatal("Could not reset data buffer\n");115 return false;116 }117 118 resetXrunCounter();119 120 // loop over the ports to reset them121 if (!PortManager::resetPorts()) {122 debugFatal("Could not reset ports\n");123 return false;124 }125 126 // reset the iso stream127 if (!IsoStream::reset()) {128 debugFatal("Could not reset isostream\n");129 return false;130 }131 return true;132 133 }134 135 bool StreamProcessor::prepareForEnable(uint64_t time_to_enable_at) {136 debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::prepareForEnable for (%p)\n",this);137 debugOutput(DEBUG_LEVEL_VERBOSE," Now : %011u\n",m_handler->getCycleTimerTicks());138 debugOutput(DEBUG_LEVEL_VERBOSE," Enable at : %011u\n",time_to_enable_at);139 m_data_buffer->dumpInfo();140 return true;141 }142 143 bool StreamProcessor::prepareForDisable() {144 debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::prepareForDisable for (%p)\n",this);145 debugOutput(DEBUG_LEVEL_VERBOSE," Now : %011u\n",m_handler->getCycleTimerTicks());146 m_data_buffer->dumpInfo();147 return true;148 }149 150 bool StreamProcessor::prepare() {151 152 debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");153 if(!m_manager) {154 debugFatal("Not attached to a manager!\n");155 return -1;156 }157 158 // init the ports159 // loop over the ports to reset them160 PortManager::preparePorts();161 162 // reset the iso stream163 IsoStream::prepare();164 165 return true;166 167 }168 169 StreamProcessor&170 StreamProcessor::getSyncSource()171 {172 return m_manager->getSyncSource();173 };174 175 int StreamProcessor::getBufferFill() {176 // return m_data_buffer->getFrameCounter();177 return m_data_buffer->getBufferFill();178 }179 180 60 uint64_t StreamProcessor::getTimeNow() { 181 61 return m_handler->getCycleTimerTicks(); 182 62 } 183 184 63 185 64 int StreamProcessor::getMaxFrameLatency() { … … 191 70 } 192 71 193 bool StreamProcessor::isRunning() { 194 return m_running; 195 } 196 197 bool StreamProcessor::enable(uint64_t time_to_enable_at) { 198 // FIXME: time_to_enable_at will be in 'time' not cycles 199 m_cycle_to_enable_at=time_to_enable_at; 200 201 if(!m_running) { 202 debugWarning("The StreamProcessor is not running yet, enable() might not be a good idea.\n"); 203 } 204 205 #ifdef DEBUG 206 uint64_t now_cycles=CYCLE_TIMER_GET_CYCLES(m_handler->getCycleTimer()); 207 const int64_t max=(int64_t)(CYCLES_PER_SECOND/2); 208 209 int64_t diff=(int64_t)m_cycle_to_enable_at-(int64_t)now_cycles; 210 211 if (diff > max) { 212 diff-=TICKS_PER_SECOND; 213 } else if (diff < -max) { 214 diff+=TICKS_PER_SECOND; 215 } 216 217 if (diff<0) { 218 debugWarning("Request to enable streamprocessor %lld cycles ago (now=%llu, cy=%llu).\n", 219 diff,now_cycles,time_to_enable_at); 220 } 221 #endif 222 m_data_buffer->enable(); 223 224 m_disabled=false; 225 return true; 226 } 227 228 bool StreamProcessor::disable() { 229 m_data_buffer->disable(); 230 m_disabled=true; 231 return true; 232 } 233 234 float 235 StreamProcessor::getTicksPerFrame() { 236 if (m_data_buffer) { 237 float rate=m_data_buffer->getRate(); 238 if (fabsf(m_ticks_per_frame - rate)>(m_ticks_per_frame*0.1)) { 239 debugWarning("TimestampedBuffer rate (%10.5f) more that 10%% off nominal (%10.5f)\n",rate,m_ticks_per_frame); 240 return m_ticks_per_frame; 241 } 242 // return m_ticks_per_frame; 243 if (rate<0.0) debugError("rate < 0! (%f)\n",rate); 244 245 return rate; 246 } else { 247 return 0.0; 248 } 249 } 250 251 int64_t StreamProcessor::getTimeUntilNextPeriodSignalUsecs() { 72 /*********************************************** 73 * Buffer management and manipulation * 74 ***********************************************/ 75 int StreamProcessor::getBufferFill() { 76 return m_data_buffer->getBufferFill(); 77 } 78 79 bool 80 StreamProcessor::dropFrames(unsigned int nbframes) 81 { 82 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d)\n", nbframes); 83 return m_data_buffer->dropFrames(nbframes); 84 } 85 86 int64_t 87 StreamProcessor::getTimeUntilNextPeriodSignalUsecs() 88 { 252 89 uint64_t time_at_period=getTimeAtPeriod(); 253 90 … … 260 97 // pass before these packets are processed. Adding this extra term makes that 261 98 // the period boundary is signalled later 262 time_at_period = addTicks(time_at_period, getSyncSource().getSyncDelay());99 time_at_period = addTicks(time_at_period, m_manager->getSyncSource().getSyncDelay()); 263 100 264 101 uint64_t cycle_timer=m_handler->getCycleTimerTicks(); … … 278 115 } 279 116 280 uint64_t StreamProcessor::getTimeAtPeriodUsecs() { 117 uint64_t 118 StreamProcessor::getTimeAtPeriodUsecs() 119 { 281 120 return (uint64_t)((float)getTimeAtPeriod() * TICKS_PER_USEC); 282 121 } 283 122 284 bool StreamProcessor::dropFrames(unsigned int nbframes) {285 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d)\n", nbframes);286 return m_data_buffer->dropFrames(nbframes);287 }288 289 /**290 * Resets the xrun counter, in a atomic way. This291 * is thread safe.292 */293 void StreamProcessor::resetXrunCounter() {294 ZERO_ATOMIC((SInt32 *)&m_xruns);295 }296 297 void StreamProcessor::setVerboseLevel(int l) {298 setDebugLevel(l);299 IsoStream::setVerboseLevel(l);300 PortManager::setVerboseLevel(l);301 m_data_buffer->setVerboseLevel(l);302 }303 304 123 uint64_t 305 StreamProcessor::getTimeAtPeriod() { 124 StreamProcessor::getTimeAtPeriod() 125 { 306 126 if (getType() == ePT_Receive) { 307 127 ffado_timestamp_t next_period_boundary=m_data_buffer->getTimestampFromHead(m_manager->getPeriodSize()); … … 333 153 } 334 154 155 float 156 StreamProcessor::getTicksPerFrame() 157 { 158 assert(m_data_buffer != NULL); 159 return m_data_buffer->getRate(); 160 } 161 335 162 bool 336 StreamProcessor::canClientTransferFrames(unsigned int nbframes) { 163 StreamProcessor::canClientTransferFrames(unsigned int nbframes) 164 { 165 bool can_transfer; 166 unsigned int fc = m_data_buffer->getFrameCounter(); 337 167 if (getType() == ePT_Receive) { 338 return m_data_buffer->getFrameCounter()>= (int) nbframes;168 can_transfer = fc >= (int) nbframes; 339 169 } else { 340 bool can_transfer;341 170 // there has to be enough space to put the frames in 342 can_transfer = m_data_buffer->getBufferSize() - m_data_buffer->getFrameCounter()> nbframes;171 can_transfer = m_data_buffer->getBufferSize() - fc > nbframes; 343 172 // or the buffer is transparent 344 173 can_transfer |= m_data_buffer->isTransparent(); 345 return can_transfer; 346 } 347 } 348 174 } 175 176 #ifdef DEBUG 177 if (!can_transfer) { 178 debugWarning("(%p, %s) cannot transfer since fc == %u, nbframes == %u\n", 179 this, ePTToString(getType()), fc, nbframes); 180 } 181 #endif 182 183 return can_transfer; 184 } 185 186 /*********************************************** 187 * I/O API * 188 ***********************************************/ 189 190 // Packet transfer API 191 enum raw1394_iso_disposition 192 StreamProcessor::putPacket(unsigned char *data, unsigned int length, 193 unsigned char channel, unsigned char tag, unsigned char sy, 194 unsigned int cycle, unsigned int dropped) { 195 196 int dropped_cycles = diffCycles(cycle, m_last_cycle) - 1; 197 if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles); 198 else m_dropped += dropped_cycles; 199 if (dropped_cycles > 0) debugWarning("(%p) dropped %d packets on cycle %u\n", this, dropped_cycles, cycle); 200 m_last_cycle = cycle; 201 202 // bypass based upon state 203 if (m_state == ePS_Invalid) { 204 debugError("Should not have state %s\n", ePSToString(m_state) ); 205 return RAW1394_ISO_ERROR; 206 } 207 if (m_state == ePS_Created) { 208 return RAW1394_ISO_DEFER; 209 } 210 211 // normal processing 212 enum raw1394_iso_disposition retval = RAW1394_ISO_OK; 213 214 // store the previous timestamp 215 m_last_timestamp2 = m_last_timestamp; 216 217 // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles) 218 // it happens on the first 'good' cycle for the wait condition 219 // or on the first received cycle that is received afterwards (might be a problem) 220 221 // check whether we are waiting for a stream to be disabled 222 if(m_state == ePS_WaitingForStreamDisable) { 223 // we then check whether we have to switch on this cycle 224 if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) { 225 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n"); 226 m_next_state = ePS_DryRunning; 227 if (!updateState()) { // we are allowed to change the state directly 228 debugError("Could not update state!\n"); 229 return RAW1394_ISO_ERROR; 230 } 231 } else { 232 // not time to disable yet 233 } 234 // the received data can be discarded while waiting for the stream 235 // to be disabled 236 return RAW1394_ISO_OK; 237 } 238 239 // check whether we are waiting for a stream to be enabled 240 else if(m_state == ePS_WaitingForStreamEnable) { 241 // we then check whether we have to switch on this cycle 242 if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) { 243 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n"); 244 m_next_state = ePS_Running; 245 if (!updateState()) { // we are allowed to change the state directly 246 debugError("Could not update state!\n"); 247 return RAW1394_ISO_ERROR; 248 } 249 } else { 250 // not time to enable yet 251 } 252 // we are dryRunning hence data should be processed in any case 253 } 254 255 // check the packet header 256 if (processPacketHeader(data, length, channel, tag, sy, cycle, dropped_cycles)) { 257 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "RECV: CY=%04u TS=%011llu\n", 258 cycle, m_last_timestamp); 259 // update some accounting 260 m_last_good_cycle = cycle; 261 m_last_dropped = dropped_cycles; 262 263 // check whether we are waiting for a stream to startup 264 // this requires that the packet is good 265 if(m_state == ePS_WaitingForStream) { 266 // since we have a packet with an OK header, 267 // we can indicate that the stream started up 268 269 // we then check whether we have to switch on this cycle 270 if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) { 271 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to good packet\n"); 272 // hence go to the dryRunning state 273 m_next_state = ePS_DryRunning; 274 if (!updateState()) { // we are allowed to change the state directly 275 debugError("Could not update state!\n"); 276 return RAW1394_ISO_ERROR; 277 } 278 } else { 279 // not time (yet) to switch state 280 } 281 // in both cases we don't want to process the data 282 return RAW1394_ISO_OK; 283 } 284 285 // check whether a state change has been requested 286 // note that only the wait state changes are synchronized with the cycles 287 else if(m_state != m_next_state) { 288 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n", 289 ePSToString(m_state), ePSToString(m_next_state)); 290 // execute the requested change 291 if (!updateState()) { // we are allowed to change the state directly 292 debugError("Could not update state!\n"); 293 return RAW1394_ISO_ERROR; 294 } 295 } 296 297 // handle dropped cycles 298 if(dropped_cycles) { 299 // they represent a discontinuity in the timestamps, and hence are 300 // to be dealt with 301 debugWarning("(%p) Correcting timestamp for dropped cycles, discarding packet...\n", this); 302 m_data_buffer->setBufferTailTimestamp(m_last_timestamp); 303 // we don't want this sample to be written 304 return RAW1394_ISO_OK; 305 } 306 307 // for all states that reach this we are allowed to 308 // do protocol specific data reception 309 bool ok = processPacketData(data, length, channel, tag, sy, cycle, dropped_cycles); 310 311 // if an xrun occured, switch to the dryRunning state and 312 // allow for the xrun to be picked up 313 if (!ok) { 314 debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to xrun\n"); 315 m_next_state = ePS_DryRunning; 316 // execute the requested change 317 if (!updateState()) { // we are allowed to change the state directly 318 debugError("Could not update state!\n"); 319 return RAW1394_ISO_ERROR; 320 } 321 return RAW1394_ISO_DEFER; 322 } 323 } else { 324 // apparently we don't have to do anything when the packets are not valid 325 } 326 return retval; 327 } 328 329 // Frame Transfer API 330 bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) { 331 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.getFrames(%d, %11llu)", nbframes, ts); 332 assert( getType() == ePT_Receive ); 333 if(isDryRunning()) return getFramesDry(nbframes, ts); 334 else return getFramesWet(nbframes, ts); 335 } 336 337 bool StreamProcessor::getFramesWet(unsigned int nbframes, int64_t ts) { 338 // FIXME: this should be done somewhere else 339 #ifdef DEBUG 340 uint64_t ts_head; 341 signed int fc; 342 int32_t lag_ticks; 343 float lag_frames; 344 345 // in order to sync up multiple received streams, we should 346 // use the ts parameter. It specifies the time of the block's 347 // first sample. 348 349 ffado_timestamp_t ts_head_tmp; 350 m_data_buffer->getBufferHeadTimestamp(&ts_head_tmp, &fc); 351 ts_head=(uint64_t)ts_head_tmp; 352 lag_ticks=diffTicks(ts, ts_head); 353 float rate=m_data_buffer->getRate(); 354 355 assert(rate!=0.0); 356 357 lag_frames=(((float)lag_ticks)/rate); 358 359 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): drifts %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n", 360 this, lag_ticks, lag_frames,rate, ts, ts_head, fc); 361 362 if (lag_frames>=1.0) { 363 // the stream lags 364 debugWarning( "stream (%p): lags with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n", 365 this, lag_ticks, lag_frames,rate, ts, ts_head, fc); 366 } else if (lag_frames<=-1.0) { 367 // the stream leads 368 debugWarning( "stream (%p): leads with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n", 369 this, lag_ticks, lag_frames,rate, ts, ts_head, fc); 370 } 371 #endif 372 // ask the buffer to process nbframes of frames 373 // using it's registered client's processReadBlock(), 374 // which should be ours 375 m_data_buffer->blockProcessReadFrames(nbframes); 376 return true; 377 } 378 379 bool StreamProcessor::getFramesDry(unsigned int nbframes, int64_t ts) { 380 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): dry run %d frames (@ ts=%lld)\n", 381 this, nbframes, ts); 382 383 // dry run on this side means that we put silence in all enabled ports 384 // since there is do data put into the ringbuffer in the dry-running state 385 return provideSilenceBlock(nbframes, 0); 386 } 387 388 389 /*********************************************** 390 * State related API * 391 ***********************************************/ 392 bool StreamProcessor::init() 393 { 394 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "init...\n"); 395 396 // initialization can be done without requesting it 397 // from the packet loop 398 m_next_state = ePS_Created; 399 return true; 400 } 401 402 bool StreamProcessor::prepare() 403 { 404 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "prepare...\n"); 405 if(!m_manager) { 406 debugFatal("Not attached to a manager!\n"); 407 return false; 408 } 409 410 if (!prepareChild()) { 411 debugFatal("Could not prepare child\n"); 412 return false; 413 } 414 415 // initialization can be done without requesting it 416 // from the packet loop 417 m_next_state = ePS_Stopped; 418 return updateState(); 419 } 420 421 bool StreamProcessor::stop() 422 { 423 uint64_t time_to_stop_at = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE); 424 int cnt; 425 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stop...\n"); 426 switch (m_state) { 427 case ePS_Stopped: return true; 428 case ePS_DryRunning: 429 return stopDryRunning(-1); 430 case ePS_Running: 431 return stopRunning(-1) && 432 stopDryRunning(-1); 433 default: 434 debugError("Bad state: %s\n", ePSToString(m_state)); 435 return false; 436 } 437 } 438 439 bool StreamProcessor::scheduleStateTransition(enum eProcessorState state, uint64_t time_instant) 440 { 441 // first set the time, since in the packet loop we first check m_state == m_next_state before 442 // using the time 443 m_cycle_to_switch_state = TICKS_TO_CYCLES(time_instant); 444 m_next_state = state; 445 return true; 446 } 447 448 bool StreamProcessor::scheduleAndWaitForStateTransition(enum eProcessorState state, 449 uint64_t time_instant, 450 enum eProcessorState wait_state) 451 { 452 int cnt=200; // 2 seconds, i.e. 2 cycles 453 if(!scheduleStateTransition(state, time_instant)) { 454 debugError("Could not schedule state transistion to %s\n", ePSToString(state)); 455 return false; 456 } 457 while (m_state != wait_state && cnt) { 458 usleep(10000); 459 cnt++; 460 } 461 if(cnt==0) { 462 debugError("Timeout entering Stopped state\n"); 463 return false; 464 } 465 debugOutput(DEBUG_LEVEL_VERBOSE, " entered state %s\n", ePSToString(wait_state)); 466 return true; 467 } 468 469 bool StreamProcessor::startDryRunning(int64_t t) { 470 uint64_t tx; 471 if (t < 0) { 472 tx = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE); 473 } else { 474 tx = t; 475 } 476 debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::startDryRunning for (%p)\n",this); 477 debugOutput(DEBUG_LEVEL_VERBOSE," Now : %011lu\n", m_handler->getCycleTimerTicks()); 478 debugOutput(DEBUG_LEVEL_VERBOSE," Start at : %011llu (%u)\n", tx, TICKS_TO_CYCLES(tx)); 479 if (m_state == ePS_Stopped) { 480 return scheduleAndWaitForStateTransition(ePS_WaitingForStream, tx, ePS_DryRunning); 481 } else if (m_state == ePS_Running) { 482 return scheduleAndWaitForStateTransition(ePS_WaitingForStreamDisable, tx, ePS_DryRunning); 483 } else { 484 debugError("Cannot switch to ePS_DryRunning from %s\n", ePSToString(m_state)); 485 return false; 486 } 487 } 488 489 bool StreamProcessor::startRunning(int64_t t) { 490 uint64_t tx; 491 if (t < 0) { 492 tx = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE); 493 } else { 494 tx = t; 495 } 496 debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::startRunning for (%p)\n",this); 497 debugOutput(DEBUG_LEVEL_VERBOSE," Now : %011lu\n", m_handler->getCycleTimerTicks()); 498 debugOutput(DEBUG_LEVEL_VERBOSE," Start at : %011llu (%u)\n", tx, TICKS_TO_CYCLES(tx)); 499 return scheduleAndWaitForStateTransition(ePS_WaitingForStreamEnable, tx, ePS_Running); 500 } 501 502 bool StreamProcessor::stopDryRunning(int64_t t) { 503 uint64_t tx; 504 if (t < 0) { 505 tx = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE); 506 } else { 507 tx = t; 508 } 509 debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::stopDryRunning for (%p)\n",this); 510 debugOutput(DEBUG_LEVEL_VERBOSE," Now : %011lu\n", m_handler->getCycleTimerTicks()); 511 debugOutput(DEBUG_LEVEL_VERBOSE," Stop at : %011llu (%u)\n", tx, TICKS_TO_CYCLES(tx)); 512 return scheduleAndWaitForStateTransition(ePS_Stopped, tx, ePS_Stopped); 513 } 514 515 bool StreamProcessor::stopRunning(int64_t t) { 516 uint64_t tx; 517 if (t < 0) { 518 tx = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE); 519 } else { 520 tx = t; 521 } 522 debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::stopRunning for (%p)\n",this); 523 debugOutput(DEBUG_LEVEL_VERBOSE," Now : %011lu\n", m_handler->getCycleTimerTicks()); 524 debugOutput(DEBUG_LEVEL_VERBOSE," Stop at : %011llu (%u)\n", tx, TICKS_TO_CYCLES(tx)); 525 return scheduleAndWaitForStateTransition(ePS_WaitingForStreamDisable, tx, ePS_DryRunning); 526 } 527 528 // internal state API 529 530 /** 531 * @brief Enter the ePS_Stopped state 532 * @return true if successful, false if not 533 * 534 * @pre none 535 * 536 * @post the buffer and the isostream are ready for use. 537 * @post all dynamic structures have been allocated successfully 538 * @post the buffer is transparent and empty, and all parameters are set 539 * to the correct initial/nominal values. 540 * 541 */ 542 bool 543 StreamProcessor::doStop() 544 { 545 float ticks_per_frame; 546 unsigned int ringbuffer_size_frames; 547 548 debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state)); 549 bool result = true; 550 551 switch(m_state) { 552 case ePS_Created: 553 assert(m_data_buffer); 554 // object just created 555 result = m_data_buffer->init(); 556 557 // prepare the framerate estimate 558 ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_manager->getNominalRate()); 559 m_ticks_per_frame = ticks_per_frame; 560 debugOutput(DEBUG_LEVEL_VERBOSE,"Initializing remote ticks/frame to %f\n", ticks_per_frame); 561 562 // initialize internal buffer 563 ringbuffer_size_frames = m_manager->getNbBuffers() * m_manager->getPeriodSize(); 564 result &= m_data_buffer->setBufferSize(ringbuffer_size_frames * 2); 565 566 result &= m_data_buffer->setEventSize( getEventSize() ); 567 result &= m_data_buffer->setEventsPerFrame( getEventsPerFrame() ); 568 result &= m_data_buffer->setUpdatePeriod( getUpdatePeriod() ); 569 570 result &= m_data_buffer->setNominalRate(ticks_per_frame); 571 result &= m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND); 572 result &= m_data_buffer->prepare(); // FIXME: the name 573 574 // set the parameters of ports we can: 575 // we want the audio ports to be period buffered, 576 // and the midi ports to be packet buffered 577 for ( PortVectorIterator it = m_Ports.begin(); 578 it != m_Ports.end(); 579 ++it ) 580 { 581 debugOutput(DEBUG_LEVEL_VERBOSE, "Setting up port %s\n",(*it)->getName().c_str()); 582 if(!(*it)->setBufferSize(m_manager->getPeriodSize())) { 583 debugFatal("Could not set buffer size to %d\n",m_manager->getPeriodSize()); 584 return false; 585 } 586 switch ((*it)->getPortType()) { 587 case Port::E_Audio: 588 if(!(*it)->setSignalType(Port::E_PeriodSignalled)) { 589 debugFatal("Could not set signal type to PeriodSignalling"); 590 return false; 591 } 592 // buffertype and datatype are dependant on the API 593 debugWarning("---------------- ! Doing hardcoded dummy setup ! --------------\n"); 594 // buffertype and datatype are dependant on the API 595 if(!(*it)->setBufferType(Port::E_PointerBuffer)) { 596 debugFatal("Could not set buffer type"); 597 return false; 598 } 599 if(!(*it)->useExternalBuffer(true)) { 600 debugFatal("Could not set external buffer usage"); 601 return false; 602 } 603 if(!(*it)->setDataType(Port::E_Float)) { 604 debugFatal("Could not set data type"); 605 return false; 606 } 607 break; 608 case Port::E_Midi: 609 if(!(*it)->setSignalType(Port::E_PacketSignalled)) { 610 debugFatal("Could not set signal type to PacketSignalling"); 611 return false; 612 } 613 // buffertype and datatype are dependant on the API 614 // buffertype and datatype are dependant on the API 615 debugWarning("---------------- ! Doing hardcoded test setup ! --------------\n"); 616 // buffertype and datatype are dependant on the API 617 if(!(*it)->setBufferType(Port::E_RingBuffer)) { 618 debugFatal("Could not set buffer type"); 619 return false; 620 } 621 if(!(*it)->setDataType(Port::E_MidiEvent)) { 622 debugFatal("Could not set data type"); 623 return false; 624 } 625 break; 626 default: 627 debugWarning("Unsupported port type specified\n"); 628 break; 629 } 630 } 631 // the API specific settings of the ports should already be set, 632 // as this is called from the processorManager->prepare() 633 // so we can init the ports 634 result &= PortManager::initPorts(); 635 636 break; 637 case ePS_DryRunning: 638 // what to do here? 639 break; 640 default: 641 debugError("Entry from invalid state: %s\n", ePSToString(m_state)); 642 return false; 643 } 644 645 result &= m_data_buffer->reset(); // FIXME: don't like the reset() name 646 647 // make the buffer transparent 648 m_data_buffer->setTransparent(true); 649 650 // reset all ports 651 result &= PortManager::preparePorts(); 652 653 m_state = ePS_Stopped; 654 return result; 655 } 656 657 /** 658 * @brief Enter the ePS_WaitingForStream state 659 * @return true if successful, false if not 660 * 661 * @pre all dynamic data structures are allocated successfully 662 * 663 * @post 664 * 665 */ 666 bool 667 StreamProcessor::doWaitForRunningStream() 668 { 669 debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state)); 670 switch(m_state) { 671 case ePS_Stopped: 672 // we have to start waiting for an incoming stream 673 // this basically means nothing, the state change will 674 // be picked up by the packet iterator 675 break; 676 default: 677 debugError("Entry from invalid state: %s\n", ePSToString(m_state)); 678 return false; 679 } 680 m_state = ePS_WaitingForStream; 681 return true; 682 } 683 684 /** 685 * @brief Enter the ePS_DryRunning state 686 * @return true if successful, false if not 687 * 688 * @pre 689 * 690 * @post 691 * 692 */ 693 bool 694 StreamProcessor::doDryRunning() 695 { 696 bool result = true; 697 debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state)); 698 switch(m_state) { 699 case ePS_WaitingForStream: 700 // a running stream has been detected 701 debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started dry-running at cycle %d\n", this, m_last_cycle); 702 if (getType() == ePT_Receive) { 703 m_data_buffer->setBufferTailTimestamp(m_last_timestamp); 704 } else { 705 // FIXME 706 debugError("Implement\n"); 707 } 708 break; 709 case ePS_WaitingForStreamDisable: 710 result &= m_data_buffer->reset(); // FIXME: don't like the reset() name 711 m_data_buffer->setTransparent(true); 712 break; 713 default: 714 debugError("Entry from invalid state: %s\n", ePSToString(m_state)); 715 return false; 716 } 717 m_state = ePS_DryRunning; 718 return result; 719 } 720 721 /** 722 * @brief Enter the ePS_WaitingForStreamEnable state 723 * @return true if successful, false if not 724 * 725 * @pre 726 * 727 * @post 728 * 729 */ 730 bool 731 StreamProcessor::doWaitForStreamEnable() 732 { 733 debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state)); 734 switch(m_state) { 735 case ePS_DryRunning: 736 // we have to start waiting for an incoming stream 737 // this basically means nothing, the state change will 738 // be picked up by the packet iterator 739 break; 740 default: 741 debugError("Entry from invalid state: %s\n", ePSToString(m_state)); 742 return false; 743 } 744 m_state = ePS_WaitingForStreamEnable; 745 return true; 746 } 747 748 /** 749 * @brief Enter the ePS_Running state 750 * @return true if successful, false if not 751 * 752 * @pre 753 * 754 * @post 755 * 756 */ 757 bool 758 StreamProcessor::doRunning() 759 { 760 bool result = true; 761 debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state)); 762 switch(m_state) { 763 case ePS_WaitingForStreamEnable: 764 // a running stream has been detected 765 debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started running at cycle %d\n", 766 this, m_last_cycle); 767 if (getType() == ePT_Receive) { 768 m_data_buffer->setTransparent(false); 769 } else { 770 // FIXME 771 debugError("Implement\n"); 772 } 773 break; 774 default: 775 debugError("Entry from invalid state: %s\n", ePSToString(m_state)); 776 return false; 777 } 778 m_state = ePS_Running; 779 return result; 780 } 781 782 /** 783 * @brief Enter the ePS_WaitingForStreamDisable state 784 * @return true if successful, false if not 785 * 786 * @pre 787 * 788 * @post 789 * 790 */ 791 bool 792 StreamProcessor::doWaitForStreamDisable() 793 { 794 debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state)); 795 switch(m_state) { 796 case ePS_Running: 797 // the thread will do the transition 798 break; 799 default: 800 debugError("Entry from invalid state: %s\n", ePSToString(m_state)); 801 return false; 802 } 803 m_state = ePS_WaitingForStreamDisable; 804 return true; 805 } 806 807 /** 808 * @brief Updates the state machine and calls the necessary transition functions 809 * @return true if successful, false if not 810 */ 811 bool StreamProcessor::updateState() { 812 bool result = false; 813 // copy the current state locally since it could change value, 814 // and that's something we don't want to happen inbetween tests 815 // if m_next_state changes during this routine, we know for sure 816 // that the previous state change was at least attempted correctly. 817 enum eProcessorState next_state = m_next_state; 818 819 debugOutput(DEBUG_LEVEL_VERBOSE, "Do state transition: %s => %s\n", 820 ePSToString(m_state), ePSToString(next_state)); 821 822 if (m_state == next_state) { 823 debugWarning("ignoring identity state update from/to %s\n", ePSToString(m_state) ); 824 return true; 825 } 826 827 // after creation, only initialization is allowed 828 if (m_state == ePS_Created) { 829 if(next_state != ePS_Stopped) { 830 goto updateState_exit_with_error; 831 } 832 // do init here 833 result = doStop(); 834 if (result) return true; 835 else goto updateState_exit_change_failed; 836 } 837 838 // after initialization, only WaitingForRunningStream is allowed 839 if (m_state == ePS_Stopped) { 840 if(next_state != ePS_WaitingForStream) { 841 goto updateState_exit_with_error; 842 } 843 result = doWaitForRunningStream(); 844 if (result) return true; 845 else goto updateState_exit_change_failed; 846 } 847 848 // after WaitingForStream, only ePS_DryRunning is allowed 849 // this means that the stream started running 850 if (m_state == ePS_WaitingForStream) { 851 if(next_state != ePS_DryRunning) { 852 goto updateState_exit_with_error; 853 } 854 result = doDryRunning(); 855 if (result) return true; 856 else goto updateState_exit_change_failed; 857 } 858 859 // from ePS_DryRunning we can go to: 860 // - ePS_Stopped if something went wrong during DryRunning 861 // - ePS_WaitingForStreamEnable if there is a requested to enable 862 if (m_state == ePS_DryRunning) { 863 if((next_state != ePS_Stopped) && 864 (next_state != ePS_WaitingForStreamEnable)) { 865 goto updateState_exit_with_error; 866 } 867 if (next_state == ePS_Stopped) { 868 result = doStop(); 869 } else { 870 result = doWaitForStreamEnable(); 871 } 872 if (result) return true; 873 else goto updateState_exit_change_failed; 874 } 875 876 // from ePS_WaitingForStreamEnable we can go to: 877 // - ePS_DryRunning if something went wrong while waiting 878 // - ePS_Running if the stream enabled correctly 879 if (m_state == ePS_WaitingForStreamEnable) { 880 if((next_state != ePS_DryRunning) && 881 (next_state != ePS_Running)) { 882 goto updateState_exit_with_error; 883 } 884 if (next_state == ePS_Stopped) { 885 result = doDryRunning(); 886 } else { 887 result = doRunning(); 888 } 889 if (result) return true; 890 else goto updateState_exit_change_failed; 891 } 892 893 // from ePS_Running we can only start waiting for a disabled stream 894 if (m_state == ePS_Running) { 895 if(next_state != ePS_WaitingForStreamDisable) { 896 goto updateState_exit_with_error; 897 } 898 result = doWaitForStreamDisable(); 899 if (result) return true; 900 else goto updateState_exit_change_failed; 901 } 902 903 // from ePS_WaitingForStreamDisable we can go to DryRunning 904 if (m_state == ePS_WaitingForStreamDisable) { 905 if(next_state != ePS_DryRunning) { 906 goto updateState_exit_with_error; 907 } 908 result = doDryRunning(); 909 if (result) return true; 910 else goto updateState_exit_change_failed; 911 } 912 913 // if we arrive here there is an error 914 updateState_exit_with_error: 915 debugError("Invalid state transition: %s => %s\n", 916 ePSToString(m_state), ePSToString(next_state)); 917 return false; 918 updateState_exit_change_failed: 919 debugError("State transition failed: %s => %s\n", 920 ePSToString(m_state), ePSToString(next_state)); 921 return false; 922 } 923 924 925 /** 926 * @brief convert a eProcessorState to a string 927 * @param s the state 928 * @return a char * describing the state 929 */ 349 930 const char * 350 931 StreamProcessor::ePSToString(enum eProcessorState s) { 351 932 switch (s) { 933 case ePS_Invalid: return "ePS_Invalid"; 352 934 case ePS_Created: return "ePS_Created"; 353 case ePS_ Initialized: return "ePS_Initialized";354 case ePS_WaitingFor RunningStream: return "ePS_WaitingForRunningStream";935 case ePS_Stopped: return "ePS_Stopped"; 936 case ePS_WaitingForStream: return "ePS_WaitingForStream"; 355 937 case ePS_DryRunning: return "ePS_DryRunning"; 356 case ePS_WaitingForEnabledStream: return "ePS_WaitingForEnabledStream"; 357 case ePS_StreamEnabled: return "ePS_StreamEnabled"; 358 case ePS_WaitingForDisabledStream: return "ePS_WaitingForDisabledStream"; 359 } 938 case ePS_WaitingForStreamEnable: return "ePS_WaitingForStreamEnable"; 939 case ePS_Running: return "ePS_Running"; 940 case ePS_WaitingForStreamDisable: return "ePS_WaitingForStreamDisable"; 941 default: return "error: unknown state"; 942 } 943 } 944 945 /** 946 * @brief convert a eProcessorType to a string 947 * @param t the type 948 * @return a char * describing the state 949 */ 950 const char * 951 StreamProcessor::ePTToString(enum eProcessorType t) { 952 switch (t) { 953 case ePT_Receive: return "Receive"; 954 case ePT_Transmit: return "Transmit"; 955 default: return "error: unknown type"; 956 } 957 } 958 959 /*********************************************** 960 * Debug * 961 ***********************************************/ 962 void 963 StreamProcessor::dumpInfo() 964 { 965 debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor information\n"); 966 debugOutputShort( DEBUG_LEVEL_NORMAL, " Iso stream info:\n"); 967 968 IsoStream::dumpInfo(); 969 debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor info:\n"); 970 if (m_handler) 971 debugOutputShort( DEBUG_LEVEL_NORMAL, " Now : %011u\n",m_handler->getCycleTimerTicks()); 972 debugOutputShort( DEBUG_LEVEL_NORMAL, " Xruns : %d\n", m_xruns); 973 debugOutputShort( DEBUG_LEVEL_NORMAL, " State : %s\n", ePSToString(m_state)); 974 debugOutputShort( DEBUG_LEVEL_NORMAL, " Next state : %s\n", ePSToString(m_next_state)); 975 debugOutputShort( DEBUG_LEVEL_NORMAL, " transition at : %u\n", m_cycle_to_switch_state); 976 977 978 debugOutputShort( DEBUG_LEVEL_NORMAL, " Nominal framerate : %u\n", m_manager->getNominalRate()); 979 debugOutputShort( DEBUG_LEVEL_NORMAL, " Device framerate : Sync: %f, Buffer %f\n", 980 24576000.0/m_manager->getSyncSource().m_data_buffer->getRate(), 981 24576000.0/m_data_buffer->getRate() 982 ); 983 984 m_data_buffer->dumpInfo(); 985 986 m_PeriodStat.dumpInfo(); 987 m_PacketStat.dumpInfo(); 988 // m_WakeupStat.dumpInfo(); 989 } 990 991 void 992 StreamProcessor::setVerboseLevel(int l) { 993 setDebugLevel(l); 994 IsoStream::setVerboseLevel(l); 995 PortManager::setVerboseLevel(l); 996 m_data_buffer->setVerboseLevel(l); 360 997 } 361 998 branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.h
r715 r719 50 50 public PortManager, 51 51 public Util::TimestampedBufferClient, 52 public Util::OptionContainer { 52 public Util::OptionContainer 53 { 53 54 54 55 friend class StreamProcessorManager; // FIXME: get rid of this … … 65 66 // this can only be set by the constructor 66 67 enum eProcessorType m_processor_type; 67 68 // pretty printing 69 const char *ePTToString(enum eProcessorType); 68 70 protected: 69 71 ///> the state the streamprocessor is in 70 72 enum eProcessorState { 73 ePS_Invalid, 71 74 ePS_Created, 72 ePS_Initialized, 73 ePS_WaitingForRunningStream, 75 // ePS_WaitingToStop, FIXME: this will be needed for the MOTU's 76 ePS_Stopped, 77 ePS_WaitingForStream, 74 78 ePS_DryRunning, 75 ePS_WaitingFor EnabledStream,76 ePS_ StreamEnabled,77 ePS_WaitingFor DisabledStream,79 ePS_WaitingForStreamEnable, 80 ePS_Running, 81 ePS_WaitingForStreamDisable, 78 82 }; 79 83 … … 84 88 private: 85 89 enum eProcessorState m_state; 90 // state switching 91 enum eProcessorState m_next_state; 92 unsigned int m_cycle_to_switch_state; 93 bool updateState(); 94 // pretty printing 86 95 const char *ePSToString(enum eProcessorState); 87 96 97 bool doStop(); 98 bool doWaitForRunningStream(); 99 bool doDryRunning(); 100 bool doWaitForStreamEnable(); 101 bool doRunning(); 102 bool doWaitForStreamDisable(); 103 104 bool scheduleStateTransition(enum eProcessorState state, uint64_t time_instant); 105 bool scheduleAndWaitForStateTransition(enum eProcessorState state, 106 uint64_t time_instant, 107 enum eProcessorState wait_state); 108 public: 109 bool isRunning() 110 {return m_state == ePS_Running;}; 111 bool isDryRunning() 112 {return m_state == ePS_DryRunning;}; 113 114 //--- state stuff (TODO: cleanup) 115 bool startDryRunning(int64_t time_to_start_at); 116 bool startRunning(int64_t time_to_start_at); 117 bool stopDryRunning(int64_t time_to_stop_at); 118 bool stopRunning(int64_t time_to_stop_at); 119 120 // the main difference between init and prepare is that when prepare is called, 121 // the SP is registered to a manager (FIXME: can't it be called by the manager?) 122 bool init(); 123 bool prepare(); 124 ///> stop the SP from running or dryrunning 125 bool stop(); 88 126 // constructor/destructor 89 127 public: … … 109 147 110 148 // the receive interface accepts packets and provides frames 111 // implement these for a receive SP 112 // leave default for a transmit SP 113 virtual enum raw1394_iso_disposition 149 150 // the following two methods are to be implemented by subclasses 151 virtual bool processPacketHeader(unsigned char *data, unsigned int length, 152 unsigned char channel, unsigned char tag, unsigned char sy, 153 unsigned int cycle, unsigned int dropped) 154 {debugWarning("call not allowed\n"); return false;}; 155 virtual bool processPacketData(unsigned char *data, unsigned int length, 156 unsigned char channel, unsigned char tag, unsigned char sy, 157 unsigned int cycle, unsigned int dropped) 158 {debugWarning("call not allowed\n"); return false;}; 159 160 // this one is implemented by us 161 enum raw1394_iso_disposition 114 162 putPacket(unsigned char *data, unsigned int length, 115 163 unsigned char channel, unsigned char tag, unsigned char sy, 116 unsigned int cycle, unsigned int dropped) 117 {debugWarning("call not allowed\n"); return RAW1394_ISO_STOP;}; 118 virtual bool getFrames(unsigned int nbframes, int64_t ts) 119 {debugWarning("call not allowed\n"); return false;}; 120 virtual bool getFramesDry(unsigned int nbframes, int64_t ts) 121 {debugWarning("call not allowed\n"); return false;}; 164 unsigned int cycle, unsigned int dropped); 165 166 bool getFrames(unsigned int nbframes, int64_t ts); ///< transfer the buffer contents to the client 167 protected: 168 // to be implemented by the children 122 169 virtual bool processReadBlock(char *data, unsigned int nevents, unsigned int offset) 123 170 {debugWarning("call not allowed\n"); return false;}; 124 125 126 //--- state stuff (TODO: cleanup) 127 bool xrunOccurred() { return (m_xruns>0); }; 128 bool isRunning(); ///< returns true if there is some stream data processed 129 virtual bool prepareForEnable(uint64_t time_to_enable_at); 130 virtual bool prepareForDisable(); 131 132 bool enable(uint64_t time_to_enable_at); ///< enable the stream processing 133 bool disable(); ///< disable the stream processing 134 bool isEnabled() {return !m_is_disabled;}; 135 136 virtual bool reset(); ///< reset the streams & buffers (e.g. after xrun) 137 138 virtual bool prepare(); ///< prepare the streams & buffers (e.g. prefill) 139 virtual bool init(); 140 virtual bool prepareForStop() {return true;}; 141 virtual bool prepareForStart() {return true;}; 171 virtual bool provideSilenceBlock(unsigned int nevents, unsigned int offset) 172 {debugWarning("call not allowed\n"); return false;}; 173 174 private: 175 bool getFramesDry(unsigned int nbframes, int64_t ts); 176 bool getFramesWet(unsigned int nbframes, int64_t ts); 142 177 143 178 // move to private? 144 void resetXrunCounter(); 145 protected: 146 bool m_running; 147 bool m_disabled; 148 bool m_is_disabled; 149 unsigned int m_cycle_to_enable_at; 179 bool xrunOccurred() { return (m_xruns>0); }; // FIXME: m_xruns not updated 180 181 protected: // FIXME: move to private 182 uint64_t m_dropped; /// FIXME:debug 183 uint64_t m_last_dropped; /// FIXME:debug 184 int m_last_good_cycle; /// FIXME:debug 185 uint64_t m_last_timestamp; /// last timestamp (in ticks) 186 uint64_t m_last_timestamp2; /// last timestamp (in ticks) 187 uint64_t m_last_timestamp_at_period_ticks; 150 188 151 189 //--- data buffering and accounting … … 170 208 * false if it can't 171 209 */ 172 virtualbool canClientTransferFrames(unsigned int nframes);210 bool canClientTransferFrames(unsigned int nframes); 173 211 174 212 /** … … 181 219 * @return true if the operation was successful 182 220 */ 183 virtualbool dropFrames(unsigned int nframes);221 bool dropFrames(unsigned int nframes); 184 222 185 223 /** … … 215 253 * @return the time in internal units 216 254 */ 217 virtualuint64_t getTimeAtPeriod();255 uint64_t getTimeAtPeriod(); 218 256 219 257 uint64_t getTimeNow(); … … 230 268 * @param d sync delay 231 269 */ 232 void setSyncDelay(int d) {m_sync_delay =d;};270 void setSyncDelay(int d) {m_sync_delay = d;}; 233 271 234 272 /** … … 247 285 * @return maximal frame latency 248 286 */ 249 virtual int getMaxFrameLatency(); 250 251 StreamProcessor& getSyncSource(); 287 int getMaxFrameLatency(); 252 288 253 289 float getTicksPerFrame(); … … 256 292 257 293 int getBufferFill(); 294 295 // Child implementation interface 296 /** 297 * @brief prepare the child SP 298 * @return true if successful, false otherwise 299 * @pre the m_manager pointer points to a valid manager 300 * @post getEventsPerFrame() returns the correct value 301 * @post getEventSize() returns the correct value 302 * @post getUpdatePeriod() returns the correct value 303 * @post processPacketHeader(...) can be called 304 * @post processPacketData(...) can be called 305 */ 306 virtual bool prepareChild() = 0; 307 /** 308 * @brief get the number of events contained in one frame 309 * @return the number of events contained in one frame 310 */ 311 virtual unsigned int getEventsPerFrame() = 0; 312 313 /** 314 * @brief get the size of one frame in bytes 315 * @return the size of one frame in bytes 316 */ 317 virtual unsigned int getEventSize() = 0; 318 319 /** 320 * @brief get the nominal number of frames between buffer updates 321 * @return the nominal number of frames between buffer updates 322 */ 323 virtual unsigned int getUpdatePeriod() = 0; 258 324 259 325 protected: branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp
r715 r719 33 33 #define PREPARE_TIMEOUT_MSEC 4000 34 34 #define ENABLE_TIMEOUT_MSEC 4000 35 36 // allows to add some processing margin. This shifts the time 37 // at which the buffer is transfer()'ed, making things somewhat 38 // more robust. It should be noted though that shifting the transfer 39 // time to a later time instant also causes the xmit buffer fill to be 40 // lower on average. 41 #define FFADO_SIGNAL_DELAY_TICKS 3072 35 42 36 43 namespace Streaming { … … 53 60 StreamProcessorManager::~StreamProcessorManager() { 54 61 if (m_isoManager) delete m_isoManager; 55 56 62 } 57 63 … … 90 96 91 97 debugFatal("Unsupported processor type!\n"); 92 93 98 return false; 94 99 } … … 102 107 103 108 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 104 it != m_ReceiveProcessors.end();105 ++it ) {106 109 it != m_ReceiveProcessors.end(); 110 ++it ) 111 { 107 112 if ( *it == processor ) { 108 m_ReceiveProcessors.erase(it); 109 110 processor->clearManager(); 111 112 if(!m_isoManager->unregisterStream(processor)) { 113 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n"); 114 115 return false; 116 117 } 118 119 return true; 113 m_ReceiveProcessors.erase(it); 114 processor->clearManager(); 115 if(!m_isoManager->unregisterStream(processor)) { 116 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n"); 117 return false; 120 118 } 119 return true; 120 } 121 121 } 122 122 } … … 124 124 if (processor->getType()==StreamProcessor::ePT_Transmit) { 125 125 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 126 it != m_TransmitProcessors.end();127 ++it ) {128 126 it != m_TransmitProcessors.end(); 127 ++it ) 128 { 129 129 if ( *it == processor ) { 130 m_TransmitProcessors.erase(it); 131 132 processor->clearManager(); 133 134 if(!m_isoManager->unregisterStream(processor)) { 135 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n"); 136 137 return false; 138 139 } 140 141 return true; 130 m_TransmitProcessors.erase(it); 131 processor->clearManager(); 132 if(!m_isoManager->unregisterStream(processor)) { 133 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n"); 134 return false; 142 135 } 136 return true; 137 } 143 138 } 144 139 } 145 140 146 141 debugFatal("Processor (%p) not found!\n",processor); 147 148 142 return false; //not found 149 150 143 } 151 144 152 145 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) { 153 146 debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s); 154 155 147 m_SyncSource=s; 156 148 return true; … … 160 152 { 161 153 debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); 162 163 154 m_isoManager = new IsoHandlerManager(m_thread_realtime, m_thread_priority + 1); 164 165 155 if(!m_isoManager) { 166 156 debugFatal("Could not create IsoHandlerManager\n"); 167 157 return false; 168 158 } 169 170 // propagate the debug level171 159 m_isoManager->setVerboseLevel(getDebugLevel()); 172 173 160 if(!m_isoManager->init()) { 174 161 debugFatal("Could not initialize IsoHandlerManager\n"); … … 177 164 178 165 m_xrun_happened=false; 179 180 166 return true; 181 167 } … … 195 181 } 196 182 197 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 198 it != m_ReceiveProcessors.end(); 199 ++it ) { 200 if(m_SyncSource == NULL) { 201 debugWarning(" => Sync Source is %p.\n", *it); 202 m_SyncSource = *it; 203 } 204 } 205 206 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 207 it != m_TransmitProcessors.end(); 208 ++it ) { 209 if(m_SyncSource == NULL) { 210 debugWarning(" => Sync Source is %p.\n", *it); 211 m_SyncSource = *it; 212 } 213 } 214 215 // now do the actual preparation 183 // FIXME: put into separate method 184 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 185 it != m_ReceiveProcessors.end(); 186 ++it ) 187 { 188 if(m_SyncSource == NULL) { 189 debugWarning(" => Sync Source is %p.\n", *it); 190 m_SyncSource = *it; 191 } 192 } 193 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 194 it != m_TransmitProcessors.end(); 195 ++it ) 196 { 197 if(m_SyncSource == NULL) { 198 debugWarning(" => Sync Source is %p.\n", *it); 199 m_SyncSource = *it; 200 } 201 } 202 203 // now do the actual preparation of the SP's 216 204 debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n"); 217 205 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); … … 228 216 } 229 217 } 230 231 218 debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n"); 232 219 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); … … 248 235 return false; 249 236 } 250 251 237 return true; 252 238 } 253 239 240 bool StreamProcessorManager::startDryRunning() { 241 debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for StreamProcessor streams to start dry-running...\n"); 242 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 243 it != m_ReceiveProcessors.end(); 244 ++it ) { 245 if(!(*it)->startDryRunning(-1)) { 246 debugError("Could not put SP %p into the dry-running state\n", *it); 247 return false; 248 } 249 } 250 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 251 it != m_TransmitProcessors.end(); 252 ++it ) { 253 if(!(*it)->startDryRunning(-1)) { 254 debugError("Could not put SP %p into the dry-running state\n", *it); 255 return false; 256 } 257 } 258 debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n"); 259 return true; 260 } 261 254 262 bool StreamProcessorManager::syncStartAll() { 255 256 debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for StreamProcessor streams to start running...\n"); 257 // we have to wait until all streamprocessors indicate that they are running 258 // i.e. that there is actually some data stream flowing 259 int wait_cycles=RUNNING_TIMEOUT_MSEC; // two seconds 260 bool notRunning=true; 261 while (notRunning && wait_cycles) { 262 wait_cycles--; 263 notRunning=false; 264 265 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 266 it != m_ReceiveProcessors.end(); 267 ++it ) { 268 if(!(*it)->isRunning()) notRunning=true; 269 } 270 271 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 272 it != m_TransmitProcessors.end(); 273 ++it ) { 274 if(!(*it)->isRunning()) notRunning=true; 275 } 276 277 usleep(1000); 278 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Running check: %d\n", notRunning); 279 } 280 281 if(!wait_cycles) { // timout has occurred 282 debugFatal("One or more streams are not starting up (timeout):\n"); 283 284 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 285 it != m_ReceiveProcessors.end(); 286 ++it ) { 287 if(!(*it)->isRunning()) { 288 debugFatal(" receive stream %p not running\n",*it); 289 } else { 290 debugFatal(" receive stream %p running\n",*it); 291 } 292 } 293 294 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 295 it != m_TransmitProcessors.end(); 296 ++it ) { 297 if(!(*it)->isRunning()) { 298 debugFatal(" transmit stream %p not running\n",*it); 299 } else { 300 debugFatal(" transmit stream %p running\n",*it); 301 } 302 } 303 return false; 304 } 305 306 debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n"); 263 // figure out when to get the SP's running. 264 // the xmit SP's should also know the base timestamp 265 // streams should be aligned here 307 266 308 267 // now find out how long we have to delay the wait operation such that … … 318 277 } 319 278 279 // add some processing margin. This only shifts the time 280 // at which the buffer is transfer()'ed. This makes things somewhat 281 // more robust. It should be noted though that shifting the transfer 282 // time to a later time instant also causes the xmit buffer fill to be 283 // lower on average. 284 max_of_min_delay += FFADO_SIGNAL_DELAY_TICKS; 320 285 debugOutput( DEBUG_LEVEL_VERBOSE, " %d ticks (%03us %04uc %04ut)...\n", 321 286 max_of_min_delay, … … 325 290 m_SyncSource->setSyncDelay(max_of_min_delay); 326 291 327 debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device to indicate clock sync lock...\n"); 292 //STEP X: when we implement such a function, we can wait for a signal from the devices that they 293 // have aquired lock 294 //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n"); 328 295 //sleep(2); // FIXME: be smarter here 329 330 debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting StreamProcessors...\n"); 331 // now we reset the frame counters 332 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 333 it != m_ReceiveProcessors.end(); 334 ++it ) { 335 // get the receive SP's going at receiving data 336 (*it)->m_data_buffer->setTransparent(false); 337 (*it)->reset(); 338 } 339 340 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 341 it != m_TransmitProcessors.end(); 342 ++it ) { 343 // make sure the SP retains it's prefilled data 344 (*it)->m_data_buffer->setTransparent(false); 345 (*it)->reset(); 346 } 347 348 dumpInfo(); 349 // All buffers are prefilled and set-up, the only thing 350 // that remains a mistery is the timestamp information. 351 // m_SyncSource->m_data_buffer->setTransparent(false); 352 // debugShowBackLog(); 353 354 // m_SyncSource->setVerboseLevel(DEBUG_LEVEL_ULTRA_VERBOSE); 355 296 297 // wait for some sort of sync 356 298 debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n"); 357 // in order to obtain that, we wait for the first periods to be 358 // received. 299 // in order to obtain that, we wait for the first periods to be received. 359 300 int nb_sync_runs=20; 301 int64_t time_till_next_period; 360 302 while(nb_sync_runs--) { // or while not sync-ed? 361 waitForPeriod(); 362 // drop the frames for all receive SP's 363 dryRun(StreamProcessor::ePT_Receive); 364 365 // we don't have to dryrun for the xmit SP's since they 366 // are not sending data yet. 367 368 // sync the xmit SP's buffer head timestamps 369 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 370 it != m_TransmitProcessors.end(); 371 ++it ) { 372 // FIXME: encapsulate 373 (*it)->m_data_buffer->setBufferHeadTimestamp(m_time_of_transfer); 374 } 375 } 376 // m_SyncSource->setVerboseLevel(DEBUG_LEVEL_VERBOSE); 377 303 time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); 304 debugOutput( DEBUG_LEVEL_VERBOSE, "waiting for %d usecs...\n", time_till_next_period); 305 if(time_till_next_period > 0) { 306 // wait for the period 307 usleep(time_till_next_period); 308 } 309 } 310 311 // figure out where we are now 312 uint64_t time_of_transfer = m_SyncSource->getTimeAtPeriod(); 378 313 debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n", 379 m_time_of_transfer, 380 (unsigned int)TICKS_TO_SECS(m_time_of_transfer), 381 (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer), 382 (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer)); 383 // FIXME: xruns can screw up the framecounter accounting. do something more sane here 384 resetXrunCounters(); 385 // lock the isohandlermanager such that things freeze 386 // debugShowBackLog(); 387 314 time_of_transfer, 315 (unsigned int)TICKS_TO_SECS(time_of_transfer), 316 (unsigned int)TICKS_TO_CYCLES(time_of_transfer), 317 (unsigned int)TICKS_TO_OFFSET(time_of_transfer)); 318 319 // start wet-running in 200 cycles 320 // this is the timeframe in which the remaining code should be ready 321 time_of_transfer = addTicks(time_of_transfer, 200*TICKS_PER_CYCLE); 322 323 debugOutput( DEBUG_LEVEL_VERBOSE, " => start at TS=%011llu (%03us %04uc %04ut)...\n", 324 time_of_transfer, 325 (unsigned int)TICKS_TO_SECS(time_of_transfer), 326 (unsigned int)TICKS_TO_CYCLES(time_of_transfer), 327 (unsigned int)TICKS_TO_OFFSET(time_of_transfer)); 388 328 // we now should have decent sync info 389 329 // the buffers of the receive streams should be (approx) empty 390 330 // the buffers of the xmit streams should be full 391 392 // note what should the timestamp of the first sample be?393 331 394 332 // at this point the buffer head timestamp of the transmit buffers can be … … 405 343 // int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks); 406 344 407 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();408 it != m_TransmitProcessors.end();409 ++it ) {410 // FIXME: encapsulate411 (*it)->m_data_buffer->setBufferHeadTimestamp(m_time_of_transfer);412 //(*it)->m_data_buffer->setNominalRate(rate); //CHECK!!!413 }345 // for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 346 // it != m_TransmitProcessors.end(); 347 // ++it ) { 348 // // FIXME: encapsulate 349 // (*it)->m_data_buffer->setBufferHeadTimestamp(m_time_of_transfer); 350 // //(*it)->m_data_buffer->setNominalRate(rate); //CHECK!!! 351 // } 414 352 415 353 dumpInfo(); 416 417 // this is the place were we should be syncing the received streams too 418 // check how much they differ 419 420 421 debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n"); 422 423 // FIXME: this should not be in cycles, but in 'time' 424 // FIXME: remove the timestamp 425 if (!enableStreamProcessors(0)) { 426 debugFatal("Could not enable StreamProcessors...\n"); 427 return false; 428 } 429 430 debugOutput( DEBUG_LEVEL_VERBOSE, "Running dry for a while...\n"); 431 #define MAX_DRYRUN_CYCLES 40 432 #define MIN_SUCCESSFUL_DRYRUN_CYCLES 4 433 // run some cycles 'dry' such that everything can stabilize 434 int nb_dryrun_cycles_left = MAX_DRYRUN_CYCLES; 435 int nb_succesful_cycles = 0; 436 while(nb_dryrun_cycles_left > 0 && 437 nb_succesful_cycles < MIN_SUCCESSFUL_DRYRUN_CYCLES ) { 438 439 waitForPeriod(); 440 441 if (dryRun()) { 442 nb_succesful_cycles++; 443 } else { 444 debugOutput( DEBUG_LEVEL_VERBOSE, " This dry-run was not xrun free...\n" ); 445 resetXrunCounters(); 446 // reset the transmit SP's such that there is no issue with accumulating buffers 447 // FIXME: what about receive SP's 448 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 449 it != m_TransmitProcessors.end(); 450 ++it ) { 451 // FIXME: encapsulate 452 (*it)->reset(); //CHECK!!! 453 (*it)->m_data_buffer->setBufferHeadTimestamp(m_time_of_transfer); 454 } 455 456 nb_succesful_cycles = 0; 457 // FIXME: xruns can screw up the framecounter accounting. do something more sane here 458 } 459 nb_dryrun_cycles_left--; 460 } 461 462 if(nb_dryrun_cycles_left == 0) { 463 debugOutput( DEBUG_LEVEL_VERBOSE, " max # dry-run cycles achieved without steady-state...\n" ); 464 return false; 465 } 466 debugOutput( DEBUG_LEVEL_VERBOSE, " dry-run resulted in steady-state...\n" ); 467 468 // now we should clear the xrun flags 469 resetXrunCounters(); 470 471 /* debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning streams...\n"); 472 // run some cycles 'dry' such that everything can stabilize 473 nb_dryrun_cycles_left = MAX_DRYRUN_CYCLES; 474 nb_succesful_cycles = 0; 475 while(nb_dryrun_cycles_left > 0 && 476 nb_succesful_cycles < MIN_SUCCESSFUL_DRYRUN_CYCLES ) { 477 478 waitForPeriod(); 479 480 // align the received streams 481 int64_t sp_lag; 482 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 483 it != m_ReceiveProcessors.end(); 484 ++it ) { 485 uint64_t ts_sp=(*it)->getTimeAtPeriod(); 486 uint64_t ts_sync=m_SyncSource->getTimeAtPeriod(); 487 488 sp_lag = diffTicks(ts_sp, ts_sync); 489 debugOutput( DEBUG_LEVEL_VERBOSE, " SP(%p) TS=%011llu - TS=%011llu = %04lld\n", 490 (*it), ts_sp, ts_sync, sp_lag); 491 // sync the other receive SP's to the sync source 492 // if((*it) != m_SyncSource) { 493 // if(!(*it)->m_data_buffer->syncCorrectLag(sp_lag)) { 494 // debugOutput(DEBUG_LEVEL_VERBOSE,"could not syncCorrectLag(%11lld) for stream processor (%p)\n", 495 // sp_lag, *it); 496 // } 497 // } 498 } 499 500 501 if (dryRun()) { 502 nb_succesful_cycles++; 503 } else { 504 debugOutput( DEBUG_LEVEL_VERBOSE, " This dry-run was not xrun free...\n" ); 505 resetXrunCounters(); 506 nb_succesful_cycles = 0; 507 // FIXME: xruns can screw up the framecounter accounting. do something more sane here 508 } 509 nb_dryrun_cycles_left--; 510 } 511 512 if(nb_dryrun_cycles_left == 0) { 513 debugOutput( DEBUG_LEVEL_VERBOSE, " max # dry-run cycles achieved without aligned steady-state...\n" ); 514 return false; 515 } 516 debugOutput( DEBUG_LEVEL_VERBOSE, " dry-run resulted in aligned steady-state...\n" );*/ 517 518 // now we should clear the xrun flags 519 resetXrunCounters(); 520 // and off we go 354 355 // STEP X: switch SP's over to the running state 356 uint64_t time_to_start = addTicks(time_of_transfer, 357 m_SyncSource->getTicksPerFrame() * getPeriodSize()); 358 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 359 it != m_ReceiveProcessors.end(); 360 ++it ) { 361 if(!(*it)->startRunning(time_to_start)) { 362 debugError("Could not put SP %p into the running state\n", *it); 363 return false; 364 } 365 } 366 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 367 it != m_TransmitProcessors.end(); 368 ++it ) { 369 if(!(*it)->startRunning(time_to_start)) { 370 debugError("Could not put SP %p into the running state\n", *it); 371 return false; 372 } 373 } 374 debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n"); 521 375 return true; 522 }523 524 void StreamProcessorManager::resetXrunCounters(){525 debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting xrun flags...\n");526 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();527 it != m_ReceiveProcessors.end();528 ++it )529 {530 (*it)->resetXrunCounter();531 }532 533 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();534 it != m_TransmitProcessors.end();535 ++it )536 {537 (*it)->resetXrunCounter();538 }539 376 } 540 377 … … 546 383 debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n"); 547 384 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 548 it != m_ReceiveProcessors.end(); 549 ++it ) { 550 if (!(*it)->prepareForStart()) { 551 debugOutput(DEBUG_LEVEL_VERBOSE,"Receive stream processor (%p) failed to prepare for start\n", *it); 552 return false; 553 } 554 if (!m_isoManager->registerStream(*it)) { 555 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it); 556 return false; 557 } 558 } 559 385 it != m_ReceiveProcessors.end(); 386 ++it ) 387 { 388 if (!m_isoManager->registerStream(*it)) { 389 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it); 390 return false; 391 } 392 } 560 393 debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n"); 561 394 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 562 it != m_TransmitProcessors.end(); 563 ++it ) { 564 if (!(*it)->prepareForStart()) { 565 debugOutput(DEBUG_LEVEL_VERBOSE,"Transmit stream processor (%p) failed to prepare for start\n", *it); 566 return false; 567 } 568 if (!m_isoManager->registerStream(*it)) { 569 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it); 570 return false; 571 } 572 } 395 it != m_TransmitProcessors.end(); 396 ++it ) 397 { 398 if (!m_isoManager->registerStream(*it)) { 399 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it); 400 return false; 401 } 402 } 573 403 574 404 debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n"); … … 578 408 } 579 409 580 debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");581 if (!disableStreamProcessors()) {582 debugFatal("Could not disable StreamProcessors...\n");583 return false;584 }585 586 410 debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n"); 587 411 if (!m_isoManager->startHandlers(-1)) { … … 590 414 } 591 415 416 // put all SP's into dry-running state 417 if (!startDryRunning()) { 418 debugFatal("Could not put SP's in dry-running state\n"); 419 return false; 420 } 421 592 422 // start all SP's synchonized 593 423 if (!syncStartAll()) { … … 602 432 603 433 return true; 604 605 434 } 606 435 … … 613 442 // (like the MOTU) need to do a few things before it's safe to turn off the iso 614 443 // handling. 615 int wait_cycles=PREPARE_TIMEOUT_MSEC; // two seconds ought to be sufficient 616 bool allReady = false; 617 while (!allReady && wait_cycles) { 618 wait_cycles--; 619 allReady = true; 620 621 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 622 it != m_ReceiveProcessors.end(); 623 ++it ) { 624 if(!(*it)->prepareForStop()) allReady = false; 625 } 626 627 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 628 it != m_TransmitProcessors.end(); 629 ++it ) { 630 if(!(*it)->prepareForStop()) allReady = false; 631 } 632 usleep(1000); 633 } 634 635 debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n"); 636 if (!disableStreamProcessors()) { 637 debugFatal("Could not disable StreamProcessors...\n"); 638 return false; 444 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 445 it != m_ReceiveProcessors.end(); 446 ++it ) { 447 if(!(*it)->stop()) { 448 debugError("Could not stop SP %p", (*it)); 449 } 450 } 451 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 452 it != m_TransmitProcessors.end(); 453 ++it ) { 454 if(!(*it)->stop()) { 455 debugError("Could not stop SP %p", (*it)); 456 } 639 457 } 640 458 … … 649 467 debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n"); 650 468 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 651 it != m_ReceiveProcessors.end(); 652 ++it ) { 653 if (!m_isoManager->unregisterStream(*it)) { 654 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it); 655 return false; 656 } 657 658 } 659 469 it != m_ReceiveProcessors.end(); 470 ++it ) { 471 if (!m_isoManager->unregisterStream(*it)) { 472 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it); 473 return false; 474 } 475 } 660 476 debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n"); 661 477 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 662 it != m_TransmitProcessors.end(); 663 ++it ) { 664 if (!m_isoManager->unregisterStream(*it)) { 665 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it); 666 return false; 667 } 668 669 } 670 671 return true; 672 673 } 674 675 /** 676 * Enables the registered StreamProcessors 677 * @return true if successful, false otherwise 678 */ 679 bool StreamProcessorManager::enableStreamProcessors(uint64_t time_to_enable_at) { 680 debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors at %llu...\n", time_to_enable_at); 681 682 debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor (%p)...\n",m_SyncSource); 683 debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare...\n"); 684 if (!m_SyncSource->prepareForEnable(time_to_enable_at)) { 685 debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n"); 686 return false; 687 } 688 689 debugOutput( DEBUG_LEVEL_VERBOSE, " Enable...\n"); 690 m_SyncSource->enable(time_to_enable_at); 691 692 debugOutput( DEBUG_LEVEL_VERBOSE, " Other StreamProcessors...\n"); 693 694 // we prepare the streamprocessors for enable 695 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 696 it != m_ReceiveProcessors.end(); 697 ++it ) { 698 if(*it != m_SyncSource) { 699 debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Receive SP (%p)...\n",*it); 700 (*it)->prepareForEnable(time_to_enable_at); 701 } 702 } 703 704 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 705 it != m_TransmitProcessors.end(); 706 ++it ) { 707 if(*it != m_SyncSource) { 708 debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Transmit SP (%p)...\n",*it); 709 (*it)->prepareForEnable(time_to_enable_at); 710 } 711 } 712 713 // then we enable the streamprocessors 714 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 715 it != m_ReceiveProcessors.end(); 716 ++it ) { 717 if(*it != m_SyncSource) { 718 debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Receive SP (%p)...\n",*it); 719 (*it)->enable(time_to_enable_at); 720 } 721 } 722 723 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 724 it != m_TransmitProcessors.end(); 725 ++it ) { 726 if(*it != m_SyncSource) { 727 debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Transmit SP (%p)...\n",*it); 728 (*it)->enable(time_to_enable_at); 729 } 730 } 731 732 // now we wait for the SP's to get enabled 733 // debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be enabled...\n"); 734 // // we have to wait until all streamprocessors indicate that they are running 735 // // i.e. that there is actually some data stream flowing 736 // int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds 737 // bool notEnabled=true; 738 // while (notEnabled && wait_cycles) { 739 // wait_cycles--; 740 // notEnabled=false; 741 // 742 // for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 743 // it != m_ReceiveProcessors.end(); 744 // ++it ) { 745 // if(!(*it)->isEnabled()) notEnabled=true; 746 // } 747 // 748 // for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 749 // it != m_TransmitProcessors.end(); 750 // ++it ) { 751 // if(!(*it)->isEnabled()) notEnabled=true; 752 // } 753 // usleep(1000); // one cycle 754 // } 755 // 756 // if(!wait_cycles) { // timout has occurred 757 // debugFatal("One or more streams couldn't be enabled (timeout):\n"); 758 // 759 // for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 760 // it != m_ReceiveProcessors.end(); 761 // ++it ) { 762 // if(!(*it)->isEnabled()) { 763 // debugFatal(" receive stream %p not enabled\n",*it); 764 // } else { 765 // debugFatal(" receive stream %p enabled\n",*it); 766 // } 767 // } 768 // 769 // for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 770 // it != m_TransmitProcessors.end(); 771 // ++it ) { 772 // if(!(*it)->isEnabled()) { 773 // debugFatal(" transmit stream %p not enabled\n",*it); 774 // } else { 775 // debugFatal(" transmit stream %p enabled\n",*it); 776 // } 777 // } 778 // return false; 779 // } 780 781 debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors enabled...\n"); 782 783 return true; 784 } 785 786 /** 787 * Disables the registered StreamProcessors 788 * @return true if successful, false otherwise 789 */ 790 bool StreamProcessorManager::disableStreamProcessors() { 791 // we prepare the streamprocessors for disable 792 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 793 it != m_ReceiveProcessors.end(); 794 ++it ) { 795 (*it)->prepareForDisable(); 796 } 797 798 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 799 it != m_TransmitProcessors.end(); 800 ++it ) { 801 (*it)->prepareForDisable(); 802 } 803 804 // then we disable the streamprocessors 805 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 806 it != m_ReceiveProcessors.end(); 807 ++it ) { 808 (*it)->disable(); 809 (*it)->m_data_buffer->setTransparent(true); 810 } 811 812 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 813 it != m_TransmitProcessors.end(); 814 ++it ) { 815 (*it)->disable(); 816 (*it)->m_data_buffer->setTransparent(true); 817 } 818 819 // now we wait for the SP's to get disabled 820 debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be disabled...\n"); 821 // we have to wait until all streamprocessors indicate that they are running 822 // i.e. that there is actually some data stream flowing 823 int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds 824 bool enabled=true; 825 while (enabled && wait_cycles) { 826 wait_cycles--; 827 enabled=false; 828 829 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 830 it != m_ReceiveProcessors.end(); 831 ++it ) { 832 if((*it)->isEnabled()) enabled=true; 833 } 834 835 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 836 it != m_TransmitProcessors.end(); 837 ++it ) { 838 if((*it)->isEnabled()) enabled=true; 839 } 840 usleep(1000); // one cycle 841 } 842 843 if(!wait_cycles) { // timout has occurred 844 debugFatal("One or more streams couldn't be disabled (timeout):\n"); 845 846 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 847 it != m_ReceiveProcessors.end(); 848 ++it ) { 849 if(!(*it)->isEnabled()) { 850 debugFatal(" receive stream %p not enabled\n",*it); 851 } else { 852 debugFatal(" receive stream %p enabled\n",*it); 853 } 854 } 855 856 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 857 it != m_TransmitProcessors.end(); 858 ++it ) { 859 if(!(*it)->isEnabled()) { 860 debugFatal(" transmit stream %p not enabled\n",*it); 861 } else { 862 debugFatal(" transmit stream %p enabled\n",*it); 863 } 864 } 865 return false; 866 } 867 868 debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors disabled...\n"); 478 it != m_TransmitProcessors.end(); 479 ++it ) { 480 if (!m_isoManager->unregisterStream(*it)) { 481 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it); 482 return false; 483 } 484 } 869 485 870 486 return true; … … 893 509 * 3) Re-enable the SP's 894 510 */ 895 debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n"); 896 if (!disableStreamProcessors()) { 897 debugFatal("Could not disable StreamProcessors...\n"); 511 512 // put all SP's back into dry-running state 513 if (!startDryRunning()) { 514 debugFatal("Could not put SP's in dry-running state\n"); 898 515 return false; 899 516 } … … 960 577 // this is to notify the client of the delay 961 578 // that we introduced 962 m_delayed_usecs =time_till_next_period;579 m_delayed_usecs = time_till_next_period; 963 580 964 581 // we save the 'ideal' time of the transfer at this point, … … 1001 618 1002 619 // if this is true, a xrun will occur 1003 xrun_occurred |= !((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled();620 xrun_occurred |= !((*it)->canClientTransferFrames(m_period)); 1004 621 1005 622 #ifdef DEBUG … … 1008 625 (*it)->dumpInfo(); 1009 626 } 1010 if (!((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled()) {627 if (!((*it)->canClientTransferFrames(m_period))) { 1011 628 debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it); 1012 629 (*it)->dumpInfo(); … … 1022 639 1023 640 // if this is true, a xrun will occur 1024 xrun_occurred |= !((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled();641 xrun_occurred |= !((*it)->canClientTransferFrames(m_period)); 1025 642 1026 643 #ifdef DEBUG … … 1028 645 debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it); 1029 646 } 1030 if (!((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled()) {647 if (!((*it)->canClientTransferFrames(m_period))) { 1031 648 debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it); 1032 649 } … … 1049 666 bool StreamProcessorManager::transfer() { 1050 667 1051 debugOutput( DEBUG_LEVEL_VER BOSE, "Transferring period...\n");668 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n"); 1052 669 bool retval=true; 1053 retval &= dryRun(StreamProcessor::ePT_Receive);1054 retval &= dryRun(StreamProcessor::ePT_Transmit);670 retval &= transfer(StreamProcessor::ePT_Receive); 671 retval &= transfer(StreamProcessor::ePT_Transmit); 1055 672 return retval; 1056 673 } … … 1066 683 1067 684 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) { 1068 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period ...\n");685 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period for type (%d)...\n", t); 1069 686 bool retval = true; 1070 687 // a static cast could make sure that there is no performance … … 1089 706 it != m_ReceiveProcessors.end(); 1090 707 ++it ) { 1091 1092 708 if(!(*it)->getFrames(m_period, receive_timestamp)) { 1093 debug Output(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)\n",709 debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n", 1094 710 m_period, m_time_of_transfer,*it); 1095 711 retval &= false; // buffer underrun … … 1110 726 1111 727 if(!(*it)->putFrames(m_period, transmit_timestamp)) { 1112 debug Output(DEBUG_LEVEL_VERBOSE,"could not putFrames(%u,%llu) to stream processor (%p)\n",728 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n", 1113 729 m_period, transmit_timestamp, *it); 1114 730 retval &= false; // buffer underrun branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.h
r715 r719 62 62 bool stop(); 63 63 64 bool startDryRunning(); 64 65 bool syncStartAll(); 65 66 … … 67 68 bool registerProcessor(StreamProcessor *processor); ///< start managing a streamprocessor 68 69 bool unregisterProcessor(StreamProcessor *processor); ///< stop managing a streamprocessor 69 70 bool enableStreamProcessors(uint64_t time_to_enable_at); /// enable registered StreamProcessors71 bool disableStreamProcessors(); /// disable registered StreamProcessors72 70 73 71 void setPeriodSize(unsigned int period); … … 99 97 100 98 private: 101 void resetXrunCounters();102 99 103 100 int m_delayed_usecs; branches/ppalmers-streaming/src/libstreaming/util/IsoHandler.cpp
r705 r719 437 437 438 438 debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing iso receive handler (%p)\n",this); 439 debugOutput( DEBUG_LEVEL_VERBOSE, " Buffers : %d \n",m_buf_packets); 440 debugOutput( DEBUG_LEVEL_VERBOSE, " Max Packet size : %d \n",m_max_packet_size); 441 debugOutput( DEBUG_LEVEL_VERBOSE, " Channel : %d \n",m_Client->getChannel()); 442 debugOutput( DEBUG_LEVEL_VERBOSE, " Irq interval : %d \n",m_irq_interval); 439 debugOutput( DEBUG_LEVEL_VERBOSE, " Buffers : %d \n", m_buf_packets); 440 debugOutput( DEBUG_LEVEL_VERBOSE, " Max Packet size : %d \n", m_max_packet_size); 441 debugOutput( DEBUG_LEVEL_VERBOSE, " Channel : %d \n", m_Client->getChannel()); 442 debugOutput( DEBUG_LEVEL_VERBOSE, " Irq interval : %d \n", m_irq_interval); 443 debugOutput( DEBUG_LEVEL_VERBOSE, " Mode : %s \n", 444 (m_irq_interval > 1)?"DMA_BUFFERFILL":"PACKET_PER_BUFFER"); 443 445 444 446 if(m_irq_interval > 1) { branches/ppalmers-streaming/src/libstreaming/util/IsoHandlerManager.cpp
r714 r719 327 327 unsigned int irq_interval=packets_per_period / MINIMUM_INTERRUPTS_PER_PERIOD; 328 328 if(irq_interval <= 0) irq_interval=1; 329 // FIXME: test 330 irq_interval=1; 331 #warning Using fixed irq_interval 332 329 333 #else 330 334 // hardware interrupts occur when one DMA block is full, and the size of one DMA branches/ppalmers-streaming/src/libutil/TimestampedBuffer.cpp
r712 r719 147 147 diff += m_wrap_at; 148 148 } 149 149 150 150 float rate=((float)diff)/((float) m_update_period); 151 if (rate<0.0) debugError("rate < 0! (%f)\n",rate); 151 152 if (fabsf(m_nominal_rate - rate)>(m_nominal_rate*0.1)) { 152 153 debugWarning("(%p) rate (%10.5f) more that 10%% off nominal (rate=%10.5f, diff="TIMESTAMP_FORMAT_SPEC", update_period=%d)\n", 153 154 this, rate,m_nominal_rate,diff, m_update_period); 154 //dumpInfo(); 155 155 156 return m_nominal_rate; 156 157 } else { … … 379 380 380 381 if (m_transparent) { 381 // // if the buffer is disabled, it's in a 'transparent' state, meaning 382 // // that if too much is put into the buffer, the oldest data is discarded 383 // signed int fc; 384 // ENTER_CRITICAL_SECTION; 385 // fc=m_framecounter; 386 // EXIT_CRITICAL_SECTION; 387 // 388 // signed int frames_to_ditch= nframes - (m_buffer_size - m_framecounter) + 1; 389 // if ( frames_to_ditch > 0 ) { 390 // debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "dropping %d frames\n", frames_to_ditch); 391 // dropFrames( frames_to_ditch ); 392 // } 393 // // add the data payload to the ringbuffer 394 // if (ffado_ringbuffer_write(m_event_buffer,data,write_size) < write_size) 395 // { 396 // debugError("we should have freed up enough space for this\n"); 397 // return false; 398 // } 399 // 400 // while disabled, we don't update the DLL, we just set the correct 401 // timestamp for the frames 382 // while disabled, we don't update the DLL, nor do we write frames 383 // we just set the correct timestamp for the frames 402 384 setBufferTailTimestamp(ts); 403 385 } else { … … 446 428 unsigned int read_size=nframes*m_event_size*m_events_per_frame; 447 429 448 // get the data payload to the ringbuffer 449 if ((ffado_ringbuffer_read(m_event_buffer,data,read_size)) < read_size) 450 { 451 // debugWarning("readFrames buffer underrun\n"); 452 return false; 453 } 454 455 decrementFrameCounter(nframes); 456 457 return true; 458 430 if (m_transparent) { 431 return true; // FIXME: the data still doesn't make sense! 432 } else { 433 // get the data payload to the ringbuffer 434 if ((ffado_ringbuffer_read(m_event_buffer,data,read_size)) < read_size) 435 { 436 debugWarning("readFrames buffer underrun\n"); 437 return false; 438 } 439 decrementFrameCounter(nframes); 440 } 441 return true; 459 442 } 460 443 … … 712 695 EXIT_CRITICAL_SECTION; 713 696 714 debugOutput(DEBUG_LEVEL_VER BOSE, "for (%p) to "697 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "for (%p) to " 715 698 TIMESTAMP_FORMAT_SPEC" => "TIMESTAMP_FORMAT_SPEC", NTS=" 716 699 TIMESTAMP_FORMAT_SPEC", DLL2=%f, RATE=%f\n", branches/ppalmers-streaming/src/libutil/TimestampedBuffer.h
r712 r719 150 150 // dll stuff 151 151 bool setNominalRate(float r); 152 float getNominalRate() {return m_nominal_rate;}; 152 153 float getRate(); 153 154 branches/ppalmers-streaming/tests/test-cycletimer.cpp
r714 r719 296 296 297 297 s->setVerboseLevel(DEBUG_LEVEL_VERBOSE); 298 299 if (!s->init()) {300 debugOutput(DEBUG_LEVEL_NORMAL, "Could not init IsoStream\n");301 goto finish;302 }303 304 298 s->setChannel(0); 305 299 branches/ppalmers-streaming/tests/test-sytmonitor.cpp
r705 r719 233 233 234 234 monitors[i]->setVerboseLevel(DEBUG_LEVEL_VERBOSE); 235 236 if (!monitors[i]->init()) {237 debugOutput(DEBUG_LEVEL_NORMAL, "Could not init SytMonitor %d\n", i);238 goto finish;239 }240 241 235 monitors[i]->setChannel(arguments.args[i].channel); 242 236