Index: /trunk/libffado/src/libieee1394/IsoHandlerManager.cpp =================================================================== --- /trunk/libffado/src/libieee1394/IsoHandlerManager.cpp (revision 993) +++ /trunk/libffado/src/libieee1394/IsoHandlerManager.cpp (revision 1005) @@ -25,5 +25,4 @@ #include "IsoHandlerManager.h" #include "ieee1394service.h" -#include "IsoHandler.h" #include "libstreaming/generic/StreamProcessor.h" @@ -42,8 +41,14 @@ // --- ISO Thread --- // -IsoTask::IsoTask(IsoHandlerManager& manager) +IsoTask::IsoTask(IsoHandlerManager& manager, enum IsoHandler::EHandlerType t) : m_manager( manager ) , m_SyncIsoHandler ( NULL ) -{ + , m_handlerType( t ) +{ +} + +IsoTask::~IsoTask() +{ + sem_destroy(&m_activity_semaphore); } @@ -65,4 +70,5 @@ #endif + sem_init(&m_activity_semaphore, 0, 0); return true; } @@ -89,4 +95,7 @@ IsoHandler *h = m_manager.m_IsoHandlers.at(i); assert(h); + + // skip the handlers not intended for us + if(h->getType() != m_handlerType) continue; if (h->isEnabled()) { @@ -129,5 +138,6 @@ { debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, - "(%p) Execute\n", this); + "(%p, %s) Execute\n", + this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); int err; unsigned int i; @@ -139,8 +149,9 @@ if(diff < 100) { debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, - "(%p) short loop detected (%d usec), cnt: %d\n", - this, diff, m_successive_short_loops); + "(%p, %s) short loop detected (%d usec), cnt: %d\n", + this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"), + diff, m_successive_short_loops); m_successive_short_loops++; - if(m_successive_short_loops > 100) { + if(m_successive_short_loops > 10000) { debugError("Shutting down runaway thread\n"); return false; @@ -163,6 +174,6 @@ if (m_poll_nfds_shadow == 0) { debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, - "(%p) bypass iterate since no handlers to poll\n", - this); + "(%p, %s) bypass iterate since no handlers to poll\n", + this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); usleep(m_poll_timeout * 1000); return true; @@ -179,23 +190,16 @@ short events = 0; IsoHandler *h = m_IsoHandler_map_shadow[i]; - if(h->getType() == IsoHandler::eHT_Transmit) { - // we should only poll on a transmit handler - // that has a client that is ready to send - // something. Otherwise it will end up in - // busy wait looping since the packet function - // will defer processing (also avoids the - // AGAIN problem) - if (h->tryWaitForClient()) { - events = POLLIN | POLLPRI; - no_one_to_poll = false; - } - } else { - // a receive handler should only be polled if - // it's client doesn't already have enough data - // and if it can still accept data. - if (h->tryWaitForClient()) { // FIXME - events = POLLIN | POLLPRI; - no_one_to_poll = false; - } + // we should only poll on a transmit handler + // that has a client that is ready to send + // something. Otherwise it will end up in + // busy wait looping since the packet function + // will defer processing (also avoids the + // AGAIN problem) + if (h->canIterateClient()) { + events = POLLIN | POLLPRI; + no_one_to_poll = false; + // if we are going to poll() it, let's ensure + // it can run until someone wants it to exit + h->allowIterateLoop(); } m_poll_fds_shadow[i].events = events; @@ -203,39 +207,37 @@ if(no_one_to_poll) { - debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, - "(%p) No one to poll, waiting on the sync handler to become ready\n", - this); - - if(!m_SyncIsoHandler->waitForClient()) { - debugError("Failed to wait for client\n"); - // This can be due to error or due to timeout - - // sleep for a while - usleep(m_poll_timeout * 1000); // FIXME - // exit this iteration loop - return true; - } - - #ifdef DEBUG - // if this happens we end up in a deadlock! - if(!m_SyncIsoHandler->tryWaitForClient()) { - debugFatal("inconsistency in wait functions!\n"); - return false; - } - #endif - - debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, - "(%p) sync handler ready\n", - this); + debugOutputExtreme(DEBUG_LEVEL_VERBOSE, + "(%p, %s) No one to poll, waiting for something to happen\n", + this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); + // wait for something to happen + switch(waitForActivity()) { + case IsoTask::eAR_Error: + debugError("Error while waiting for activity\n"); + return false; + case IsoTask::eAR_Interrupted: + // FIXME: what to do here? + debugWarning("Interrupted while waiting for activity\n"); + break; + case IsoTask::eAR_Timeout: + // FIXME: what to do here? + debugWarning("Timeout while waiting for activity\n"); + break; + case IsoTask::eAR_Activity: + // do nothing + debugOutputExtreme(DEBUG_LEVEL_VERBOSE, + "(%p, %s) something happened\n", + this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); + break; + } } } // Use a shadow map of the fd's such that we don't have to update - // the fd map everytime we run poll(). It doesn't change that much - // anyway + // the fd map everytime we run poll(). err = poll (m_poll_fds_shadow, m_poll_nfds_shadow, m_poll_timeout); if (err < 0) { if (errno == EINTR) { + debugOutput(DEBUG_LEVEL_VERBOSE, "Ignoring poll return due to signal\n"); return true; } @@ -247,7 +249,8 @@ #ifdef DEBUG if(m_poll_fds_shadow[i].revents) { - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, - "(%p) received events: %08X for (%d/%d, %p, %s)\n", - this, m_poll_fds_shadow[i].revents, + debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, + "(%p, %s) received events: %08X for (%d/%d, %p, %s)\n", + this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"), + m_poll_fds_shadow[i].revents, i, m_poll_nfds_shadow, m_IsoHandler_map_shadow[i], @@ -283,5 +286,67 @@ } return true; - +} + +enum IsoTask::eActivityResult +IsoTask::waitForActivity() +{ + debugOutputExtreme(DEBUG_LEVEL_VERBOSE, + "(%p, %s) waiting for activity\n", + this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); + struct timespec ts; + int result; + + if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { + debugError("clock_gettime failed\n"); + return eAR_Error; + } + long long int timeout_nsec=0; + int timeout_sec = 0; + + timeout_nsec = ISOHANDLERMANAGER_ISO_TASK_WAIT_TIMEOUT_USECS * 1000LL; + timeout_sec = 0; + while(timeout_nsec >= 1000000000LL) { + timeout_sec += 1; + timeout_nsec -= 1000000000LL; + } + ts.tv_nsec += timeout_nsec; + ts.tv_sec += timeout_sec; + + result = sem_timedwait(&m_activity_semaphore, &ts); + + if(result != 0) { + if (result == ETIMEDOUT) { + debugOutput(DEBUG_LEVEL_VERBOSE, + "(%p) pthread_cond_timedwait() timed out (result=%d)\n", + this, result); + return eAR_Timeout; + } else if (result == EINTR) { + debugOutput(DEBUG_LEVEL_VERBOSE, + "(%p) pthread_cond_[timed]wait() interrupted by signal (result=%d)\n", + this, result); + return eAR_Interrupted; + } else { + debugError("(%p) pthread_cond_[timed]wait error (result=%d)\n", + this, result); + debugError("(%p) timeout_sec=%d timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n", + this, timeout_sec, timeout_nsec, ts.tv_sec, ts.tv_nsec); + return eAR_Error; + } + } + + debugOutputExtreme(DEBUG_LEVEL_VERBOSE, + "(%p, %s) got activity\n", + this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); + return eAR_Activity; +} + +void +IsoTask::signalActivity() +{ + // signal the activity cond var + sem_post(&m_activity_semaphore); + debugOutputExtreme(DEBUG_LEVEL_VERBOSE, + "(%p, %s) activity\n", + this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); } @@ -295,7 +360,10 @@ , m_service( service ) , m_realtime(false), m_priority(0) - , m_IsoThread ( NULL ) - , m_IsoTask ( NULL ) -{} + , m_IsoThreadTransmit ( NULL ) + , m_IsoTaskTransmit ( NULL ) + , m_IsoThreadReceive ( NULL ) + , m_IsoTaskReceive ( NULL ) +{ +} IsoHandlerManager::IsoHandlerManager(Ieee1394Service& service, bool run_rt, int rt_prio) @@ -303,7 +371,10 @@ , m_service( service ) , m_realtime(run_rt), m_priority(rt_prio) - , m_IsoThread ( NULL ) - , m_IsoTask ( NULL ) -{} + , m_IsoThreadTransmit ( NULL ) + , m_IsoTaskTransmit ( NULL ) + , m_IsoThreadReceive ( NULL ) + , m_IsoTaskReceive ( NULL ) +{ +} IsoHandlerManager::~IsoHandlerManager() @@ -314,10 +385,17 @@ debugError("Still some handlers in use\n"); } - if (m_IsoThread) { - m_IsoThread->Stop(); - delete m_IsoThread; - } - if (m_IsoTask) { - delete m_IsoTask; + if (m_IsoThreadTransmit) { + m_IsoThreadTransmit->Stop(); + delete m_IsoThreadTransmit; + } + if (m_IsoThreadReceive) { + m_IsoThreadReceive->Stop(); + delete m_IsoThreadReceive; + } + if (m_IsoTaskTransmit) { + delete m_IsoTaskTransmit; + } + if (m_IsoTaskReceive) { + delete m_IsoTaskReceive; } } @@ -326,5 +404,6 @@ IsoHandlerManager::requestShadowMapUpdate() { - if(m_IsoTask) m_IsoTask->requestShadowMapUpdate(); + if(m_IsoTaskTransmit) m_IsoTaskTransmit->requestShadowMapUpdate(); + if(m_IsoTaskReceive) m_IsoTaskReceive->requestShadowMapUpdate(); } @@ -336,9 +415,20 @@ m_priority = priority; - if (m_IsoThread) { + if (m_IsoThreadTransmit) { if (m_realtime) { - m_IsoThread->AcquireRealTime(m_priority); + m_IsoThreadTransmit->AcquireRealTime(m_priority + + ISOHANDLERMANAGER_ISO_PRIO_INCREASE + + ISOHANDLERMANAGER_ISO_PRIO_INCREASE_XMIT); } else { - m_IsoThread->DropRealTime(); + m_IsoThreadTransmit->DropRealTime(); + } + } + if (m_IsoThreadReceive) { + if (m_realtime) { + m_IsoThreadReceive->AcquireRealTime(m_priority + + ISOHANDLERMANAGER_ISO_PRIO_INCREASE + + ISOHANDLERMANAGER_ISO_PRIO_INCREASE_RECV); + } else { + m_IsoThreadReceive->DropRealTime(); } } @@ -356,16 +446,33 @@ } - // create a thread to iterate our ISO handlers - debugOutput( DEBUG_LEVEL_VERBOSE, "Create iso thread for %p...\n", this); - m_IsoTask = new IsoTask( *this ); - if(!m_IsoTask) { + // create threads to iterate our ISO handlers + debugOutput( DEBUG_LEVEL_VERBOSE, "Create iso thread for %p transmit...\n", this); + m_IsoTaskTransmit = new IsoTask( *this, IsoHandler::eHT_Transmit ); + if(!m_IsoTaskTransmit) { debugFatal("No task\n"); return false; } - m_IsoThread = new Util::PosixThread(m_IsoTask, m_realtime, - m_priority + ISOHANDLERMANAGER_ISO_PRIO_INCREASE, - PTHREAD_CANCEL_DEFERRED); - - if(!m_IsoThread) { + m_IsoThreadTransmit = new Util::PosixThread(m_IsoTaskTransmit, m_realtime, + m_priority + ISOHANDLERMANAGER_ISO_PRIO_INCREASE + + ISOHANDLERMANAGER_ISO_PRIO_INCREASE_XMIT, + PTHREAD_CANCEL_DEFERRED); + + if(!m_IsoThreadTransmit) { + debugFatal("No thread\n"); + return false; + } + + debugOutput( DEBUG_LEVEL_VERBOSE, "Create iso thread for %p receive...\n", this); + m_IsoTaskReceive = new IsoTask( *this, IsoHandler::eHT_Receive ); + if(!m_IsoTaskReceive) { + debugFatal("No task\n"); + return false; + } + m_IsoThreadReceive = new Util::PosixThread(m_IsoTaskReceive, m_realtime, + m_priority + ISOHANDLERMANAGER_ISO_PRIO_INCREASE + + ISOHANDLERMANAGER_ISO_PRIO_INCREASE_RECV, + PTHREAD_CANCEL_DEFERRED); + + if(!m_IsoThreadReceive) { debugFatal("No thread\n"); return false; @@ -374,6 +481,9 @@ Util::Watchdog *watchdog = m_service.getWatchdog(); if(watchdog) { - if(!watchdog->registerThread(m_IsoThread)) { - debugWarning("could not register iso thread with watchdog\n"); + if(!watchdog->registerThread(m_IsoThreadTransmit)) { + debugWarning("could not register iso transmit thread with watchdog\n"); + } + if(!watchdog->registerThread(m_IsoThreadReceive)) { + debugWarning("could not register iso receive thread with watchdog\n"); } } else { @@ -381,6 +491,10 @@ } - if (m_IsoThread->Start() != 0) { - debugFatal("Could not start ISO thread\n"); + if (m_IsoThreadTransmit->Start() != 0) { + debugFatal("Could not start ISO Transmit thread\n"); + return false; + } + if (m_IsoThreadReceive->Start() != 0) { + debugFatal("Could not start ISO Receive thread\n"); return false; } @@ -401,5 +515,9 @@ if ((*it) == h) { result = h->disable(); - result &= m_IsoTask->requestShadowMapUpdate(); + if(h->getType() == IsoHandler::eHT_Transmit) { + result &= m_IsoTaskTransmit->requestShadowMapUpdate(); + } else { + result &= m_IsoTaskReceive->requestShadowMapUpdate(); + } debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " disabled\n"); return result; @@ -422,5 +540,9 @@ if ((*it) == h) { result = h->enable(); - result &= m_IsoTask->requestShadowMapUpdate(); + if(h->getType() == IsoHandler::eHT_Transmit) { + result &= m_IsoTaskTransmit->requestShadowMapUpdate(); + } else { + result &= m_IsoTaskReceive->requestShadowMapUpdate(); + } debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " enabled\n"); return result; @@ -430,4 +552,18 @@ debugError("Handler not found\n"); return false; +} + +void +IsoHandlerManager::signalActivityTransmit() +{ + assert(m_IsoTaskTransmit); + m_IsoTaskTransmit->signalActivity(); +} + +void +IsoHandlerManager::signalActivityReceive() +{ + assert(m_IsoTaskReceive); + m_IsoTaskReceive->signalActivity(); } @@ -438,5 +574,6 @@ handler->setVerboseLevel(getDebugLevel()); m_IsoHandlers.push_back(handler); - return m_IsoTask->requestShadowMapUpdate(); + requestShadowMapUpdate(); + return true; } @@ -452,5 +589,6 @@ if ( *it == handler ) { m_IsoHandlers.erase(it); - return m_IsoTask->requestShadowMapUpdate(); + requestShadowMapUpdate(); + return true; } } @@ -690,5 +828,11 @@ return false; } - if(!m_IsoTask->requestShadowMapUpdate()) { + bool result; + if((*it)->getType() == IsoHandler::eHT_Transmit) { + result = m_IsoTaskTransmit->requestShadowMapUpdate(); + } else { + result = m_IsoTaskReceive->requestShadowMapUpdate(); + } + if(!result) { debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it); return false; @@ -751,5 +895,11 @@ return false; } - if(!m_IsoTask->requestShadowMapUpdate()) { + bool result; + if((*it)->getType() == IsoHandler::eHT_Transmit) { + result = m_IsoTaskTransmit->requestShadowMapUpdate(); + } else { + result = m_IsoTaskReceive->requestShadowMapUpdate(); + } + if(!result) { debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it); return false; @@ -782,7 +932,13 @@ retval=false; } - if(!m_IsoTask->requestShadowMapUpdate()) { + bool result; + if((*it)->getType() == IsoHandler::eHT_Transmit) { + result = m_IsoTaskTransmit->requestShadowMapUpdate(); + } else { + result = m_IsoTaskReceive->requestShadowMapUpdate(); + } + if(!result) { debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it); - retval=false; + return false; } } @@ -816,6 +972,8 @@ (*it)->setVerboseLevel(i); } - if(m_IsoThread) m_IsoThread->setVerboseLevel(i); - if(m_IsoTask) m_IsoTask->setVerboseLevel(i); + if(m_IsoThreadTransmit) m_IsoThreadTransmit->setVerboseLevel(i); + if(m_IsoTaskTransmit) m_IsoTaskTransmit->setVerboseLevel(i); + if(m_IsoThreadReceive) m_IsoThreadReceive->setVerboseLevel(i); + if(m_IsoTaskReceive) m_IsoTaskReceive->setVerboseLevel(i); } Index: /trunk/libffado/src/libieee1394/IsoHandlerManager.h =================================================================== --- /trunk/libffado/src/libieee1394/IsoHandlerManager.h (revision 978) +++ /trunk/libffado/src/libieee1394/IsoHandlerManager.h (revision 1005) @@ -26,16 +26,18 @@ #include "config.h" - #include "debugmodule/debugmodule.h" #include "libutil/Thread.h" + +#include "IsoHandler.h" #include #include - #include +#include class Ieee1394Service; -class IsoHandler; +//class IsoHandler; +//enum IsoHandler::EHandlerType; namespace Streaming { @@ -57,6 +59,6 @@ { public: - IsoTask(IsoHandlerManager& manager); - virtual ~IsoTask() {}; + IsoTask(IsoHandlerManager& manager, enum IsoHandler::EHandlerType); + virtual ~IsoTask(); public: @@ -65,7 +67,22 @@ /** - * requests the thread to sync it's stream map with the manager + * @brief requests the thread to sync it's stream map with the manager */ bool requestShadowMapUpdate(); + enum eActivityResult { + eAR_Activity, + eAR_Timeout, + eAR_Interrupted, + eAR_Error + }; + + /** + * @brief signals that something happened in one of the clients of this task + */ + void signalActivity(); + /** + * @brief wait until something happened in one of the clients of this task + */ + enum eActivityResult waitForActivity(); void setVerboseLevel(int i); @@ -92,4 +109,8 @@ #endif + // activity signaling + sem_t m_activity_semaphore; + + enum IsoHandler::EHandlerType m_handlerType; // debug stuff DECLARE_DEBUG_MODULE; @@ -110,7 +131,5 @@ class IsoHandlerManager { - friend class Streaming::StreamProcessorManager; friend class IsoTask; - friend class IsoHandler; public: @@ -138,4 +157,11 @@ bool disable(IsoHandler *); ///< disables a handler bool enable(IsoHandler *); ///< enables a handler + + /** + * @brief signals that something happened in one of the clients + */ + void signalActivityTransmit(); + void signalActivityReceive(); + ///> disables the handler attached to the stream bool stopHandlerForStream(Streaming::StreamProcessor *); @@ -157,5 +183,4 @@ Ieee1394Service& get1394Service() {return m_service;}; - protected: void requestShadowMapUpdate(); @@ -195,6 +220,8 @@ bool m_realtime; int m_priority; - Util::Thread * m_IsoThread; - IsoTask * m_IsoTask; + Util::Thread * m_IsoThreadTransmit; + IsoTask * m_IsoTaskTransmit; + Util::Thread * m_IsoThreadReceive; + IsoTask * m_IsoTaskReceive; // debug stuff Index: /trunk/libffado/src/libieee1394/IsoHandler.cpp =================================================================== --- /trunk/libffado/src/libieee1394/IsoHandler.cpp (revision 977) +++ /trunk/libffado/src/libieee1394/IsoHandler.cpp (revision 1005) @@ -26,4 +26,5 @@ #include "IsoHandler.h" #include "ieee1394service.h" +#include "IsoHandlerManager.h" #include "libstreaming/generic/StreamProcessor.h" @@ -91,4 +92,5 @@ , m_speed( RAW1394_ISO_SPEED_400 ) , m_prebuffers( 0 ) + , m_dont_exit_iterate_loop( true ) , m_State( E_Created ) #ifdef DEBUG @@ -150,26 +152,7 @@ bool -IsoHandler::waitForClient() -{ - debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, "waiting...\n"); - if(m_Client) { - bool result; - if (m_type == eHT_Receive) { - result = m_Client->waitForProducePacket(); - } else { - result = m_Client->waitForConsumePacket(); - } - debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, " returns %d\n", result); - return result; - } else { - debugOutputExtreme(DEBUG_LEVEL_VERBOSE, " no client\n"); - } - return false; -} - -bool -IsoHandler::tryWaitForClient() -{ - debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, "waiting...\n"); +IsoHandler::canIterateClient() +{ + debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, "checking...\n"); if(m_Client) { bool result; @@ -186,70 +169,4 @@ return false; } -/* -bool -IsoHandler::Execute() -{ - debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE, "%p: Execute thread...\n", this); - - // bypass if not running - if (m_State != E_Running) { - debugOutput( DEBUG_LEVEL_VERBOSE, "%p: not polling since not running...\n", this); - usleep(m_poll_timeout * 1000); - debugOutput( DEBUG_LEVEL_VERBOSE, "%p: done sleeping...\n", this); - return true; - } - - // wait for the availability of frames in the client - // (blocking for transmit handlers) - debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) Waiting for Client activity...\n", this, getTypeString()); - if (waitForClient()) { -#if ISOHANDLER_USE_POLL - bool result = true; - while(result && m_Client && tryWaitForClient()) { - int err = poll(&m_poll_fd, 1, m_poll_timeout); - if (err == -1) { - if (errno == EINTR) { - return true; - } - debugFatal("%p, poll error: %s\n", this, strerror (errno)); - return false; - } - - if(m_poll_fd.revents & (POLLIN)) { - result=iterate(); - if(!result) { - debugOutput( DEBUG_LEVEL_VERBOSE, - "IsoHandler (%p): Failed to iterate handler\n", - this); - } - } else { - if (m_poll_fd.revents & POLLERR) { - debugWarning("error on fd for %p\n", this); - } - if (m_poll_fd.revents & POLLHUP) { - debugWarning("hangup on fd for %p\n",this); - } - break; - } - } - return result; -#else - // iterate() is blocking if no 1394 data is available - // so poll'ing is not really necessary - bool result = true; - while(result && m_Client && tryWaitForClient()) { - result = iterate(); -// if (getType() == eHT_Receive) { -// debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) Iterate returned: %d\n", -// this, (m_type==eHT_Receive?"Receive":"Transmit"), result); -// } - } - return result; -#endif - } else { - debugError("waitForClient() failed.\n"); - return false; - } -}*/ bool @@ -440,5 +357,18 @@ #endif if(m_Client) { - return m_Client->putPacket(data, length, channel, tag, sy, cycle, dropped, skipped); + enum raw1394_iso_disposition retval = m_Client->putPacket(data, length, channel, tag, sy, cycle, dropped, skipped); + if (retval == RAW1394_ISO_OK) { + if (m_dont_exit_iterate_loop) { + return RAW1394_ISO_OK; + } else { + m_dont_exit_iterate_loop = true; + debugOutput(DEBUG_LEVEL_VERBOSE, + "(%p) loop exit requested\n", + this); + return RAW1394_ISO_DEFER; + } + } else { + return retval; + } } @@ -467,5 +397,17 @@ } #endif - return retval; + if (retval == RAW1394_ISO_OK) { + if (m_dont_exit_iterate_loop) { + return RAW1394_ISO_OK; + } else { + m_dont_exit_iterate_loop = true; + debugOutput(DEBUG_LEVEL_VERBOSE, + "(%p) loop exit requested\n", + this); + return RAW1394_ISO_DEFER; + } + } else { + return retval; + } } *tag = 0; Index: /trunk/libffado/src/libieee1394/IsoHandler.h =================================================================== --- /trunk/libffado/src/libieee1394/IsoHandler.h (revision 930) +++ /trunk/libffado/src/libieee1394/IsoHandler.h (revision 1005) @@ -26,5 +26,4 @@ #include "debugmodule/debugmodule.h" -#include "IsoHandlerManager.h" #include "libutil/Thread.h" @@ -32,4 +31,5 @@ enum raw1394_iso_disposition; +class IsoHandlerManager; namespace Streaming { class StreamProcessor; @@ -121,6 +121,20 @@ bool unregisterStream(Streaming::StreamProcessor *); - bool waitForClient(); - bool tryWaitForClient(); + bool canIterateClient(); // FIXME: implement with functor + + /** + * @brief request that the handler exits the packet processing loop ASAP + * + * The raw1394 lib doesn't provide a means to stop the packet iteration loop + * except when the iterate callback returns a DEFER value. Calling this function + * will make the callback return DEFER ASAP. + */ + void requestIterateLoopExit() {m_dont_exit_iterate_loop = false;}; + /** + * @brief allow the handler to stay in the packet processing loop + * + * This resets the state set by requestIterateLoopExit() + */ + void allowIterateLoop() {m_dont_exit_iterate_loop = true;}; private: @@ -132,5 +146,5 @@ int m_irq_interval; - Streaming::StreamProcessor *m_Client; + Streaming::StreamProcessor *m_Client; // FIXME: implement with functors int handleBusReset(unsigned int generation); @@ -140,4 +154,5 @@ enum raw1394_iso_speed m_speed; unsigned int m_prebuffers; + bool m_dont_exit_iterate_loop; // the state machine Index: /trunk/libffado/src/libstreaming/StreamProcessorManager.cpp =================================================================== --- /trunk/libffado/src/libstreaming/StreamProcessorManager.cpp (revision 1001) +++ /trunk/libffado/src/libstreaming/StreamProcessorManager.cpp (revision 1005) @@ -42,4 +42,5 @@ , m_SyncSource(NULL) , m_xrun_happened( false ) + , m_activity_wait_timeout_usec( 1000*1000 ) , m_nb_buffers( 0 ) , m_period( 0 ) @@ -51,4 +52,5 @@ { addOption(Util::OptionContainer::Option("slaveMode",false)); + sem_init(&m_activity_semaphore, 0, 0); } @@ -57,4 +59,5 @@ , m_SyncSource(NULL) , m_xrun_happened( false ) + , m_activity_wait_timeout_usec( 1000*1000 ) , m_nb_buffers(nb_buffers) , m_period(period) @@ -66,7 +69,10 @@ { addOption(Util::OptionContainer::Option("slaveMode",false)); + sem_init(&m_activity_semaphore, 0, 0); } StreamProcessorManager::~StreamProcessorManager() { + sem_post(&m_activity_semaphore); + sem_destroy(&m_activity_semaphore); } @@ -100,4 +106,65 @@ m_WaitLock.Unlock(); +} + +void +StreamProcessorManager::signalActivity() +{ + sem_post(&m_activity_semaphore); + debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p activity\n", this); +} + +enum StreamProcessorManager::eActivityResult +StreamProcessorManager::waitForActivity() +{ + debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p waiting for activity\n", this); + struct timespec ts; + int result; + + if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { + debugError("clock_gettime failed\n"); + return eAR_Error; + } + long long int timeout_nsec=0; + int timeout_sec = 0; + if (m_activity_wait_timeout_usec >= 0) { + timeout_nsec = m_activity_wait_timeout_usec * 1000LL; + timeout_sec = 0; + while(timeout_nsec >= 1000000000LL) { + timeout_sec += 1; + timeout_nsec -= 1000000000LL; + } + ts.tv_nsec += timeout_nsec; + ts.tv_sec += timeout_sec; + } + + if (m_activity_wait_timeout_usec >= 0) { + result = sem_timedwait(&m_activity_semaphore, &ts); + } else { + result = sem_wait(&m_activity_semaphore); + } + + if(result != 0) { + if (result == ETIMEDOUT) { + debugOutput(DEBUG_LEVEL_VERBOSE, + "(%p) pthread_cond_timedwait() timed out (result=%d)\n", + this, result); + return eAR_Timeout; + } else if (result == EINTR) { + debugOutput(DEBUG_LEVEL_VERBOSE, + "(%p) pthread_cond_[timed]wait() interrupted by signal (result=%d)\n", + this, result); + return eAR_Interrupted; + } else { + debugError("(%p) pthread_cond_[timed]wait error (result=%d)\n", + this, result); + debugError("(%p) timeout_sec=%d timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n", + this, timeout_sec, timeout_nsec, ts.tv_sec, ts.tv_nsec); + return eAR_Error; + } + } + + debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p got activity\n", this); + return eAR_Activity; } @@ -545,4 +612,11 @@ }; + // before we do anything else, transfer + if(!transferSilence()) { + debugError("Could not transfer silence\n"); + return false; + } + + // now calculate the stream offset i = 0; for ( i = 0; i < nb_rcv_sp; i++) { @@ -558,8 +632,4 @@ } - if(!transferSilence()) { - debugError("Could not transfer silence\n"); - return false; - } nb_sync_runs--; } @@ -806,17 +876,26 @@ while(period_not_ready) { - debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, + debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "waiting for period (%d frames in buffer)...\n", m_SyncSource->getBufferFill()); - bool result; - if(m_SyncSource->getType() == StreamProcessor::ePT_Receive) { - result = m_SyncSource->waitForConsumePeriod(); - } else { - result = m_SyncSource->waitForProducePeriod(); - } -// if(!result) { -// debugError("Error waiting for signal\n"); -// return false; -// } + + // wait for something to happen + switch(waitForActivity()) { + case eAR_Error: + debugError("Error while waiting for activity\n"); + return false; + case eAR_Interrupted: + // FIXME: what to do here? + debugWarning("Interrupted while waiting for activity\n"); + break; + case eAR_Timeout: + // FIXME: what to do here? + debugWarning("Timeout while waiting for activity\n"); + break; + case eAR_Activity: + // do nothing + break; + } + debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "got activity...\n"); // HACK: this should be solved more elegantly @@ -838,4 +917,6 @@ } } + debugOutputExtreme(DEBUG_LEVEL_VERBOSE, " period not ready? %d...\n", period_not_ready); + // check for underruns on the ISO side, // those should make us bail out of the wait loop @@ -885,6 +966,6 @@ m_time_of_transfer2 = m_time_of_transfer; #endif - - debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE, + + debugOutputExtreme( DEBUG_LEVEL_VERBOSE, "transfer at %llu ticks...\n", m_time_of_transfer); @@ -1027,6 +1108,10 @@ debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n"); bool retval=true; + // NOTE: the order here is opposite from the order in + // normal operation (transmit is before receive), because + // we can do that here (data=silence=available) and + // it increases reliability (esp. on startup) + retval &= transferSilence(StreamProcessor::ePT_Transmit); retval &= transferSilence(StreamProcessor::ePT_Receive); - retval &= transferSilence(StreamProcessor::ePT_Transmit); return retval; } Index: /trunk/libffado/src/libstreaming/StreamProcessorManager.h =================================================================== --- /trunk/libffado/src/libstreaming/StreamProcessorManager.h (revision 967) +++ /trunk/libffado/src/libstreaming/StreamProcessorManager.h (revision 1005) @@ -69,4 +69,13 @@ bool startDryRunning(); bool syncStartAll(); + // activity signaling + enum eActivityResult { + eAR_Activity, + eAR_Timeout, + eAR_Interrupted, + eAR_Error + }; + void signalActivity(); + enum eActivityResult waitForActivity(); // this is the setup API @@ -139,10 +148,14 @@ {return *m_SyncSource;}; -protected: +protected: // FIXME: private? - // thread sync primitives + // thread related vars bool m_xrun_happened; + int m_activity_wait_timeout_usec; bool m_thread_realtime; int m_thread_priority; + + // activity signaling + sem_t m_activity_semaphore; // processor list Index: /trunk/libffado/src/libstreaming/generic/StreamProcessor.h =================================================================== --- /trunk/libffado/src/libstreaming/generic/StreamProcessor.h (revision 1001) +++ /trunk/libffado/src/libstreaming/generic/StreamProcessor.h (revision 1005) @@ -163,13 +163,4 @@ bool putFrames(unsigned int nbframes, int64_t ts); ///< transfer the client contents to the buffer - //FIXME: document wait functions - bool waitForProducePacket(); - bool waitForProducePeriod(); - bool waitForProduce(unsigned int nframes); - - bool waitForConsumePacket(); - bool waitForConsumePeriod(); - bool waitForConsume(unsigned int nframes); - bool canProducePacket(); bool canProducePeriod(); @@ -476,6 +467,4 @@ private: bool m_in_xrun; - pthread_mutex_t m_activity_cond_lock; - pthread_cond_t m_activity_cond; public: @@ -488,7 +477,7 @@ const char *getTypeString() {return ePTToString(getType());}; - StreamStatistics m_PacketStat; - StreamStatistics m_PeriodStat; - StreamStatistics m_WakeupStat; + + int m_min_ahead; // DEBUG + DECLARE_DEBUG_MODULE; }; Index: /trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp =================================================================== --- /trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp (revision 1001) +++ /trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp (revision 1005) @@ -40,8 +40,17 @@ #include -#define SIGNAL_ACTIVITY { \ - pthread_mutex_lock(&m_activity_cond_lock); \ - pthread_cond_broadcast(&m_activity_cond); \ - pthread_mutex_unlock(&m_activity_cond_lock); \ +#define SIGNAL_ACTIVITY_SPM { \ + m_StreamProcessorManager.signalActivity(); \ +} +#define SIGNAL_ACTIVITY_ISO_XMIT { \ + m_IsoHandlerManager.signalActivityTransmit(); \ +} +#define SIGNAL_ACTIVITY_ISO_RECV { \ + m_IsoHandlerManager.signalActivityReceive(); \ +} +#define SIGNAL_ACTIVITY_ALL { \ + m_StreamProcessorManager.signalActivity(); \ + m_IsoHandlerManager.signalActivityTransmit(); \ + m_IsoHandlerManager.signalActivityReceive(); \ } @@ -71,9 +80,8 @@ , m_sync_delay( 0 ) , m_in_xrun( false ) + , m_min_ahead( 7999 ) { // create the timestamped buffer and register ourselves as its client m_data_buffer = new Util::TimestampedBuffer(this); - pthread_mutex_init(&m_activity_cond_lock, NULL); - pthread_cond_init(&m_activity_cond, NULL); } @@ -84,16 +92,4 @@ } - // lock the condition mutex to keep threads from blocking on - // the condition var while we destroy it - pthread_mutex_lock(&m_activity_cond_lock); - // now signal activity, releasing threads that - // are already blocking on the condition variable - pthread_cond_broadcast(&m_activity_cond); - // then destroy it - pthread_cond_destroy(&m_activity_cond); - pthread_mutex_unlock(&m_activity_cond_lock); - - // destroy the mutexes - pthread_mutex_destroy(&m_activity_cond_lock); if (m_data_buffer) delete m_data_buffer; if (m_scratch_buffer) delete[] m_scratch_buffer; @@ -108,5 +104,5 @@ debugError("Failed to stop SP\n"); } - SIGNAL_ACTIVITY; + SIGNAL_ACTIVITY_ALL; } @@ -313,7 +309,5 @@ m_dropped += dropped_cycles; m_last_cycle = cycle; - m_Parent.showDevice(); -// flushDebugOutput(); -// assert(0); + dumpInfo(); } } @@ -481,5 +475,12 @@ } else if(result2 == eCRV_OK) { // no problem here - SIGNAL_ACTIVITY; + // FIXME: cache the period size? + unsigned int periodsize = m_StreamProcessorManager.getPeriodSize(); + unsigned int bufferfill = m_data_buffer->getBufferFill(); + if(bufferfill >= periodsize) { + debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "signal activity, %d>%d\n", bufferfill, periodsize); + SIGNAL_ACTIVITY_SPM; + return RAW1394_ISO_DEFER; + } return RAW1394_ISO_OK; } else { @@ -589,6 +590,7 @@ if(cycle_diff < 0 && (m_state == ePS_Running || m_state == ePS_DryRunning)) { - debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy)\n", - cycle, now_cycles); + unsigned int fc = m_data_buffer->getBufferFill(); + debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy, fill=%u)\n", + cycle, now_cycles, fc); if(m_state == ePS_Running) { debugShowBackLogLines(200); @@ -642,5 +644,5 @@ generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length); return RAW1394_ISO_OK; - // FIXME: PP: I think this should be possible too + // FIXME: PP: I think this should also be a possibility //} else if (result == eCRV_EmptyPacket) { // goto send_empty_packet; @@ -687,7 +689,9 @@ enum eChildReturnValue result = generatePacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length); if (result == eCRV_Packet || result == eCRV_Defer) { - debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, - "XMIT: CY=%04u TS=%011llu\n", - cycle, m_last_timestamp); + int ahead = diffCycles(cycle, now_cycles); + if (ahead < m_min_ahead) m_min_ahead = ahead; + debugOutputExtreme(DEBUG_LEVEL_VERBOSE, + "XMIT: CY=%04u TS=%011llu NOW_CY=%04u AHEAD=%04d\n", + cycle, m_last_timestamp, now_cycles, ahead); // update some accounting m_last_good_cycle = cycle; @@ -804,13 +808,17 @@ } } - - debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, - "XMIT EMPTY: CY=%04u\n", - cycle); + + { // context to avoid ahead var clash + int ahead = diffCycles(cycle, now_cycles); + if (ahead < m_min_ahead) m_min_ahead = ahead; + debugOutputExtreme(DEBUG_LEVEL_VERBOSE, + "XMIT EMPTY: CY=%04u, NOW_CY=%04u, AHEAD=%04d\n", + cycle, now_cycles, ahead); + } + generateEmptyPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length); generateEmptyPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length); return RAW1394_ISO_OK; } - // Frame Transfer API @@ -823,11 +831,11 @@ bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) { bool result; - debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE, - "%p.getFrames(%d, %11llu)", - nbframes, ts); + debugOutputExtreme( DEBUG_LEVEL_VERBOSE, + "(%p, %s) getFrames(%d, %11llu)\n", + this, getTypeString(), nbframes, ts); assert( getType() == ePT_Receive ); if(isDryRunning()) result = getFramesDry(nbframes, ts); else result = getFramesWet(nbframes, ts); - SIGNAL_ACTIVITY; + SIGNAL_ACTIVITY_ISO_RECV; return result; } @@ -890,5 +898,5 @@ debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d, %lld)\n", nbframes, ts); result = m_data_buffer->dropFrames(nbframes); - SIGNAL_ACTIVITY; + SIGNAL_ACTIVITY_ISO_RECV; return result; } @@ -897,11 +905,11 @@ { bool result; - debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, - "%p.putFrames(%d, %11llu)", - nbframes, ts); + debugOutputExtreme( DEBUG_LEVEL_VERBOSE, + "(%p, %s) putFrames(%d, %11llu)\n", + this, getTypeString(), nbframes, ts); assert( getType() == ePT_Transmit ); if(isDryRunning()) result = putFramesDry(nbframes, ts); else result = putFramesWet(nbframes, ts); - SIGNAL_ACTIVITY; + SIGNAL_ACTIVITY_ISO_XMIT; return result; } @@ -933,5 +941,5 @@ StreamProcessor::putSilenceFrames(unsigned int nbframes, int64_t ts) { - debugOutputExtreme(DEBUG_LEVEL_ULTRA_VERBOSE, + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::putSilenceFrames(%d, %llu)\n", nbframes, ts); @@ -955,5 +963,5 @@ } - SIGNAL_ACTIVITY; + SIGNAL_ACTIVITY_ISO_XMIT; return true; } @@ -966,5 +974,5 @@ if(nbframes > 0) { result = m_data_buffer->dropFrames(nbframes); - SIGNAL_ACTIVITY; + SIGNAL_ACTIVITY_ALL; return result; } else { @@ -973,5 +981,5 @@ result &= m_data_buffer->writeDummyFrame(); } - SIGNAL_ACTIVITY; + SIGNAL_ACTIVITY_ALL; return result; } @@ -1135,5 +1143,5 @@ // wake up any threads that might be waiting on data in the buffers // since a state transition can cause data to become available - SIGNAL_ACTIVITY; + SIGNAL_ACTIVITY_ALL; return true; } @@ -1412,5 +1420,5 @@ } #endif - SIGNAL_ACTIVITY; + SIGNAL_ACTIVITY_ALL; return result; } @@ -1446,5 +1454,5 @@ } #endif - SIGNAL_ACTIVITY; + SIGNAL_ACTIVITY_ALL; return true; } @@ -1496,5 +1504,5 @@ } #endif - SIGNAL_ACTIVITY; + SIGNAL_ACTIVITY_ALL; return result; } @@ -1545,5 +1553,5 @@ } #endif - SIGNAL_ACTIVITY; + SIGNAL_ACTIVITY_ALL; return true; } @@ -1569,4 +1577,5 @@ this, m_last_cycle); m_in_xrun = false; + m_min_ahead = 7999; m_local_node_id = m_1394service.getLocalNodeId() & 0x3f; m_data_buffer->setTransparent(false); @@ -1583,5 +1592,5 @@ } #endif - SIGNAL_ACTIVITY; + SIGNAL_ACTIVITY_ALL; return result; } @@ -1615,5 +1624,5 @@ } #endif - SIGNAL_ACTIVITY; + SIGNAL_ACTIVITY_ALL; return true; } @@ -1728,147 +1737,11 @@ debugError("Invalid state transition: %s => %s\n", ePSToString(m_state), ePSToString(next_state)); - SIGNAL_ACTIVITY; + SIGNAL_ACTIVITY_ALL; return false; updateState_exit_change_failed: debugError("State transition failed: %s => %s\n", ePSToString(m_state), ePSToString(next_state)); - SIGNAL_ACTIVITY; + SIGNAL_ACTIVITY_ALL; return false; -} - -bool StreamProcessor::waitForProducePacket() -{ - return waitForProduce(getNominalFramesPerPacket()); -} -bool StreamProcessor::waitForProducePeriod() -{ - return waitForProduce(m_StreamProcessorManager.getPeriodSize()); -} -bool StreamProcessor::waitForProduce(unsigned int nframes) -{ - debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, - "(%p, %s) wait ...\n", - this, getTypeString()); - struct timespec ts; - int result; - int max_runs = 1000; - - if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { - debugError("clock_gettime failed\n"); - return false; - } - - // FIXME: hardcoded timeout of 10 sec -// ts.tv_nsec += 1000 * 1000000LL; -// while (ts.tv_nsec > 1000000000LL) { -// ts.tv_sec += 1; -// ts.tv_nsec -= 1000000000LL; -// } - ts.tv_sec += 2; - - pthread_mutex_lock(&m_activity_cond_lock); - while(!canProduce(nframes) && max_runs) { - result = pthread_cond_timedwait(&m_activity_cond, &m_activity_cond_lock, &ts); - - if(result != 0) { - if (result == ETIMEDOUT) { - debugOutput(DEBUG_LEVEL_VERBOSE, - "(%p, %s) pthread_cond_timedwait() timed out (result=%d)\n", - this, getTypeString(), result); - pthread_mutex_unlock(&m_activity_cond_lock); - dumpInfo(); - return false; - } else if (result == EINTR) { - debugOutput(DEBUG_LEVEL_VERBOSE, - "(%p, %s) pthread_cond_timedwait() interrupted by signal (result=%d)\n", - this, getTypeString(), result); - pthread_mutex_unlock(&m_activity_cond_lock); - dumpInfo(); - return false; - } else { - debugError("(%p, %s) pthread_cond_timedwait error (result=%d)\n", - this, getTypeString(), result); - pthread_mutex_unlock(&m_activity_cond_lock); - dumpInfo(); - return false; - } - } - } - pthread_mutex_unlock(&m_activity_cond_lock); - if(max_runs == 0) { - debugWarning("(%p) runaway loop\n"); - } - return true; -} - -bool StreamProcessor::waitForConsumePacket() -{ - return waitForConsume(getNominalFramesPerPacket()); -} -bool StreamProcessor::waitForConsumePeriod() -{ - return waitForConsume(m_StreamProcessorManager.getPeriodSize()); -} -bool StreamProcessor::waitForConsume(unsigned int nframes) -{ - debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, - "(%p, %s) wait ...\n", - this, getTypeString()); - struct timespec ts; - int result; - - int max_runs = 1000; - - if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { - debugError("clock_gettime failed\n"); - return false; - } - - // FIXME: hardcoded timeout of 10 sec -// ts.tv_nsec += 1000 * 1000000LL; -// while (ts.tv_nsec > 1000000000LL) { -// ts.tv_sec += 1; -// ts.tv_nsec -= 1000000000LL; -// } - ts.tv_sec += 2; - - pthread_mutex_lock(&m_activity_cond_lock); - while(!canConsume(nframes) && max_runs) { - result = pthread_cond_timedwait(&m_activity_cond, &m_activity_cond_lock, &ts); - if(result != 0) { - if (result == ETIMEDOUT) { - debugOutput(DEBUG_LEVEL_VERBOSE, - "(%p, %s) pthread_cond_timedwait() timed out (result=%d)\n", - this, getTypeString(), result); - pthread_mutex_unlock(&m_activity_cond_lock); - dumpInfo(); - return false; - } else if (result == EINTR) { - debugOutput(DEBUG_LEVEL_VERBOSE, - "(%p, %s) pthread_cond_timedwait() interrupted by signal (result=%d)\n", - this, getTypeString(), result); - pthread_mutex_unlock(&m_activity_cond_lock); - dumpInfo(); - return false; - } else { - debugError("(%p, %s) pthread_cond_timedwait error (result=%d)\n", - this, getTypeString(), result); - pthread_mutex_unlock(&m_activity_cond_lock); - dumpInfo(); - return false; - } - } - max_runs--; - } - pthread_mutex_unlock(&m_activity_cond_lock); - - if(max_runs == 0) { - debugWarning("(%p) runaway loop\n"); - } - - debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, - "(%p, %s) leave ...\n", - this, getTypeString()); - return true; } @@ -1885,20 +1758,9 @@ if(m_in_xrun) return true; if(m_state == ePS_Running && m_next_state == ePS_Running) { - - if(getType() == ePT_Transmit) { - // can we put a certain amount of frames into the buffer? - unsigned int bufferspace = m_data_buffer->getBufferSpace(); - if(bufferspace >= nframes) { - return true; - } else return false; - } else { - // do we still have to put frames in the buffer? - unsigned int bufferfill = m_data_buffer->getBufferFill(); - unsigned int periodsize = m_StreamProcessorManager.getPeriodSize(); - if (bufferfill > periodsize) return false; - else return true; - } - - + // can we put a certain amount of frames into the buffer? + unsigned int bufferspace = m_data_buffer->getBufferSpace(); + if(bufferspace >= nframes) { + return true; + } else return false; } else { if(getType() == ePT_Transmit) { @@ -1947,19 +1809,22 @@ * Helper routines * ***********************************************/ +// FIXME: I think this can be removed and replaced by putSilenceFrames bool StreamProcessor::transferSilence(unsigned int nframes) { bool retval; + + #ifdef DEBUG signed int fc; ffado_timestamp_t ts_tail_tmp; + m_data_buffer->getBufferTailTimestamp(&ts_tail_tmp, &fc); + if (fc != 0) { + debugWarning("Prefilling a buffer that already contains %d frames\n", fc); + } + #endif // prepare a buffer of silence char *dummybuffer = (char *)calloc(getEventSize(), nframes * getEventsPerFrame()); transmitSilenceBlock(dummybuffer, nframes, 0); - - m_data_buffer->getBufferTailTimestamp(&ts_tail_tmp, &fc); - if (fc != 0) { - debugWarning("Prefilling a buffer that already contains %d frames\n", fc); - } // add the silence data to the ringbuffer @@ -1970,4 +1835,5 @@ retval = false; } + free(dummybuffer); return retval; @@ -2023,4 +1889,7 @@ (unsigned int)TICKS_TO_CYCLES(now), (unsigned int)TICKS_TO_OFFSET(now)); + if(getType() == ePT_Transmit) { + debugOutputShort( DEBUG_LEVEL_NORMAL, " Min ISOXMT bufferfill : %04d\n", m_min_ahead); + } debugOutputShort( DEBUG_LEVEL_NORMAL, " Xrun? : %s\n", (m_in_xrun ? "True":"False")); if (m_state == m_next_state) { Index: /trunk/libffado/config.h.in =================================================================== --- /trunk/libffado/config.h.in (revision 981) +++ /trunk/libffado/config.h.in (revision 1005) @@ -80,6 +80,19 @@ #define ISOHANDLERMANAGER_MAX_STREAMS_PER_ISOTHREAD 16 -// Ideally the audio processing will be driven by this thread -#define ISOHANDLERMANAGER_ISO_PRIO_INCREASE 1 +// The best setup is if the receive handlers have lower priority +// than the client thread since that ensures that as soon as we +// received sufficient frames, the client thread runs. +// The transmit thread should have higher priority to ensure that +// all available data is flushed to the ISO kernel buffers as +// soon as possible +// At this moment, the jack backend uses base+5 to init ffado +// prio +#define ISOHANDLERMANAGER_ISO_PRIO_INCREASE 0 +#define ISOHANDLERMANAGER_ISO_PRIO_INCREASE_RECV -6 +#define ISOHANDLERMANAGER_ISO_PRIO_INCREASE_XMIT 1 + +// the timeout for ISO activity on any thread +// NOTE: don't make this 0 +#define ISOHANDLERMANAGER_ISO_TASK_WAIT_TIMEOUT_USECS 1000000LL // allows to add some processing margin. This shifts the time @@ -97,5 +110,5 @@ #define STREAMPROCESSORMANAGER_SYNCSTART_TRIES 10 #define STREAMPROCESSORMANAGER_SYNC_WAIT_TIME_MSEC 200 -#define STREAMPROCESSORMANAGER_ALIGN_AVERAGE_TIME_MSEC 200 +#define STREAMPROCESSORMANAGER_ALIGN_AVERAGE_TIME_MSEC 400 #define STREAMPROCESSORMANAGER_NB_ALIGN_TRIES 40