Changeset 759

Show
Ignore:
Timestamp:
12/15/07 11:36:27 (13 years ago)
Author:
ppalmers
Message:

fix streaming problem

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/libffado/src/libieee1394/IsoHandler.cpp

    r754 r759  
    2121 * 
    2222 */ 
     23 
     24//#define PER_HANDLER_THREAD 
    2325 
    2426#include "IsoHandler.h" 
     
    181183    } 
    182184 
     185    uint64_t poll_enter = m_manager.get1394Service().getCurrentTimeAsUsecs(); 
    183186    err = poll(&m_poll_fd, 1, m_poll_timeout); 
     187    uint64_t poll_exit = m_manager.get1394Service().getCurrentTimeAsUsecs(); 
    184188    if (err == -1) { 
    185189        if (errno == EINTR) { 
     
    189193        return false; 
    190194    } 
     195    uint64_t iter_enter=0; 
     196    uint64_t iter_exit=0; 
    191197    if(m_poll_fd.revents & (POLLIN)) { 
    192         if(raw1394_loop_iterate(m_handle)) { 
     198        iter_enter = m_manager.get1394Service().getCurrentTimeAsUsecs(); 
     199        if(!iterate()) { 
    193200            debugOutput( DEBUG_LEVEL_VERBOSE, 
    194                         "IsoHandler (%p): Failed to iterate handler: %s\n", 
    195                         this,strerror(errno)); 
    196             return false; 
    197         } 
     201                        "IsoHandler (%p): Failed to iterate handler\n", 
     202                        this); 
     203            return false; 
     204        } 
     205        iter_exit = m_manager.get1394Service().getCurrentTimeAsUsecs(); 
    198206    } else { 
    199207        if (m_poll_fd.revents & POLLERR) { 
     
    203211            debugWarning("hangup on fd for %p\n",this); 
    204212        } 
     213    } 
     214    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%c %p) poll took %lldus, iterate took %lldus\n",  
     215                (this->getType()==eHT_Receive?'R':'X'), this,  
     216                poll_exit-poll_enter, iter_exit-iter_enter); 
     217    return true; 
     218} 
     219 
     220bool 
     221IsoHandler::iterate() { 
     222    if(raw1394_loop_iterate(m_handle)) { 
     223        debugOutput( DEBUG_LEVEL_VERBOSE, 
     224                    "IsoHandler (%p): Failed to iterate handler: %s\n", 
     225                    this,strerror(errno)); 
     226        return false; 
    205227    } 
    206228    return true; 
     
    257279    } 
    258280 
     281#ifdef THREAD_PER_ISOHANDLER 
    259282    // create a thread to iterate ourselves 
    260283    debugOutput( DEBUG_LEVEL_VERBOSE, "Start thread for %p...\n", this); 
     
    269292        return false; 
    270293    } 
     294#endif 
    271295 
    272296    // update the internal state 
  • trunk/libffado/src/libieee1394/IsoHandler.h

    r754 r759  
    8484    bool Init(); 
    8585    bool Execute(); 
     86    bool iterate(); 
     87 
    8688    int getFileDescriptor() { return raw1394_get_fd(m_handle);}; 
    8789    bool setThreadParameters(bool rt, int priority); 
     
    9092    bool prepare(); 
    9193 
    92     bool iterate(); 
    9394    void setVerboseLevel(int l); 
    9495 
  • trunk/libffado/src/libieee1394/IsoHandlerManager.cpp

    r754 r759  
    2929#include "libutil/Atomic.h" 
    3030 
     31#include "libutil/PosixThread.h" 
     32 
    3133#include <assert.h> 
    3234 
    33 #define MINIMUM_INTERRUPTS_PER_PERIOD  8
     35#define MINIMUM_INTERRUPTS_PER_PERIOD  4
    3436 
    3537IMPL_DEBUG_MODULE( IsoHandlerManager, IsoHandlerManager, DEBUG_LEVEL_NORMAL ); 
     
    4143   , m_service( service ) 
    4244   , m_realtime(false), m_priority(0) 
     45   , m_Thread ( NULL ) 
    4346{} 
    4447 
     
    4750   , m_service( service ) 
    4851   , m_realtime(run_rt), m_priority(rt_prio) 
     52   , m_Thread ( NULL ) 
    4953{} 
    5054 
     
    5256{ 
    5357    stopHandlers(); 
     58    if (m_Thread) { 
     59        m_Thread->Stop(); 
     60        delete m_Thread; 
     61    } 
    5462} 
    5563 
     
    6775        result &= (*it)->setThreadParameters(m_realtime, m_priority); 
    6876    } 
     77 
     78    if (m_Thread) { 
     79        if (m_realtime) { 
     80            m_Thread->AcquireRealTime(m_priority); 
     81        } else { 
     82            m_Thread->DropRealTime(); 
     83        } 
     84    } 
     85 
    6986    return result; 
     87} 
     88 
     89/** 
     90 * Update the shadow variables. Should only be called from 
     91 * the iso handler iteration thread 
     92 */ 
     93void 
     94IsoHandlerManager::updateShadowVars() 
     95{ 
     96    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "updating shadow vars...\n"); 
     97    unsigned int i; 
     98    m_poll_nfds_shadow = m_IsoHandlers.size(); 
     99    if(m_poll_nfds_shadow > FFADO_MAX_ISO_HANDLERS_PER_PORT) { 
     100        debugWarning("Too much ISO Handlers in manager...\n"); 
     101        m_poll_nfds_shadow = FFADO_MAX_ISO_HANDLERS_PER_PORT; 
     102    } 
     103    for (i = 0; i < m_poll_nfds_shadow; i++) { 
     104        IsoHandler *h = m_IsoHandlers.at(i); 
     105        assert(h); 
     106        m_IsoHandler_map_shadow[i] = h; 
     107 
     108        m_poll_fds_shadow[i].fd = h->getFileDescriptor(); 
     109        m_poll_fds_shadow[i].revents = 0; 
     110        if (h->isEnabled()) { 
     111            m_poll_fds_shadow[i].events = POLLIN; 
     112        } else { 
     113            m_poll_fds_shadow[i].events = 0; 
     114        } 
     115    } 
     116    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " updated shadow vars...\n"); 
     117} 
     118 
     119bool 
     120IsoHandlerManager::Init() { 
     121    debugOutput( DEBUG_LEVEL_VERBOSE, "%p: Init thread...\n", this); 
     122    bool result = true; 
     123    for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin(); 
     124        it != m_IsoHandlers.end(); 
     125        ++it ) 
     126    { 
     127        result &= (*it)->Init(); 
     128    } 
     129    return result; 
     130} 
     131 
     132bool 
     133IsoHandlerManager::Execute() { 
     134    int err; 
     135    unsigned int i; 
     136 
     137    unsigned int m_poll_timeout = 100; 
     138 
     139    // update the shadow variables if requested 
     140   // if(m_request_fdmap_update) { 
     141        updateShadowVars(); 
     142    //    ZERO_ATOMIC((SInt32*)&m_request_fdmap_update); 
     143    //} 
     144 
     145    // bypass if no handlers are registered 
     146    if (m_poll_nfds_shadow == 0) { 
     147        usleep(m_poll_timeout * 1000); 
     148        return true; 
     149    } 
     150 
     151    // Use a shadow map of the fd's such that the poll call is not in a critical section 
     152    uint64_t poll_enter = m_service.getCurrentTimeAsUsecs(); 
     153    err = poll (m_poll_fds_shadow, m_poll_nfds_shadow, m_poll_timeout); 
     154    uint64_t poll_exit = m_service.getCurrentTimeAsUsecs(); 
     155 
     156    if (err == -1) { 
     157        if (errno == EINTR) { 
     158            return true; 
     159        } 
     160        debugFatal("poll error: %s\n", strerror (errno)); 
     161        return false; 
     162    } 
     163 
     164    int nb_rcv = 0; 
     165    int nb_xmit = 0; 
     166    uint64_t iter_enter = m_service.getCurrentTimeAsUsecs(); 
     167    for (i = 0; i < m_poll_nfds_shadow; i++) { 
     168        if(m_poll_fds_shadow[i].revents) { 
     169            debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "received events: %08X for (%p)\n", 
     170                m_poll_fds_shadow[i].revents, m_IsoHandler_map_shadow[i]); 
     171        } 
     172        if (m_poll_fds_shadow[i].revents & POLLERR) { 
     173            debugWarning("error on fd for %d\n",i); 
     174        } 
     175 
     176        if (m_poll_fds_shadow[i].revents & POLLHUP) { 
     177            debugWarning("hangup on fd for %d\n",i); 
     178        } 
     179 
     180        if(m_poll_fds_shadow[i].revents & (POLLIN)) { 
     181            m_IsoHandler_map_shadow[i]->iterate(); 
     182            if (m_IsoHandler_map_shadow[i]->getType() == IsoHandler::eHT_Receive) { 
     183                nb_rcv++; 
     184            } else { 
     185                nb_xmit++; 
     186            } 
     187        } 
     188    } 
     189    uint64_t iter_exit = m_service.getCurrentTimeAsUsecs(); 
     190     
     191    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " poll took %6lldus, iterate took %6lldus, iterated (R: %2d, X: %2d) handlers\n", 
     192                poll_exit-poll_enter, iter_exit-iter_enter, 
     193                nb_rcv, nb_xmit); 
     194 
     195    return true; 
    70196} 
    71197 
     
    78204        return false; 
    79205    } 
     206 
     207#ifndef THREAD_PER_ISOHANDLER 
     208    // create a thread to iterate our handlers 
     209    debugOutput( DEBUG_LEVEL_VERBOSE, "Start thread for %p...\n", this); 
     210    m_Thread = new Util::PosixThread(this, m_realtime, m_priority,  
     211                                     PTHREAD_CANCEL_DEFERRED); 
     212    if(!m_Thread) { 
     213        debugFatal("No thread\n"); 
     214        return false; 
     215    } 
     216    if (m_Thread->Start() != 0) { 
     217        debugFatal("Could not start update thread\n"); 
     218        return false; 
     219    } 
     220#endif 
    80221 
    81222    m_State=E_Running; 
     
    197338        // NOTE: try and use MINIMUM_INTERRUPTS_PER_PERIOD hardware interrupts 
    198339        //       per period for better latency. 
    199         unsigned int max_packet_size=(MINIMUM_INTERRUPTS_PER_PERIOD * getpagesize()) / packets_per_period; 
     340        unsigned int max_packet_size=(MINIMUM_INTERRUPTS_PER_PERIOD * getpagesize()/2) / packets_per_period; 
    200341 
    201342        if (max_packet_size < stream->getMaxPacketSize()) { 
     
    207348        // Ensure we don't request a packet size bigger than the 
    208349        // kernel-enforced maximum which is currently 1 page. 
    209         if (max_packet_size > (unsigned int)getpagesize()) { 
    210             debugWarning("max packet size (%u) > page size (%u)\n", max_packet_size ,(unsigned int)getpagesize()); 
    211             max_packet_size = getpagesize()
     350        if (max_packet_size > (unsigned int)getpagesize()/2) { 
     351            debugError("max packet size (%u) > page size (%u)\n", max_packet_size, (unsigned int)getpagesize()/2); 
     352            return false
    212353        } 
    213354 
     
    224365 
    225366        // configure it such that we have an irq for every PACKETS_PER_INTERRUPT packets 
    226         unsigned int irq_interval=PACKETS_PER_INTERRUPT; 
    227  
    228         // unless the period size doesn't allow this 
    229         if ((packets_per_period/MINIMUM_INTERRUPTS_PER_PERIOD) < irq_interval) { 
    230             irq_interval=1; 
    231         } 
    232  
    233         // FIXME: test 
    234         irq_interval=1; 
    235 #warning Using fixed irq_interval 
    236  
    237         unsigned int max_packet_size=getpagesize() / irq_interval; 
     367        unsigned int irq_interval = packets_per_period/MINIMUM_INTERRUPTS_PER_PERIOD; 
     368        if(irq_interval <= 0) irq_interval = 1; 
     369 
     370        unsigned int max_packet_size = getpagesize()/2; 
    238371 
    239372        if (max_packet_size < stream->getMaxPacketSize()) { 
    240             max_packet_size=stream->getMaxPacketSize(); 
    241         } 
    242  
    243         // Ensure we don't request a packet size bigger than the 
    244         // kernel-enforced maximum which is currently 1 page. 
    245         if (max_packet_size > (unsigned int)getpagesize()) 
    246                     max_packet_size = getpagesize(); 
     373            debugError("Stream max packet size too large: %d\n", stream->getMaxPacketSize()); 
     374            return false; 
     375        } 
    247376 
    248377#endif 
     
    255384        h = new IsoHandler(*this, IsoHandler::eHT_Receive, 
    256385                           buffers, max_packet_size, irq_interval); 
     386 
     387        debugOutput( DEBUG_LEVEL_VERBOSE, " creating IsoRecvHandler\n"); 
     388 
    257389        if(!h) { 
    258390            debugFatal("Could not create IsoRecvHandler\n"); 
    259391            return false; 
    260392        } 
    261         debugOutput( DEBUG_LEVEL_VERBOSE, " registering IsoRecvHandler\n"); 
    262393 
    263394    } else if (stream->getType()==StreamProcessor::ePT_Transmit) { 
     
    265396        unsigned int packets_per_period = stream->getPacketsPerPeriod(); 
    266397 
     398#if 0 
    267399        // hardware interrupts occur when one DMA block is full, and the size of one DMA 
    268400        // block = PAGE_SIZE. Setting the max_packet_size makes sure that the HW irq 
     
    295427        //int buffers=30; 
    296428        //max_packet_size = getpagesize(); // HACK 
    297  
     429#else 
     430        // configure it such that we have an irq for every PACKETS_PER_INTERRUPT packets 
     431        unsigned int irq_interval = packets_per_period/MINIMUM_INTERRUPTS_PER_PERIOD; 
     432        if(irq_interval <= 0) irq_interval = 1; 
     433 
     434        unsigned int max_packet_size=MINIMUM_INTERRUPTS_PER_PERIOD * getpagesize() / packets_per_period; 
     435        if (max_packet_size < stream->getMaxPacketSize()) { 
     436            max_packet_size = stream->getMaxPacketSize(); 
     437        } 
     438 
     439        if (max_packet_size < stream->getMaxPacketSize()) { 
     440            debugError("Max packet size too large! (%d)\n", stream->getMaxPacketSize()); 
     441        } 
     442 
     443#endif 
    298444        // the SP specifies how many packets to buffer 
    299445        int buffers = stream->getNbPacketsIsoXmitBuffer(); 
     446 
     447        debugOutput( DEBUG_LEVEL_VERBOSE, " creating IsoXmitHandler\n"); 
    300448 
    301449        // create the actual handler 
    302450        h = new IsoHandler(*this, IsoHandler::eHT_Transmit, 
    303451                           buffers, max_packet_size, irq_interval); 
    304  
    305         debugOutput( DEBUG_LEVEL_VERBOSE, " registering IsoXmitHandler\n"); 
    306452 
    307453        if(!h) { 
  • trunk/libffado/src/libieee1394/IsoHandlerManager.h

    r753 r759  
    2727#include "debugmodule/debugmodule.h" 
    2828 
     29#include "libutil/Thread.h" 
     30 
    2931#include <sys/poll.h> 
    3032#include <errno.h> 
    3133 
    3234#include <vector> 
     35 
     36#define THREAD_PER_ISOHANDLER 
    3337 
    3438#define FFADO_MAX_ISO_HANDLERS_PER_PORT 16 
     
    6165 
    6266*/ 
    63 class IsoHandlerManager 
     67class IsoHandlerManager : public Util::RunnableInterface 
    6468{ 
    6569    friend class Streaming::StreamProcessorManager; 
     70    public: 
     71        bool Init(); 
     72        bool Execute(); 
     73        void updateShadowVars(); 
     74    private: 
     75        // shadow variables 
     76        struct pollfd m_poll_fds_shadow[FFADO_MAX_ISO_HANDLERS_PER_PORT]; 
     77        IsoHandler *m_IsoHandler_map_shadow[FFADO_MAX_ISO_HANDLERS_PER_PORT]; 
     78        unsigned int m_poll_nfds_shadow; 
    6679 
    6780    public: 
     
    143156        bool m_realtime; 
    144157        int m_priority; 
     158        Util::Thread *  m_Thread; 
    145159 
    146160        // debug stuff 
  • trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp

    r753 r759  
    145145void 
    146146StreamProcessor::setSyncDelay(int d) { 
    147     debugOutput(DEBUG_LEVEL_VERBOSE, "Setting SP %p SyncDelay to %d ticks\n", this, d); 
     147    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting SP %p SyncDelay to %d ticks\n", this, d); 
    148148    m_sync_delay = d; 
    149149} 
     
    234234    if (m_last_cycle != (int)cycle && m_last_cycle != -1) { 
    235235        dropped_cycles = diffCycles(cycle, m_last_cycle) - 1; 
    236         if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles); 
     236        if (dropped_cycles < 0) { 
     237            debugWarning("(%p) dropped < 1 (%d), cycle: %d, last_cycle: %d, dropped: %d\n",  
     238                         this, dropped_cycles, cycle, m_last_cycle, dropped); 
     239        } 
    237240        if (dropped_cycles > 0) { 
    238241            debugWarning("(%p) dropped %d packets on cycle %u, 'dropped'=%u, cycle=%d, m_last_cycle=%d\n", 
    239242                this, dropped_cycles, cycle, dropped, cycle, m_last_cycle); 
    240243            m_dropped += dropped_cycles; 
     244            m_in_xrun = true; 
    241245        } 
    242246    } 
     
    410414    if (m_last_cycle != cycle && m_last_cycle != -1) { 
    411415        dropped_cycles = diffCycles(cycle, m_last_cycle) - 1; 
    412         if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles); 
     416        if (dropped_cycles < 0) {  
     417            debugWarning("(%p) dropped < 1 (%d), cycle: %d, last_cycle: %d, dropped: %d\n",  
     418                         this, dropped_cycles, cycle, m_last_cycle, dropped); 
     419        } 
    413420        if (dropped_cycles > 0) { 
    414421            debugWarning("(%p) dropped %d packets on cycle %u (last_cycle=%u, dropped=%d)\n", this, dropped_cycles, cycle, m_last_cycle, dropped); 
     
    508515        // check the packet header 
    509516        enum eChildReturnValue result = generatePacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length); 
    510         if (result == eCRV_Packet) { 
     517        if (result == eCRV_Packet || result == eCRV_Defer) { 
    511518            debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT: CY=%04u TS=%011llu\n", 
    512519                    cycle, m_last_timestamp); 
     
    544551            // skip queueing packets if we detect that there are not enough frames 
    545552            // available 
    546             if(result2 == eCRV_Defer
     553            if(result2 == eCRV_Defer || result == eCRV_Defer
    547554                return RAW1394_ISO_DEFER; 
    548555            else 
  • trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

    r754 r759  
    733733            usleep(125); // MAGIC: one cycle sleep... 
    734734 
    735             #if 0 
    736735            // in order to avoid this in the future, we increase the sync delay of the sync source SP 
    737736            int d = m_SyncSource->getSyncDelay() + TICKS_PER_CYCLE; 
    738737            m_SyncSource->setSyncDelay(d); 
    739             #endif 
    740738 
    741739            #ifdef DEBUG 
     
    744742        } 
    745743    } // we are either ready or an xrun occurred 
     744 
     745    // in order to avoid a runaway value of the sync delay, we gradually decrease 
     746    // it. It will be increased by a 'too early' event (cfr some lines higher) 
     747    // hence we'll be at a good point on average. 
     748    int d = m_SyncSource->getSyncDelay() - 1; 
     749    if (d >= 0) m_SyncSource->setSyncDelay(d); 
     750 
    746751 
    747752    #ifdef DEBUG