Index: /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.cpp =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.cpp (revision 721) +++ /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.cpp (revision 722) @@ -48,5 +48,5 @@ {} -bool +enum StreamProcessor::eChildReturnValue AmdtpTransmitStreamProcessor::generatePacketHeader ( unsigned char *data, unsigned int *length, @@ -94,5 +94,5 @@ // given by TRANSMIT_TRANSFER_DELAY (in ticks), but we can send // packets early if we want to. (not completely according to spec) - const int max_cycles_to_transmit_early = 5; + const int max_cycles_to_transmit_early = 2; try_block_of_frames: @@ -129,11 +129,12 @@ cycles_until_transmit = diffCycles ( transmit_at_cycle, cycle ); - debugOutput ( DEBUG_LEVEL_VERY_VERBOSE, - "Gen HDR: CY=%04u, TC=%04u, CUT=%04d, TST=%011llu (%04u), TSP=%011llu (%04u)\n", - cycle, - transmit_at_cycle, cycles_until_transmit, - transmit_at_time, ( unsigned int ) TICKS_TO_CYCLES ( transmit_at_time ), - presentation_time, ( unsigned int ) TICKS_TO_CYCLES ( presentation_time ) ); - + if (dropped) { + debugOutput ( DEBUG_LEVEL_VERBOSE, + "Gen HDR: CY=%04u, TC=%04u, CUT=%04d, TST=%011llu (%04u), TSP=%011llu (%04u)\n", + cycle, + transmit_at_cycle, cycles_until_transmit, + transmit_at_time, ( unsigned int ) TICKS_TO_CYCLES ( transmit_at_time ), + presentation_time, ( unsigned int ) TICKS_TO_CYCLES ( presentation_time ) ); + } // two different options: // 1) there are not enough frames for one packet @@ -154,9 +155,5 @@ fc, cycle, transmit_at_cycle, cycles_until_transmit ); // we are too late - // meaning that we in some sort of xrun state - // signal xrun situation ??HERE?? - m_xruns++; - // we send an empty packet on this cycle - return false; + return eCRV_XRun; } else @@ -166,13 +163,6 @@ fc, cycle, transmit_at_cycle, cycles_until_transmit ); // there is still time left to send the packet - // we want the system to give this packet another go - // goto try_packet_again; // UGLY but effective - // unfortunatly the try_again doesn't work very well, - // so we'll have to either usleep(one cycle) and goto try_block_of_frames - - // or just fill this with an empty packet - // if we have to do this too often, the presentation time will - // get too close and we're in trouble - return false; + // we want the system to give this packet another go at a later time instant + return eCRV_Again; } } @@ -195,18 +185,12 @@ // get next block of frames and repeat - if ( cycles_until_transmit <= max_cycles_to_transmit_early ) - { - // it's time send the packet - m_dbc += fillDataPacketHeader ( packet, length, m_last_timestamp ); - return true; - } - else if ( cycles_until_transmit < 0 ) + if(cycles_until_transmit < 0) { // we are too late - debugOutput ( DEBUG_LEVEL_VERBOSE, + debugOutput(DEBUG_LEVEL_VERBOSE, "Too late: CY=%04u, TC=%04u, CUT=%04d, TSP=%011llu (%04u)\n", cycle, transmit_at_cycle, cycles_until_transmit, - presentation_time, ( unsigned int ) TICKS_TO_CYCLES ( presentation_time ) ); + presentation_time, (unsigned int)TICKS_TO_CYCLES(presentation_time) ); // however, if we can send this sufficiently before the presentation @@ -214,19 +198,20 @@ // NOTE: dangerous since the device has no way of reporting that it didn't get // this packet on time. - if ( cycles_until_presentation <= min_cycles_before_presentation ) + if(cycles_until_presentation >= min_cycles_before_presentation) { // we are not that late and can still try to transmit the packet - m_dbc += fillDataPacketHeader ( packet, length, m_last_timestamp ); - return true; + m_dbc += fillDataPacketHeader(packet, length, m_last_timestamp); + return eCRV_Packet; } else // definitely too late { - // remove the samples - m_data_buffer->dropFrames ( m_syt_interval ); - // signal some xrun situation ??HERE?? - m_xruns++; - // try a new block of frames - goto try_block_of_frames; // UGLY but effective - } + return eCRV_XRun; + } + } + else if(cycles_until_transmit <= max_cycles_to_transmit_early) + { + // it's time send the packet + m_dbc += fillDataPacketHeader(packet, length, m_last_timestamp); + return eCRV_Packet; } else @@ -250,11 +235,11 @@ #endif // we are too early, send only an empty packet - return false; - } - } - return true; -} - -bool + return eCRV_EmptyPacket; + } + } + return eCRV_Invalid; +} + +enum StreamProcessor::eChildReturnValue AmdtpTransmitStreamProcessor::generatePacketData ( unsigned char *data, unsigned int *length, @@ -273,13 +258,11 @@ debugOutput ( DEBUG_LEVEL_VERY_VERBOSE, "XMIT DATA: TSP=%011llu (%04u)\n", cycle, m_last_timestamp, ( unsigned int ) TICKS_TO_CYCLES ( m_last_timestamp ) ); - return true; - } - else - { - return false; - } -} - -bool + return eCRV_OK; + } + else return eCRV_XRun; + +} + +enum StreamProcessor::eChildReturnValue AmdtpTransmitStreamProcessor::generateSilentPacketHeader ( unsigned char *data, unsigned int *length, @@ -308,8 +291,8 @@ m_dbc += fillNoDataPacketHeader ( packet, length ); - return true; -} - -bool + return eCRV_OK; +} + +enum StreamProcessor::eChildReturnValue AmdtpTransmitStreamProcessor::generateSilentPacketData ( unsigned char *data, unsigned int *length, @@ -317,5 +300,5 @@ int cycle, unsigned int dropped, unsigned int max_length ) { - return true; // no need to do anything + return eCRV_OK; // no need to do anything } Index: /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.cpp =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.cpp (revision 720) +++ /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.cpp (revision 722) @@ -97,7 +97,7 @@ * @param cycle * @param dropped - * @return true if this is a valid packet, false if not - */ -bool + * @return + */ +enum StreamProcessor::eChildReturnValue AmdtpReceiveStreamProcessor::processPacketHeader(unsigned char *data, unsigned int length, unsigned char channel, unsigned char tag, unsigned char sy, @@ -106,10 +106,10 @@ struct iec61883_packet *packet = (struct iec61883_packet *) data; assert(packet); - bool retval = (packet->syt != 0xFFFF) && + bool ok = (packet->syt != 0xFFFF) && (packet->fdf != 0xFF) && (packet->fmt == 0x10) && (packet->dbs > 0) && (length >= 2*sizeof(quadlet_t)); - if(retval) { + if(ok) { uint64_t now = m_handler->getCycleTimer(); //=> convert the SYT to a full timestamp in ticks @@ -117,5 +117,5 @@ cycle, now); } - return retval; + return (ok ? eCRV_OK : eCRV_Invalid ); } @@ -130,7 +130,7 @@ * @param cycle * @param dropped - * @return true if successful, false if xrun - */ -bool + * @return + */ +enum StreamProcessor::eChildReturnValue AmdtpReceiveStreamProcessor::processPacketData(unsigned char *data, unsigned int length, unsigned char channel, unsigned char tag, unsigned char sy, @@ -158,10 +158,12 @@ // process all ports that should be handled on a per-packet base // this is MIDI for AMDTP (due to the need of DBC) - if (!decodePacketPorts((quadlet_t *)(data+8), nevents, packet->dbc)) { - debugWarning("Problem decoding Packet Ports\n"); + if(isRunning()) { + if (!decodePacketPorts((quadlet_t *)(data+8), nevents, packet->dbc)) { + debugWarning("Problem decoding Packet Ports\n"); + } } - return true; + return eCRV_OK; } else { - return false; + return eCRV_XRun; } } Index: /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.h =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.h (revision 720) +++ /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.h (revision 722) @@ -80,14 +80,14 @@ virtual ~AmdtpTransmitStreamProcessor() {}; - bool generatePacketHeader(unsigned char *data, unsigned int *length, + enum eChildReturnValue generatePacketHeader(unsigned char *data, unsigned int *length, unsigned char *tag, unsigned char *sy, int cycle, unsigned int dropped, unsigned int max_length); - bool generatePacketData(unsigned char *data, unsigned int *length, + enum eChildReturnValue generatePacketData(unsigned char *data, unsigned int *length, unsigned char *tag, unsigned char *sy, int cycle, unsigned int dropped, unsigned int max_length); - bool generateSilentPacketHeader(unsigned char *data, unsigned int *length, + enum eChildReturnValue generateSilentPacketHeader(unsigned char *data, unsigned int *length, unsigned char *tag, unsigned char *sy, int cycle, unsigned int dropped, unsigned int max_length); - bool generateSilentPacketData(unsigned char *data, unsigned int *length, + enum eChildReturnValue generateSilentPacketData(unsigned char *data, unsigned int *length, unsigned char *tag, unsigned char *sy, int cycle, unsigned int dropped, unsigned int max_length); Index: /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.h =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.h (revision 720) +++ /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.h (revision 722) @@ -78,8 +78,8 @@ virtual ~AmdtpReceiveStreamProcessor() {}; - bool processPacketHeader(unsigned char *data, unsigned int length, + enum eChildReturnValue processPacketHeader(unsigned char *data, unsigned int length, unsigned char channel, unsigned char tag, unsigned char sy, unsigned int cycle, unsigned int dropped); - bool processPacketData(unsigned char *data, unsigned int length, + enum eChildReturnValue processPacketData(unsigned char *data, unsigned int length, unsigned char channel, unsigned char tag, unsigned char sy, unsigned int cycle, unsigned int dropped); Index: /branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp (revision 721) +++ /branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp (revision 722) @@ -158,4 +158,6 @@ } m_isoManager->setVerboseLevel(getDebugLevel()); + m_isoManager->setTransmitBufferNbPeriods(getNbBuffers() - 1); + if(!m_isoManager->init()) { debugFatal("Could not initialize IsoHandlerManager\n"); @@ -243,7 +245,9 @@ it != m_ReceiveProcessors.end(); ++it ) { - if(!(*it)->startDryRunning(-1)) { - debugError("Could not put SP %p into the dry-running state\n", *it); - return false; + if (!(*it)->isDryRunning()) { + if(!(*it)->startDryRunning(-1)) { + debugError("Could not put SP %p into the dry-running state\n", *it); + return false; + } } } @@ -251,7 +255,9 @@ it != m_TransmitProcessors.end(); ++it ) { - if(!(*it)->startDryRunning(-1)) { - debugError("Could not put SP %p into the dry-running state\n", *it); - return false; + if (!(*it)->isDryRunning()) { + if(!(*it)->startDryRunning(-1)) { + debugError("Could not put SP %p into the dry-running state\n", *it); + return false; + } } } @@ -621,5 +627,5 @@ bool StreamProcessorManager::waitForPeriod() { int time_till_next_period; - bool xrun_occurred=false; + bool xrun_occurred = false; debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n"); @@ -649,16 +655,9 @@ xrun_occurred |= (*it)->xrunOccurred(); } - if(xrun_occurred) break; // check if we were waked up too soon - time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); - } - - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", time_till_next_period); - - // this is to notify the client of the delay - // that we introduced - m_delayed_usecs = -time_till_next_period; + time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); + } // we save the 'ideal' time of the transfer at this point, @@ -672,4 +671,47 @@ m_time_of_transfer); + // normally we can transfer frames at this time, but in some cases this is not true + // e.g. when there are not enough frames in the receive buffer. + // however this doesn't have to be a problem, since we can wait some more until we + // have enough frames. There is only a problem once the ISO xmit doesn't have packets + // to transmit, or if the receive buffer overflows. These conditions are signaled by + // the iso threads + // check if xruns occurred on the Iso side. + // also check if xruns will occur should we transfer() now + #ifdef DEBUG + int waited = -1; + #endif + bool ready_for_transfer = false; + xrun_occurred = false; + while (!ready_for_transfer && !xrun_occurred) { + ready_for_transfer = true; + for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); + it != m_ReceiveProcessors.end(); + ++it ) { + ready_for_transfer &= ((*it)->canClientTransferFrames(m_period)); + xrun_occurred |= (*it)->xrunOccurred(); + } + for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); + it != m_TransmitProcessors.end(); + ++it ) { + ready_for_transfer &= ((*it)->canClientTransferFrames(m_period)); + xrun_occurred |= (*it)->xrunOccurred(); + } + usleep(125); // MAGIC: one cycle sleep... + #ifdef DEBUG + waited++; + #endif + } // we are either ready or an xrun occurred + + #ifdef DEBUG + if(waited > 0) { + debugOutput(DEBUG_LEVEL_VERBOSE, "Waited %d x 125us due to SP not ready for transfer\n", waited); + } + #endif + + // this is to notify the client of the delay that we introduced by waiting + m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", m_delayed_usecs); + #ifdef DEBUG int rcv_bf=0, xmt_bf=0; @@ -687,53 +729,32 @@ m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf); -#endif - - xrun_occurred=false; - // check if xruns occurred on the Iso side. // also check if xruns will occur should we transfer() now - for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); it != m_ReceiveProcessors.end(); ++it ) { - // a xrun has occurred on the Iso side - xrun_occurred |= (*it)->xrunOccurred(); - - // if this is true, a xrun will occur - xrun_occurred |= !((*it)->canClientTransferFrames(m_period)); - -#ifdef DEBUG + if ((*it)->xrunOccurred()) { - debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it); + debugWarning("Xrun on RECV SP %p due to ISO side xrun\n",*it); (*it)->dumpInfo(); } if (!((*it)->canClientTransferFrames(m_period))) { - debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it); + debugWarning("Xrun on RECV SP %p due to buffer side xrun\n",*it); (*it)->dumpInfo(); } + } + for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); + it != m_TransmitProcessors.end(); + ++it ) { + if ((*it)->xrunOccurred()) { + debugWarning("Xrun on XMIT SP %p due to ISO side xrun\n",*it); + } + if (!((*it)->canClientTransferFrames(m_period))) { + debugWarning("Xrun on XMIT SP %p due to buffer side xrun\n",*it); + } + } #endif - } - for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - // a xrun has occurred on the Iso side - xrun_occurred |= (*it)->xrunOccurred(); - - // if this is true, a xrun will occur - xrun_occurred |= !((*it)->canClientTransferFrames(m_period)); - -#ifdef DEBUG - if ((*it)->xrunOccurred()) { - debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it); - } - if (!((*it)->canClientTransferFrames(m_period))) { - debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it); - } -#endif - } - m_nbperiods++; - // now we can signal the client that we are (should be) ready return !xrun_occurred; Index: /branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.h =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.h (revision 721) +++ /branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.h (revision 722) @@ -155,16 +155,24 @@ protected: // the helper receive/transmit functions + enum eChildReturnValue { + eCRV_OK, + eCRV_Invalid, + eCRV_Packet, + eCRV_EmptyPacket, + eCRV_XRun, + eCRV_Again, + }; // to be implemented by the children // the following methods are to be implemented by receive SP subclasses - virtual bool processPacketHeader(unsigned char *data, unsigned int length, + virtual enum eChildReturnValue processPacketHeader(unsigned char *data, unsigned int length, unsigned char channel, unsigned char tag, unsigned char sy, unsigned int cycle, unsigned int dropped) - {debugWarning("call not allowed\n"); return false;}; - virtual bool processPacketData(unsigned char *data, unsigned int length, + {debugWarning("call not allowed\n"); return eCRV_Invalid;}; + virtual enum eChildReturnValue processPacketData(unsigned char *data, unsigned int length, unsigned char channel, unsigned char tag, unsigned char sy, unsigned int cycle, unsigned int dropped) - {debugWarning("call not allowed\n"); return false;}; + {debugWarning("call not allowed\n"); return eCRV_Invalid;}; virtual bool processReadBlock(char *data, unsigned int nevents, unsigned int offset) {debugWarning("call not allowed\n"); return false;}; @@ -173,24 +181,24 @@ // the following methods are to be implemented by transmit SP subclasses - virtual bool generatePacketHeader(unsigned char *data, unsigned int *length, + virtual enum eChildReturnValue generatePacketHeader(unsigned char *data, unsigned int *length, unsigned char *tag, unsigned char *sy, int cycle, unsigned int dropped, unsigned int max_length) - {debugWarning("call not allowed\n"); return false;}; - virtual bool generatePacketData(unsigned char *data, unsigned int *length, + {debugWarning("call not allowed\n"); return eCRV_Invalid;}; + virtual enum eChildReturnValue generatePacketData(unsigned char *data, unsigned int *length, unsigned char *tag, unsigned char *sy, int cycle, unsigned int dropped, unsigned int max_length) - {debugWarning("call not allowed\n"); return false;}; - virtual bool generateSilentPacketHeader(unsigned char *data, unsigned int *length, + {debugWarning("call not allowed\n"); return eCRV_Invalid;}; + virtual enum eChildReturnValue generateSilentPacketHeader(unsigned char *data, unsigned int *length, unsigned char *tag, unsigned char *sy, int cycle, unsigned int dropped, unsigned int max_length) - {debugWarning("call not allowed\n"); return false;}; - virtual bool generateSilentPacketData(unsigned char *data, unsigned int *length, + {debugWarning("call not allowed\n"); return eCRV_Invalid;}; + virtual enum eChildReturnValue generateSilentPacketData(unsigned char *data, unsigned int *length, unsigned char *tag, unsigned char *sy, int cycle, unsigned int dropped, unsigned int max_length) - {debugWarning("call not allowed\n"); return false;}; + {debugWarning("call not allowed\n"); return eCRV_Invalid;}; virtual bool processWriteBlock(char *data, unsigned int nevents, unsigned int offset) {debugWarning("call not allowed\n"); return false;}; @@ -207,5 +215,5 @@ // move to private? - bool xrunOccurred() { return (m_xruns>0); }; // FIXME: m_xruns not updated + bool xrunOccurred() { return m_in_xrun; }; protected: // FIXME: move to private @@ -232,6 +240,4 @@ protected: - unsigned int m_xruns; - StreamProcessorManager *m_manager; @@ -367,4 +373,6 @@ int m_last_cycle; int m_sync_delay; + private: + bool m_in_xrun; protected: // SPM related Index: /branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.cpp =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.cpp (revision 721) +++ /branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.cpp (revision 722) @@ -41,9 +41,9 @@ , m_next_state( ePS_Invalid ) , m_cycle_to_switch_state( 0 ) - , m_xruns( 0 ) , m_manager( NULL ) , m_ticks_per_frame( 0 ) - , m_last_cycle( 0 ) + , m_last_cycle( -1 ) , m_sync_delay( 0 ) + , m_in_xrun( false ) , m_last_timestamp(0) , m_last_timestamp2(0) @@ -166,5 +166,5 @@ unsigned int fc = m_data_buffer->getFrameCounter(); if (getType() == ePT_Receive) { - can_transfer = fc >= (int) nbframes; + can_transfer = (fc >= nbframes); } else { // there has to be enough space to put the frames in @@ -193,10 +193,14 @@ unsigned char channel, unsigned char tag, unsigned char sy, unsigned int cycle, unsigned int dropped) { - - int dropped_cycles = diffCycles(cycle, m_last_cycle) - 1; - if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles); - else m_dropped += dropped_cycles; - if (dropped_cycles > 0) debugWarning("(%p) dropped %d packets on cycle %u\n", this, dropped_cycles, cycle); - m_last_cycle = cycle; + int dropped_cycles = 0; + if (m_last_cycle != (int)cycle && m_last_cycle != -1) { + dropped_cycles = diffCycles(cycle, m_last_cycle) - 1; + if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles); + if (dropped_cycles > 0) { + debugWarning("(%p) dropped %d packets on cycle %u\n", this, dropped_cycles, cycle); + m_dropped += dropped_cycles; + } + m_last_cycle = cycle; + } // bypass based upon state @@ -208,7 +212,4 @@ return RAW1394_ISO_DEFER; } - - // normal processing - enum raw1394_iso_disposition retval = RAW1394_ISO_OK; // store the previous timestamp @@ -254,5 +255,6 @@ // check the packet header - if (processPacketHeader(data, length, channel, tag, sy, cycle, dropped_cycles)) { + enum eChildReturnValue result = processPacketHeader(data, length, channel, tag, sy, cycle, dropped_cycles); + if (result == eCRV_OK) { debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "RECV: CY=%04u TS=%011llu\n", cycle, m_last_timestamp); @@ -301,17 +303,10 @@ debugWarning("(%p) Correcting timestamp for dropped cycles, discarding packet...\n", this); m_data_buffer->setBufferTailTimestamp(m_last_timestamp); - // we don't want this sample to be written - return RAW1394_ISO_OK; - } - - // for all states that reach this we are allowed to - // do protocol specific data reception - bool ok = processPacketData(data, length, channel, tag, sy, cycle, dropped_cycles); - - // if an xrun occured, switch to the dryRunning state and - // allow for the xrun to be picked up - if (!ok) { - debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to xrun\n"); - m_next_state = ePS_DryRunning; + + // this is an xrun situation + m_in_xrun = true; + debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to dropped packet xrun\n"); + m_cycle_to_switch_state = cycle + 1; // switch in the next cycle + m_next_state = ePS_WaitingForStreamDisable; // execute the requested change if (!updateState()) { // we are allowed to change the state directly @@ -321,8 +316,38 @@ return RAW1394_ISO_DEFER; } + + // for all states that reach this we are allowed to + // do protocol specific data reception + enum eChildReturnValue result2 = processPacketData(data, length, channel, tag, sy, cycle, dropped_cycles); + + // if an xrun occured, switch to the dryRunning state and + // allow for the xrun to be picked up + if (result2 == eCRV_XRun) { + m_in_xrun = true; + debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n"); + m_cycle_to_switch_state = cycle+1; // switch in the next cycle + m_next_state = ePS_WaitingForStreamDisable; + // execute the requested change + if (!updateState()) { // we are allowed to change the state directly + debugError("Could not update state!\n"); + return RAW1394_ISO_ERROR; + } + return RAW1394_ISO_DEFER; + } else if(result2 == eCRV_OK) { + // no problem here + return RAW1394_ISO_OK; + } else { + debugError("Invalid response\n"); + return RAW1394_ISO_ERROR; + } + } else if(result == eCRV_Invalid) { + // apparently we don't have to do anything when the packets are not valid + return RAW1394_ISO_OK; } else { - // apparently we don't have to do anything when the packets are not valid - } - return retval; + debugError("Invalid response\n"); + return RAW1394_ISO_ERROR; + } + debugError("reached the unreachable\n"); + return RAW1394_ISO_ERROR; } @@ -338,9 +363,14 @@ } - int dropped_cycles = diffCycles(cycle, m_last_cycle) - 1; - if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles); - else m_dropped += dropped_cycles; - if (dropped_cycles > 0) debugWarning("(%p) dropped %d packets on cycle %u\n", this, dropped_cycles, cycle); - m_last_cycle = cycle; + int dropped_cycles = 0; + if (m_last_cycle != cycle && m_last_cycle != -1) { + dropped_cycles = diffCycles(cycle, m_last_cycle) - 1; + if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles); + if (dropped_cycles > 0) { + debugWarning("(%p) dropped %d packets on cycle %u\n", this, dropped_cycles, cycle); + m_dropped += dropped_cycles; + } + m_last_cycle = cycle; + } // bypass based upon state @@ -431,5 +461,6 @@ else if(m_state == ePS_Running) { // check the packet header - if (generatePacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length)) { + enum eChildReturnValue result = generatePacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length); + if (result == eCRV_Packet) { debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT: CY=%04u TS=%011llu\n", cycle, m_last_timestamp); @@ -450,10 +481,12 @@ } - bool ok = generatePacketData(data, length, tag, sy, cycle, dropped_cycles, max_length); + enum eChildReturnValue result2 = generatePacketData(data, length, tag, sy, cycle, dropped_cycles, max_length); // if an xrun occured, switch to the dryRunning state and // allow for the xrun to be picked up - if (!ok) { - debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to xrun\n"); - m_next_state = ePS_DryRunning; + if (result2 == eCRV_XRun) { + debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to data xrun\n"); + m_in_xrun = true; + m_cycle_to_switch_state = cycle+1; // switch in the next cycle + m_next_state = ePS_WaitingForStreamDisable; // execute the requested change if (!updateState()) { // we are allowed to change the state directly @@ -464,6 +497,42 @@ } return RAW1394_ISO_OK; - } else { // pick up the possible xruns - + } else if (result == eCRV_XRun) { // pick up the possible xruns + debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to WaitingForStreamDisable due to header xrun\n"); + m_in_xrun = true; + m_cycle_to_switch_state = cycle+1; // switch in the next cycle + m_next_state = ePS_WaitingForStreamDisable; + // execute the requested change + if (!updateState()) { // we are allowed to change the state directly + debugError("Could not update state!\n"); + return RAW1394_ISO_ERROR; + } + } else if (result == eCRV_EmptyPacket) { + if(m_state != m_next_state) { + debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n", + ePSToString(m_state), ePSToString(m_next_state)); + // execute the requested change + if (!updateState()) { // we are allowed to change the state directly + debugError("Could not update state!\n"); + return RAW1394_ISO_ERROR; + } + } + goto send_empty_packet; + } else if (result == eCRV_Again) { + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "have to retry cycle %d\n", cycle); + if(m_state != m_next_state) { + debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n", + ePSToString(m_state), ePSToString(m_next_state)); + // execute the requested change + if (!updateState()) { // we are allowed to change the state directly + debugError("Could not update state!\n"); + return RAW1394_ISO_ERROR; + } + } + // force some delay + usleep(125); + return RAW1394_ISO_AGAIN; + } else { + debugError("Invalid return value: %d\n", result); + return RAW1394_ISO_ERROR; } } @@ -968,6 +1037,5 @@ m_data_buffer->setBufferTailTimestamp(m_last_timestamp); } else { - // FIXME - debugError("Implement\n"); + // FIXME: PC=master mode will have to do something here I guess... } break; @@ -1058,5 +1126,5 @@ debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started running at cycle %d\n", this, m_last_cycle); - m_xruns = 0; + m_in_xrun = false; m_data_buffer->setTransparent(false); break; @@ -1306,5 +1374,5 @@ (unsigned int)TICKS_TO_OFFSET(now)); } - debugOutputShort( DEBUG_LEVEL_NORMAL, " Xruns : %d\n", m_xruns); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Xruns : %s\n", (m_in_xrun ? "True":"False")); debugOutputShort( DEBUG_LEVEL_NORMAL, " State : %s\n", ePSToString(m_state)); debugOutputShort( DEBUG_LEVEL_NORMAL, " Next state : %s\n", ePSToString(m_next_state)); Index: /branches/ppalmers-streaming/src/libstreaming/util/IsoHandlerManager.cpp =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/util/IsoHandlerManager.cpp (revision 719) +++ /branches/ppalmers-streaming/src/libstreaming/util/IsoHandlerManager.cpp (revision 722) @@ -41,5 +41,5 @@ m_State(E_Created), m_poll_timeout(100), m_poll_fds(0), m_poll_nfds(0), - m_realtime(false), m_priority(0) + m_realtime(false), m_priority(0), m_xmit_nb_periods( 1 ) { @@ -49,5 +49,5 @@ m_State(E_Created), m_poll_timeout(1), m_poll_fds(0), m_poll_nfds(0), - m_realtime(run_rt), m_priority(rt_prio) + m_realtime(run_rt), m_priority(rt_prio), m_xmit_nb_periods( 1 ) { @@ -400,5 +400,5 @@ if (stream->getStreamType()==IsoStream::eST_Transmit) { // setup the optimal parameters for the raw1394 ISO buffering - unsigned int packets_per_period=stream->getPacketsPerPeriod(); + unsigned int packets_per_period = stream->getPacketsPerPeriod(); #if 1 @@ -410,5 +410,5 @@ unsigned int max_packet_size=MINIMUM_INTERRUPTS_PER_PERIOD * getpagesize() / packets_per_period; if (max_packet_size < stream->getMaxPacketSize()) { - max_packet_size=stream->getMaxPacketSize(); + max_packet_size = stream->getMaxPacketSize(); } @@ -418,5 +418,5 @@ max_packet_size = getpagesize(); - unsigned int irq_interval=packets_per_period / MINIMUM_INTERRUPTS_PER_PERIOD; + unsigned int irq_interval = packets_per_period / MINIMUM_INTERRUPTS_PER_PERIOD; if(irq_interval <= 0) irq_interval=1; #else @@ -427,19 +427,19 @@ // configure it such that we have an irq for every PACKETS_PER_INTERRUPT packets - unsigned int irq_interval=PACKETS_PER_INTERRUPT; + unsigned int irq_interval = PACKETS_PER_INTERRUPT; // unless the period size doesn't allow this if ((packets_per_period/MINIMUM_INTERRUPTS_PER_PERIOD) < irq_interval) { - irq_interval=1; + irq_interval = 1; } // FIXME: test - irq_interval=1; + irq_interval = 1; #warning Using fixed irq_interval - unsigned int max_packet_size=getpagesize() / irq_interval; + unsigned int max_packet_size = getpagesize() / irq_interval; if (max_packet_size < stream->getMaxPacketSize()) { - max_packet_size=stream->getMaxPacketSize(); + max_packet_size = stream->getMaxPacketSize(); } @@ -458,11 +458,8 @@ // buffers get transfered, meaning that we should have at least some // margin here - int buffers=irq_interval * 2; - - // half a period. the xmit handler will take care of this -// int buffers=packets_per_period/4; - - // NOTE: this is dangerous: what if there is not enough prefill? -// if (buffers<10) buffers=10; +// int buffers=irq_interval * 2; + + // we should queue up as much as possible + int buffers = packets_per_period * m_xmit_nb_periods; // create the actual handler Index: /branches/ppalmers-streaming/src/libstreaming/util/IsoHandlerManager.h =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/util/IsoHandlerManager.h (revision 705) +++ /branches/ppalmers-streaming/src/libstreaming/util/IsoHandlerManager.h (revision 722) @@ -43,5 +43,4 @@ namespace Streaming { - class IsoHandler; class IsoStream; @@ -78,4 +77,7 @@ void setPollTimeout(int t) {m_poll_timeout=t;}; ///< set the timeout used for poll() int getPollTimeout() {return m_poll_timeout;}; ///< get the timeout used for poll() + + void setTransmitBufferNbPeriods(unsigned int t) {m_xmit_nb_periods = t;}; + int getTransmitBufferNbPeriods() {return m_xmit_nb_periods;}; void setVerboseLevel(int l); ///< set the verbose level @@ -151,4 +153,7 @@ Util::PosixThread *m_isoManagerThread; + // the preferred number of periods to buffer on xmit + unsigned int m_xmit_nb_periods; + // debug stuff DECLARE_DEBUG_MODULE; Index: /branches/ppalmers-streaming/src/dice/dice_avdevice.cpp =================================================================== --- /branches/ppalmers-streaming/src/dice/dice_avdevice.cpp (revision 715) +++ /branches/ppalmers-streaming/src/dice/dice_avdevice.cpp (revision 722) @@ -448,5 +448,6 @@ DiceAvDevice::prepare() { // prepare receive SP's - for (unsigned int i=0;i