Index: /trunk/libffado/src/libieee1394/IsoHandler.cpp =================================================================== --- /trunk/libffado/src/libieee1394/IsoHandler.cpp (revision 841) +++ /trunk/libffado/src/libieee1394/IsoHandler.cpp (revision 860) @@ -174,8 +174,13 @@ IsoHandler::waitForClient() { - //debugOutput(DEBUG_LEVEL_VERBOSE, "waiting...\n"); + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "waiting...\n"); if(m_Client) { - bool result = m_Client->waitForSignal(); - //debugOutput(DEBUG_LEVEL_VERBOSE, " returns %d\n", result); + bool result; + if (m_type == eHT_Receive) { + result = m_Client->waitForProducePacket(); + } else { + result = m_Client->waitForConsumePacket(); + } + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " returns %d\n", result); return result; } else { @@ -188,8 +193,13 @@ IsoHandler::tryWaitForClient() { - //debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "waiting...\n"); + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "waiting...\n"); if(m_Client) { - bool result = m_Client->tryWaitForSignal(); - //debugOutput(DEBUG_LEVEL_VERBOSE, " returns %d\n", result); + bool result; + if (m_type == eHT_Receive) { + result = m_Client->canProducePacket(); + } else { + result = m_Client->canConsumePacket(); + } + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " returns %d\n", result); return result; } else { @@ -214,14 +224,9 @@ // wait for the availability of frames in the client // (blocking for transmit handlers) -#if 0 //#ifdef DEBUG - if (getType() == eHT_Transmit) { - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p) Waiting for Client to signal frame availability...\n", this); - } -#endif - if (getType() == eHT_Receive || waitForClient()) { - + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) Waiting for Client activity...\n", this, getTypeString()); + if (waitForClient()) { #if ISOHANDLER_USE_POLL bool result = true; - while(result && m_Client && m_Client->canProcessPackets()) { + while(result && m_Client && tryWaitForClient()) { int err = poll(&m_poll_fd, 1, m_poll_timeout); if (err == -1) { @@ -255,5 +260,5 @@ // so poll'ing is not really necessary bool result = true; - while(result && m_Client && m_Client->canProcessPackets()) { + while(result && m_Client && tryWaitForClient()) { result = iterate(); // if (getType() == eHT_Receive) { @@ -272,8 +277,6 @@ bool IsoHandler::iterate() { -// if(m_type==eHT_Receive) { -// debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) Iterating ISO handler\n", -// this, (m_type==eHT_Receive?"Receive":"Transmit")); -// } + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) Iterating ISO handler...\n", + this, getTypeString()); if(m_State == E_Running) { #if ISOHANDLER_FLUSH_BEFORE_ITERATE @@ -285,10 +288,10 @@ return false; } -// debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) done iterating ISO handler\n", -// this, (m_type==eHT_Receive?"Receive":"Transmit")); + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) done interating ISO handler...\n", + this, getTypeString()); return true; } else { debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) Not iterating a non-running handler...\n", - this, (m_type==eHT_Receive?"Receive":"Transmit")); + this, getTypeString()); return false; } @@ -415,5 +418,5 @@ debugOutputShort( DEBUG_LEVEL_NORMAL, " Handler type................: %s\n", - (this->getType() == eHT_Receive ? "Receive" : "Transmit")); + getTypeString()); debugOutputShort( DEBUG_LEVEL_NORMAL, " Port, Channel...............: %2d, %2d\n", m_manager.get1394Service().getPort(), channel); @@ -475,7 +478,7 @@ unsigned int cycle, unsigned int dropped) { - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, +/* debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "received packet: length=%d, channel=%d, cycle=%d\n", - length, channel, cycle ); + length, channel, cycle );*/ m_packetcount++; m_dropped += dropped; @@ -494,7 +497,7 @@ int cycle, unsigned int dropped) { - debugOutput( DEBUG_LEVEL_ULTRA_VERBOSE, +/* debugOutput( DEBUG_LEVEL_ULTRA_VERBOSE, "sending packet: length=%d, cycle=%d\n", - *length, cycle ); + *length, cycle );*/ m_packetcount++; m_dropped += dropped; @@ -503,4 +506,7 @@ return m_Client->getPacket(data, length, tag, sy, cycle, dropped, m_max_packet_size); } + *tag = 0; + *sy = 0; + *length = 0; return RAW1394_ISO_OK; } @@ -594,2 +600,16 @@ return true; } + +/** + * @brief convert a EHandlerType to a string + * @param t the type + * @return a char * describing the state + */ +const char * +IsoHandler::eHTToString(enum EHandlerType t) { + switch (t) { + case eHT_Receive: return "Receive"; + case eHT_Transmit: return "Transmit"; + default: return "error: unknown type"; + } +} Index: /trunk/libffado/src/libieee1394/IsoHandler.h =================================================================== --- /trunk/libffado/src/libieee1394/IsoHandler.h (revision 803) +++ /trunk/libffado/src/libieee1394/IsoHandler.h (revision 860) @@ -100,4 +100,8 @@ void flush(); enum EHandlerType getType() {return m_type;}; + const char *getTypeString() {return eHTToString(m_type); }; + + // pretty printing + const char *eHTToString(enum EHandlerType); bool isEnabled() Index: /trunk/libffado/src/libstreaming/StreamProcessorManager.cpp =================================================================== --- /trunk/libffado/src/libstreaming/StreamProcessorManager.cpp (revision 857) +++ /trunk/libffado/src/libstreaming/StreamProcessorManager.cpp (revision 860) @@ -712,8 +712,14 @@ while(period_not_ready) { debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for period (%d frames in buffer)...\n", m_SyncSource->getBufferFill()); - if(!m_SyncSource->waitForSignal()) { - debugError("Error waiting for signal\n"); - return false; - } + bool result; + if(m_SyncSource->getType() == StreamProcessor::ePT_Receive) { + result = m_SyncSource->waitForConsumePeriod(); + } else { + result = m_SyncSource->waitForProducePeriod(); + } +// if(!result) { +// debugError("Error waiting for signal\n"); +// return false; +// } // HACK: this should be solved more elegantly @@ -722,5 +728,13 @@ it != m_ReceiveProcessors.end(); ++it ) { - bool this_sp_period_ready = (*it)->canClientTransferFrames(m_period); + bool this_sp_period_ready = (*it)->canConsumePeriod(); + if (!this_sp_period_ready) { + period_not_ready = true; + } + } + for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); + it != m_TransmitProcessors.end(); + ++it ) { + bool this_sp_period_ready = (*it)->canProducePeriod(); if (!this_sp_period_ready) { period_not_ready = true; Index: /trunk/libffado/src/libstreaming/generic/StreamProcessor.h =================================================================== --- /trunk/libffado/src/libstreaming/generic/StreamProcessor.h (revision 857) +++ /trunk/libffado/src/libstreaming/generic/StreamProcessor.h (revision 860) @@ -34,5 +34,6 @@ #include "debugmodule/debugmodule.h" -#include + +#include class Ieee1394Service; @@ -161,48 +162,22 @@ bool putFrames(unsigned int nbframes, int64_t ts); ///< transfer the client contents to the buffer - unsigned int getSignalPeriod() {return m_signal_period;}; - bool setSignalPeriod(unsigned int p) {m_signal_period=p; return true;}; - /** - * @brief waits for a 'signal' (blocking) - * - * a 'signal' is: - * when type==Receive: - * - one signal_period of frames is present in the buffer - * (received by the iso side) - * - an error has occurred (xrun, iso error, ...) - * when type==Transmit: - * - at least one signal_period of frames are present in the buffer - * (have been written into it by the client) - * - an error occurred - * - * @return true if the 'signal' is available, false if error - */ - bool waitForSignal(); - - /** - * @brief checks for a 'signal' (non-blocking) - * - * a 'signal' is: - * when type==Receive: - * - one signal_period of frames is present in the buffer - * (received by the iso side) - * - an error has occurred (xrun, iso error, ...) - * when type==Transmit: - * - at least one signal_period of frames are present in the buffer - * (have been written into it by the client) - * - an error occurred - * - * @return true if the 'signal' is available, false if not (or error) - */ - bool tryWaitForSignal(); - - /** - * @brief can a SP process (queue, dequeue) packets at this moment? - * - * - * @return true if packet processing makes sense - */ - bool canProcessPackets(); - + //FIXME: document wait functions + bool waitForProducePacket(); + bool waitForProducePeriod(); + bool waitForProduce(unsigned int nframes); + + bool waitForConsumePacket(); + bool waitForConsumePeriod(); + bool waitForConsume(unsigned int nframes); + + bool canProducePacket(); + bool canProducePeriod(); + bool canProduce(unsigned int nframes); + + bool canConsumePacket(); + bool canConsumePeriod(); + bool canConsume(unsigned int nframes); + +public: /** * @brief drop nframes from the internal buffer as if they were transferred to the client side @@ -503,7 +478,6 @@ private: bool m_in_xrun; - sem_t m_signal_semaphore; - unsigned int m_signal_period; - unsigned int m_signal_offset; + pthread_mutex_t m_activity_cond_lock; + pthread_cond_t m_activity_cond; public: Index: /trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp =================================================================== --- /trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp (revision 857) +++ /trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp (revision 860) @@ -40,15 +40,8 @@ #include -/* -#define POST_SEMAPHORE { \ - int tmp; \ - sem_getvalue(&m_signal_semaphore, &tmp); \ - debugWarning("posting semaphore from value %d\n", tmp); \ - sem_post(&m_signal_semaphore); \ -} -*/ - -#define POST_SEMAPHORE { \ - sem_post(&m_signal_semaphore); \ +#define SIGNAL_ACTIVITY { \ + pthread_mutex_lock(&m_activity_cond_lock); \ + pthread_cond_broadcast(&m_activity_cond); \ + pthread_mutex_unlock(&m_activity_cond_lock); \ } @@ -78,9 +71,9 @@ , m_sync_delay( 0 ) , m_in_xrun( false ) - , m_signal_period( 0 ) - , m_signal_offset( 0 ) { // create the timestamped buffer and register ourselves as its client m_data_buffer = new Util::TimestampedBuffer(this); + pthread_mutex_init(&m_activity_cond_lock, NULL); + pthread_cond_init(&m_activity_cond, NULL); } @@ -90,8 +83,17 @@ debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister stream processor with the Iso manager\n"); } - // make the threads leave the wait condition - POST_SEMAPHORE; - sem_destroy(&m_signal_semaphore); - + + // lock the condition mutex to keep threads from blocking on + // the condition var while we destroy it + pthread_mutex_lock(&m_activity_cond_lock); + // now signal activity, releasing threads that + // are already blocking on the condition variable + pthread_cond_broadcast(&m_activity_cond); + // then destroy it + pthread_cond_destroy(&m_activity_cond); + pthread_mutex_unlock(&m_activity_cond_lock); + + // destroy the mutexes + pthread_mutex_destroy(&m_activity_cond_lock); if (m_data_buffer) delete m_data_buffer; if (m_scratch_buffer) delete[] m_scratch_buffer; @@ -316,5 +318,4 @@ if (m_state == ePS_Invalid) { debugError("Should not have state %s\n", ePSToString(m_state) ); - POST_SEMAPHORE; return RAW1394_ISO_ERROR; } @@ -338,5 +339,4 @@ if (!updateState()) { // we are allowed to change the state directly debugError("Could not update state!\n"); - POST_SEMAPHORE; return RAW1394_ISO_ERROR; } @@ -358,5 +358,4 @@ if (!updateState()) { // we are allowed to change the state directly debugError("Could not update state!\n"); - POST_SEMAPHORE; return RAW1394_ISO_ERROR; } @@ -383,5 +382,4 @@ if (!updateState()) { // we are allowed to change the state directly debugError("Could not update state!\n"); - POST_SEMAPHORE; return RAW1394_ISO_ERROR; } @@ -417,5 +415,4 @@ if (!updateState()) { // we are allowed to change the state directly debugError("Could not update state!\n"); - POST_SEMAPHORE; return RAW1394_ISO_ERROR; } @@ -435,5 +432,4 @@ if (!updateState()) { // we are allowed to change the state directly debugError("Could not update state!\n"); - POST_SEMAPHORE; return RAW1394_ISO_ERROR; } @@ -455,38 +451,13 @@ if (!updateState()) { // we are allowed to change the state directly debugError("Could not update state!\n"); - POST_SEMAPHORE; return RAW1394_ISO_ERROR; } - POST_SEMAPHORE; return RAW1394_ISO_DEFER; } else if(result2 == eCRV_OK) { // no problem here - // if we have enough samples, we can post the semaphore and - // defer further processing until later. this will allow us to - // run the client and process the frames such that we can put them - // into the xmit buffers ASAP - if (m_state == ePS_Running) { - unsigned int bufferfill = m_data_buffer->getBufferFill(); - if(bufferfill >= m_signal_period + m_signal_offset) { - // this to avoid multiple signals for the same period - int semval; - sem_getvalue(&m_signal_semaphore, &semval); - // NOTE: this can cause 2 posts to be done when the receiving thread - // decreases the semaphore but hasn't processed the frames yet - unsigned int signal_period = m_signal_period * (semval + 1) + m_signal_offset; - if(bufferfill >= signal_period) { - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p) buffer fill (%d) > signal period (%d), sem_val=%d\n", - this, m_data_buffer->getBufferFill(), signal_period, semval); - POST_SEMAPHORE; - } - // the process thread should have higher prio such that we are blocked until - // the samples are processed. - return RAW1394_ISO_DEFER; - } - } + SIGNAL_ACTIVITY; return RAW1394_ISO_OK; } else { debugError("Invalid response\n"); - POST_SEMAPHORE; return RAW1394_ISO_ERROR; } @@ -496,9 +467,7 @@ } else { debugError("Invalid response\n"); - POST_SEMAPHORE; return RAW1394_ISO_ERROR; } debugError("reached the unreachable\n"); - POST_SEMAPHORE; return RAW1394_ISO_ERROR; } @@ -633,5 +602,5 @@ // assumed not to xrun - enum eChildReturnValue result2 = generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length); + generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length); return RAW1394_ISO_OK; } else { @@ -789,8 +758,11 @@ */ bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) { + bool result; debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.getFrames(%d, %11llu)", nbframes, ts); assert( getType() == ePT_Receive ); - if(isDryRunning()) return getFramesDry(nbframes, ts); - else return getFramesWet(nbframes, ts); + if(isDryRunning()) result = getFramesDry(nbframes, ts); + else result = getFramesWet(nbframes, ts); + SIGNAL_ACTIVITY; + return result; } @@ -847,15 +819,20 @@ StreamProcessor::dropFrames(unsigned int nbframes, int64_t ts) { + bool result; debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d, %lld)\n", nbframes, ts); - return m_data_buffer->dropFrames(nbframes); + result = m_data_buffer->dropFrames(nbframes); + SIGNAL_ACTIVITY; + return result; } bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts) { + bool result; debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.putFrames(%d, %11llu)", nbframes, ts); assert( getType() == ePT_Transmit ); - - if(isDryRunning()) return putFramesDry(nbframes, ts); - else return putFramesWet(nbframes, ts); + if(isDryRunning()) result = putFramesDry(nbframes, ts); + else result = putFramesWet(nbframes, ts); + SIGNAL_ACTIVITY; + return result; } @@ -867,14 +844,4 @@ m_data_buffer->blockProcessWriteFrames(nbframes, ts); debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, " New timestamp: %llu\n", ts); - - unsigned int bufferfill = m_data_buffer->getBufferFill(); - if (bufferfill >= m_signal_period + m_signal_offset) { - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p) sufficient frames in buffer (%d / %d), posting semaphore\n", - this, bufferfill, m_signal_period + m_signal_offset); - POST_SEMAPHORE; - } else { - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p) insufficient frames in buffer (%d / %d), not posting semaphore\n", - this, bufferfill, m_signal_period + m_signal_offset); - } return true; // FIXME: what about failure? } @@ -911,86 +878,6 @@ } - unsigned int bufferfill = m_data_buffer->getBufferFill(); - if (bufferfill >= m_signal_period + m_signal_offset) { - debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) sufficient frames in buffer (%d / %d), posting semaphore\n", - this, bufferfill, m_signal_period + m_signal_offset); - POST_SEMAPHORE; - } else { - debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) insufficient frames in buffer (%d / %d), not posting semaphore\n", - this, bufferfill, m_signal_period + m_signal_offset); - } - + SIGNAL_ACTIVITY; return true; -} - -bool -StreamProcessor::waitForSignal() -{ - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) wait ...\n", this, getTypeString()); - int result; - if(m_state == ePS_Running && m_next_state == ePS_Running) { - // check whether we already fullfil the criterion - unsigned int bufferfill = m_data_buffer->getBufferFill(); - if(bufferfill >= m_signal_period + m_signal_offset) { - return true; - } - - result = sem_wait(&m_signal_semaphore); -#ifdef DEBUG - int tmp; - sem_getvalue(&m_signal_semaphore, &tmp); - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " sem_wait returns: %d, sem_value: %d\n", result, tmp); -#endif - return result == 0; - } else { - // when we're not running, we can always provide frames - // when we're in a state transition, keep iterating too - debugOutput(DEBUG_LEVEL_VERBOSE, "Not running...\n"); - return true; - } -} - -bool -StreamProcessor::tryWaitForSignal() -{ - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) trywait ...\n", this, getTypeString()); - int result; - if(m_state == ePS_Running && m_next_state == ePS_Running) { - // check whether we already fullfil the criterion - unsigned int bufferfill = m_data_buffer->getBufferFill(); - if(bufferfill >= m_signal_period + m_signal_offset) { - return true; - } - - result = sem_trywait(&m_signal_semaphore); -#ifdef DEBUG - int tmp; - sem_getvalue(&m_signal_semaphore, &tmp); - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " sem_wait returns: %d, sem_value: %d\n", result, tmp); -#endif - return result == 0; - } else { - // when we're not running, we can always provide frames - // when we're in a state transition, keep iterating too - debugOutput(DEBUG_LEVEL_VERBOSE, "Not running...\n"); - return true; - } -} - -bool -StreamProcessor::canProcessPackets() -{ - if(m_state != ePS_Running || m_next_state != ePS_Running) return true; - bool result; - unsigned int bufferfill; - if(getType() == ePT_Receive) { - bufferfill = m_data_buffer->getBufferSpace(); - } else { - bufferfill = m_data_buffer->getBufferFill(); - } - result = bufferfill > getNominalFramesPerPacket(); -// debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) for a bufferfill of %d, we return %d\n", -// this, ePTToString(getType()), bufferfill, result); - return result; } @@ -998,12 +885,16 @@ StreamProcessor::shiftStream(int nbframes) { + bool result; if(nbframes == 0) return true; if(nbframes > 0) { - return m_data_buffer->dropFrames(nbframes); + result = m_data_buffer->dropFrames(nbframes); + SIGNAL_ACTIVITY; + return result; } else { - bool result = true; + result = true; while(nbframes++) { result &= m_data_buffer->writeDummyFrame(); } + SIGNAL_ACTIVITY; return result; } @@ -1089,9 +980,4 @@ debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "init...\n"); - if (sem_init(&m_signal_semaphore, 0, 0) == -1) { - debugError("Could not init signal semaphore"); - return false; - } - if(!m_IsoHandlerManager.registerStream(this)) { debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register stream processor with the Iso manager\n"); @@ -1170,5 +1056,7 @@ m_cycle_to_switch_state = TICKS_TO_CYCLES(time_instant); m_next_state = state; - POST_SEMAPHORE; // needed to ensure that things don't get deadlocked + // wake up any threads that might be waiting on data in the buffers + // since a state transition can cause data to become available + SIGNAL_ACTIVITY; return true; } @@ -1417,4 +1305,5 @@ } #endif + SIGNAL_ACTIVITY; return result; } @@ -1450,4 +1339,5 @@ } #endif + SIGNAL_ACTIVITY; return true; } @@ -1499,4 +1389,5 @@ } #endif + SIGNAL_ACTIVITY; return result; } @@ -1521,8 +1412,4 @@ // this basically means nothing, the state change will // be picked up by the packet iterator - - sem_init(&m_signal_semaphore, 0, 0); - m_signal_period = m_StreamProcessorManager.getPeriodSize(); - m_signal_offset = 0; // FIXME: we have to ensure that everyone is ready if(!m_data_buffer->clearBuffer()) { @@ -1538,9 +1425,5 @@ return false; } - if (m_data_buffer->getBufferFill() >= m_signal_period + m_signal_offset) { - POST_SEMAPHORE; - } - } - + } break; default: @@ -1555,4 +1438,5 @@ } #endif + SIGNAL_ACTIVITY; return true; } @@ -1592,4 +1476,5 @@ } #endif + SIGNAL_ACTIVITY; return result; } @@ -1611,7 +1496,4 @@ case ePS_Running: // the thread will do the transition - - // we have to wake the iterator if it's asleep - POST_SEMAPHORE; break; default: @@ -1626,4 +1508,5 @@ } #endif + SIGNAL_ACTIVITY; return true; } @@ -1648,5 +1531,4 @@ return true; } - // after creation, only initialization is allowed if (m_state == ePS_Created) { @@ -1656,5 +1538,5 @@ // do init here result = doStop(); - if (result) {POST_SEMAPHORE; return true;} + if (result) {return true;} else goto updateState_exit_change_failed; } @@ -1666,5 +1548,5 @@ } result = doWaitForRunningStream(); - if (result) {POST_SEMAPHORE; return true;} + if (result) {return true;} else goto updateState_exit_change_failed; } @@ -1677,5 +1559,5 @@ } result = doDryRunning(); - if (result) {POST_SEMAPHORE; return true;} + if (result) {return true;} else goto updateState_exit_change_failed; } @@ -1694,5 +1576,5 @@ result = doWaitForStreamEnable(); } - if (result) {POST_SEMAPHORE; return true;} + if (result) {return true;} else goto updateState_exit_change_failed; } @@ -1711,5 +1593,5 @@ result = doRunning(); } - if (result) {POST_SEMAPHORE; return true;} + if (result) {return true;} else goto updateState_exit_change_failed; } @@ -1721,5 +1603,5 @@ } result = doWaitForStreamDisable(); - if (result) {POST_SEMAPHORE; return true;} + if (result) {return true;} else goto updateState_exit_change_failed; } @@ -1731,5 +1613,5 @@ } result = doDryRunning(); - if (result) {POST_SEMAPHORE; return true;} + if (result) {return true;} else goto updateState_exit_change_failed; } @@ -1739,11 +1621,163 @@ debugError("Invalid state transition: %s => %s\n", ePSToString(m_state), ePSToString(next_state)); - POST_SEMAPHORE; + SIGNAL_ACTIVITY; return false; updateState_exit_change_failed: debugError("State transition failed: %s => %s\n", ePSToString(m_state), ePSToString(next_state)); - POST_SEMAPHORE; + SIGNAL_ACTIVITY; return false; +} + +bool StreamProcessor::waitForProducePacket() +{ + return waitForProduce(getNominalFramesPerPacket()); +} +bool StreamProcessor::waitForProducePeriod() +{ + return waitForProduce(m_StreamProcessorManager.getPeriodSize()); +} +bool StreamProcessor::waitForProduce(unsigned int nframes) +{ + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p) wait ...\n", this); + struct timespec ts; + int result; + + if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { + debugError("clock_gettime failed\n"); + return false; + } + + // FIXME: hardcoded timeout of 0.1 sec + ts.tv_nsec += 100 * 1000000LL; + if (ts.tv_nsec > 1000000000LL) { + ts.tv_sec += 1; + ts.tv_nsec -= 1000000000LL; + } + + pthread_mutex_lock(&m_activity_cond_lock); + while(!canProduce(nframes)) { + result = pthread_cond_timedwait(&m_activity_cond, &m_activity_cond_lock, &ts); + + if (result == -1) { + if (errno == ETIMEDOUT) { + debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) pthread_cond_timedwait() timed out\n", this); + pthread_mutex_unlock(&m_activity_cond_lock); + return false; + } else { + debugError("(%p) pthread_cond_timedwait error\n", this); + pthread_mutex_unlock(&m_activity_cond_lock); + return false; + } + } + } + pthread_mutex_unlock(&m_activity_cond_lock); + return true; +} + +bool StreamProcessor::waitForConsumePacket() +{ + return waitForConsume(getNominalFramesPerPacket()); +} +bool StreamProcessor::waitForConsumePeriod() +{ + return waitForConsume(m_StreamProcessorManager.getPeriodSize()); +} +bool StreamProcessor::waitForConsume(unsigned int nframes) +{ + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p) wait ...\n", this); + struct timespec ts; + int result; + + if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { + debugError("clock_gettime failed\n"); + return false; + } + + // FIXME: hardcoded timeout of 0.1 sec + ts.tv_nsec += 100 * 1000000LL; + if (ts.tv_nsec > 1000000000LL) { + ts.tv_sec += 1; + ts.tv_nsec -= 1000000000LL; + } + + pthread_mutex_lock(&m_activity_cond_lock); + while(!canConsume(nframes)) { + result = pthread_cond_timedwait(&m_activity_cond, &m_activity_cond_lock, &ts); + + if (result == -1) { + if (errno == ETIMEDOUT) { + debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) pthread_cond_timedwait() timed out\n", this); + pthread_mutex_unlock(&m_activity_cond_lock); + return false; + } else { + debugError("(%p) pthread_cond_timedwait error\n", this); + pthread_mutex_unlock(&m_activity_cond_lock); + return false; + } + } + } + pthread_mutex_unlock(&m_activity_cond_lock); + return true; +} + +bool StreamProcessor::canProducePacket() +{ + return canProduce(getNominalFramesPerPacket()); +} +bool StreamProcessor::canProducePeriod() +{ + return canProduce(m_StreamProcessorManager.getPeriodSize()); +} +bool StreamProcessor::canProduce(unsigned int nframes) +{ + if(m_state == ePS_Running && m_next_state == ePS_Running) { + // check whether we already fullfil the criterion + unsigned int bufferspace = m_data_buffer->getBufferSpace(); + if(bufferspace >= nframes) { +// debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "enough space (%u)...\n", bufferspace); + return true; + } else return false; + } else { + if(getType() == ePT_Transmit) { + // if we are an xmit SP, we cannot accept frames + // when not running + return false; + } else { + // if we are a receive SP, we can always accept frames + // when not running + return true; + } + } +} + +bool StreamProcessor::canConsumePacket() +{ + return canConsume(getNominalFramesPerPacket()); +} +bool StreamProcessor::canConsumePeriod() +{ + return canConsume(m_StreamProcessorManager.getPeriodSize()); +} +bool StreamProcessor::canConsume(unsigned int nframes) +{ + if(m_state == ePS_Running && m_next_state == ePS_Running) { + // check whether we already fullfil the criterion + unsigned int bufferfill = m_data_buffer->getBufferFill(); + if(bufferfill >= nframes) { +// debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "enough items (%u)...\n", bufferfill); + return true; + } else return false; + } else { + if(getType() == ePT_Transmit) { + // if we are an xmit SP, and we're not running, + // we can always provide frames + return true; + } else { + // if we are a receive SP, we can't provide frames + // when not running + return false; + } + } }