Changeset 938

Show
Ignore:
Timestamp:
03/12/08 04:48:37 (13 years ago)
Author:
ppalmers
Message:

implement static iso handler scheduling

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/libffado/config.h.in

    r936 r938  
    6767#define ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT         16 
    6868#define ISOHANDLERMANAGER_MAX_STREAMS_PER_ISOTHREAD         16 
    69 // the transmit thread should run at elevated priority since it 
    70 // should push packets into the kernel ASAP 
    71 #define ISOHANDLERMANAGER_TRANSMIT_PRIO_INCREASE            1 
    72 // the receive thread should have lower priority than the transmit one, since 
    73 // it should be interrupted once it has flagged the semaphore that indicates 
    74 // one period of frames. 
    75 #define ISOHANDLERMANAGER_RECEIVE_PRIO_INCREASE             -6 
     69 
     70// Ideally the audio processing will be driven by this thread 
     71#define ISOHANDLERMANAGER_ISO_PRIO_INCREASE            1 
    7672 
    7773// allows to add some processing margin. This shifts the time 
  • trunk/libffado/src/libieee1394/IsoHandlerManager.cpp

    r937 r938  
    4343// --- ISO Thread --- // 
    4444 
    45 IsoTask::IsoTask(IsoHandlerManager& manager, enum IsoTask::eTaskType t
     45IsoTask::IsoTask(IsoHandlerManager& manager
    4646    : m_manager( manager ) 
    47     , m_type( t
     47    , m_SyncIsoHandler ( NULL
    4848{ 
    4949} 
     
    6666IsoTask::requestShadowMapUpdate() 
    6767{ 
    68     debugOutput(DEBUG_LEVEL_VERBOSE, "enter\n"); 
     68    debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) enter\n", this); 
    6969    INC_ATOMIC(&request_update); 
    7070    return true; 
     
    7373// updates the internal stream map 
    7474// note that this should be executed with the guarantee that 
    75 // nobody will modify  
     75// nobody will modify the parent data structures 
    7676void 
    7777IsoTask::updateShadowMapHelper() 
    7878{ 
    79     debugOutput( DEBUG_LEVEL_VERBOSE, "updating shadow vars...\n"); 
     79    debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) updating shadow vars...\n", this); 
    8080    unsigned int i, cnt, max; 
    8181    max = m_manager.m_IsoHandlers.size(); 
     82    m_SyncIsoHandler = NULL; 
    8283    for (i = 0, cnt = 0; i < max; i++) { 
    8384        IsoHandler *h = m_manager.m_IsoHandlers.at(i); 
    8485        assert(h); 
    85  
    86         // skip handlers of the wrong type 
    87         if (h->getType() == IsoHandler::eHT_Receive  && m_type == eTT_Transmit) continue; 
    88         if (h->getType() == IsoHandler::eHT_Transmit && m_type == eTT_Receive) continue; 
    8986 
    9087        if (h->isEnabled()) { 
     
    9491            m_poll_fds_shadow[cnt].events = POLLIN; 
    9592            cnt++; 
    96             debugOutput( DEBUG_LEVEL_VERBOSE, "%s handler %p added\n", h->getTypeString(), h); 
     93            // FIXME: need a more generic approach here 
     94            if(   m_SyncIsoHandler == NULL 
     95               && h->getType() == IsoHandler::eHT_Transmit) { 
     96                m_SyncIsoHandler = h; 
     97            } 
     98 
     99            debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) %s handler %p added\n", 
     100                                              this, h->getTypeString(), h); 
    97101        } else { 
    98             debugOutput( DEBUG_LEVEL_VERBOSE, "%s handler %p skipped (disabled)\n", h->getTypeString(), h); 
     102            debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) %s handler %p skipped (disabled)\n", 
     103                                              this, h->getTypeString(), h); 
    99104        } 
    100105        if(cnt > ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT) { 
     
    103108        } 
    104109    } 
     110 
     111    // FIXME: need a more generic approach here 
     112    // if there are no active transmit handlers, 
     113    // use the first receive handler 
     114    if(   m_SyncIsoHandler == NULL 
     115       && m_poll_nfds_shadow) { 
     116        m_SyncIsoHandler = m_IsoHandler_map_shadow[0]; 
     117    } 
    105118    m_poll_nfds_shadow = cnt; 
    106     debugOutput( DEBUG_LEVEL_VERBOSE, " updated shadow vars...\n"); 
     119    debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) updated shadow vars...\n", this); 
    107120} 
    108121 
     
    111124{ 
    112125    debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 
    113                        "(%p, %s) Execute\n", 
    114                        this, m_type==eTT_Transmit?"Transmit":"Receive"); 
     126                       "(%p) Execute\n", this); 
    115127    int err; 
    116128    unsigned int i; 
     
    126138    if (m_poll_nfds_shadow == 0) { 
    127139        debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 
    128                            "(%p, %8s) bypass iterate since no handlers to poll\n", 
    129                            this, m_type==eTT_Transmit?"Transmit":"Receive"); 
     140                           "(%p) bypass iterate since no handlers to poll\n", 
     141                           this); 
    130142        usleep(m_poll_timeout * 1000); 
    131143        return true; 
     
    139151    bool no_one_to_poll = true; 
    140152    while(no_one_to_poll) { 
    141         if (m_type==eTT_Transmit) { 
    142             // if we are a transmit thread, we should only poll on 
    143             // those handlers that have a client that is ready to send 
    144             // something. poll'ing the others will only cause busy-wait 
    145             // looping. 
    146             for (i = 0; i < m_poll_nfds_shadow; i++) { 
    147                 short events = 0; 
    148                 if (m_IsoHandler_map_shadow[i]->tryWaitForClient()) { 
     153        for (i = 0; i < m_poll_nfds_shadow; i++) { 
     154            short events = 0; 
     155            IsoHandler *h = m_IsoHandler_map_shadow[i]; 
     156            if(h->getType() == IsoHandler::eHT_Transmit) { 
     157                // we should only poll on a transmit handler 
     158                // that has a client that is ready to send 
     159                // something. Otherwise it will end up in 
     160                // busy wait looping since the packet function 
     161                // will defer processing (also avoids the 
     162                // AGAIN problem) 
     163                if (h->tryWaitForClient()) { 
    149164                    events = POLLIN | POLLPRI; 
    150165                    no_one_to_poll = false; 
    151166                } 
    152                 m_poll_fds_shadow[i].events = events; 
    153             } 
    154         } else { 
    155             // for receive handlers, we can do the same. we might not have to though 
    156             for (i = 0; i < m_poll_nfds_shadow; i++) { 
    157                 short events = 0; 
    158                 if (m_IsoHandler_map_shadow[i]->tryWaitForClient()) { 
    159                     events = POLLIN | POLLERR | POLLHUP; 
     167            } else { 
     168                // a receive handler should only be polled if 
     169                // it's client doesn't already have enough data 
     170                // and if it can still accept data. 
     171                if (h->tryWaitForClient()) { // FIXME 
     172                    events = POLLIN | POLLPRI; 
    160173                    no_one_to_poll = false; 
    161174                } 
    162                 m_poll_fds_shadow[i].events = events; 
    163             } 
    164         } 
     175            } 
     176            m_poll_fds_shadow[i].events = events; 
     177        } 
     178 
    165179        if(no_one_to_poll) { 
    166180            debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 
    167                                "(%p, %8s) No one to poll, waiting on the first handler to become ready\n", 
    168                                this, m_type==eTT_Transmit?"Transmit":"Receive"); 
    169  
    170             m_IsoHandler_map_shadow[0]->waitForClient(); 
     181                               "(%p) No one to poll, waiting on the sync handler to become ready\n", 
     182                               this); 
     183 
     184            if(!m_SyncIsoHandler->waitForClient()) { 
     185                debugError("Failed to wait for client\n"); 
     186                return false; 
     187            } 
     188 
     189            #ifdef DEBUG 
     190            // if this happens we end up in a deadlock! 
     191            if(!m_SyncIsoHandler->tryWaitForClient()) { 
     192                debugFatal("inconsistency in wait functions!\n"); 
     193                return false; 
     194            } 
     195            #endif 
    171196 
    172197            debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 
    173                                "(%p, %8s) handler ready\n", 
    174                                this, m_type==eTT_Transmit?"Transmit":"Receive"); 
    175         } 
    176     } 
    177  
    178     // Use a shadow map of the fd's such that the poll call is not in a critical section 
     198                               "(%p) sync handler ready\n", 
     199                               this); 
     200        } 
     201    } 
     202 
     203    // Use a shadow map of the fd's such that we don't have to update 
     204    // the fd map everytime we run poll(). It doesn't change that much 
     205    // anyway 
    179206    err = poll (m_poll_fds_shadow, m_poll_nfds_shadow, m_poll_timeout); 
    180207 
     
    201228        // if we get here, it means two things: 
    202229        // 1) the kernel can accept or provide packets (poll returned POLLIN) 
    203         // 2) the client can provide or accept packets (we enabled polling) 
     230        // 2) the client can provide or accept packets (since we enabled polling) 
    204231        if(m_poll_fds_shadow[i].revents & (POLLIN)) { 
    205232            m_IsoHandler_map_shadow[i]->iterate(); 
     
    238265   , m_service( service ) 
    239266   , m_realtime(false), m_priority(0) 
    240    , m_ReceiveThread ( NULL ) 
    241    , m_TransmitThread ( NULL ) 
    242    , m_ReceiveTask ( NULL ) 
    243    , m_TransmitTask ( NULL ) 
     267   , m_IsoThread ( NULL ) 
     268   , m_IsoTask ( NULL ) 
    244269{} 
    245270 
     
    248273   , m_service( service ) 
    249274   , m_realtime(run_rt), m_priority(rt_prio) 
    250    , m_ReceiveThread ( NULL ) 
    251    , m_TransmitThread ( NULL ) 
    252    , m_ReceiveTask ( NULL ) 
    253    , m_TransmitTask ( NULL ) 
     275   , m_IsoThread ( NULL ) 
     276   , m_IsoTask ( NULL ) 
    254277{} 
    255278 
     
    261284        debugError("Still some handlers in use\n"); 
    262285    } 
    263     if (m_ReceiveThread) { 
    264         m_ReceiveThread->Stop(); 
    265         delete m_ReceiveThread; 
    266     } 
    267     if (m_ReceiveTask) { 
    268         delete m_ReceiveTask; 
    269     } 
    270  
    271     if (m_TransmitThread) { 
    272         m_TransmitThread->Stop(); 
    273         delete m_TransmitThread; 
    274     } 
    275     if (m_TransmitTask) { 
    276         delete m_TransmitTask; 
     286    if (m_IsoThread) { 
     287        m_IsoThread->Stop(); 
     288        delete m_IsoThread; 
     289    } 
     290    if (m_IsoTask) { 
     291        delete m_IsoTask; 
    277292    } 
    278293} 
     
    284299    m_realtime = rt; 
    285300    m_priority = priority; 
    286     bool result = true; 
    287  
    288     if (m_ReceiveThread) { 
     301 
     302    if (m_IsoThread) { 
    289303        if (m_realtime) { 
    290             m_ReceiveThread->AcquireRealTime(m_priority); 
     304            m_IsoThread->AcquireRealTime(m_priority); 
    291305        } else { 
    292             m_ReceiveThread->DropRealTime(); 
    293         } 
    294     } 
    295     if (m_TransmitThread) { 
    296         if (m_realtime) { 
    297             m_TransmitThread->AcquireRealTime(m_priority); 
    298         } else { 
    299             m_TransmitThread->DropRealTime(); 
    300         } 
    301     } 
    302  
    303     return result; 
     306            m_IsoThread->DropRealTime(); 
     307        } 
     308    } 
     309 
     310    return true; 
    304311} 
    305312 
     
    313320    } 
    314321 
    315     // create a thread to iterate our receive handlers 
    316     debugOutput( DEBUG_LEVEL_VERBOSE, "Create receive thread for %p...\n", this); 
    317     m_ReceiveTask = new IsoTask( *this, IsoTask::eTT_Receive ); 
    318     if(!m_ReceiveTask) { 
     322    // create a thread to iterate our ISO handlers 
     323    debugOutput( DEBUG_LEVEL_VERBOSE, "Create iso thread for %p...\n", this); 
     324    m_IsoTask = new IsoTask( *this ); 
     325    if(!m_IsoTask) { 
    319326        debugFatal("No task\n"); 
    320327        return false; 
    321328    } 
    322     m_ReceiveThread = new Util::PosixThread(m_ReceiveTask, m_realtime, 
    323                                             m_priority + ISOHANDLERMANAGER_RECEIVE_PRIO_INCREASE, 
    324                                             PTHREAD_CANCEL_DEFERRED); 
    325  
    326     if(!m_ReceiveThread) { 
     329    m_IsoThread = new Util::PosixThread(m_IsoTask, m_realtime, 
     330                                        m_priority + ISOHANDLERMANAGER_ISO_PRIO_INCREASE, 
     331                                        PTHREAD_CANCEL_DEFERRED); 
     332 
     333    if(!m_IsoThread) { 
    327334        debugFatal("No thread\n"); 
    328335        return false; 
    329336    } 
    330     if (m_ReceiveThread->Start() != 0) { 
    331         debugFatal("Could not start receive thread\n"); 
    332         return false; 
    333     } 
    334  
    335     // create a thread to iterate our transmit handlers 
    336     debugOutput( DEBUG_LEVEL_VERBOSE, "Create transmit thread for %p...\n", this); 
    337     m_TransmitTask = new IsoTask( *this, IsoTask::eTT_Transmit ); 
    338     if(!m_TransmitTask) { 
    339         debugFatal("No task\n"); 
    340         return false; 
    341     } 
    342     m_TransmitThread = new Util::PosixThread(m_TransmitTask, m_realtime, 
    343                                              m_priority + ISOHANDLERMANAGER_TRANSMIT_PRIO_INCREASE, 
    344                                              PTHREAD_CANCEL_DEFERRED); 
    345     if(!m_TransmitThread) { 
    346         debugFatal("No thread\n"); 
    347         return false; 
    348     } 
    349     if (m_TransmitThread->Start() != 0) { 
    350         debugFatal("Could not start transmit thread\n"); 
     337    if (m_IsoThread->Start() != 0) { 
     338        debugFatal("Could not start ISO thread\n"); 
    351339        return false; 
    352340    } 
    353341 
    354342    m_State=E_Running; 
    355     return true; 
    356 } 
    357  
    358  
    359 bool 
    360 IsoHandlerManager::updateShadowMapFor(IsoHandler *h) 
    361 { 
    362     // update the shadow map 
    363     if(h->getType() == IsoHandler::eHT_Receive) { 
    364         if(!m_ReceiveTask->requestShadowMapUpdate()) { 
    365             debugError("failed to update shadow map\n"); 
    366             return false; 
    367         } 
    368     } else { 
    369         if(!m_TransmitTask->requestShadowMapUpdate()) { 
    370             debugError("failed to update shadow map\n"); 
    371             return false; 
    372         } 
    373     } 
    374343    return true; 
    375344} 
     
    386355        if ((*it) == h) { 
    387356            result = h->disable(); 
    388             result &= updateShadowMapFor(h); 
     357            result &= m_IsoTask->requestShadowMapUpdate(); 
    389358            debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " disabled\n"); 
    390359            return result; 
     
    407376        if ((*it) == h) { 
    408377            result = h->enable(); 
    409             result &= updateShadowMapFor(h); 
     378            result &= m_IsoTask->requestShadowMapUpdate(); 
    410379            debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " enabled\n"); 
    411380            return result; 
     
    423392    handler->setVerboseLevel(getDebugLevel()); 
    424393    m_IsoHandlers.push_back(handler); 
    425     return updateShadowMapFor(handler); 
     394    return m_IsoTask->requestShadowMapUpdate(); 
    426395} 
    427396 
     
    437406        if ( *it == handler ) { 
    438407            m_IsoHandlers.erase(it); 
    439             return updateShadowMapFor(handler); 
     408            return m_IsoTask->requestShadowMapUpdate(); 
    440409        } 
    441410    } 
     
    675644                return false; 
    676645            } 
    677             if(!updateShadowMapFor(*it)) { 
     646            if(!m_IsoTask->requestShadowMapUpdate()) { 
    678647                debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it); 
    679648                return false; 
     
    736705                return false; 
    737706            } 
    738             if(!updateShadowMapFor(*it)) { 
     707            if(!m_IsoTask->requestShadowMapUpdate()) { 
    739708                debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it); 
    740709                return false; 
     
    767736            retval=false; 
    768737        } 
    769         if(!updateShadowMapFor(*it)) { 
     738        if(!m_IsoTask->requestShadowMapUpdate()) { 
    770739            debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it); 
    771740            retval=false; 
     
    801770        (*it)->setVerboseLevel(i); 
    802771    } 
    803     if(m_ReceiveThread) m_ReceiveThread->setVerboseLevel(i); 
    804     if(m_ReceiveTask) m_ReceiveTask->setVerboseLevel(i); 
    805     if(m_TransmitThread) m_TransmitThread->setVerboseLevel(i); 
    806     if(m_TransmitTask) m_TransmitTask->setVerboseLevel(i); 
     772    if(m_IsoThread) m_IsoThread->setVerboseLevel(i); 
     773    if(m_IsoTask)   m_IsoTask->setVerboseLevel(i); 
    807774} 
    808775 
  • trunk/libffado/src/libieee1394/IsoHandlerManager.h

    r904 r938  
    5757{ 
    5858    public: 
    59         enum eTaskType { 
    60             eTT_Receive, 
    61             eTT_Transmit, 
    62         }; 
    63         IsoTask(IsoHandlerManager& manager, enum IsoTask::eTaskType t); 
     59        IsoTask(IsoHandlerManager& manager); 
    6460        virtual ~IsoTask() {}; 
    6561 
     
    7672    protected: 
    7773        IsoHandlerManager& m_manager; 
    78         enum eTaskType m_type; 
    7974 
    8075        // the event request structure 
     
    8479        // this is the map used by the actual thread 
    8580        // it is a shadow of the m_StreamProcessors vector 
    86         struct pollfd m_poll_fds_shadow[ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT]; 
    87         IsoHandler *m_IsoHandler_map_shadow[ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT]; 
    88         unsigned int m_poll_nfds_shadow; 
     81        struct pollfd   m_poll_fds_shadow[ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT]; 
     82        IsoHandler *    m_IsoHandler_map_shadow[ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT]; 
     83        unsigned int    m_poll_nfds_shadow; 
     84        IsoHandler *    m_SyncIsoHandler; 
    8985 
    9086        // updates the streams map 
     
    187183        Streaming::StreamProcessorVector m_StreamProcessors; 
    188184 
    189         // thread params for the handler threads 
    190         bool m_realtime; 
    191         int m_priority; 
    192         // handler threads 
    193         Util::Thread *  m_ReceiveThread; 
    194         Util::Thread *  m_TransmitThread; 
    195  
    196         // actual tasks 
    197         IsoTask *  m_ReceiveTask; 
    198         IsoTask *  m_TransmitTask; 
    199  
    200         bool updateShadowMapFor(IsoHandler *h); 
     185        // handler thread/task 
     186        bool            m_realtime; 
     187        int             m_priority; 
     188        Util::Thread *  m_IsoThread; 
     189        IsoTask *       m_IsoTask; 
    201190 
    202191        // debug stuff 
  • trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp

    r937 r938  
    17691769    if(m_in_xrun) return true; 
    17701770    if(m_state == ePS_Running && m_next_state == ePS_Running) { 
    1771         // check whether we already fullfil the criterion 
    1772         unsigned int bufferspace = m_data_buffer->getBufferSpace(); 
    1773         if(bufferspace >= nframes) { 
    1774             return true; 
    1775         } else return false; 
     1771         
     1772        if(getType() == ePT_Transmit) { 
     1773            // can we put a certain amount of frames into the buffer? 
     1774            unsigned int bufferspace = m_data_buffer->getBufferSpace(); 
     1775            if(bufferspace >= nframes) { 
     1776                return true; 
     1777            } else return false; 
     1778        } else { 
     1779            // do we still have to put frames in the buffer? 
     1780            unsigned int bufferfill = m_data_buffer->getBufferFill(); 
     1781            unsigned int periodsize = m_StreamProcessorManager.getPeriodSize(); 
     1782            if (bufferfill > periodsize) return false; 
     1783            else return true; 
     1784        } 
     1785 
     1786 
    17761787    } else { 
    17771788        if(getType() == ePT_Transmit) {