Index: /trunk/libffado/src/libieee1394/IsoHandlerManager.cpp =================================================================== --- /trunk/libffado/src/libieee1394/IsoHandlerManager.cpp (revision 754) +++ /trunk/libffado/src/libieee1394/IsoHandlerManager.cpp (revision 759) @@ -29,7 +29,9 @@ #include "libutil/Atomic.h" +#include "libutil/PosixThread.h" + #include -#define MINIMUM_INTERRUPTS_PER_PERIOD 8U +#define MINIMUM_INTERRUPTS_PER_PERIOD 4U IMPL_DEBUG_MODULE( IsoHandlerManager, IsoHandlerManager, DEBUG_LEVEL_NORMAL ); @@ -41,4 +43,5 @@ , m_service( service ) , m_realtime(false), m_priority(0) + , m_Thread ( NULL ) {} @@ -47,4 +50,5 @@ , m_service( service ) , m_realtime(run_rt), m_priority(rt_prio) + , m_Thread ( NULL ) {} @@ -52,4 +56,8 @@ { stopHandlers(); + if (m_Thread) { + m_Thread->Stop(); + delete m_Thread; + } } @@ -67,5 +75,123 @@ result &= (*it)->setThreadParameters(m_realtime, m_priority); } + + if (m_Thread) { + if (m_realtime) { + m_Thread->AcquireRealTime(m_priority); + } else { + m_Thread->DropRealTime(); + } + } + return result; +} + +/** + * Update the shadow variables. Should only be called from + * the iso handler iteration thread + */ +void +IsoHandlerManager::updateShadowVars() +{ + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "updating shadow vars...\n"); + unsigned int i; + m_poll_nfds_shadow = m_IsoHandlers.size(); + if(m_poll_nfds_shadow > FFADO_MAX_ISO_HANDLERS_PER_PORT) { + debugWarning("Too much ISO Handlers in manager...\n"); + m_poll_nfds_shadow = FFADO_MAX_ISO_HANDLERS_PER_PORT; + } + for (i = 0; i < m_poll_nfds_shadow; i++) { + IsoHandler *h = m_IsoHandlers.at(i); + assert(h); + m_IsoHandler_map_shadow[i] = h; + + m_poll_fds_shadow[i].fd = h->getFileDescriptor(); + m_poll_fds_shadow[i].revents = 0; + if (h->isEnabled()) { + m_poll_fds_shadow[i].events = POLLIN; + } else { + m_poll_fds_shadow[i].events = 0; + } + } + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " updated shadow vars...\n"); +} + +bool +IsoHandlerManager::Init() { + debugOutput( DEBUG_LEVEL_VERBOSE, "%p: Init thread...\n", this); + bool result = true; + for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin(); + it != m_IsoHandlers.end(); + ++it ) + { + result &= (*it)->Init(); + } + return result; +} + +bool +IsoHandlerManager::Execute() { + int err; + unsigned int i; + + unsigned int m_poll_timeout = 100; + + // update the shadow variables if requested + // if(m_request_fdmap_update) { + updateShadowVars(); + // ZERO_ATOMIC((SInt32*)&m_request_fdmap_update); + //} + + // bypass if no handlers are registered + if (m_poll_nfds_shadow == 0) { + usleep(m_poll_timeout * 1000); + return true; + } + + // Use a shadow map of the fd's such that the poll call is not in a critical section + uint64_t poll_enter = m_service.getCurrentTimeAsUsecs(); + err = poll (m_poll_fds_shadow, m_poll_nfds_shadow, m_poll_timeout); + uint64_t poll_exit = m_service.getCurrentTimeAsUsecs(); + + if (err == -1) { + if (errno == EINTR) { + return true; + } + debugFatal("poll error: %s\n", strerror (errno)); + return false; + } + + int nb_rcv = 0; + int nb_xmit = 0; + uint64_t iter_enter = m_service.getCurrentTimeAsUsecs(); + for (i = 0; i < m_poll_nfds_shadow; i++) { + if(m_poll_fds_shadow[i].revents) { + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "received events: %08X for (%p)\n", + m_poll_fds_shadow[i].revents, m_IsoHandler_map_shadow[i]); + } + if (m_poll_fds_shadow[i].revents & POLLERR) { + debugWarning("error on fd for %d\n",i); + } + + if (m_poll_fds_shadow[i].revents & POLLHUP) { + debugWarning("hangup on fd for %d\n",i); + } + + if(m_poll_fds_shadow[i].revents & (POLLIN)) { + m_IsoHandler_map_shadow[i]->iterate(); + if (m_IsoHandler_map_shadow[i]->getType() == IsoHandler::eHT_Receive) { + nb_rcv++; + } else { + nb_xmit++; + } + } + } + uint64_t iter_exit = m_service.getCurrentTimeAsUsecs(); + + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " poll took %6lldus, iterate took %6lldus, iterated (R: %2d, X: %2d) handlers\n", + poll_exit-poll_enter, iter_exit-iter_enter, + nb_rcv, nb_xmit); + + return true; } @@ -78,4 +204,19 @@ return false; } + +#ifndef THREAD_PER_ISOHANDLER + // create a thread to iterate our handlers + debugOutput( DEBUG_LEVEL_VERBOSE, "Start thread for %p...\n", this); + m_Thread = new Util::PosixThread(this, m_realtime, m_priority, + PTHREAD_CANCEL_DEFERRED); + if(!m_Thread) { + debugFatal("No thread\n"); + return false; + } + if (m_Thread->Start() != 0) { + debugFatal("Could not start update thread\n"); + return false; + } +#endif m_State=E_Running; @@ -197,5 +338,5 @@ // NOTE: try and use MINIMUM_INTERRUPTS_PER_PERIOD hardware interrupts // per period for better latency. - unsigned int max_packet_size=(MINIMUM_INTERRUPTS_PER_PERIOD * getpagesize()) / packets_per_period; + unsigned int max_packet_size=(MINIMUM_INTERRUPTS_PER_PERIOD * getpagesize()/2) / packets_per_period; if (max_packet_size < stream->getMaxPacketSize()) { @@ -207,7 +348,7 @@ // Ensure we don't request a packet size bigger than the // kernel-enforced maximum which is currently 1 page. - if (max_packet_size > (unsigned int)getpagesize()) { - debugWarning("max packet size (%u) > page size (%u)\n", max_packet_size ,(unsigned int)getpagesize()); - max_packet_size = getpagesize(); + if (max_packet_size > (unsigned int)getpagesize()/2) { + debugError("max packet size (%u) > page size (%u)\n", max_packet_size, (unsigned int)getpagesize()/2); + return false; } @@ -224,25 +365,13 @@ // configure it such that we have an irq for every PACKETS_PER_INTERRUPT packets - unsigned int irq_interval=PACKETS_PER_INTERRUPT; - - // unless the period size doesn't allow this - if ((packets_per_period/MINIMUM_INTERRUPTS_PER_PERIOD) < irq_interval) { - irq_interval=1; - } - - // FIXME: test - irq_interval=1; -#warning Using fixed irq_interval - - unsigned int max_packet_size=getpagesize() / irq_interval; + unsigned int irq_interval = packets_per_period/MINIMUM_INTERRUPTS_PER_PERIOD; + if(irq_interval <= 0) irq_interval = 1; + + unsigned int max_packet_size = getpagesize()/2; if (max_packet_size < stream->getMaxPacketSize()) { - max_packet_size=stream->getMaxPacketSize(); - } - - // Ensure we don't request a packet size bigger than the - // kernel-enforced maximum which is currently 1 page. - if (max_packet_size > (unsigned int)getpagesize()) - max_packet_size = getpagesize(); + debugError("Stream max packet size too large: %d\n", stream->getMaxPacketSize()); + return false; + } #endif @@ -255,9 +384,11 @@ h = new IsoHandler(*this, IsoHandler::eHT_Receive, buffers, max_packet_size, irq_interval); + + debugOutput( DEBUG_LEVEL_VERBOSE, " creating IsoRecvHandler\n"); + if(!h) { debugFatal("Could not create IsoRecvHandler\n"); return false; } - debugOutput( DEBUG_LEVEL_VERBOSE, " registering IsoRecvHandler\n"); } else if (stream->getType()==StreamProcessor::ePT_Transmit) { @@ -265,4 +396,5 @@ unsigned int packets_per_period = stream->getPacketsPerPeriod(); +#if 0 // hardware interrupts occur when one DMA block is full, and the size of one DMA // block = PAGE_SIZE. Setting the max_packet_size makes sure that the HW irq @@ -295,13 +427,27 @@ //int buffers=30; //max_packet_size = getpagesize(); // HACK - +#else + // configure it such that we have an irq for every PACKETS_PER_INTERRUPT packets + unsigned int irq_interval = packets_per_period/MINIMUM_INTERRUPTS_PER_PERIOD; + if(irq_interval <= 0) irq_interval = 1; + + unsigned int max_packet_size=MINIMUM_INTERRUPTS_PER_PERIOD * getpagesize() / packets_per_period; + if (max_packet_size < stream->getMaxPacketSize()) { + max_packet_size = stream->getMaxPacketSize(); + } + + if (max_packet_size < stream->getMaxPacketSize()) { + debugError("Max packet size too large! (%d)\n", stream->getMaxPacketSize()); + } + +#endif // the SP specifies how many packets to buffer int buffers = stream->getNbPacketsIsoXmitBuffer(); + + debugOutput( DEBUG_LEVEL_VERBOSE, " creating IsoXmitHandler\n"); // create the actual handler h = new IsoHandler(*this, IsoHandler::eHT_Transmit, buffers, max_packet_size, irq_interval); - - debugOutput( DEBUG_LEVEL_VERBOSE, " registering IsoXmitHandler\n"); if(!h) { Index: /trunk/libffado/src/libieee1394/IsoHandlerManager.h =================================================================== --- /trunk/libffado/src/libieee1394/IsoHandlerManager.h (revision 753) +++ /trunk/libffado/src/libieee1394/IsoHandlerManager.h (revision 759) @@ -27,8 +27,12 @@ #include "debugmodule/debugmodule.h" +#include "libutil/Thread.h" + #include #include #include + +#define THREAD_PER_ISOHANDLER #define FFADO_MAX_ISO_HANDLERS_PER_PORT 16 @@ -61,7 +65,16 @@ */ -class IsoHandlerManager +class IsoHandlerManager : public Util::RunnableInterface { friend class Streaming::StreamProcessorManager; + public: + bool Init(); + bool Execute(); + void updateShadowVars(); + private: + // shadow variables + struct pollfd m_poll_fds_shadow[FFADO_MAX_ISO_HANDLERS_PER_PORT]; + IsoHandler *m_IsoHandler_map_shadow[FFADO_MAX_ISO_HANDLERS_PER_PORT]; + unsigned int m_poll_nfds_shadow; public: @@ -143,4 +156,5 @@ bool m_realtime; int m_priority; + Util::Thread * m_Thread; // debug stuff Index: /trunk/libffado/src/libieee1394/IsoHandler.cpp =================================================================== --- /trunk/libffado/src/libieee1394/IsoHandler.cpp (revision 754) +++ /trunk/libffado/src/libieee1394/IsoHandler.cpp (revision 759) @@ -21,4 +21,6 @@ * */ + +//#define PER_HANDLER_THREAD #include "IsoHandler.h" @@ -181,5 +183,7 @@ } + uint64_t poll_enter = m_manager.get1394Service().getCurrentTimeAsUsecs(); err = poll(&m_poll_fd, 1, m_poll_timeout); + uint64_t poll_exit = m_manager.get1394Service().getCurrentTimeAsUsecs(); if (err == -1) { if (errno == EINTR) { @@ -189,11 +193,15 @@ return false; } + uint64_t iter_enter=0; + uint64_t iter_exit=0; if(m_poll_fd.revents & (POLLIN)) { - if(raw1394_loop_iterate(m_handle)) { + iter_enter = m_manager.get1394Service().getCurrentTimeAsUsecs(); + if(!iterate()) { debugOutput( DEBUG_LEVEL_VERBOSE, - "IsoHandler (%p): Failed to iterate handler: %s\n", - this,strerror(errno)); - return false; - } + "IsoHandler (%p): Failed to iterate handler\n", + this); + return false; + } + iter_exit = m_manager.get1394Service().getCurrentTimeAsUsecs(); } else { if (m_poll_fd.revents & POLLERR) { @@ -203,4 +211,18 @@ debugWarning("hangup on fd for %p\n",this); } + } + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%c %p) poll took %lldus, iterate took %lldus\n", + (this->getType()==eHT_Receive?'R':'X'), this, + poll_exit-poll_enter, iter_exit-iter_enter); + return true; +} + +bool +IsoHandler::iterate() { + if(raw1394_loop_iterate(m_handle)) { + debugOutput( DEBUG_LEVEL_VERBOSE, + "IsoHandler (%p): Failed to iterate handler: %s\n", + this,strerror(errno)); + return false; } return true; @@ -257,4 +279,5 @@ } +#ifdef THREAD_PER_ISOHANDLER // create a thread to iterate ourselves debugOutput( DEBUG_LEVEL_VERBOSE, "Start thread for %p...\n", this); @@ -269,4 +292,5 @@ return false; } +#endif // update the internal state Index: /trunk/libffado/src/libieee1394/IsoHandler.h =================================================================== --- /trunk/libffado/src/libieee1394/IsoHandler.h (revision 754) +++ /trunk/libffado/src/libieee1394/IsoHandler.h (revision 759) @@ -84,4 +84,6 @@ bool Init(); bool Execute(); + bool iterate(); + int getFileDescriptor() { return raw1394_get_fd(m_handle);}; bool setThreadParameters(bool rt, int priority); @@ -90,5 +92,4 @@ bool prepare(); - bool iterate(); void setVerboseLevel(int l); Index: /trunk/libffado/src/libstreaming/StreamProcessorManager.cpp =================================================================== --- /trunk/libffado/src/libstreaming/StreamProcessorManager.cpp (revision 754) +++ /trunk/libffado/src/libstreaming/StreamProcessorManager.cpp (revision 759) @@ -733,9 +733,7 @@ usleep(125); // MAGIC: one cycle sleep... - #if 0 // in order to avoid this in the future, we increase the sync delay of the sync source SP int d = m_SyncSource->getSyncDelay() + TICKS_PER_CYCLE; m_SyncSource->setSyncDelay(d); - #endif #ifdef DEBUG @@ -744,4 +742,11 @@ } } // we are either ready or an xrun occurred + + // in order to avoid a runaway value of the sync delay, we gradually decrease + // it. It will be increased by a 'too early' event (cfr some lines higher) + // hence we'll be at a good point on average. + int d = m_SyncSource->getSyncDelay() - 1; + if (d >= 0) m_SyncSource->setSyncDelay(d); + #ifdef DEBUG Index: /trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp =================================================================== --- /trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp (revision 753) +++ /trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp (revision 759) @@ -145,5 +145,5 @@ void StreamProcessor::setSyncDelay(int d) { - debugOutput(DEBUG_LEVEL_VERBOSE, "Setting SP %p SyncDelay to %d ticks\n", this, d); + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting SP %p SyncDelay to %d ticks\n", this, d); m_sync_delay = d; } @@ -234,9 +234,13 @@ if (m_last_cycle != (int)cycle && m_last_cycle != -1) { dropped_cycles = diffCycles(cycle, m_last_cycle) - 1; - if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles); + if (dropped_cycles < 0) { + debugWarning("(%p) dropped < 1 (%d), cycle: %d, last_cycle: %d, dropped: %d\n", + this, dropped_cycles, cycle, m_last_cycle, dropped); + } if (dropped_cycles > 0) { debugWarning("(%p) dropped %d packets on cycle %u, 'dropped'=%u, cycle=%d, m_last_cycle=%d\n", this, dropped_cycles, cycle, dropped, cycle, m_last_cycle); m_dropped += dropped_cycles; + m_in_xrun = true; } } @@ -410,5 +414,8 @@ if (m_last_cycle != cycle && m_last_cycle != -1) { dropped_cycles = diffCycles(cycle, m_last_cycle) - 1; - if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles); + if (dropped_cycles < 0) { + debugWarning("(%p) dropped < 1 (%d), cycle: %d, last_cycle: %d, dropped: %d\n", + this, dropped_cycles, cycle, m_last_cycle, dropped); + } if (dropped_cycles > 0) { debugWarning("(%p) dropped %d packets on cycle %u (last_cycle=%u, dropped=%d)\n", this, dropped_cycles, cycle, m_last_cycle, dropped); @@ -508,5 +515,5 @@ // check the packet header enum eChildReturnValue result = generatePacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length); - if (result == eCRV_Packet) { + if (result == eCRV_Packet || result == eCRV_Defer) { debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT: CY=%04u TS=%011llu\n", cycle, m_last_timestamp); @@ -544,5 +551,5 @@ // skip queueing packets if we detect that there are not enough frames // available - if(result2 == eCRV_Defer) + if(result2 == eCRV_Defer || result == eCRV_Defer) return RAW1394_ISO_DEFER; else