Index: /branches/ppalmers-streaming/tests/test-sytmonitor.cpp =================================================================== --- /branches/ppalmers-streaming/tests/test-sytmonitor.cpp (revision 705) +++ /branches/ppalmers-streaming/tests/test-sytmonitor.cpp (revision 719) @@ -233,10 +233,4 @@ monitors[i]->setVerboseLevel(DEBUG_LEVEL_VERBOSE); - - if (!monitors[i]->init()) { - debugOutput(DEBUG_LEVEL_NORMAL, "Could not init SytMonitor %d\n", i); - goto finish; - } - monitors[i]->setChannel(arguments.args[i].channel); Index: /branches/ppalmers-streaming/tests/test-cycletimer.cpp =================================================================== --- /branches/ppalmers-streaming/tests/test-cycletimer.cpp (revision 714) +++ /branches/ppalmers-streaming/tests/test-cycletimer.cpp (revision 719) @@ -296,10 +296,4 @@ s->setVerboseLevel(DEBUG_LEVEL_VERBOSE); - - if (!s->init()) { - debugOutput(DEBUG_LEVEL_NORMAL, "Could not init IsoStream\n"); - goto finish; - } - s->setChannel(0); Index: /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.cpp =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.cpp (revision 715) +++ /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.cpp (revision 719) @@ -49,20 +49,4 @@ {} -/** - * @return - */ -bool AmdtpTransmitStreamProcessor::init() { - - debugOutput( DEBUG_LEVEL_VERBOSE, "Initializing (%p)...\n", this); - // call the parent init - // this has to be done before allocating the buffers, - // because this sets the buffersizes from the processormanager - if(!StreamProcessor::init()) { - debugFatal("Could not do base class init (%p)\n",this); - return false; - } - return true; -} - enum raw1394_iso_disposition AmdtpTransmitStreamProcessor::getPacket(unsigned char *data, unsigned int *length, @@ -73,5 +57,5 @@ if (cycle<0) { debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE,"Xmit handler for cycle %d, (running=%d)\n", - cycle, m_running); + cycle, isRunning()); *tag = 0; *sy = 0; @@ -81,5 +65,5 @@ debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE,"Xmit handler for cycle %d, (running=%d)\n", - cycle, m_running); + cycle, isRunning()); if (addCycles(m_last_cycle, 1) != cycle) { @@ -99,5 +83,5 @@ /* Our node ID can change after a bus reset, so it is best to fetch * our node ID for each packet. */ - packet->sid = getNodeId() & 0x3f; + packet->sid = m_handler->getLocalNodeId() & 0x3f; packet->dbs = m_dimension; @@ -126,5 +110,5 @@ #ifdef DEBUG - if(m_running && (cycle_diff < 0)) { + if(isRunning() && (cycle_diff < 0)) { debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy)\n", cycle, now_cycles); @@ -139,7 +123,6 @@ // to be 'running' // NOTE: this works only at startup - if (!m_running && cycle_diff >= 0 && cycle >= 0) { + if (!isRunning() && cycle_diff >= 0 && cycle >= 0) { debugOutput(DEBUG_LEVEL_VERBOSE, "Xmit StreamProcessor %p started running at cycle %d\n",this, cycle); - m_running=true; } @@ -169,8 +152,8 @@ const int max_cycles_to_transmit_early = 5; - if( !m_running || !m_data_buffer->isEnabled() ) { + if( !isRunning() || !m_data_buffer->isEnabled() ) { debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "Not running (%d) or buffer not enabled (enabled=%d)\n", - m_running, m_data_buffer->isEnabled()); + isRunning(), m_data_buffer->isEnabled()); // not running or not enabled @@ -357,4 +340,17 @@ } +unsigned int +AmdtpTransmitStreamProcessor::getEventsPerFrame() +{ + return m_dimension; +} + +unsigned int +AmdtpTransmitStreamProcessor::getUpdatePeriod() +{ + return m_syt_interval; +} + + unsigned int AmdtpTransmitStreamProcessor::fillDataPacketHeader( struct iec61883_packet *packet, unsigned int* length, @@ -417,10 +413,10 @@ m_data_buffer->setTickOffset(0); - // reset all non-device specific stuff - // i.e. the iso stream and the associated ports - if(!StreamProcessor::reset()) { - debugFatal("Could not do base class reset\n"); - return false; - } +// // reset all non-device specific stuff +// // i.e. the iso stream and the associated ports +// if(!StreamProcessor::reset()) { +// debugFatal("Could not do base class reset\n"); +// return false; +// } // we should prefill the event buffer @@ -433,5 +429,5 @@ } -bool AmdtpTransmitStreamProcessor::prepare() { +bool AmdtpTransmitStreamProcessor::prepareChild() { m_PeriodStat.setName("XMT PERIOD"); m_PacketStat.setName("XMT PACKET"); @@ -620,8 +616,8 @@ bool AmdtpTransmitStreamProcessor::prepareForEnable(uint64_t time_to_enable_at) { - if (!StreamProcessor::prepareForEnable(time_to_enable_at)) { - debugError("StreamProcessor::prepareForEnable failed\n"); - return false; - } +// if (!StreamProcessor::prepareForEnable(time_to_enable_at)) { +// debugError("StreamProcessor::prepareForEnable failed\n"); +// return false; +// } return true; Index: /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.cpp =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.cpp (revision 715) +++ /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.cpp (revision 719) @@ -45,225 +45,14 @@ : StreamProcessor(ePT_Receive , port) , m_dimension(dimension) - , m_last_timestamp(0) - , m_last_timestamp2(0) - , m_dropped(0) {} -bool AmdtpReceiveStreamProcessor::init() { - - // call the parent init - // this has to be done before allocating the buffers, - // because this sets the buffersizes from the processormanager - if(!StreamProcessor::init()) { - debugFatal("Could not do base class init (%d)\n",this); - return false; - } - return true; -} - -enum raw1394_iso_disposition -AmdtpReceiveStreamProcessor::putPacket(unsigned char *data, unsigned int length, - unsigned char channel, unsigned char tag, unsigned char sy, - unsigned int cycle, unsigned int dropped) { - - enum raw1394_iso_disposition retval=RAW1394_ISO_OK; - - int dropped_cycles=diffCycles(cycle, m_last_cycle) - 1; - if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles); - else m_dropped += dropped_cycles; - if (dropped_cycles > 0) debugWarning("(%p) dropped %d packets on cycle %u\n", this, dropped_cycles, cycle); - - m_last_cycle=cycle; - - struct iec61883_packet *packet = (struct iec61883_packet *) data; - assert(packet); - -#ifdef DEBUG - if(dropped>0) { - debugWarning("(%p) Dropped %d packets on cycle %d\n", this, dropped, cycle); - } - - debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"ch%2u: CY=%4u, SYT=%08X (%4ucy + %04uticks) (running=%d)\n", - channel, cycle, ntohs(packet->syt), - CYCLE_TIMER_GET_CYCLES(ntohs(packet->syt)), CYCLE_TIMER_GET_OFFSET(ntohs(packet->syt)), - m_running); - - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, - "RCV: CH = %d, FDF = %X. SYT = %6d, DBS = %3d, DBC = %3d, FMT = %3d, LEN = %4d\n", - channel, packet->fdf, - packet->syt, - packet->dbs, - packet->dbc, - packet->fmt, - length); - -#endif - - // check if this is a valid packet - if((packet->syt != 0xFFFF) - && (packet->fdf != 0xFF) - && (packet->fmt == 0x10) - && (packet->dbs>0) - && (length>=2*sizeof(quadlet_t))) { - - unsigned int nevents=((length / sizeof (quadlet_t)) - 2)/packet->dbs; - - //=> store the previous timestamp - m_last_timestamp2=m_last_timestamp; - - uint64_t nowX = m_handler->getCycleTimer(); - //=> convert the SYT to a full timestamp in ticks - m_last_timestamp=sytRecvToFullTicks((uint32_t)ntohs(packet->syt), - cycle, nowX); - - int64_t diffx = diffTicks(m_last_timestamp, m_last_timestamp2); - if (abs(diffx) > m_syt_interval * m_data_buffer->getRate() * 1.1) { - uint32_t now=m_handler->getCycleTimer(); - uint32_t syt = (uint32_t)ntohs(packet->syt); - uint32_t now_ticks=CYCLE_TIMER_TO_TICKS(now); - - debugOutput(DEBUG_LEVEL_VERBOSE, "diff=%06lld TS=%011llu TS2=%011llu\n", - diffx, m_last_timestamp, m_last_timestamp2); - debugOutput(DEBUG_LEVEL_VERBOSE, "[1] cy=%04d dropped=%05llu syt=%04llX NOW=%08llX => TS=%011llu\n", - m_last_good_cycle, m_last_dropped, m_last_syt, m_last_now, m_last_timestamp2); - debugOutput(DEBUG_LEVEL_VERBOSE, "[2] cy=%04d dropped=%05d syt=%04X NOW=%08llX => TS=%011llu\n", - cycle, dropped_cycles, ntohs(packet->syt), nowX, m_last_timestamp); - - uint32_t test_ts=sytRecvToFullTicks(syt, cycle, now); - - debugOutput(DEBUG_LEVEL_VERBOSE, "R %04d: SYT=%08X, CY=%04d OFF=%04d\n", - cycle, syt, CYCLE_TIMER_GET_CYCLES(syt), CYCLE_TIMER_GET_OFFSET(syt) - ); - debugOutput(DEBUG_LEVEL_VERBOSE, "R %04d: NOW=%011lu, SEC=%03u CY=%04u OFF=%04u\n", - cycle, now_ticks, CYCLE_TIMER_GET_SECS(now), CYCLE_TIMER_GET_CYCLES(now), CYCLE_TIMER_GET_OFFSET(now) - ); - debugOutput(DEBUG_LEVEL_VERBOSE, "R %04d: TSS=%011lu, SEC=%03u CY=%04u OFF=%04u\n", - cycle, test_ts, TICKS_TO_SECS(test_ts), TICKS_TO_CYCLES(test_ts), TICKS_TO_OFFSET(test_ts) - ); - - int64_t diff_ts = diffTicks(now_ticks, test_ts); - debugOutput(DEBUG_LEVEL_VERBOSE, "DIFF : TCK=%011lld, SEC=%03llu CY=%04llu OFF=%04llu\n", - diff_ts, - TICKS_TO_SECS((uint64_t)diff_ts), - TICKS_TO_CYCLES((uint64_t)diff_ts), - TICKS_TO_OFFSET((uint64_t)diff_ts) - ); - } - m_last_syt = ntohs(packet->syt); - m_last_now = nowX; - m_last_good_cycle = cycle; - m_last_dropped = dropped_cycles; - - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "RECV: CY=%04u TS=%011llu\n", - cycle, m_last_timestamp); - - // we have to keep in mind that there are also - // some packets buffered by the ISO layer, - // at most x=m_handler->getWakeupInterval() - // these contain at most x*syt_interval - // frames, meaning that we might receive - // this packet x*syt_interval*ticks_per_frame - // later than expected (the real receive time) - debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"STMP: %lluticks | buff=%d, syt_interval=%d, tpf=%f\n", - m_last_timestamp, m_handler->getWakeupInterval(),m_syt_interval,getTicksPerFrame()); - - //=> signal that we're running (if we are) - if(!m_running && nevents && m_last_timestamp2 && m_last_timestamp) { - debugOutput(DEBUG_LEVEL_VERBOSE,"Receive StreamProcessor %p started running at %d\n", this, cycle); - m_running=true; - m_data_buffer->setBufferTailTimestamp(m_last_timestamp); - // we don't want this first sample to be written - return RAW1394_ISO_OK; - } - - // if we are not running yet, there is nothing more to do - if(!m_running) { - return RAW1394_ISO_OK; - } - #ifdef DEBUG_OFF - if((cycle % 1000) == 0) { - uint32_t now=m_handler->getCycleTimer(); - uint32_t syt = (uint32_t)ntohs(packet->syt); - uint32_t now_ticks=CYCLE_TIMER_TO_TICKS(now); - - uint32_t test_ts=sytRecvToFullTicks(syt, cycle, now); - - debugOutput(DEBUG_LEVEL_VERBOSE, "R %04d: SYT=%08X, CY=%02d OFF=%04d\n", - cycle, syt, CYCLE_TIMER_GET_CYCLES(syt), CYCLE_TIMER_GET_OFFSET(syt) - ); - debugOutput(DEBUG_LEVEL_VERBOSE, "R %04d: NOW=%011lu, SEC=%03u CY=%02u OFF=%04u\n", - cycle, now_ticks, CYCLE_TIMER_GET_SECS(now), CYCLE_TIMER_GET_CYCLES(now), CYCLE_TIMER_GET_OFFSET(now) - ); - debugOutput(DEBUG_LEVEL_VERBOSE, "R %04d: TSS=%011lu, SEC=%03u CY=%02u OFF=%04u\n", - cycle, test_ts, TICKS_TO_SECS(test_ts), TICKS_TO_CYCLES(test_ts), TICKS_TO_OFFSET(test_ts) - ); - } - #endif - - #ifdef DEBUG - // keep track of the lag - uint32_t now=m_handler->getCycleTimer(); - int32_t diff = diffCycles( cycle, ((int)CYCLE_TIMER_GET_CYCLES(now)) ); - m_PacketStat.mark(diff); - #endif - - //=> process the packet - // add the data payload to the ringbuffer - - if(dropped_cycles) { - debugWarning("(%p) Correcting timestamp for dropped cycles, discarding packet...\n", this); - m_data_buffer->setBufferTailTimestamp(m_last_timestamp); - // we don't want this first sample to be written - return RAW1394_ISO_OK; - } - - if(m_data_buffer->writeFrames(nevents, (char *)(data+8), m_last_timestamp)) { - retval=RAW1394_ISO_OK; - - // process all ports that should be handled on a per-packet base - // this is MIDI for AMDTP (due to the need of DBC) - if (!decodePacketPorts((quadlet_t *)(data+8), nevents, packet->dbc)) { - debugWarning("Problem decoding Packet Ports\n"); - retval=RAW1394_ISO_DEFER; - } - - } else { - -// debugWarning("Receive buffer overrun (cycle %d, FC=%d, PC=%d)\n", -// cycle, m_data_buffer->getFrameCounter(), m_handler->getPacketCount()); - - m_xruns++; - - retval=RAW1394_ISO_DEFER; - } - } - - return retval; -} - -void AmdtpReceiveStreamProcessor::dumpInfo() { - StreamProcessor::dumpInfo(); -} - -bool AmdtpReceiveStreamProcessor::reset() { - - debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting...\n"); - - m_PeriodStat.reset(); - m_PacketStat.reset(); - m_WakeupStat.reset(); - - m_data_buffer->setTickOffset(0); - - // reset all non-device specific stuff - // i.e. the iso stream and the associated ports - if(!StreamProcessor::reset()) { - debugFatal("Could not do base class reset\n"); - return false; - } - return true; -} - -bool AmdtpReceiveStreamProcessor::prepare() { + +unsigned int +AmdtpReceiveStreamProcessor::getPacketsPerPeriod() +{ + return (m_manager->getPeriodSize())/m_syt_interval; +} + +bool AmdtpReceiveStreamProcessor::prepareChild() { m_PeriodStat.setName("RCV PERIOD"); @@ -273,133 +62,26 @@ debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing (%p)...\n", this); - // prepare all non-device specific stuff - // i.e. the iso stream and the associated ports - if(!StreamProcessor::prepare()) { - debugFatal("Could not prepare base class\n"); - return false; - } - switch (m_manager->getNominalRate()) { - case 32000: - m_syt_interval = 8; - break; - case 44100: - m_syt_interval = 8; - break; - default: - case 48000: - m_syt_interval = 8; - break; - case 88200: - m_syt_interval = 16; - break; - case 96000: - m_syt_interval = 16; - break; - case 176400: - m_syt_interval = 32; - break; - case 192000: - m_syt_interval = 32; - break; - } - - // prepare the framerate estimate - float ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_manager->getNominalRate()); - m_ticks_per_frame=ticks_per_frame; - - debugOutput(DEBUG_LEVEL_VERBOSE,"Initializing remote ticks/frame to %f\n",ticks_per_frame); - - // initialize internal buffer - unsigned int ringbuffer_size_frames=m_manager->getNbBuffers() * m_manager->getPeriodSize(); - - assert(m_data_buffer); - m_data_buffer->setBufferSize(ringbuffer_size_frames * 2); - m_data_buffer->setEventSize(sizeof(quadlet_t)); - m_data_buffer->setEventsPerFrame(m_dimension); - - // the buffer is written every syt_interval - m_data_buffer->setUpdatePeriod(m_syt_interval); - m_data_buffer->setNominalRate(ticks_per_frame); - - m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND); - - m_data_buffer->prepare(); - - // set the parameters of ports we can: - // we want the audio ports to be period buffered, - // and the midi ports to be packet buffered - for ( PortVectorIterator it = m_Ports.begin(); - it != m_Ports.end(); - ++it ) - { - debugOutput(DEBUG_LEVEL_VERBOSE, "Setting up port %s\n",(*it)->getName().c_str()); - if(!(*it)->setBufferSize(m_manager->getPeriodSize())) { - debugFatal("Could not set buffer size to %d\n",m_manager->getPeriodSize()); + case 32000: + case 44100: + case 48000: + m_syt_interval = 8; + break; + case 88200: + case 96000: + m_syt_interval = 16; + break; + case 176400: + case 192000: + m_syt_interval = 32; + break; + default: + debugError("Unsupported rate: %d\n", m_manager->getNominalRate()); return false; - } - - switch ((*it)->getPortType()) { - case Port::E_Audio: - if(!(*it)->setSignalType(Port::E_PeriodSignalled)) { - debugFatal("Could not set signal type to PeriodSignalling"); - return false; - } - // buffertype and datatype are dependant on the API - debugWarning("---------------- ! Doing hardcoded dummy setup ! --------------\n"); - // buffertype and datatype are dependant on the API - if(!(*it)->setBufferType(Port::E_PointerBuffer)) { - debugFatal("Could not set buffer type"); - return false; - } - if(!(*it)->useExternalBuffer(true)) { - debugFatal("Could not set external buffer usage"); - return false; - } - if(!(*it)->setDataType(Port::E_Float)) { - debugFatal("Could not set data type"); - return false; - } - break; - case Port::E_Midi: - if(!(*it)->setSignalType(Port::E_PacketSignalled)) { - debugFatal("Could not set signal type to PacketSignalling"); - return false; - } - // buffertype and datatype are dependant on the API - // buffertype and datatype are dependant on the API - debugWarning("---------------- ! Doing hardcoded test setup ! --------------\n"); - // buffertype and datatype are dependant on the API - if(!(*it)->setBufferType(Port::E_RingBuffer)) { - debugFatal("Could not set buffer type"); - return false; - } - if(!(*it)->setDataType(Port::E_MidiEvent)) { - debugFatal("Could not set data type"); - return false; - } - break; - default: - debugWarning("Unsupported port type specified\n"); - break; - } - } - - // the API specific settings of the ports should already be set, - // as this is called from the processorManager->prepare() - // so we can init the ports - if(!initPorts()) { - debugFatal("Could not initialize ports!\n"); - return false; - } - - if(!preparePorts()) { - debugFatal("Could not initialize ports!\n"); - return false; } debugOutput( DEBUG_LEVEL_VERBOSE, "Prepared for:\n"); debugOutput( DEBUG_LEVEL_VERBOSE, " Samplerate: %d, DBS: %d, SYT: %d\n", - m_manager->getNominalRate(),m_dimension,m_syt_interval); + m_manager->getNominalRate(), m_dimension, m_syt_interval); debugOutput( DEBUG_LEVEL_VERBOSE, " PeriodSize: %d, NbBuffers: %d\n", m_manager->getPeriodSize(), m_manager->getNbBuffers()); @@ -408,82 +90,87 @@ return true; - -} - -bool AmdtpReceiveStreamProcessor::prepareForStart() { - disable(); - return true; -} - -bool AmdtpReceiveStreamProcessor::prepareForStop() { - disable(); - return true; -} - -unsigned int -AmdtpReceiveStreamProcessor::getPacketsPerPeriod() -{ - return (m_manager->getPeriodSize())/m_syt_interval; -} - -bool AmdtpReceiveStreamProcessor::getFrames(unsigned int nbframes, int64_t ts) { - m_PeriodStat.mark(m_data_buffer->getBufferFill()); - -#ifdef DEBUG - uint64_t ts_head; - signed int fc; - int32_t lag_ticks; - float lag_frames; - - // in order to sync up multiple received streams, we should - // use the ts parameter. It specifies the time of the block's - // first sample. - - ffado_timestamp_t ts_head_tmp; - m_data_buffer->getBufferHeadTimestamp(&ts_head_tmp, &fc); - ts_head=(uint64_t)ts_head_tmp; - lag_ticks=diffTicks(ts, ts_head); - float rate=m_data_buffer->getRate(); - - assert(rate!=0.0); - - lag_frames=(((float)lag_ticks)/rate); - - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): drifts %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n", - this, lag_ticks, lag_frames,rate, ts, ts_head, fc); - - if (lag_frames>=1.0) { - // the stream lags - debugWarning( "stream (%p): lags with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n", - this, lag_ticks, lag_frames,rate, ts, ts_head, fc); - } else if (lag_frames<=-1.0) { - // the stream leads - debugWarning( "stream (%p): leads with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n", - this, lag_ticks, lag_frames,rate, ts, ts_head, fc); - } -#endif - // ask the buffer to process nbframes of frames - // using it's registered client's processReadBlock(), - // which should be ours - m_data_buffer->blockProcessReadFrames(nbframes); - - return true; -} - -bool AmdtpReceiveStreamProcessor::getFramesDry(unsigned int nbframes, int64_t ts) { - m_PeriodStat.mark(m_data_buffer->getBufferFill()); - int frames_to_ditch=(int)(nbframes); - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): dry run %d frames (@ ts=%lld)\n", - this, frames_to_ditch, ts); - char dummy[m_data_buffer->getBytesPerFrame()]; // one frame of garbage - - while (frames_to_ditch--) { - m_data_buffer->readFrames(1, dummy); - } - return true; -} - -/** - * \brief write received events to the stream ringbuffers. +} + + +/** + * Processes packet header to extract timestamps and so on + * @param data + * @param length + * @param channel + * @param tag + * @param sy + * @param cycle + * @param dropped + * @return true if this is a valid packet, false if not + */ +bool +AmdtpReceiveStreamProcessor::processPacketHeader(unsigned char *data, unsigned int length, + unsigned char channel, unsigned char tag, unsigned char sy, + unsigned int cycle, unsigned int dropped) +{ + struct iec61883_packet *packet = (struct iec61883_packet *) data; + assert(packet); + bool retval = (packet->syt != 0xFFFF) && + (packet->fdf != 0xFF) && + (packet->fmt == 0x10) && + (packet->dbs > 0) && + (length >= 2*sizeof(quadlet_t)); + if(retval) { + uint64_t now = m_handler->getCycleTimer(); + //=> convert the SYT to a full timestamp in ticks + m_last_timestamp = sytRecvToFullTicks((uint32_t)ntohs(packet->syt), + cycle, now); + } + return retval; +} + +/** + * extract the data from the packet + * @pre the IEC61883 packet is valid according to isValidPacket + * @param data + * @param length + * @param channel + * @param tag + * @param sy + * @param cycle + * @param dropped + * @return true if successful, false if xrun + */ +bool +AmdtpReceiveStreamProcessor::processPacketData(unsigned char *data, unsigned int length, + unsigned char channel, unsigned char tag, unsigned char sy, + unsigned int cycle, unsigned int dropped_cycles) { + struct iec61883_packet *packet = (struct iec61883_packet *) data; + assert(packet); + + unsigned int nevents=((length / sizeof (quadlet_t)) - 2)/packet->dbs; + + // we have to keep in mind that there are also + // some packets buffered by the ISO layer, + // at most x=m_handler->getWakeupInterval() + // these contain at most x*syt_interval + // frames, meaning that we might receive + // this packet x*syt_interval*ticks_per_frame + // later than expected (the real receive time) + debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"STMP: %lluticks | buff=%d, syt_interval=%d, tpf=%f\n", + m_last_timestamp, m_handler->getWakeupInterval(), m_syt_interval, getTicksPerFrame()); + + if(m_data_buffer->writeFrames(nevents, (char *)(data+8), m_last_timestamp)) { + // process all ports that should be handled on a per-packet base + // this is MIDI for AMDTP (due to the need of DBC) + if (!decodePacketPorts((quadlet_t *)(data+8), nevents, packet->dbc)) { + debugWarning("Problem decoding Packet Ports\n"); + } + return true; + } else { + return false; + } +} + +/*********************************************** + * Encoding/Decoding API * + ***********************************************/ +/** + * @brief write received events to the stream ringbuffers. */ bool AmdtpReceiveStreamProcessor::processReadBlock(char *data, @@ -498,5 +185,4 @@ ++it ) { - if((*it)->isDisabled()) {continue;}; @@ -523,5 +209,41 @@ } return no_problem; - +} + +/** + * @brief write silence events to the stream ringbuffers. + */ +bool AmdtpReceiveStreamProcessor::provideSilenceBlock(unsigned int nevents, unsigned int offset) +{ + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "(%p)->proviceSilenceBlock(%u, %u)\n", this, nevents, offset); + + bool no_problem=true; + + for ( PortVectorIterator it = m_PeriodPorts.begin(); + it != m_PeriodPorts.end(); + ++it ) + { + if((*it)->isDisabled()) {continue;}; + //FIXME: make this into a static_cast when not DEBUG? + AmdtpPortInfo *pinfo=dynamic_cast(*it); + assert(pinfo); // this should not fail!! + + switch(pinfo->getFormat()) { + case AmdtpPortInfo::E_MBLA: + if(provideSilenceToPort(static_cast(*it), offset, nevents)) { + debugWarning("Could not put silence into to port %s",(*it)->getName().c_str()); + no_problem=false; + } + break; + case AmdtpPortInfo::E_SPDIF: // still unimplemented + break; + /* for this processor, midi is a packet based port + case AmdtpPortInfo::E_Midi: + break;*/ + default: // ignore + break; + } + } + return no_problem; } @@ -583,13 +305,10 @@ } -int AmdtpReceiveStreamProcessor::decodeMBLAEventsToPort(AmdtpAudioPort *p, quadlet_t *data, +int +AmdtpReceiveStreamProcessor::decodeMBLAEventsToPort( + AmdtpAudioPort *p, quadlet_t *data, unsigned int offset, unsigned int nevents) { unsigned int j=0; - -// printf("****************\n"); -// hexDumpQuadlets(data,m_dimension*4); -// printf("****************\n"); - quadlet_t *target_event; @@ -640,3 +359,38 @@ } +int +AmdtpReceiveStreamProcessor::provideSilenceToPort( + AmdtpAudioPort *p, unsigned int offset, unsigned int nevents) +{ + unsigned int j=0; + switch(p->getDataType()) { + default: + case Port::E_Int24: + { + quadlet_t *buffer=(quadlet_t *)(p->getBufferAddress()); + assert(nevents + offset <= p->getBufferSize()); + buffer+=offset; + + for(j = 0; j < nevents; j += 1) { // decode max nsamples + *(buffer)=0; + buffer++; + } + } + break; + case Port::E_Float: + { + float *buffer=(float *)(p->getBufferAddress()); + assert(nevents + offset <= p->getBufferSize()); + buffer+=offset; + + for(j = 0; j < nevents; j += 1) { // decode max nsamples + *buffer = 0.0; + buffer++; + } + } + break; + } + return 0; +} + } // end of namespace Streaming Index: /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.h =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.h (revision 715) +++ /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.h (revision 719) @@ -85,7 +85,10 @@ int cycle, unsigned int dropped, unsigned int max_length); - bool init(); + virtual unsigned int getEventsPerFrame(); + virtual unsigned int getEventSize() {return 4;}; + virtual unsigned int getUpdatePeriod(); + bool reset(); - bool prepare(); + bool prepareChild(); bool prepareForStop(); Index: /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.h =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.h (revision 715) +++ /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.h (revision 719) @@ -72,5 +72,4 @@ * Create a AMDTP receive StreamProcessor * @param port 1394 port - * @param framerate frame rate * @param dimension number of substreams in the ISO stream * (midi-muxed is only one stream) @@ -79,18 +78,22 @@ virtual ~AmdtpReceiveStreamProcessor() {}; - enum raw1394_iso_disposition putPacket(unsigned char *data, unsigned int length, + bool processPacketHeader(unsigned char *data, unsigned int length, + unsigned char channel, unsigned char tag, unsigned char sy, + unsigned int cycle, unsigned int dropped); + bool processPacketData(unsigned char *data, unsigned int length, unsigned char channel, unsigned char tag, unsigned char sy, unsigned int cycle, unsigned int dropped); + virtual bool prepareChild(); - bool init(); - bool reset(); - bool prepare(); - - bool prepareForStop(); - bool prepareForStart(); - - bool getFrames(unsigned int nbframes, int64_t ts); ///< transfer the buffer contents to the client - bool getFramesDry(unsigned int nbframes, int64_t ts); +public: + virtual unsigned int getEventSize() + {return 4;}; + virtual unsigned int getMaxPacketSize() + {return 4 * (2 + m_syt_interval * m_dimension);}; + virtual unsigned int getEventsPerFrame() + { return m_dimension; }; + virtual unsigned int getUpdatePeriod() + {return m_syt_interval;}; // We have 1 period of samples = m_period @@ -101,29 +104,21 @@ // however, if we only count the number of used packets // it is m_period / m_syt_interval - unsigned int getPacketsPerPeriod(); + virtual unsigned int getPacketsPerPeriod(); - unsigned int getMaxPacketSize() {return 4 * (2 + m_syt_interval * m_dimension);}; - - void dumpInfo(); protected: bool processReadBlock(char *data, unsigned int nevents, unsigned int offset); + bool provideSilenceBlock(unsigned int nevents, unsigned int offset); bool decodePacketPorts(quadlet_t *data, unsigned int nevents, unsigned int dbc); int decodeMBLAEventsToPort(AmdtpAudioPort *, quadlet_t *data, unsigned int offset, unsigned int nevents); - void updatePreparedState(); + int provideSilenceToPort(AmdtpAudioPort *p, unsigned int offset, unsigned int nevents); int m_dimension; unsigned int m_syt_interval; - uint64_t m_dropped; /// FIXME:debug - uint64_t m_last_dropped; /// FIXME:debug uint64_t m_last_syt; /// FIXME:debug uint64_t m_last_now; /// FIXME:debug - int m_last_good_cycle; /// FIXME:debug - uint64_t m_last_timestamp; /// last timestamp (in ticks) - uint64_t m_last_timestamp2; /// last timestamp (in ticks) - uint64_t m_last_timestamp_at_period_ticks; }; Index: /branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp (revision 715) +++ /branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp (revision 719) @@ -33,4 +33,11 @@ #define PREPARE_TIMEOUT_MSEC 4000 #define ENABLE_TIMEOUT_MSEC 4000 + +// allows to add some processing margin. This shifts the time +// at which the buffer is transfer()'ed, making things somewhat +// more robust. It should be noted though that shifting the transfer +// time to a later time instant also causes the xmit buffer fill to be +// lower on average. +#define FFADO_SIGNAL_DELAY_TICKS 3072 namespace Streaming { @@ -53,5 +60,4 @@ StreamProcessorManager::~StreamProcessorManager() { if (m_isoManager) delete m_isoManager; - } @@ -90,5 +96,4 @@ debugFatal("Unsupported processor type!\n"); - return false; } @@ -102,21 +107,16 @@ for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); - it != m_ReceiveProcessors.end(); - ++it ) { - + it != m_ReceiveProcessors.end(); + ++it ) + { if ( *it == processor ) { - m_ReceiveProcessors.erase(it); - - processor->clearManager(); - - if(!m_isoManager->unregisterStream(processor)) { - debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n"); - - return false; - - } - - return true; + m_ReceiveProcessors.erase(it); + processor->clearManager(); + if(!m_isoManager->unregisterStream(processor)) { + debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n"); + return false; } + return true; + } } } @@ -124,33 +124,25 @@ if (processor->getType()==StreamProcessor::ePT_Transmit) { for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - + it != m_TransmitProcessors.end(); + ++it ) + { if ( *it == processor ) { - m_TransmitProcessors.erase(it); - - processor->clearManager(); - - if(!m_isoManager->unregisterStream(processor)) { - debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n"); - - return false; - - } - - return true; + m_TransmitProcessors.erase(it); + processor->clearManager(); + if(!m_isoManager->unregisterStream(processor)) { + debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n"); + return false; } + return true; + } } } debugFatal("Processor (%p) not found!\n",processor); - return false; //not found - } bool StreamProcessorManager::setSyncSource(StreamProcessor *s) { debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s); - m_SyncSource=s; return true; @@ -160,15 +152,10 @@ { debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); - m_isoManager = new IsoHandlerManager(m_thread_realtime, m_thread_priority + 1); - if(!m_isoManager) { debugFatal("Could not create IsoHandlerManager\n"); return false; } - - // propagate the debug level m_isoManager->setVerboseLevel(getDebugLevel()); - if(!m_isoManager->init()) { debugFatal("Could not initialize IsoHandlerManager\n"); @@ -177,5 +164,4 @@ m_xrun_happened=false; - return true; } @@ -195,23 +181,25 @@ } - for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); - it != m_ReceiveProcessors.end(); - ++it ) { - if(m_SyncSource == NULL) { - debugWarning(" => Sync Source is %p.\n", *it); - m_SyncSource = *it; - } - } - - for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - if(m_SyncSource == NULL) { - debugWarning(" => Sync Source is %p.\n", *it); - m_SyncSource = *it; - } - } - - // now do the actual preparation + // FIXME: put into separate method + for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); + it != m_ReceiveProcessors.end(); + ++it ) + { + if(m_SyncSource == NULL) { + debugWarning(" => Sync Source is %p.\n", *it); + m_SyncSource = *it; + } + } + for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); + it != m_TransmitProcessors.end(); + ++it ) + { + if(m_SyncSource == NULL) { + debugWarning(" => Sync Source is %p.\n", *it); + m_SyncSource = *it; + } + } + + // now do the actual preparation of the SP's debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n"); for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); @@ -228,5 +216,4 @@ } } - debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n"); for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); @@ -248,61 +235,33 @@ return false; } - return true; } +bool StreamProcessorManager::startDryRunning() { + debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for StreamProcessor streams to start dry-running...\n"); + for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); + it != m_ReceiveProcessors.end(); + ++it ) { + if(!(*it)->startDryRunning(-1)) { + debugError("Could not put SP %p into the dry-running state\n", *it); + return false; + } + } + for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); + it != m_TransmitProcessors.end(); + ++it ) { + if(!(*it)->startDryRunning(-1)) { + debugError("Could not put SP %p into the dry-running state\n", *it); + return false; + } + } + debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n"); + return true; +} + bool StreamProcessorManager::syncStartAll() { - - debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for StreamProcessor streams to start running...\n"); - // we have to wait until all streamprocessors indicate that they are running - // i.e. that there is actually some data stream flowing - int wait_cycles=RUNNING_TIMEOUT_MSEC; // two seconds - bool notRunning=true; - while (notRunning && wait_cycles) { - wait_cycles--; - notRunning=false; - - for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); - it != m_ReceiveProcessors.end(); - ++it ) { - if(!(*it)->isRunning()) notRunning=true; - } - - for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - if(!(*it)->isRunning()) notRunning=true; - } - - usleep(1000); - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Running check: %d\n", notRunning); - } - - if(!wait_cycles) { // timout has occurred - debugFatal("One or more streams are not starting up (timeout):\n"); - - for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); - it != m_ReceiveProcessors.end(); - ++it ) { - if(!(*it)->isRunning()) { - debugFatal(" receive stream %p not running\n",*it); - } else { - debugFatal(" receive stream %p running\n",*it); - } - } - - for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - if(!(*it)->isRunning()) { - debugFatal(" transmit stream %p not running\n",*it); - } else { - debugFatal(" transmit stream %p running\n",*it); - } - } - return false; - } - - debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n"); + // figure out when to get the SP's running. + // the xmit SP's should also know the base timestamp + // streams should be aligned here // now find out how long we have to delay the wait operation such that @@ -318,4 +277,10 @@ } + // add some processing margin. This only shifts the time + // at which the buffer is transfer()'ed. This makes things somewhat + // more robust. It should be noted though that shifting the transfer + // time to a later time instant also causes the xmit buffer fill to be + // lower on average. + max_of_min_delay += FFADO_SIGNAL_DELAY_TICKS; debugOutput( DEBUG_LEVEL_VERBOSE, " %d ticks (%03us %04uc %04ut)...\n", max_of_min_delay, @@ -325,70 +290,43 @@ m_SyncSource->setSyncDelay(max_of_min_delay); - debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device to indicate clock sync lock...\n"); + //STEP X: when we implement such a function, we can wait for a signal from the devices that they + // have aquired lock + //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n"); //sleep(2); // FIXME: be smarter here - - debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting StreamProcessors...\n"); - // now we reset the frame counters - for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); - it != m_ReceiveProcessors.end(); - ++it ) { - // get the receive SP's going at receiving data - (*it)->m_data_buffer->setTransparent(false); - (*it)->reset(); - } - - for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - // make sure the SP retains it's prefilled data - (*it)->m_data_buffer->setTransparent(false); - (*it)->reset(); - } - - dumpInfo(); - // All buffers are prefilled and set-up, the only thing - // that remains a mistery is the timestamp information. -// m_SyncSource->m_data_buffer->setTransparent(false); -// debugShowBackLog(); - -// m_SyncSource->setVerboseLevel(DEBUG_LEVEL_ULTRA_VERBOSE); - + + // wait for some sort of sync debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n"); - // in order to obtain that, we wait for the first periods to be - // received. + // in order to obtain that, we wait for the first periods to be received. int nb_sync_runs=20; + int64_t time_till_next_period; while(nb_sync_runs--) { // or while not sync-ed? - waitForPeriod(); - // drop the frames for all receive SP's - dryRun(StreamProcessor::ePT_Receive); - - // we don't have to dryrun for the xmit SP's since they - // are not sending data yet. - - // sync the xmit SP's buffer head timestamps - for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - // FIXME: encapsulate - (*it)->m_data_buffer->setBufferHeadTimestamp(m_time_of_transfer); - } - } -// m_SyncSource->setVerboseLevel(DEBUG_LEVEL_VERBOSE); - + time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); + debugOutput( DEBUG_LEVEL_VERBOSE, "waiting for %d usecs...\n", time_till_next_period); + if(time_till_next_period > 0) { + // wait for the period + usleep(time_till_next_period); + } + } + + // figure out where we are now + uint64_t time_of_transfer = m_SyncSource->getTimeAtPeriod(); debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n", - m_time_of_transfer, - (unsigned int)TICKS_TO_SECS(m_time_of_transfer), - (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer), - (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer)); - // FIXME: xruns can screw up the framecounter accounting. do something more sane here - resetXrunCounters(); - // lock the isohandlermanager such that things freeze -// debugShowBackLog(); - + time_of_transfer, + (unsigned int)TICKS_TO_SECS(time_of_transfer), + (unsigned int)TICKS_TO_CYCLES(time_of_transfer), + (unsigned int)TICKS_TO_OFFSET(time_of_transfer)); + + // start wet-running in 200 cycles + // this is the timeframe in which the remaining code should be ready + time_of_transfer = addTicks(time_of_transfer, 200*TICKS_PER_CYCLE); + + debugOutput( DEBUG_LEVEL_VERBOSE, " => start at TS=%011llu (%03us %04uc %04ut)...\n", + time_of_transfer, + (unsigned int)TICKS_TO_SECS(time_of_transfer), + (unsigned int)TICKS_TO_CYCLES(time_of_transfer), + (unsigned int)TICKS_TO_OFFSET(time_of_transfer)); // we now should have decent sync info // the buffers of the receive streams should be (approx) empty // the buffers of the xmit streams should be full - - // note what should the timestamp of the first sample be? // at this point the buffer head timestamp of the transmit buffers can be @@ -405,136 +343,35 @@ // int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks); - for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - // FIXME: encapsulate - (*it)->m_data_buffer->setBufferHeadTimestamp(m_time_of_transfer); - //(*it)->m_data_buffer->setNominalRate(rate); //CHECK!!! - } +// for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); +// it != m_TransmitProcessors.end(); +// ++it ) { +// // FIXME: encapsulate +// (*it)->m_data_buffer->setBufferHeadTimestamp(m_time_of_transfer); +// //(*it)->m_data_buffer->setNominalRate(rate); //CHECK!!! +// } dumpInfo(); - - // this is the place were we should be syncing the received streams too - // check how much they differ - - - debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n"); - - // FIXME: this should not be in cycles, but in 'time' - // FIXME: remove the timestamp - if (!enableStreamProcessors(0)) { - debugFatal("Could not enable StreamProcessors...\n"); - return false; - } - - debugOutput( DEBUG_LEVEL_VERBOSE, "Running dry for a while...\n"); - #define MAX_DRYRUN_CYCLES 40 - #define MIN_SUCCESSFUL_DRYRUN_CYCLES 4 - // run some cycles 'dry' such that everything can stabilize - int nb_dryrun_cycles_left = MAX_DRYRUN_CYCLES; - int nb_succesful_cycles = 0; - while(nb_dryrun_cycles_left > 0 && - nb_succesful_cycles < MIN_SUCCESSFUL_DRYRUN_CYCLES ) { - - waitForPeriod(); - - if (dryRun()) { - nb_succesful_cycles++; - } else { - debugOutput( DEBUG_LEVEL_VERBOSE, " This dry-run was not xrun free...\n" ); - resetXrunCounters(); - // reset the transmit SP's such that there is no issue with accumulating buffers - // FIXME: what about receive SP's - for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - // FIXME: encapsulate - (*it)->reset(); //CHECK!!! - (*it)->m_data_buffer->setBufferHeadTimestamp(m_time_of_transfer); - } - - nb_succesful_cycles = 0; - // FIXME: xruns can screw up the framecounter accounting. do something more sane here - } - nb_dryrun_cycles_left--; - } - - if(nb_dryrun_cycles_left == 0) { - debugOutput( DEBUG_LEVEL_VERBOSE, " max # dry-run cycles achieved without steady-state...\n" ); - return false; - } - debugOutput( DEBUG_LEVEL_VERBOSE, " dry-run resulted in steady-state...\n" ); - - // now we should clear the xrun flags - resetXrunCounters(); - -/* debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning streams...\n"); - // run some cycles 'dry' such that everything can stabilize - nb_dryrun_cycles_left = MAX_DRYRUN_CYCLES; - nb_succesful_cycles = 0; - while(nb_dryrun_cycles_left > 0 && - nb_succesful_cycles < MIN_SUCCESSFUL_DRYRUN_CYCLES ) { - - waitForPeriod(); - - // align the received streams - int64_t sp_lag; - for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); - it != m_ReceiveProcessors.end(); - ++it ) { - uint64_t ts_sp=(*it)->getTimeAtPeriod(); - uint64_t ts_sync=m_SyncSource->getTimeAtPeriod(); - - sp_lag = diffTicks(ts_sp, ts_sync); - debugOutput( DEBUG_LEVEL_VERBOSE, " SP(%p) TS=%011llu - TS=%011llu = %04lld\n", - (*it), ts_sp, ts_sync, sp_lag); - // sync the other receive SP's to the sync source -// if((*it) != m_SyncSource) { -// if(!(*it)->m_data_buffer->syncCorrectLag(sp_lag)) { -// debugOutput(DEBUG_LEVEL_VERBOSE,"could not syncCorrectLag(%11lld) for stream processor (%p)\n", -// sp_lag, *it); -// } -// } - } - - - if (dryRun()) { - nb_succesful_cycles++; - } else { - debugOutput( DEBUG_LEVEL_VERBOSE, " This dry-run was not xrun free...\n" ); - resetXrunCounters(); - nb_succesful_cycles = 0; - // FIXME: xruns can screw up the framecounter accounting. do something more sane here - } - nb_dryrun_cycles_left--; - } - - if(nb_dryrun_cycles_left == 0) { - debugOutput( DEBUG_LEVEL_VERBOSE, " max # dry-run cycles achieved without aligned steady-state...\n" ); - return false; - } - debugOutput( DEBUG_LEVEL_VERBOSE, " dry-run resulted in aligned steady-state...\n" );*/ - - // now we should clear the xrun flags - resetXrunCounters(); - // and off we go + + // STEP X: switch SP's over to the running state + uint64_t time_to_start = addTicks(time_of_transfer, + m_SyncSource->getTicksPerFrame() * getPeriodSize()); + for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); + it != m_ReceiveProcessors.end(); + ++it ) { + if(!(*it)->startRunning(time_to_start)) { + debugError("Could not put SP %p into the running state\n", *it); + return false; + } + } + for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); + it != m_TransmitProcessors.end(); + ++it ) { + if(!(*it)->startRunning(time_to_start)) { + debugError("Could not put SP %p into the running state\n", *it); + return false; + } + } + debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n"); return true; -} - -void StreamProcessorManager::resetXrunCounters(){ - debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting xrun flags...\n"); - for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); - it != m_ReceiveProcessors.end(); - ++it ) - { - (*it)->resetXrunCounter(); - } - - for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) - { - (*it)->resetXrunCounter(); - } } @@ -546,29 +383,22 @@ debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n"); for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); - it != m_ReceiveProcessors.end(); - ++it ) { - if (!(*it)->prepareForStart()) { - debugOutput(DEBUG_LEVEL_VERBOSE,"Receive stream processor (%p) failed to prepare for start\n", *it); - return false; - } - if (!m_isoManager->registerStream(*it)) { - debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it); - return false; - } - } - + it != m_ReceiveProcessors.end(); + ++it ) + { + if (!m_isoManager->registerStream(*it)) { + debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it); + return false; + } + } debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n"); for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - if (!(*it)->prepareForStart()) { - debugOutput(DEBUG_LEVEL_VERBOSE,"Transmit stream processor (%p) failed to prepare for start\n", *it); - return false; - } - if (!m_isoManager->registerStream(*it)) { - debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it); - return false; - } - } + it != m_TransmitProcessors.end(); + ++it ) + { + if (!m_isoManager->registerStream(*it)) { + debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it); + return false; + } + } debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n"); @@ -578,10 +408,4 @@ } - debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n"); - if (!disableStreamProcessors()) { - debugFatal("Could not disable StreamProcessors...\n"); - return false; - } - debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n"); if (!m_isoManager->startHandlers(-1)) { @@ -590,4 +414,10 @@ } + // put all SP's into dry-running state + if (!startDryRunning()) { + debugFatal("Could not put SP's in dry-running state\n"); + return false; + } + // start all SP's synchonized if (!syncStartAll()) { @@ -602,5 +432,4 @@ return true; - } @@ -613,28 +442,17 @@ // (like the MOTU) need to do a few things before it's safe to turn off the iso // handling. - int wait_cycles=PREPARE_TIMEOUT_MSEC; // two seconds ought to be sufficient - bool allReady = false; - while (!allReady && wait_cycles) { - wait_cycles--; - allReady = true; - - for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); - it != m_ReceiveProcessors.end(); - ++it ) { - if(!(*it)->prepareForStop()) allReady = false; - } - - for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - if(!(*it)->prepareForStop()) allReady = false; - } - usleep(1000); - } - - debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n"); - if (!disableStreamProcessors()) { - debugFatal("Could not disable StreamProcessors...\n"); - return false; + for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); + it != m_ReceiveProcessors.end(); + ++it ) { + if(!(*it)->stop()) { + debugError("Could not stop SP %p", (*it)); + } + } + for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); + it != m_TransmitProcessors.end(); + ++it ) { + if(!(*it)->stop()) { + debugError("Could not stop SP %p", (*it)); + } } @@ -649,222 +467,20 @@ debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n"); for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); - it != m_ReceiveProcessors.end(); - ++it ) { - if (!m_isoManager->unregisterStream(*it)) { - debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it); - return false; - } - - } - + it != m_ReceiveProcessors.end(); + ++it ) { + if (!m_isoManager->unregisterStream(*it)) { + debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it); + return false; + } + } debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n"); for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - if (!m_isoManager->unregisterStream(*it)) { - debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it); - return false; - } - - } - - return true; - -} - -/** - * Enables the registered StreamProcessors - * @return true if successful, false otherwise - */ -bool StreamProcessorManager::enableStreamProcessors(uint64_t time_to_enable_at) { - debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors at %llu...\n", time_to_enable_at); - - debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor (%p)...\n",m_SyncSource); - debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare...\n"); - if (!m_SyncSource->prepareForEnable(time_to_enable_at)) { - debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n"); - return false; - } - - debugOutput( DEBUG_LEVEL_VERBOSE, " Enable...\n"); - m_SyncSource->enable(time_to_enable_at); - - debugOutput( DEBUG_LEVEL_VERBOSE, " Other StreamProcessors...\n"); - - // we prepare the streamprocessors for enable - for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); - it != m_ReceiveProcessors.end(); - ++it ) { - if(*it != m_SyncSource) { - debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Receive SP (%p)...\n",*it); - (*it)->prepareForEnable(time_to_enable_at); - } - } - - for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - if(*it != m_SyncSource) { - debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Transmit SP (%p)...\n",*it); - (*it)->prepareForEnable(time_to_enable_at); - } - } - - // then we enable the streamprocessors - for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); - it != m_ReceiveProcessors.end(); - ++it ) { - if(*it != m_SyncSource) { - debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Receive SP (%p)...\n",*it); - (*it)->enable(time_to_enable_at); - } - } - - for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - if(*it != m_SyncSource) { - debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Transmit SP (%p)...\n",*it); - (*it)->enable(time_to_enable_at); - } - } - - // now we wait for the SP's to get enabled -// debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be enabled...\n"); -// // we have to wait until all streamprocessors indicate that they are running -// // i.e. that there is actually some data stream flowing -// int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds -// bool notEnabled=true; -// while (notEnabled && wait_cycles) { -// wait_cycles--; -// notEnabled=false; -// -// for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); -// it != m_ReceiveProcessors.end(); -// ++it ) { -// if(!(*it)->isEnabled()) notEnabled=true; -// } -// -// for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); -// it != m_TransmitProcessors.end(); -// ++it ) { -// if(!(*it)->isEnabled()) notEnabled=true; -// } -// usleep(1000); // one cycle -// } -// -// if(!wait_cycles) { // timout has occurred -// debugFatal("One or more streams couldn't be enabled (timeout):\n"); -// -// for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); -// it != m_ReceiveProcessors.end(); -// ++it ) { -// if(!(*it)->isEnabled()) { -// debugFatal(" receive stream %p not enabled\n",*it); -// } else { -// debugFatal(" receive stream %p enabled\n",*it); -// } -// } -// -// for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); -// it != m_TransmitProcessors.end(); -// ++it ) { -// if(!(*it)->isEnabled()) { -// debugFatal(" transmit stream %p not enabled\n",*it); -// } else { -// debugFatal(" transmit stream %p enabled\n",*it); -// } -// } -// return false; -// } - - debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors enabled...\n"); - - return true; -} - -/** - * Disables the registered StreamProcessors - * @return true if successful, false otherwise - */ -bool StreamProcessorManager::disableStreamProcessors() { - // we prepare the streamprocessors for disable - for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); - it != m_ReceiveProcessors.end(); - ++it ) { - (*it)->prepareForDisable(); - } - - for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - (*it)->prepareForDisable(); - } - - // then we disable the streamprocessors - for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); - it != m_ReceiveProcessors.end(); - ++it ) { - (*it)->disable(); - (*it)->m_data_buffer->setTransparent(true); - } - - for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - (*it)->disable(); - (*it)->m_data_buffer->setTransparent(true); - } - - // now we wait for the SP's to get disabled - debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be disabled...\n"); - // we have to wait until all streamprocessors indicate that they are running - // i.e. that there is actually some data stream flowing - int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds - bool enabled=true; - while (enabled && wait_cycles) { - wait_cycles--; - enabled=false; - - for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); - it != m_ReceiveProcessors.end(); - ++it ) { - if((*it)->isEnabled()) enabled=true; - } - - for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - if((*it)->isEnabled()) enabled=true; - } - usleep(1000); // one cycle - } - - if(!wait_cycles) { // timout has occurred - debugFatal("One or more streams couldn't be disabled (timeout):\n"); - - for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); - it != m_ReceiveProcessors.end(); - ++it ) { - if(!(*it)->isEnabled()) { - debugFatal(" receive stream %p not enabled\n",*it); - } else { - debugFatal(" receive stream %p enabled\n",*it); - } - } - - for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - if(!(*it)->isEnabled()) { - debugFatal(" transmit stream %p not enabled\n",*it); - } else { - debugFatal(" transmit stream %p enabled\n",*it); - } - } - return false; - } - - debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors disabled...\n"); + it != m_TransmitProcessors.end(); + ++it ) { + if (!m_isoManager->unregisterStream(*it)) { + debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it); + return false; + } + } return true; @@ -893,7 +509,8 @@ * 3) Re-enable the SP's */ - debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n"); - if (!disableStreamProcessors()) { - debugFatal("Could not disable StreamProcessors...\n"); + + // put all SP's back into dry-running state + if (!startDryRunning()) { + debugFatal("Could not put SP's in dry-running state\n"); return false; } @@ -960,5 +577,5 @@ // this is to notify the client of the delay // that we introduced - m_delayed_usecs=time_till_next_period; + m_delayed_usecs = time_till_next_period; // we save the 'ideal' time of the transfer at this point, @@ -1001,5 +618,5 @@ // if this is true, a xrun will occur - xrun_occurred |= !((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled(); + xrun_occurred |= !((*it)->canClientTransferFrames(m_period)); #ifdef DEBUG @@ -1008,5 +625,5 @@ (*it)->dumpInfo(); } - if (!((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled()) { + if (!((*it)->canClientTransferFrames(m_period))) { debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it); (*it)->dumpInfo(); @@ -1022,5 +639,5 @@ // if this is true, a xrun will occur - xrun_occurred |= !((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled(); + xrun_occurred |= !((*it)->canClientTransferFrames(m_period)); #ifdef DEBUG @@ -1028,5 +645,5 @@ debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it); } - if (!((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled()) { + if (!((*it)->canClientTransferFrames(m_period))) { debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it); } @@ -1049,8 +666,8 @@ bool StreamProcessorManager::transfer() { - debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n"); + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n"); bool retval=true; - retval &= dryRun(StreamProcessor::ePT_Receive); - retval &= dryRun(StreamProcessor::ePT_Transmit); + retval &= transfer(StreamProcessor::ePT_Receive); + retval &= transfer(StreamProcessor::ePT_Transmit); return retval; } @@ -1066,5 +683,5 @@ bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) { - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n"); + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period for type (%d)...\n", t); bool retval = true; // a static cast could make sure that there is no performance @@ -1089,7 +706,6 @@ it != m_ReceiveProcessors.end(); ++it ) { - if(!(*it)->getFrames(m_period, receive_timestamp)) { - debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)\n", + debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n", m_period, m_time_of_transfer,*it); retval &= false; // buffer underrun @@ -1110,5 +726,5 @@ if(!(*it)->putFrames(m_period, transmit_timestamp)) { - debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)\n", + debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n", m_period, transmit_timestamp, *it); retval &= false; // buffer underrun Index: /branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.h =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.h (revision 715) +++ /branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.h (revision 719) @@ -62,4 +62,5 @@ bool stop(); + bool startDryRunning(); bool syncStartAll(); @@ -67,7 +68,4 @@ bool registerProcessor(StreamProcessor *processor); ///< start managing a streamprocessor bool unregisterProcessor(StreamProcessor *processor); ///< stop managing a streamprocessor - - bool enableStreamProcessors(uint64_t time_to_enable_at); /// enable registered StreamProcessors - bool disableStreamProcessors(); /// disable registered StreamProcessors void setPeriodSize(unsigned int period); @@ -99,5 +97,4 @@ private: - void resetXrunCounters(); int m_delayed_usecs; Index: /branches/ppalmers-streaming/src/libstreaming/generic/PortManager.cpp =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/generic/PortManager.cpp (revision 705) +++ /branches/ppalmers-streaming/src/libstreaming/generic/PortManager.cpp (revision 719) @@ -251,7 +251,4 @@ } } - - - return true; } Index: /branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.h =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.h (revision 715) +++ /branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.h (revision 719) @@ -50,5 +50,6 @@ public PortManager, public Util::TimestampedBufferClient, - public Util::OptionContainer { + public Util::OptionContainer +{ friend class StreamProcessorManager; // FIXME: get rid of this @@ -65,15 +66,18 @@ // this can only be set by the constructor enum eProcessorType m_processor_type; - + // pretty printing + const char *ePTToString(enum eProcessorType); protected: ///> the state the streamprocessor is in enum eProcessorState { + ePS_Invalid, ePS_Created, - ePS_Initialized, - ePS_WaitingForRunningStream, + // ePS_WaitingToStop, FIXME: this will be needed for the MOTU's + ePS_Stopped, + ePS_WaitingForStream, ePS_DryRunning, - ePS_WaitingForEnabledStream, - ePS_StreamEnabled, - ePS_WaitingForDisabledStream, + ePS_WaitingForStreamEnable, + ePS_Running, + ePS_WaitingForStreamDisable, }; @@ -84,6 +88,40 @@ private: enum eProcessorState m_state; + // state switching + enum eProcessorState m_next_state; + unsigned int m_cycle_to_switch_state; + bool updateState(); + // pretty printing const char *ePSToString(enum eProcessorState); + bool doStop(); + bool doWaitForRunningStream(); + bool doDryRunning(); + bool doWaitForStreamEnable(); + bool doRunning(); + bool doWaitForStreamDisable(); + + bool scheduleStateTransition(enum eProcessorState state, uint64_t time_instant); + bool scheduleAndWaitForStateTransition(enum eProcessorState state, + uint64_t time_instant, + enum eProcessorState wait_state); +public: + bool isRunning() + {return m_state == ePS_Running;}; + bool isDryRunning() + {return m_state == ePS_DryRunning;}; + +//--- state stuff (TODO: cleanup) + bool startDryRunning(int64_t time_to_start_at); + bool startRunning(int64_t time_to_start_at); + bool stopDryRunning(int64_t time_to_stop_at); + bool stopRunning(int64_t time_to_stop_at); + + // the main difference between init and prepare is that when prepare is called, + // the SP is registered to a manager (FIXME: can't it be called by the manager?) + bool init(); + bool prepare(); + ///> stop the SP from running or dryrunning + bool stop(); // constructor/destructor public: @@ -109,43 +147,43 @@ // the receive interface accepts packets and provides frames - // implement these for a receive SP - // leave default for a transmit SP - virtual enum raw1394_iso_disposition + + // the following two methods are to be implemented by subclasses + virtual bool processPacketHeader(unsigned char *data, unsigned int length, + unsigned char channel, unsigned char tag, unsigned char sy, + unsigned int cycle, unsigned int dropped) + {debugWarning("call not allowed\n"); return false;}; + virtual bool processPacketData(unsigned char *data, unsigned int length, + unsigned char channel, unsigned char tag, unsigned char sy, + unsigned int cycle, unsigned int dropped) + {debugWarning("call not allowed\n"); return false;}; + + // this one is implemented by us + enum raw1394_iso_disposition putPacket(unsigned char *data, unsigned int length, unsigned char channel, unsigned char tag, unsigned char sy, - unsigned int cycle, unsigned int dropped) - {debugWarning("call not allowed\n"); return RAW1394_ISO_STOP;}; - virtual bool getFrames(unsigned int nbframes, int64_t ts) - {debugWarning("call not allowed\n"); return false;}; - virtual bool getFramesDry(unsigned int nbframes, int64_t ts) - {debugWarning("call not allowed\n"); return false;}; + unsigned int cycle, unsigned int dropped); + + bool getFrames(unsigned int nbframes, int64_t ts); ///< transfer the buffer contents to the client +protected: + // to be implemented by the children virtual bool processReadBlock(char *data, unsigned int nevents, unsigned int offset) {debugWarning("call not allowed\n"); return false;}; - - -//--- state stuff (TODO: cleanup) - bool xrunOccurred() { return (m_xruns>0); }; - bool isRunning(); ///< returns true if there is some stream data processed - virtual bool prepareForEnable(uint64_t time_to_enable_at); - virtual bool prepareForDisable(); - - bool enable(uint64_t time_to_enable_at); ///< enable the stream processing - bool disable(); ///< disable the stream processing - bool isEnabled() {return !m_is_disabled;}; - - virtual bool reset(); ///< reset the streams & buffers (e.g. after xrun) - - virtual bool prepare(); ///< prepare the streams & buffers (e.g. prefill) - virtual bool init(); - virtual bool prepareForStop() {return true;}; - virtual bool prepareForStart() {return true;}; + virtual bool provideSilenceBlock(unsigned int nevents, unsigned int offset) + {debugWarning("call not allowed\n"); return false;}; + +private: + bool getFramesDry(unsigned int nbframes, int64_t ts); + bool getFramesWet(unsigned int nbframes, int64_t ts); // move to private? - void resetXrunCounter(); -protected: - bool m_running; - bool m_disabled; - bool m_is_disabled; - unsigned int m_cycle_to_enable_at; + bool xrunOccurred() { return (m_xruns>0); }; // FIXME: m_xruns not updated + +protected: // FIXME: move to private + uint64_t m_dropped; /// FIXME:debug + uint64_t m_last_dropped; /// FIXME:debug + int m_last_good_cycle; /// FIXME:debug + uint64_t m_last_timestamp; /// last timestamp (in ticks) + uint64_t m_last_timestamp2; /// last timestamp (in ticks) + uint64_t m_last_timestamp_at_period_ticks; //--- data buffering and accounting @@ -170,5 +208,5 @@ * false if it can't */ - virtual bool canClientTransferFrames(unsigned int nframes); + bool canClientTransferFrames(unsigned int nframes); /** @@ -181,5 +219,5 @@ * @return true if the operation was successful */ - virtual bool dropFrames(unsigned int nframes); + bool dropFrames(unsigned int nframes); /** @@ -215,5 +253,5 @@ * @return the time in internal units */ - virtual uint64_t getTimeAtPeriod(); + uint64_t getTimeAtPeriod(); uint64_t getTimeNow(); @@ -230,5 +268,5 @@ * @param d sync delay */ - void setSyncDelay(int d) {m_sync_delay=d;}; + void setSyncDelay(int d) {m_sync_delay = d;}; /** @@ -247,7 +285,5 @@ * @return maximal frame latency */ - virtual int getMaxFrameLatency(); - - StreamProcessor& getSyncSource(); + int getMaxFrameLatency(); float getTicksPerFrame(); @@ -256,4 +292,34 @@ int getBufferFill(); + + // Child implementation interface + /** + * @brief prepare the child SP + * @return true if successful, false otherwise + * @pre the m_manager pointer points to a valid manager + * @post getEventsPerFrame() returns the correct value + * @post getEventSize() returns the correct value + * @post getUpdatePeriod() returns the correct value + * @post processPacketHeader(...) can be called + * @post processPacketData(...) can be called + */ + virtual bool prepareChild() = 0; + /** + * @brief get the number of events contained in one frame + * @return the number of events contained in one frame + */ + virtual unsigned int getEventsPerFrame() = 0; + + /** + * @brief get the size of one frame in bytes + * @return the size of one frame in bytes + */ + virtual unsigned int getEventSize() = 0; + + /** + * @brief get the nominal number of frames between buffer updates + * @return the nominal number of frames between buffer updates + */ + virtual unsigned int getUpdatePeriod() = 0; protected: Index: /branches/ppalmers-streaming/src/libstreaming/generic/Port.cpp =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/generic/Port.cpp (revision 705) +++ /branches/ppalmers-streaming/src/libstreaming/generic/Port.cpp (revision 719) @@ -111,4 +111,450 @@ }; +bool Port::setName(std::string name) { + debugOutput( DEBUG_LEVEL_VERBOSE, "Setting name to %s for port %s\n",name.c_str(),m_Name.c_str()); + + if (m_State != E_Created) { + debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); + return false; + } + m_Name=name; + return true; +} + +bool Port::setBufferSize(unsigned int newsize) { + debugOutput( DEBUG_LEVEL_VERBOSE, "Setting buffersize to %d for port %s\n",newsize,m_Name.c_str()); + if (m_State != E_Created) { + debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); + return false; + } + m_buffersize=newsize; + return true; +} + +unsigned int Port::getEventSize() { + switch (m_DataType) { + case E_Float: + return sizeof(float); + case E_Int24: // 24 bit 2's complement, packed in a 32bit integer (LSB's) + return sizeof(uint32_t); + case E_MidiEvent: + return sizeof(uint32_t); + default: + return 0; + } +} + +bool Port::setDataType(enum E_DataType d) { + debugOutput( DEBUG_LEVEL_VERBOSE, "Setting datatype to %d for port %s\n",(int) d,m_Name.c_str()); + if (m_State != E_Created) { + debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); + return false; + } + + // do some sanity checks + bool type_is_ok=false; + switch (m_PortType) { + case E_Audio: + if(d == E_Int24) type_is_ok=true; + if(d == E_Float) type_is_ok=true; + break; + case E_Midi: + if(d == E_MidiEvent) type_is_ok=true; + break; + case E_Control: + if(d == E_Default) type_is_ok=true; + break; + default: + break; + } + + if(!type_is_ok) { + debugFatal("Datatype not supported by this type of port!\n"); + return false; + } + + m_DataType=d; + return true; +} + +bool Port::setSignalType(enum E_SignalType s) { + debugOutput( DEBUG_LEVEL_VERBOSE, "Setting signaltype to %d for port %s\n",(int)s,m_Name.c_str()); + if (m_State != E_Created) { + debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); + return false; + } + + // do some sanity checks + bool type_is_ok=false; + switch (m_PortType) { + case E_Audio: + if(s == E_PeriodSignalled) type_is_ok=true; + break; + case E_Midi: + if(s == E_PacketSignalled) type_is_ok=true; + break; + case E_Control: + if(s == E_PeriodSignalled) type_is_ok=true; + break; + default: + break; + } + if(!type_is_ok) { + debugFatal("Signalling type not supported by this type of port!\n"); + return false; + } + m_SignalType=s; + return true; +} + +bool Port::setBufferType(enum E_BufferType b) { + debugOutput( DEBUG_LEVEL_VERBOSE, "Setting buffer type to %d for port %s\n",(int)b,m_Name.c_str()); + if (m_State != E_Created) { + debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); + return false; + } + // do some sanity checks + bool type_is_ok=false; + switch (m_PortType) { + case E_Audio: + if(b == E_PointerBuffer) type_is_ok=true; + break; + case E_Midi: + if(b == E_RingBuffer) type_is_ok=true; + break; + case E_Control: + break; + default: + break; + } + if(!type_is_ok) { + debugFatal("Buffer type not supported by this type of port!\n"); + return false; + } + m_BufferType=b; + return true; +} + +bool Port::useExternalBuffer(bool b) { + // If called on an initialised stream but the request isn't for a change silently + // allow it (relied on by C API as used by jack backend driver) + if (m_State==E_Initialized && m_use_external_buffer==b) + return true; + + debugOutput( DEBUG_LEVEL_VERBOSE, "Setting external buffer use to %d for port %s\n",(int)b,m_Name.c_str()); + + if (m_State != E_Created) { + debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); + return false; + } + m_use_external_buffer=b; + return true; +} + +// buffer handling api's for pointer buffers +/** + * Get the buffer address (being the external or the internal one). + * + * @param buff + */ +void *Port::getBufferAddress() { + assert(m_BufferType==E_PointerBuffer); + return m_buffer; +}; + +/** + * Set the external buffer address. + * only call this when you have specified that you will use + * an external buffer before doing the init() + * + * @param buff + */ +void Port::setExternalBufferAddress(void *buff) { + assert(m_BufferType==E_PointerBuffer); + assert(m_use_external_buffer); // don't call this with an internal buffer! + m_buffer=buff; +}; + +// buffer handling api's for ringbuffers +bool Port::writeEvent(void *event) { + +#ifdef DEBUG + if (m_State != E_Initialized) { + debugFatal("Port (%s) not in E_Initialized state: %d\n",m_Name.c_str(),m_State); + return false; + } + + if(m_BufferType!=E_RingBuffer) { + debugError("operation not allowed on non E_RingBuffer ports\n"); + show(); + return false; + } + assert(m_ringbuffer); +#endif + + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Writing event %08X with size %d to port %s\n",*((quadlet_t *)event),m_eventsize, m_Name.c_str()); + + return (ffado_ringbuffer_write(m_ringbuffer, (char *)event, m_eventsize)==m_eventsize); +} + +bool Port::readEvent(void *event) { + +#ifdef DEBUG + if (m_State != E_Initialized) { + debugFatal("Port (%s) not in E_Initialized state: %d\n",m_Name.c_str(),m_State); + return false; + } + + if(m_BufferType!=E_RingBuffer) { + debugError("operation not allowed on non E_RingBuffer ports\n"); + show(); + return false; + } + assert(m_ringbuffer); +#endif + + + unsigned int read=ffado_ringbuffer_read(m_ringbuffer, (char *)event, m_eventsize); + + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Reading event %X with size %d from port %s\n",*((quadlet_t *)event),m_eventsize,m_Name.c_str()); + + + return (read==m_eventsize); +} + +int Port::writeEvents(void *event, unsigned int nevents) { + +#ifdef DEBUG + if (m_State != E_Initialized) { + debugFatal("Port (%s) not in E_Initialized state: %d\n",m_Name.c_str(),m_State); + return false; + } + + if(m_BufferType!=E_RingBuffer) { + debugError("operation not allowed on non E_RingBuffer ports\n"); + show(); + return false; + } + assert(m_ringbuffer); +#endif + + + unsigned int bytes2write=m_eventsize*nevents; + + unsigned int written=ffado_ringbuffer_write(m_ringbuffer, (char *)event,bytes2write)/m_eventsize; + +#ifdef DEBUG + if(written) { + unsigned int i=0; + quadlet_t * tmp=(quadlet_t *)event; + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Written %d events (",written); + for (i=0;i= m_eventsize); + + if(byte_present_in_buffer) { + + if(!m_do_ratecontrol) { + return true; + } + + if(m_rate_counter <= 0) { + // update the counter + if(m_average_ratecontrol) { + m_rate_counter += m_event_interval; + assert(m_rate_counterevent_interval) { + debugWarning("Rate control not needed!\n",m_Name.c_str()); + m_do_ratecontrol=false; + return false; + } + if(slot_interval==0) { + debugFatal("Cannot have slot interval == 0!\n"); + m_do_ratecontrol=false; + return false; + } + if(event_interval==0) { + debugFatal("Cannot have event interval == 0!\n"); + m_do_ratecontrol=false; + return false; + } + m_do_ratecontrol=use; + m_event_interval=event_interval; + m_slot_interval=slot_interval; + m_rate_counter=0; + + // NOTE: pretty arbitrary, but in average mode this limits the peak stream rate + m_rate_counter_minimum=-(2*event_interval); + + m_average_ratecontrol=average; + + } else { + debugOutput(DEBUG_LEVEL_VERBOSE, "Disabling rate control for port %s...\n",m_Name.c_str()); + m_do_ratecontrol=use; + } + return true; +} + +/// Enable the port. (this can be called anytime) +void +Port::enable() { + debugOutput(DEBUG_LEVEL_VERBOSE, "Enabling port %s...\n",m_Name.c_str()); + m_disabled=false; +}; + +/// Disable the port. (this can be called anytime) +void +Port::disable() { + debugOutput(DEBUG_LEVEL_VERBOSE, "Disabling port %s...\n",m_Name.c_str()); + m_disabled=false; +}; + + +/* Private functions */ + +bool Port::allocateInternalBuffer() { + int event_size=getEventSize(); + + debugOutput(DEBUG_LEVEL_VERBOSE, + "Allocating internal buffer of %d events with size %d (%s)\n", + m_buffersize, event_size, m_Name.c_str()); + + if(m_buffer) { + debugWarning("already has an internal buffer attached, re-allocating\n"); + freeInternalBuffer(); + } + + m_buffer=calloc(m_buffersize,event_size); + if (!m_buffer) { + debugFatal("could not allocate internal buffer\n"); + m_buffersize=0; + return false; + } + + return true; +} + +void Port::freeInternalBuffer() { + debugOutput(DEBUG_LEVEL_VERBOSE, + "Freeing internal buffer (%s)\n",m_Name.c_str()); + + if(m_buffer) { + free(m_buffer); + m_buffer=0; + } +} + +bool Port::allocateInternalRingBuffer() { + int event_size=getEventSize(); + + debugOutput(DEBUG_LEVEL_VERBOSE, + "Allocating internal buffer of %d events with size %d (%s)\n", + m_buffersize, event_size, m_Name.c_str()); + + if(m_ringbuffer) { + debugWarning("already has an internal ringbuffer attached, re-allocating\n"); + freeInternalRingBuffer(); + } + + m_ringbuffer=ffado_ringbuffer_create(m_buffersize * event_size); + if (!m_ringbuffer) { + debugFatal("could not allocate internal ringbuffer\n"); + m_buffersize=0; + return false; + } + + return true; +} + +void Port::freeInternalRingBuffer() { + debugOutput(DEBUG_LEVEL_VERBOSE, + "Freeing internal ringbuffer (%s)\n",m_Name.c_str()); + + if(m_ringbuffer) { + ffado_ringbuffer_free(m_ringbuffer); + m_ringbuffer=0; + } +} + void Port::show() { debugOutput(DEBUG_LEVEL_VERBOSE,"Name : %s\n", m_Name.c_str()); @@ -129,462 +575,3 @@ } -bool Port::setName(std::string name) { - debugOutput( DEBUG_LEVEL_VERBOSE, "Setting name to %s for port %s\n",name.c_str(),m_Name.c_str()); - - if (m_State != E_Created) { - debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); - return false; - } - - m_Name=name; - - return true; -} - -bool Port::setBufferSize(unsigned int newsize) { - debugOutput( DEBUG_LEVEL_VERBOSE, "Setting buffersize to %d for port %s\n",newsize,m_Name.c_str()); - if (m_State != E_Created) { - debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); - return false; - } - - m_buffersize=newsize; - return true; - -} - -unsigned int Port::getEventSize() { - switch (m_DataType) { - case E_Float: - return sizeof(float); - case E_Int24: // 24 bit 2's complement, packed in a 32bit integer (LSB's) - return sizeof(uint32_t); - case E_MidiEvent: - return sizeof(uint32_t); - default: - return 0; - } -} - -bool Port::setDataType(enum E_DataType d) { - debugOutput( DEBUG_LEVEL_VERBOSE, "Setting datatype to %d for port %s\n",(int) d,m_Name.c_str()); - if (m_State != E_Created) { - debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); - return false; - } - - // do some sanity checks - bool type_is_ok=false; - switch (m_PortType) { - case E_Audio: - if(d == E_Int24) type_is_ok=true; - if(d == E_Float) type_is_ok=true; - break; - case E_Midi: - if(d == E_MidiEvent) type_is_ok=true; - break; - case E_Control: - if(d == E_Default) type_is_ok=true; - break; - default: - break; - } - - if(!type_is_ok) { - debugFatal("Datatype not supported by this type of port!\n"); - return false; - } - - m_DataType=d; - return true; -} - -bool Port::setSignalType(enum E_SignalType s) { - debugOutput( DEBUG_LEVEL_VERBOSE, "Setting signaltype to %d for port %s\n",(int)s,m_Name.c_str()); - if (m_State != E_Created) { - debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); - return false; - } - - // do some sanity checks - bool type_is_ok=false; - switch (m_PortType) { - case E_Audio: - if(s == E_PeriodSignalled) type_is_ok=true; - break; - case E_Midi: - if(s == E_PacketSignalled) type_is_ok=true; - break; - case E_Control: - if(s == E_PeriodSignalled) type_is_ok=true; - break; - default: - break; - } - - if(!type_is_ok) { - debugFatal("Signalling type not supported by this type of port!\n"); - return false; - } - - m_SignalType=s; - return true; - -} - -bool Port::setBufferType(enum E_BufferType b) { - debugOutput( DEBUG_LEVEL_VERBOSE, "Setting buffer type to %d for port %s\n",(int)b,m_Name.c_str()); - if (m_State != E_Created) { - debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); - return false; - } - - // do some sanity checks - bool type_is_ok=false; - switch (m_PortType) { - case E_Audio: - if(b == E_PointerBuffer) type_is_ok=true; - break; - case E_Midi: - if(b == E_RingBuffer) type_is_ok=true; - break; - case E_Control: - break; - default: - break; - } - - if(!type_is_ok) { - debugFatal("Buffer type not supported by this type of port!\n"); - return false; - } - - m_BufferType=b; - return true; - -} - -bool Port::useExternalBuffer(bool b) { - - // If called on an initialised stream but the request isn't for a change silently - // allow it (relied on by C API as used by jack backend driver) - if (m_State==E_Initialized && m_use_external_buffer==b) - return true; - - debugOutput( DEBUG_LEVEL_VERBOSE, "Setting external buffer use to %d for port %s\n",(int)b,m_Name.c_str()); - - if (m_State != E_Created) { - debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); - return false; - } - - m_use_external_buffer=b; - return true; -} - -// buffer handling api's for pointer buffers -/** - * Get the buffer address (being the external or the internal one). - * - * @param buff - */ -void *Port::getBufferAddress() { - assert(m_BufferType==E_PointerBuffer); - return m_buffer; -}; - -/** - * Set the external buffer address. - * only call this when you have specified that you will use - * an external buffer before doing the init() - * - * @param buff - */ -void Port::setExternalBufferAddress(void *buff) { - assert(m_BufferType==E_PointerBuffer); - assert(m_use_external_buffer); // don't call this with an internal buffer! - m_buffer=buff; -}; - -// buffer handling api's for ringbuffers -bool Port::writeEvent(void *event) { - -#ifdef DEBUG - if (m_State != E_Initialized) { - debugFatal("Port (%s) not in E_Initialized state: %d\n",m_Name.c_str(),m_State); - return false; - } - - if(m_BufferType!=E_RingBuffer) { - debugError("operation not allowed on non E_RingBuffer ports\n"); - show(); - return false; - } - assert(m_ringbuffer); -#endif - - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Writing event %08X with size %d to port %s\n",*((quadlet_t *)event),m_eventsize, m_Name.c_str()); - - return (ffado_ringbuffer_write(m_ringbuffer, (char *)event, m_eventsize)==m_eventsize); -} - -bool Port::readEvent(void *event) { - -#ifdef DEBUG - if (m_State != E_Initialized) { - debugFatal("Port (%s) not in E_Initialized state: %d\n",m_Name.c_str(),m_State); - return false; - } - - if(m_BufferType!=E_RingBuffer) { - debugError("operation not allowed on non E_RingBuffer ports\n"); - show(); - return false; - } - assert(m_ringbuffer); -#endif - - - unsigned int read=ffado_ringbuffer_read(m_ringbuffer, (char *)event, m_eventsize); - - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Reading event %X with size %d from port %s\n",*((quadlet_t *)event),m_eventsize,m_Name.c_str()); - - - return (read==m_eventsize); -} - -int Port::writeEvents(void *event, unsigned int nevents) { - -#ifdef DEBUG - if (m_State != E_Initialized) { - debugFatal("Port (%s) not in E_Initialized state: %d\n",m_Name.c_str(),m_State); - return false; - } - - if(m_BufferType!=E_RingBuffer) { - debugError("operation not allowed on non E_RingBuffer ports\n"); - show(); - return false; - } - assert(m_ringbuffer); -#endif - - - unsigned int bytes2write=m_eventsize*nevents; - - unsigned int written=ffado_ringbuffer_write(m_ringbuffer, (char *)event,bytes2write)/m_eventsize; - -#ifdef DEBUG - if(written) { - unsigned int i=0; - quadlet_t * tmp=(quadlet_t *)event; - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Written %d events (",written); - for (i=0;i= m_eventsize); - - if(byte_present_in_buffer) { - - if(!m_do_ratecontrol) { - return true; - } - - if(m_rate_counter <= 0) { - // update the counter - if(m_average_ratecontrol) { - m_rate_counter += m_event_interval; - assert(m_rate_counterevent_interval) { - debugWarning("Rate control not needed!\n",m_Name.c_str()); - m_do_ratecontrol=false; - return false; - } - if(slot_interval==0) { - debugFatal("Cannot have slot interval == 0!\n"); - m_do_ratecontrol=false; - return false; - } - if(event_interval==0) { - debugFatal("Cannot have event interval == 0!\n"); - m_do_ratecontrol=false; - return false; - } - m_do_ratecontrol=use; - m_event_interval=event_interval; - m_slot_interval=slot_interval; - m_rate_counter=0; - - // NOTE: pretty arbitrary, but in average mode this limits the peak stream rate - m_rate_counter_minimum=-(2*event_interval); - - m_average_ratecontrol=average; - - } else { - debugOutput(DEBUG_LEVEL_VERBOSE, "Disabling rate control for port %s...\n",m_Name.c_str()); - m_do_ratecontrol=use; - } - return true; -} - -/// Enable the port. (this can be called anytime) -void -Port::enable() { - debugOutput(DEBUG_LEVEL_VERBOSE, "Enabling port %s...\n",m_Name.c_str()); - m_disabled=false; -}; - -/// Disable the port. (this can be called anytime) -void -Port::disable() { - debugOutput(DEBUG_LEVEL_VERBOSE, "Disabling port %s...\n",m_Name.c_str()); - m_disabled=false; -}; - - -/* Private functions */ - -bool Port::allocateInternalBuffer() { - int event_size=getEventSize(); - - debugOutput(DEBUG_LEVEL_VERBOSE, - "Allocating internal buffer of %d events with size %d (%s)\n", - m_buffersize, event_size, m_Name.c_str()); - - if(m_buffer) { - debugWarning("already has an internal buffer attached, re-allocating\n"); - freeInternalBuffer(); - } - - m_buffer=calloc(m_buffersize,event_size); - if (!m_buffer) { - debugFatal("could not allocate internal buffer\n"); - m_buffersize=0; - return false; - } - - return true; -} - -void Port::freeInternalBuffer() { - debugOutput(DEBUG_LEVEL_VERBOSE, - "Freeing internal buffer (%s)\n",m_Name.c_str()); - - if(m_buffer) { - free(m_buffer); - m_buffer=0; - } -} - -bool Port::allocateInternalRingBuffer() { - int event_size=getEventSize(); - - debugOutput(DEBUG_LEVEL_VERBOSE, - "Allocating internal buffer of %d events with size %d (%s)\n", - m_buffersize, event_size, m_Name.c_str()); - - if(m_ringbuffer) { - debugWarning("already has an internal ringbuffer attached, re-allocating\n"); - freeInternalRingBuffer(); - } - - m_ringbuffer=ffado_ringbuffer_create(m_buffersize * event_size); - if (!m_ringbuffer) { - debugFatal("could not allocate internal ringbuffer\n"); - m_buffersize=0; - return false; - } - - return true; -} - -void Port::freeInternalRingBuffer() { - debugOutput(DEBUG_LEVEL_VERBOSE, - "Freeing internal ringbuffer (%s)\n",m_Name.c_str()); - - if(m_ringbuffer) { - ffado_ringbuffer_free(m_ringbuffer); - m_ringbuffer=0; - } -} - -} +} Index: /branches/ppalmers-streaming/src/libstreaming/generic/IsoStream.cpp =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/generic/IsoStream.cpp (revision 714) +++ /branches/ppalmers-streaming/src/libstreaming/generic/IsoStream.cpp (revision 719) @@ -55,49 +55,11 @@ *sy = 0; - return RAW1394_ISO_OK; -} - -int IsoStream::getNodeId() { - if (m_handler) { - return m_handler->getLocalNodeId(); - } - return -1; -} - - -void IsoStream::dumpInfo() -{ - - debugOutputShort( DEBUG_LEVEL_NORMAL, " Address : %p\n",this); - debugOutputShort( DEBUG_LEVEL_NORMAL, " Stream type : %s\n", - (this->getStreamType()==eST_Receive ? "Receive" : "Transmit")); - debugOutputShort( DEBUG_LEVEL_NORMAL, " Port, Channel : %d, %d\n", - m_port, m_channel); - } bool IsoStream::setChannel(int c) { debugOutput( DEBUG_LEVEL_VERBOSE, "setting channel for (%p) to %d\n",this, c); - m_channel=c; return true; -} - - -bool IsoStream::reset() { - debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); - return true; -} - -bool IsoStream::prepare() { - debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); - return true; -} - -bool IsoStream::init() { - debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); - return true; - } @@ -109,9 +71,15 @@ void IsoStream::clearHandler() { debugOutput( DEBUG_LEVEL_VERBOSE, "clearing handler of isostream %p\n", this); + m_handler=0; +} - m_handler=0; +void IsoStream::dumpInfo() +{ + debugOutputShort( DEBUG_LEVEL_NORMAL, " Address : %p\n",this); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Stream type : %s\n", + (this->getStreamType()==eST_Receive ? "Receive" : "Transmit")); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Port, Channel : %d, %d\n", + m_port, m_channel); +} } - - -} Index: /branches/ppalmers-streaming/src/libstreaming/generic/IsoStream.h =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/generic/IsoStream.h (revision 714) +++ /branches/ppalmers-streaming/src/libstreaming/generic/IsoStream.h (revision 719) @@ -74,6 +74,4 @@ virtual unsigned int getMaxPacketSize() {return 1024;}; //FIXME: arbitrary - virtual bool init(); - virtual enum raw1394_iso_disposition putPacket(unsigned char *data, unsigned int length, @@ -86,9 +84,4 @@ void dumpInfo(); - - int getNodeId(); - - virtual bool reset(); - virtual bool prepare(); protected: Index: /branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.cpp =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.cpp (revision 715) +++ /branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.cpp (revision 719) @@ -39,17 +39,17 @@ , m_processor_type ( type ) , m_state( ePS_Created ) + , m_next_state( ePS_Invalid ) + , m_cycle_to_switch_state( 0 ) , m_xruns( 0 ) - , m_manager(NULL) - , m_running(false) - , m_disabled(true) - , m_is_disabled(true) - , m_cycle_to_enable_at(0) - , m_ticks_per_frame(0) - , m_last_cycle(0) - , m_sync_delay(0) + , m_manager( NULL ) + , m_ticks_per_frame( 0 ) + , m_last_cycle( 0 ) + , m_sync_delay( 0 ) + , m_last_timestamp(0) + , m_last_timestamp2(0) + , m_dropped(0) { // create the timestamped buffer and register ourselves as its client m_data_buffer=new Util::TimestampedBuffer(this); - } @@ -58,128 +58,7 @@ } -void -StreamProcessor::setState(enum eProcessorState s) { - #ifdef DEBUG - // check the state transistion - debugOutput( DEBUG_LEVEL_VERBOSE, "State transition from %s to %s", - ePSToString(m_state), ePSToString(s) ); - #endif - m_state = s; -} - -void StreamProcessor::dumpInfo() -{ - debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor information\n"); - debugOutputShort( DEBUG_LEVEL_NORMAL, " Iso stream info:\n"); - - IsoStream::dumpInfo(); - debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor info:\n"); - if (m_handler) - debugOutputShort( DEBUG_LEVEL_NORMAL, " Now : %011u\n",m_handler->getCycleTimerTicks()); - debugOutputShort( DEBUG_LEVEL_NORMAL, " Xruns : %d\n", m_xruns); - debugOutputShort( DEBUG_LEVEL_NORMAL, " Running : %d\n", m_running); - debugOutputShort( DEBUG_LEVEL_NORMAL, " Enabled : %s\n", m_disabled ? "No" : "Yes"); - debugOutputShort( DEBUG_LEVEL_NORMAL, " enable status : %s\n", m_is_disabled ? "No" : "Yes"); - - debugOutputShort( DEBUG_LEVEL_NORMAL, " Nominal framerate : %u\n", m_manager->getNominalRate()); - debugOutputShort( DEBUG_LEVEL_NORMAL, " Device framerate : Sync: %f, Buffer %f\n", - 24576000.0/getSyncSource().m_data_buffer->getRate(), - 24576000.0/m_data_buffer->getRate() - ); - - m_data_buffer->dumpInfo(); - - m_PeriodStat.dumpInfo(); - m_PacketStat.dumpInfo(); -// m_WakeupStat.dumpInfo(); -} - -bool StreamProcessor::init() -{ - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n"); - m_data_buffer->init(); - return IsoStream::init(); -} - -/** - * Resets the frame counter, the xrun counter, the ports and the iso stream. - * @return true if reset succeeded - */ -bool StreamProcessor::reset() { - - debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting...\n"); - - // reset the event buffer, discard all content - if (!m_data_buffer->reset()) { - debugFatal("Could not reset data buffer\n"); - return false; - } - - resetXrunCounter(); - - // loop over the ports to reset them - if (!PortManager::resetPorts()) { - debugFatal("Could not reset ports\n"); - return false; - } - - // reset the iso stream - if (!IsoStream::reset()) { - debugFatal("Could not reset isostream\n"); - return false; - } - return true; - -} - -bool StreamProcessor::prepareForEnable(uint64_t time_to_enable_at) { - debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::prepareForEnable for (%p)\n",this); - debugOutput(DEBUG_LEVEL_VERBOSE," Now : %011u\n",m_handler->getCycleTimerTicks()); - debugOutput(DEBUG_LEVEL_VERBOSE," Enable at : %011u\n",time_to_enable_at); - m_data_buffer->dumpInfo(); - return true; -} - -bool StreamProcessor::prepareForDisable() { - debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::prepareForDisable for (%p)\n",this); - debugOutput(DEBUG_LEVEL_VERBOSE," Now : %011u\n",m_handler->getCycleTimerTicks()); - m_data_buffer->dumpInfo(); - return true; -} - -bool StreamProcessor::prepare() { - - debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n"); - if(!m_manager) { - debugFatal("Not attached to a manager!\n"); - return -1; - } - - // init the ports - // loop over the ports to reset them - PortManager::preparePorts(); - - // reset the iso stream - IsoStream::prepare(); - - return true; - -} - -StreamProcessor& -StreamProcessor::getSyncSource() -{ - return m_manager->getSyncSource(); -}; - -int StreamProcessor::getBufferFill() { -// return m_data_buffer->getFrameCounter(); - return m_data_buffer->getBufferFill(); -} - uint64_t StreamProcessor::getTimeNow() { return m_handler->getCycleTimerTicks(); } - int StreamProcessor::getMaxFrameLatency() { @@ -191,63 +70,21 @@ } -bool StreamProcessor::isRunning() { - return m_running; -} - -bool StreamProcessor::enable(uint64_t time_to_enable_at) { - // FIXME: time_to_enable_at will be in 'time' not cycles - m_cycle_to_enable_at=time_to_enable_at; - - if(!m_running) { - debugWarning("The StreamProcessor is not running yet, enable() might not be a good idea.\n"); - } - -#ifdef DEBUG - uint64_t now_cycles=CYCLE_TIMER_GET_CYCLES(m_handler->getCycleTimer()); - const int64_t max=(int64_t)(CYCLES_PER_SECOND/2); - - int64_t diff=(int64_t)m_cycle_to_enable_at-(int64_t)now_cycles; - - if (diff > max) { - diff-=TICKS_PER_SECOND; - } else if (diff < -max) { - diff+=TICKS_PER_SECOND; - } - - if (diff<0) { - debugWarning("Request to enable streamprocessor %lld cycles ago (now=%llu, cy=%llu).\n", - diff,now_cycles,time_to_enable_at); - } -#endif - m_data_buffer->enable(); - - m_disabled=false; - return true; -} - -bool StreamProcessor::disable() { - m_data_buffer->disable(); - m_disabled=true; - return true; -} - -float -StreamProcessor::getTicksPerFrame() { - if (m_data_buffer) { - float rate=m_data_buffer->getRate(); - if (fabsf(m_ticks_per_frame - rate)>(m_ticks_per_frame*0.1)) { - debugWarning("TimestampedBuffer rate (%10.5f) more that 10%% off nominal (%10.5f)\n",rate,m_ticks_per_frame); - return m_ticks_per_frame; - } -// return m_ticks_per_frame; - if (rate<0.0) debugError("rate < 0! (%f)\n",rate); - - return rate; - } else { - return 0.0; - } -} - -int64_t StreamProcessor::getTimeUntilNextPeriodSignalUsecs() { +/*********************************************** + * Buffer management and manipulation * + ***********************************************/ +int StreamProcessor::getBufferFill() { + return m_data_buffer->getBufferFill(); +} + +bool +StreamProcessor::dropFrames(unsigned int nbframes) +{ + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d)\n", nbframes); + return m_data_buffer->dropFrames(nbframes); +} + +int64_t +StreamProcessor::getTimeUntilNextPeriodSignalUsecs() +{ uint64_t time_at_period=getTimeAtPeriod(); @@ -260,5 +97,5 @@ // pass before these packets are processed. Adding this extra term makes that // the period boundary is signalled later - time_at_period = addTicks(time_at_period, getSyncSource().getSyncDelay()); + time_at_period = addTicks(time_at_period, m_manager->getSyncSource().getSyncDelay()); uint64_t cycle_timer=m_handler->getCycleTimerTicks(); @@ -278,30 +115,13 @@ } -uint64_t StreamProcessor::getTimeAtPeriodUsecs() { +uint64_t +StreamProcessor::getTimeAtPeriodUsecs() +{ return (uint64_t)((float)getTimeAtPeriod() * TICKS_PER_USEC); } -bool StreamProcessor::dropFrames(unsigned int nbframes) { - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d)\n", nbframes); - return m_data_buffer->dropFrames(nbframes); -} - -/** - * Resets the xrun counter, in a atomic way. This - * is thread safe. - */ -void StreamProcessor::resetXrunCounter() { - ZERO_ATOMIC((SInt32 *)&m_xruns); -} - -void StreamProcessor::setVerboseLevel(int l) { - setDebugLevel(l); - IsoStream::setVerboseLevel(l); - PortManager::setVerboseLevel(l); - m_data_buffer->setVerboseLevel(l); -} - uint64_t -StreamProcessor::getTimeAtPeriod() { +StreamProcessor::getTimeAtPeriod() +{ if (getType() == ePT_Receive) { ffado_timestamp_t next_period_boundary=m_data_buffer->getTimestampFromHead(m_manager->getPeriodSize()); @@ -333,29 +153,846 @@ } +float +StreamProcessor::getTicksPerFrame() +{ + assert(m_data_buffer != NULL); + return m_data_buffer->getRate(); +} + bool -StreamProcessor::canClientTransferFrames(unsigned int nbframes) { +StreamProcessor::canClientTransferFrames(unsigned int nbframes) +{ + bool can_transfer; + unsigned int fc = m_data_buffer->getFrameCounter(); if (getType() == ePT_Receive) { - return m_data_buffer->getFrameCounter() >= (int) nbframes; + can_transfer = fc >= (int) nbframes; } else { - bool can_transfer; // there has to be enough space to put the frames in - can_transfer = m_data_buffer->getBufferSize() - m_data_buffer->getFrameCounter() > nbframes; + can_transfer = m_data_buffer->getBufferSize() - fc > nbframes; // or the buffer is transparent can_transfer |= m_data_buffer->isTransparent(); - return can_transfer; - } -} - + } + + #ifdef DEBUG + if (!can_transfer) { + debugWarning("(%p, %s) cannot transfer since fc == %u, nbframes == %u\n", + this, ePTToString(getType()), fc, nbframes); + } + #endif + + return can_transfer; +} + +/*********************************************** + * I/O API * + ***********************************************/ + +// Packet transfer API +enum raw1394_iso_disposition +StreamProcessor::putPacket(unsigned char *data, unsigned int length, + unsigned char channel, unsigned char tag, unsigned char sy, + unsigned int cycle, unsigned int dropped) { + + int dropped_cycles = diffCycles(cycle, m_last_cycle) - 1; + if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles); + else m_dropped += dropped_cycles; + if (dropped_cycles > 0) debugWarning("(%p) dropped %d packets on cycle %u\n", this, dropped_cycles, cycle); + m_last_cycle = cycle; + + // bypass based upon state + if (m_state == ePS_Invalid) { + debugError("Should not have state %s\n", ePSToString(m_state) ); + return RAW1394_ISO_ERROR; + } + if (m_state == ePS_Created) { + return RAW1394_ISO_DEFER; + } + + // normal processing + enum raw1394_iso_disposition retval = RAW1394_ISO_OK; + + // store the previous timestamp + m_last_timestamp2 = m_last_timestamp; + + // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles) + // it happens on the first 'good' cycle for the wait condition + // or on the first received cycle that is received afterwards (might be a problem) + + // check whether we are waiting for a stream to be disabled + if(m_state == ePS_WaitingForStreamDisable) { + // we then check whether we have to switch on this cycle + if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) { + debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n"); + m_next_state = ePS_DryRunning; + if (!updateState()) { // we are allowed to change the state directly + debugError("Could not update state!\n"); + return RAW1394_ISO_ERROR; + } + } else { + // not time to disable yet + } + // the received data can be discarded while waiting for the stream + // to be disabled + return RAW1394_ISO_OK; + } + + // check whether we are waiting for a stream to be enabled + else if(m_state == ePS_WaitingForStreamEnable) { + // we then check whether we have to switch on this cycle + if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) { + debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n"); + m_next_state = ePS_Running; + if (!updateState()) { // we are allowed to change the state directly + debugError("Could not update state!\n"); + return RAW1394_ISO_ERROR; + } + } else { + // not time to enable yet + } + // we are dryRunning hence data should be processed in any case + } + + // check the packet header + if (processPacketHeader(data, length, channel, tag, sy, cycle, dropped_cycles)) { + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "RECV: CY=%04u TS=%011llu\n", + cycle, m_last_timestamp); + // update some accounting + m_last_good_cycle = cycle; + m_last_dropped = dropped_cycles; + + // check whether we are waiting for a stream to startup + // this requires that the packet is good + if(m_state == ePS_WaitingForStream) { + // since we have a packet with an OK header, + // we can indicate that the stream started up + + // we then check whether we have to switch on this cycle + if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) { + debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to good packet\n"); + // hence go to the dryRunning state + m_next_state = ePS_DryRunning; + if (!updateState()) { // we are allowed to change the state directly + debugError("Could not update state!\n"); + return RAW1394_ISO_ERROR; + } + } else { + // not time (yet) to switch state + } + // in both cases we don't want to process the data + return RAW1394_ISO_OK; + } + + // check whether a state change has been requested + // note that only the wait state changes are synchronized with the cycles + else if(m_state != m_next_state) { + debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n", + ePSToString(m_state), ePSToString(m_next_state)); + // execute the requested change + if (!updateState()) { // we are allowed to change the state directly + debugError("Could not update state!\n"); + return RAW1394_ISO_ERROR; + } + } + + // handle dropped cycles + if(dropped_cycles) { + // they represent a discontinuity in the timestamps, and hence are + // to be dealt with + debugWarning("(%p) Correcting timestamp for dropped cycles, discarding packet...\n", this); + m_data_buffer->setBufferTailTimestamp(m_last_timestamp); + // we don't want this sample to be written + return RAW1394_ISO_OK; + } + + // for all states that reach this we are allowed to + // do protocol specific data reception + bool ok = processPacketData(data, length, channel, tag, sy, cycle, dropped_cycles); + + // if an xrun occured, switch to the dryRunning state and + // allow for the xrun to be picked up + if (!ok) { + debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to xrun\n"); + m_next_state = ePS_DryRunning; + // execute the requested change + if (!updateState()) { // we are allowed to change the state directly + debugError("Could not update state!\n"); + return RAW1394_ISO_ERROR; + } + return RAW1394_ISO_DEFER; + } + } else { + // apparently we don't have to do anything when the packets are not valid + } + return retval; +} + +// Frame Transfer API +bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) { + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.getFrames(%d, %11llu)", nbframes, ts); + assert( getType() == ePT_Receive ); + if(isDryRunning()) return getFramesDry(nbframes, ts); + else return getFramesWet(nbframes, ts); +} + +bool StreamProcessor::getFramesWet(unsigned int nbframes, int64_t ts) { +// FIXME: this should be done somewhere else +#ifdef DEBUG + uint64_t ts_head; + signed int fc; + int32_t lag_ticks; + float lag_frames; + + // in order to sync up multiple received streams, we should + // use the ts parameter. It specifies the time of the block's + // first sample. + + ffado_timestamp_t ts_head_tmp; + m_data_buffer->getBufferHeadTimestamp(&ts_head_tmp, &fc); + ts_head=(uint64_t)ts_head_tmp; + lag_ticks=diffTicks(ts, ts_head); + float rate=m_data_buffer->getRate(); + + assert(rate!=0.0); + + lag_frames=(((float)lag_ticks)/rate); + + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): drifts %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n", + this, lag_ticks, lag_frames,rate, ts, ts_head, fc); + + if (lag_frames>=1.0) { + // the stream lags + debugWarning( "stream (%p): lags with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n", + this, lag_ticks, lag_frames,rate, ts, ts_head, fc); + } else if (lag_frames<=-1.0) { + // the stream leads + debugWarning( "stream (%p): leads with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n", + this, lag_ticks, lag_frames,rate, ts, ts_head, fc); + } +#endif + // ask the buffer to process nbframes of frames + // using it's registered client's processReadBlock(), + // which should be ours + m_data_buffer->blockProcessReadFrames(nbframes); + return true; +} + +bool StreamProcessor::getFramesDry(unsigned int nbframes, int64_t ts) { + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): dry run %d frames (@ ts=%lld)\n", + this, nbframes, ts); + + // dry run on this side means that we put silence in all enabled ports + // since there is do data put into the ringbuffer in the dry-running state + return provideSilenceBlock(nbframes, 0); +} + + +/*********************************************** + * State related API * + ***********************************************/ +bool StreamProcessor::init() +{ + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "init...\n"); + + // initialization can be done without requesting it + // from the packet loop + m_next_state = ePS_Created; + return true; +} + +bool StreamProcessor::prepare() +{ + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "prepare...\n"); + if(!m_manager) { + debugFatal("Not attached to a manager!\n"); + return false; + } + + if (!prepareChild()) { + debugFatal("Could not prepare child\n"); + return false; + } + + // initialization can be done without requesting it + // from the packet loop + m_next_state = ePS_Stopped; + return updateState(); +} + +bool StreamProcessor::stop() +{ + uint64_t time_to_stop_at = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE); + int cnt; + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stop...\n"); + switch (m_state) { + case ePS_Stopped: return true; + case ePS_DryRunning: + return stopDryRunning(-1); + case ePS_Running: + return stopRunning(-1) && + stopDryRunning(-1); + default: + debugError("Bad state: %s\n", ePSToString(m_state)); + return false; + } +} + +bool StreamProcessor::scheduleStateTransition(enum eProcessorState state, uint64_t time_instant) +{ + // first set the time, since in the packet loop we first check m_state == m_next_state before + // using the time + m_cycle_to_switch_state = TICKS_TO_CYCLES(time_instant); + m_next_state = state; + return true; +} + +bool StreamProcessor::scheduleAndWaitForStateTransition(enum eProcessorState state, + uint64_t time_instant, + enum eProcessorState wait_state) +{ + int cnt=200; // 2 seconds, i.e. 2 cycles + if(!scheduleStateTransition(state, time_instant)) { + debugError("Could not schedule state transistion to %s\n", ePSToString(state)); + return false; + } + while (m_state != wait_state && cnt) { + usleep(10000); + cnt++; + } + if(cnt==0) { + debugError("Timeout entering Stopped state\n"); + return false; + } + debugOutput(DEBUG_LEVEL_VERBOSE, " entered state %s\n", ePSToString(wait_state)); + return true; +} + +bool StreamProcessor::startDryRunning(int64_t t) { + uint64_t tx; + if (t < 0) { + tx = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE); + } else { + tx = t; + } + debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::startDryRunning for (%p)\n",this); + debugOutput(DEBUG_LEVEL_VERBOSE," Now : %011lu\n", m_handler->getCycleTimerTicks()); + debugOutput(DEBUG_LEVEL_VERBOSE," Start at : %011llu (%u)\n", tx, TICKS_TO_CYCLES(tx)); + if (m_state == ePS_Stopped) { + return scheduleAndWaitForStateTransition(ePS_WaitingForStream, tx, ePS_DryRunning); + } else if (m_state == ePS_Running) { + return scheduleAndWaitForStateTransition(ePS_WaitingForStreamDisable, tx, ePS_DryRunning); + } else { + debugError("Cannot switch to ePS_DryRunning from %s\n", ePSToString(m_state)); + return false; + } +} + +bool StreamProcessor::startRunning(int64_t t) { + uint64_t tx; + if (t < 0) { + tx = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE); + } else { + tx = t; + } + debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::startRunning for (%p)\n",this); + debugOutput(DEBUG_LEVEL_VERBOSE," Now : %011lu\n", m_handler->getCycleTimerTicks()); + debugOutput(DEBUG_LEVEL_VERBOSE," Start at : %011llu (%u)\n", tx, TICKS_TO_CYCLES(tx)); + return scheduleAndWaitForStateTransition(ePS_WaitingForStreamEnable, tx, ePS_Running); +} + +bool StreamProcessor::stopDryRunning(int64_t t) { + uint64_t tx; + if (t < 0) { + tx = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE); + } else { + tx = t; + } + debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::stopDryRunning for (%p)\n",this); + debugOutput(DEBUG_LEVEL_VERBOSE," Now : %011lu\n", m_handler->getCycleTimerTicks()); + debugOutput(DEBUG_LEVEL_VERBOSE," Stop at : %011llu (%u)\n", tx, TICKS_TO_CYCLES(tx)); + return scheduleAndWaitForStateTransition(ePS_Stopped, tx, ePS_Stopped); +} + +bool StreamProcessor::stopRunning(int64_t t) { + uint64_t tx; + if (t < 0) { + tx = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE); + } else { + tx = t; + } + debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::stopRunning for (%p)\n",this); + debugOutput(DEBUG_LEVEL_VERBOSE," Now : %011lu\n", m_handler->getCycleTimerTicks()); + debugOutput(DEBUG_LEVEL_VERBOSE," Stop at : %011llu (%u)\n", tx, TICKS_TO_CYCLES(tx)); + return scheduleAndWaitForStateTransition(ePS_WaitingForStreamDisable, tx, ePS_DryRunning); +} + +// internal state API + +/** + * @brief Enter the ePS_Stopped state + * @return true if successful, false if not + * + * @pre none + * + * @post the buffer and the isostream are ready for use. + * @post all dynamic structures have been allocated successfully + * @post the buffer is transparent and empty, and all parameters are set + * to the correct initial/nominal values. + * + */ +bool +StreamProcessor::doStop() +{ + float ticks_per_frame; + unsigned int ringbuffer_size_frames; + + debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state)); + bool result = true; + + switch(m_state) { + case ePS_Created: + assert(m_data_buffer); + // object just created + result = m_data_buffer->init(); + + // prepare the framerate estimate + ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_manager->getNominalRate()); + m_ticks_per_frame = ticks_per_frame; + debugOutput(DEBUG_LEVEL_VERBOSE,"Initializing remote ticks/frame to %f\n", ticks_per_frame); + + // initialize internal buffer + ringbuffer_size_frames = m_manager->getNbBuffers() * m_manager->getPeriodSize(); + result &= m_data_buffer->setBufferSize(ringbuffer_size_frames * 2); + + result &= m_data_buffer->setEventSize( getEventSize() ); + result &= m_data_buffer->setEventsPerFrame( getEventsPerFrame() ); + result &= m_data_buffer->setUpdatePeriod( getUpdatePeriod() ); + + result &= m_data_buffer->setNominalRate(ticks_per_frame); + result &= m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND); + result &= m_data_buffer->prepare(); // FIXME: the name + + // set the parameters of ports we can: + // we want the audio ports to be period buffered, + // and the midi ports to be packet buffered + for ( PortVectorIterator it = m_Ports.begin(); + it != m_Ports.end(); + ++it ) + { + debugOutput(DEBUG_LEVEL_VERBOSE, "Setting up port %s\n",(*it)->getName().c_str()); + if(!(*it)->setBufferSize(m_manager->getPeriodSize())) { + debugFatal("Could not set buffer size to %d\n",m_manager->getPeriodSize()); + return false; + } + switch ((*it)->getPortType()) { + case Port::E_Audio: + if(!(*it)->setSignalType(Port::E_PeriodSignalled)) { + debugFatal("Could not set signal type to PeriodSignalling"); + return false; + } + // buffertype and datatype are dependant on the API + debugWarning("---------------- ! Doing hardcoded dummy setup ! --------------\n"); + // buffertype and datatype are dependant on the API + if(!(*it)->setBufferType(Port::E_PointerBuffer)) { + debugFatal("Could not set buffer type"); + return false; + } + if(!(*it)->useExternalBuffer(true)) { + debugFatal("Could not set external buffer usage"); + return false; + } + if(!(*it)->setDataType(Port::E_Float)) { + debugFatal("Could not set data type"); + return false; + } + break; + case Port::E_Midi: + if(!(*it)->setSignalType(Port::E_PacketSignalled)) { + debugFatal("Could not set signal type to PacketSignalling"); + return false; + } + // buffertype and datatype are dependant on the API + // buffertype and datatype are dependant on the API + debugWarning("---------------- ! Doing hardcoded test setup ! --------------\n"); + // buffertype and datatype are dependant on the API + if(!(*it)->setBufferType(Port::E_RingBuffer)) { + debugFatal("Could not set buffer type"); + return false; + } + if(!(*it)->setDataType(Port::E_MidiEvent)) { + debugFatal("Could not set data type"); + return false; + } + break; + default: + debugWarning("Unsupported port type specified\n"); + break; + } + } + // the API specific settings of the ports should already be set, + // as this is called from the processorManager->prepare() + // so we can init the ports + result &= PortManager::initPorts(); + + break; + case ePS_DryRunning: + // what to do here? + break; + default: + debugError("Entry from invalid state: %s\n", ePSToString(m_state)); + return false; + } + + result &= m_data_buffer->reset(); // FIXME: don't like the reset() name + + // make the buffer transparent + m_data_buffer->setTransparent(true); + + // reset all ports + result &= PortManager::preparePorts(); + + m_state = ePS_Stopped; + return result; +} + +/** + * @brief Enter the ePS_WaitingForStream state + * @return true if successful, false if not + * + * @pre all dynamic data structures are allocated successfully + * + * @post + * + */ +bool +StreamProcessor::doWaitForRunningStream() +{ + debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state)); + switch(m_state) { + case ePS_Stopped: + // we have to start waiting for an incoming stream + // this basically means nothing, the state change will + // be picked up by the packet iterator + break; + default: + debugError("Entry from invalid state: %s\n", ePSToString(m_state)); + return false; + } + m_state = ePS_WaitingForStream; + return true; +} + +/** + * @brief Enter the ePS_DryRunning state + * @return true if successful, false if not + * + * @pre + * + * @post + * + */ +bool +StreamProcessor::doDryRunning() +{ + bool result = true; + debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state)); + switch(m_state) { + case ePS_WaitingForStream: + // a running stream has been detected + debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started dry-running at cycle %d\n", this, m_last_cycle); + if (getType() == ePT_Receive) { + m_data_buffer->setBufferTailTimestamp(m_last_timestamp); + } else { + // FIXME + debugError("Implement\n"); + } + break; + case ePS_WaitingForStreamDisable: + result &= m_data_buffer->reset(); // FIXME: don't like the reset() name + m_data_buffer->setTransparent(true); + break; + default: + debugError("Entry from invalid state: %s\n", ePSToString(m_state)); + return false; + } + m_state = ePS_DryRunning; + return result; +} + +/** + * @brief Enter the ePS_WaitingForStreamEnable state + * @return true if successful, false if not + * + * @pre + * + * @post + * + */ +bool +StreamProcessor::doWaitForStreamEnable() +{ + debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state)); + switch(m_state) { + case ePS_DryRunning: + // we have to start waiting for an incoming stream + // this basically means nothing, the state change will + // be picked up by the packet iterator + break; + default: + debugError("Entry from invalid state: %s\n", ePSToString(m_state)); + return false; + } + m_state = ePS_WaitingForStreamEnable; + return true; +} + +/** + * @brief Enter the ePS_Running state + * @return true if successful, false if not + * + * @pre + * + * @post + * + */ +bool +StreamProcessor::doRunning() +{ + bool result = true; + debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state)); + switch(m_state) { + case ePS_WaitingForStreamEnable: + // a running stream has been detected + debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started running at cycle %d\n", + this, m_last_cycle); + if (getType() == ePT_Receive) { + m_data_buffer->setTransparent(false); + } else { + // FIXME + debugError("Implement\n"); + } + break; + default: + debugError("Entry from invalid state: %s\n", ePSToString(m_state)); + return false; + } + m_state = ePS_Running; + return result; +} + +/** + * @brief Enter the ePS_WaitingForStreamDisable state + * @return true if successful, false if not + * + * @pre + * + * @post + * + */ +bool +StreamProcessor::doWaitForStreamDisable() +{ + debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state)); + switch(m_state) { + case ePS_Running: + // the thread will do the transition + break; + default: + debugError("Entry from invalid state: %s\n", ePSToString(m_state)); + return false; + } + m_state = ePS_WaitingForStreamDisable; + return true; +} + +/** + * @brief Updates the state machine and calls the necessary transition functions + * @return true if successful, false if not + */ +bool StreamProcessor::updateState() { + bool result = false; + // copy the current state locally since it could change value, + // and that's something we don't want to happen inbetween tests + // if m_next_state changes during this routine, we know for sure + // that the previous state change was at least attempted correctly. + enum eProcessorState next_state = m_next_state; + + debugOutput(DEBUG_LEVEL_VERBOSE, "Do state transition: %s => %s\n", + ePSToString(m_state), ePSToString(next_state)); + + if (m_state == next_state) { + debugWarning("ignoring identity state update from/to %s\n", ePSToString(m_state) ); + return true; + } + + // after creation, only initialization is allowed + if (m_state == ePS_Created) { + if(next_state != ePS_Stopped) { + goto updateState_exit_with_error; + } + // do init here + result = doStop(); + if (result) return true; + else goto updateState_exit_change_failed; + } + + // after initialization, only WaitingForRunningStream is allowed + if (m_state == ePS_Stopped) { + if(next_state != ePS_WaitingForStream) { + goto updateState_exit_with_error; + } + result = doWaitForRunningStream(); + if (result) return true; + else goto updateState_exit_change_failed; + } + + // after WaitingForStream, only ePS_DryRunning is allowed + // this means that the stream started running + if (m_state == ePS_WaitingForStream) { + if(next_state != ePS_DryRunning) { + goto updateState_exit_with_error; + } + result = doDryRunning(); + if (result) return true; + else goto updateState_exit_change_failed; + } + + // from ePS_DryRunning we can go to: + // - ePS_Stopped if something went wrong during DryRunning + // - ePS_WaitingForStreamEnable if there is a requested to enable + if (m_state == ePS_DryRunning) { + if((next_state != ePS_Stopped) && + (next_state != ePS_WaitingForStreamEnable)) { + goto updateState_exit_with_error; + } + if (next_state == ePS_Stopped) { + result = doStop(); + } else { + result = doWaitForStreamEnable(); + } + if (result) return true; + else goto updateState_exit_change_failed; + } + + // from ePS_WaitingForStreamEnable we can go to: + // - ePS_DryRunning if something went wrong while waiting + // - ePS_Running if the stream enabled correctly + if (m_state == ePS_WaitingForStreamEnable) { + if((next_state != ePS_DryRunning) && + (next_state != ePS_Running)) { + goto updateState_exit_with_error; + } + if (next_state == ePS_Stopped) { + result = doDryRunning(); + } else { + result = doRunning(); + } + if (result) return true; + else goto updateState_exit_change_failed; + } + + // from ePS_Running we can only start waiting for a disabled stream + if (m_state == ePS_Running) { + if(next_state != ePS_WaitingForStreamDisable) { + goto updateState_exit_with_error; + } + result = doWaitForStreamDisable(); + if (result) return true; + else goto updateState_exit_change_failed; + } + + // from ePS_WaitingForStreamDisable we can go to DryRunning + if (m_state == ePS_WaitingForStreamDisable) { + if(next_state != ePS_DryRunning) { + goto updateState_exit_with_error; + } + result = doDryRunning(); + if (result) return true; + else goto updateState_exit_change_failed; + } + + // if we arrive here there is an error +updateState_exit_with_error: + debugError("Invalid state transition: %s => %s\n", + ePSToString(m_state), ePSToString(next_state)); + return false; +updateState_exit_change_failed: + debugError("State transition failed: %s => %s\n", + ePSToString(m_state), ePSToString(next_state)); + return false; +} + + +/** + * @brief convert a eProcessorState to a string + * @param s the state + * @return a char * describing the state + */ const char * StreamProcessor::ePSToString(enum eProcessorState s) { switch (s) { + case ePS_Invalid: return "ePS_Invalid"; case ePS_Created: return "ePS_Created"; - case ePS_Initialized: return "ePS_Initialized"; - case ePS_WaitingForRunningStream: return "ePS_WaitingForRunningStream"; + case ePS_Stopped: return "ePS_Stopped"; + case ePS_WaitingForStream: return "ePS_WaitingForStream"; case ePS_DryRunning: return "ePS_DryRunning"; - case ePS_WaitingForEnabledStream: return "ePS_WaitingForEnabledStream"; - case ePS_StreamEnabled: return "ePS_StreamEnabled"; - case ePS_WaitingForDisabledStream: return "ePS_WaitingForDisabledStream"; - } + case ePS_WaitingForStreamEnable: return "ePS_WaitingForStreamEnable"; + case ePS_Running: return "ePS_Running"; + case ePS_WaitingForStreamDisable: return "ePS_WaitingForStreamDisable"; + default: return "error: unknown state"; + } +} + +/** + * @brief convert a eProcessorType to a string + * @param t the type + * @return a char * describing the state + */ +const char * +StreamProcessor::ePTToString(enum eProcessorType t) { + switch (t) { + case ePT_Receive: return "Receive"; + case ePT_Transmit: return "Transmit"; + default: return "error: unknown type"; + } +} + +/*********************************************** + * Debug * + ***********************************************/ +void +StreamProcessor::dumpInfo() +{ + debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor information\n"); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Iso stream info:\n"); + + IsoStream::dumpInfo(); + debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor info:\n"); + if (m_handler) + debugOutputShort( DEBUG_LEVEL_NORMAL, " Now : %011u\n",m_handler->getCycleTimerTicks()); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Xruns : %d\n", m_xruns); + debugOutputShort( DEBUG_LEVEL_NORMAL, " State : %s\n", ePSToString(m_state)); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Next state : %s\n", ePSToString(m_next_state)); + debugOutputShort( DEBUG_LEVEL_NORMAL, " transition at : %u\n", m_cycle_to_switch_state); + + + debugOutputShort( DEBUG_LEVEL_NORMAL, " Nominal framerate : %u\n", m_manager->getNominalRate()); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Device framerate : Sync: %f, Buffer %f\n", + 24576000.0/m_manager->getSyncSource().m_data_buffer->getRate(), + 24576000.0/m_data_buffer->getRate() + ); + + m_data_buffer->dumpInfo(); + + m_PeriodStat.dumpInfo(); + m_PacketStat.dumpInfo(); +// m_WakeupStat.dumpInfo(); +} + +void +StreamProcessor::setVerboseLevel(int l) { + setDebugLevel(l); + IsoStream::setVerboseLevel(l); + PortManager::setVerboseLevel(l); + m_data_buffer->setVerboseLevel(l); } Index: /branches/ppalmers-streaming/src/libstreaming/util/IsoHandlerManager.cpp =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/util/IsoHandlerManager.cpp (revision 714) +++ /branches/ppalmers-streaming/src/libstreaming/util/IsoHandlerManager.cpp (revision 719) @@ -327,4 +327,8 @@ unsigned int irq_interval=packets_per_period / MINIMUM_INTERRUPTS_PER_PERIOD; if(irq_interval <= 0) irq_interval=1; + // FIXME: test + irq_interval=1; + #warning Using fixed irq_interval + #else // hardware interrupts occur when one DMA block is full, and the size of one DMA Index: /branches/ppalmers-streaming/src/libstreaming/util/IsoHandler.cpp =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/util/IsoHandler.cpp (revision 705) +++ /branches/ppalmers-streaming/src/libstreaming/util/IsoHandler.cpp (revision 719) @@ -437,8 +437,10 @@ debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing iso receive handler (%p)\n",this); - debugOutput( DEBUG_LEVEL_VERBOSE, " Buffers : %d \n",m_buf_packets); - debugOutput( DEBUG_LEVEL_VERBOSE, " Max Packet size : %d \n",m_max_packet_size); - debugOutput( DEBUG_LEVEL_VERBOSE, " Channel : %d \n",m_Client->getChannel()); - debugOutput( DEBUG_LEVEL_VERBOSE, " Irq interval : %d \n",m_irq_interval); + debugOutput( DEBUG_LEVEL_VERBOSE, " Buffers : %d \n", m_buf_packets); + debugOutput( DEBUG_LEVEL_VERBOSE, " Max Packet size : %d \n", m_max_packet_size); + debugOutput( DEBUG_LEVEL_VERBOSE, " Channel : %d \n", m_Client->getChannel()); + debugOutput( DEBUG_LEVEL_VERBOSE, " Irq interval : %d \n", m_irq_interval); + debugOutput( DEBUG_LEVEL_VERBOSE, " Mode : %s \n", + (m_irq_interval > 1)?"DMA_BUFFERFILL":"PACKET_PER_BUFFER"); if(m_irq_interval > 1) { Index: /branches/ppalmers-streaming/src/libutil/TimestampedBuffer.cpp =================================================================== --- /branches/ppalmers-streaming/src/libutil/TimestampedBuffer.cpp (revision 712) +++ /branches/ppalmers-streaming/src/libutil/TimestampedBuffer.cpp (revision 719) @@ -147,10 +147,11 @@ diff += m_wrap_at; } - + float rate=((float)diff)/((float) m_update_period); + if (rate<0.0) debugError("rate < 0! (%f)\n",rate); if (fabsf(m_nominal_rate - rate)>(m_nominal_rate*0.1)) { debugWarning("(%p) rate (%10.5f) more that 10%% off nominal (rate=%10.5f, diff="TIMESTAMP_FORMAT_SPEC", update_period=%d)\n", this, rate,m_nominal_rate,diff, m_update_period); - //dumpInfo(); + return m_nominal_rate; } else { @@ -379,25 +380,6 @@ if (m_transparent) { -// // if the buffer is disabled, it's in a 'transparent' state, meaning -// // that if too much is put into the buffer, the oldest data is discarded -// signed int fc; -// ENTER_CRITICAL_SECTION; -// fc=m_framecounter; -// EXIT_CRITICAL_SECTION; -// -// signed int frames_to_ditch= nframes - (m_buffer_size - m_framecounter) + 1; -// if ( frames_to_ditch > 0 ) { -// debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "dropping %d frames\n", frames_to_ditch); -// dropFrames( frames_to_ditch ); -// } -// // add the data payload to the ringbuffer -// if (ffado_ringbuffer_write(m_event_buffer,data,write_size) < write_size) -// { -// debugError("we should have freed up enough space for this\n"); -// return false; -// } -// - // while disabled, we don't update the DLL, we just set the correct - // timestamp for the frames + // while disabled, we don't update the DLL, nor do we write frames + // we just set the correct timestamp for the frames setBufferTailTimestamp(ts); } else { @@ -446,15 +428,16 @@ unsigned int read_size=nframes*m_event_size*m_events_per_frame; - // get the data payload to the ringbuffer - if ((ffado_ringbuffer_read(m_event_buffer,data,read_size)) < read_size) - { -// debugWarning("readFrames buffer underrun\n"); - return false; - } - - decrementFrameCounter(nframes); - - return true; - + if (m_transparent) { + return true; // FIXME: the data still doesn't make sense! + } else { + // get the data payload to the ringbuffer + if ((ffado_ringbuffer_read(m_event_buffer,data,read_size)) < read_size) + { + debugWarning("readFrames buffer underrun\n"); + return false; + } + decrementFrameCounter(nframes); + } + return true; } @@ -712,5 +695,5 @@ EXIT_CRITICAL_SECTION; - debugOutput(DEBUG_LEVEL_VERBOSE, "for (%p) to " + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "for (%p) to " TIMESTAMP_FORMAT_SPEC" => "TIMESTAMP_FORMAT_SPEC", NTS=" TIMESTAMP_FORMAT_SPEC", DLL2=%f, RATE=%f\n", Index: /branches/ppalmers-streaming/src/libutil/TimestampedBuffer.h =================================================================== --- /branches/ppalmers-streaming/src/libutil/TimestampedBuffer.h (revision 712) +++ /branches/ppalmers-streaming/src/libutil/TimestampedBuffer.h (revision 719) @@ -150,4 +150,5 @@ // dll stuff bool setNominalRate(float r); + float getNominalRate() {return m_nominal_rate;}; float getRate(); Index: /branches/ppalmers-streaming/src/genericavc/avc_avdevice.cpp =================================================================== --- /branches/ppalmers-streaming/src/genericavc/avc_avdevice.cpp (revision 715) +++ /branches/ppalmers-streaming/src/genericavc/avc_avdevice.cpp (revision 719) @@ -592,5 +592,6 @@ int AvDevice::getStreamCount() { - return m_receiveProcessors.size() + m_transmitProcessors.size(); + //return m_receiveProcessors.size() + m_transmitProcessors.size(); + return 1; } Index: /branches/ppalmers-streaming/src/ffado_streaming.cpp =================================================================== --- /branches/ppalmers-streaming/src/ffado_streaming.cpp (revision 715) +++ /branches/ppalmers-streaming/src/ffado_streaming.cpp (revision 719) @@ -340,5 +340,5 @@ int ffado_streaming_transfer_playback_buffers(ffado_device_t *dev) { - return dev->processorManager->transfer(StreamProcessor::ePT_Receive); + return dev->processorManager->transfer(StreamProcessor::ePT_Transmit); }