Show
Ignore:
Timestamp:
03/05/08 14:24:49 (16 years ago)
Author:
ppalmers
Message:

simplify threading. Each port now gets two threads: one for transmit and one for receive.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/libffado/src/libieee1394/CycleTimerHelper.cpp

    r894 r904  
    233233            debugOutput(DEBUG_LEVEL_VERBOSE, "have to retry, diff = %f\n",diff_ticks); 
    234234        } 
    235          
     235 
    236236    } while(diff_ticks < -((double)TICKS_PER_HALFCYCLE) && --ntries && !m_first_run); 
    237237 
     
    471471     
    472472        if (diffTicks(cycle_timer_ticks, m_cycle_timer_ticks_prev) < 0) { 
    473             debugOutput( DEBUG_LEVEL_VERBOSE, 
     473            debugOutput( DEBUG_LEVEL_VERY_VERBOSE, 
    474474                        "non-monotonic CTR (try %02d): %llu -> %llu\n", 
    475475                        maxtries, m_cycle_timer_ticks_prev, cycle_timer_ticks); 
    476             debugOutput( DEBUG_LEVEL_VERBOSE, 
     476            debugOutput( DEBUG_LEVEL_VERY_VERBOSE, 
    477477                        "                            : %08X -> %08X\n", 
    478478                        m_cycle_timer_prev, *cycle_timer); 
    479             debugOutput( DEBUG_LEVEL_VERBOSE, 
     479            debugOutput( DEBUG_LEVEL_VERY_VERBOSE, 
    480480                        " current: %011llu (%03us %04ucy %04uticks)\n", 
    481481                        cycle_timer_ticks, 
     
    483483                        (unsigned int)TICKS_TO_CYCLES( cycle_timer_ticks ), 
    484484                        (unsigned int)TICKS_TO_OFFSET( cycle_timer_ticks ) ); 
    485             debugOutput( DEBUG_LEVEL_VERBOSE, 
     485            debugOutput( DEBUG_LEVEL_VERY_VERBOSE, 
    486486                        " prev   : %011llu (%03us %04ucy %04uticks)\n", 
    487487                        m_cycle_timer_ticks_prev, 
  • trunk/libffado/src/libieee1394/ieee1394service.cpp

    r864 r904  
    7676    , m_realtime ( rt ) 
    7777    , m_base_priority ( prio ) 
    78     , m_pIsoManager( new IsoHandlerManager( *this, rt, prio + IEEE1394SERVICE_ISOMANAGER_PRIO_INCREASE ) ) 
     78    , m_pIsoManager( new IsoHandlerManager( *this, rt, prio ) ) 
    7979    , m_pCTRHelper ( new CycleTimerHelper( *this, IEEE1394SERVICE_CYCLETIMER_DLL_UPDATE_INTERVAL_USEC, 
    8080                                           rt, prio + IEEE1394SERVICE_CYCLETIMER_HELPER_PRIO_INCREASE ) ) 
     
    277277    if (m_pIsoManager) { 
    278278        debugOutput(DEBUG_LEVEL_VERBOSE, "Switching IsoManager to (rt=%d, prio=%d)\n", 
    279                                          rt, priority + IEEE1394SERVICE_ISOMANAGER_PRIO_INCREASE); 
    280         result &= m_pIsoManager->setThreadParameters(rt, priority + IEEE1394SERVICE_ISOMANAGER_PRIO_INCREASE); 
     279                                         rt, priority); 
     280        result &= m_pIsoManager->setThreadParameters(rt, priority); 
    281281    } 
    282282    if (m_pCTRHelper) { 
  • trunk/libffado/src/libieee1394/IsoHandler.cpp

    r870 r904  
    8383   , m_max_packet_size( 1024 ) 
    8484   , m_irq_interval( -1 ) 
    85    , m_last_wakeup( -1 ) 
    8685   , m_Client( 0 ) 
    87    , m_poll_timeout( 100 ) 
    88    , m_realtime ( false ) 
    89    , m_priority ( 0 ) 
    90    , m_Thread ( NULL ) 
    9186   , m_speed( RAW1394_ISO_SPEED_400 ) 
    9287   , m_prebuffers( 0 ) 
     
    10398   , m_max_packet_size( max_packet_size ) 
    10499   , m_irq_interval( irq ) 
    105    , m_last_wakeup( -1 ) 
    106100   , m_Client( 0 ) 
    107    , m_poll_timeout( 100 ) 
    108    , m_realtime ( false ) 
    109    , m_priority ( 0 ) 
    110    , m_Thread ( NULL ) 
    111101   , m_speed( RAW1394_ISO_SPEED_400 ) 
    112102   , m_prebuffers( 0 ) 
     
    124114   , m_max_packet_size( max_packet_size ) 
    125115   , m_irq_interval( irq ) 
    126    , m_last_wakeup( -1 ) 
    127116   , m_Client( 0 ) 
    128    , m_poll_timeout( 100 ) 
    129    , m_realtime ( false ) 
    130    , m_priority ( 0 ) 
    131    , m_Thread ( NULL ) 
    132117   , m_speed( speed ) 
    133118   , m_prebuffers( 0 ) 
     
    137122 
    138123IsoHandler::~IsoHandler() { 
    139     if (m_Thread) { 
    140         m_Thread->Stop(); 
    141         delete m_Thread; 
    142     } 
    143124// Don't call until libraw1394's raw1394_new_handle() function has been 
    144125// fixed to correctly initialise the iso_packet_infos field.  Bug is 
     
    155136 
    156137bool 
    157 IsoHandler::Init() 
    158 { 
    159     debugOutput( DEBUG_LEVEL_VERBOSE, "%p: Init thread...\n", this); 
    160     m_poll_fd.fd = getFileDescriptor(); 
    161     m_poll_fd.revents = 0; 
    162     if (isEnabled()) { 
    163         m_poll_fd.events = POLLIN; 
    164     } else { 
    165         m_poll_fd.events = 0; 
    166     } 
    167     return true; 
    168 } 
    169  
    170 bool 
    171 IsoHandler::isDead() 
    172 { 
    173     if(m_last_wakeup < 0) return false; // startup artifacts 
    174     if(m_State != E_Running) return false; // not running can't be dead 
    175     int64_t now = m_manager.get1394Service().getCurrentTimeAsUsecs(); 
    176     int64_t last_call = m_last_wakeup + ISOHANDLER_DEATH_DETECT_TIMEOUT_USECS; 
    177     if(now > last_call) { 
    178         debugOutput(DEBUG_LEVEL_VERBOSE, 
    179                     "(%p, %s) Handler timed out: %lld usecs since last wakeup\n", 
    180                     this, getTypeString(), now-m_last_wakeup); 
    181         return true; 
    182     } else { 
    183         return false; 
    184     } 
    185 } 
    186  
    187 bool 
    188138IsoHandler::waitForClient() 
    189139{ 
     
    222172    return false; 
    223173} 
    224  
     174/* 
    225175bool 
    226176IsoHandler::Execute() 
     
    287237        return false; 
    288238    } 
    289 } 
     239}*/ 
    290240 
    291241bool 
     
    313263 
    314264bool 
    315 IsoHandler::setThreadParameters(bool rt, int priority) { 
    316     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) switch to: (rt=%d, prio=%d)...\n", this, rt, priority); 
    317     if (priority > THREAD_MAX_RTPRIO) priority = THREAD_MAX_RTPRIO; // cap the priority 
    318     m_realtime = rt; 
    319     m_priority = priority; 
    320  
    321     if (m_Thread) { 
    322         if (m_realtime) { 
    323             m_Thread->AcquireRealTime(m_priority); 
    324         } else { 
    325             m_Thread->DropRealTime(); 
    326         } 
    327     } 
    328     return true; 
    329 } 
    330  
    331 bool 
    332265IsoHandler::init() 
    333266{ 
     
    362295    } 
    363296 
    364 #if ISOHANDLER_PER_HANDLER_THREAD 
    365     // create a thread to iterate ourselves 
    366     debugOutput( DEBUG_LEVEL_VERBOSE, "Start thread for %p...\n", this); 
    367     m_Thread = new Util::PosixThread(this, m_realtime, m_priority,  
    368                                      PTHREAD_CANCEL_DEFERRED); 
    369     if(!m_Thread) { 
    370         debugFatal("No thread\n"); 
    371         return false; 
    372     } 
    373     if (m_Thread->Start() != 0) { 
    374         debugFatal("Could not start update thread\n"); 
    375         return false; 
    376     } 
    377 #endif 
    378  
    379297    // update the internal state 
    380298    m_State=E_Initialized; 
     
    393311        return false; 
    394312    } 
    395  
    396     m_poll_fd.events = 0; 
    397313 
    398314    // this is put here to try and avoid the 
     
    446362{ 
    447363    setDebugLevel(l); 
    448     if(m_Thread) m_Thread->setVerboseLevel(l); 
    449364} 
    450365 
     
    494409                       length, channel, cycle); 
    495410    #ifdef DEBUG 
    496     m_last_wakeup = m_manager.get1394Service().getCurrentTimeAsUsecs(); 
    497411    if (length > m_max_packet_size) { 
    498412        debugWarning("(%p, %s) packet too large: len=%u max=%u\n", 
     
    516430                       "sending packet: length=%d, cycle=%d\n", 
    517431                       *length, cycle); 
    518     #ifdef DEBUG 
    519     m_last_wakeup = m_manager.get1394Service().getCurrentTimeAsUsecs(); 
    520     #endif 
    521  
    522432    if(m_Client) { 
    523433        enum raw1394_iso_disposition retval; 
     
    550460    //     raw1394_iso_shutdown(m_handle); 
    551461    m_State = E_Prepared; 
    552     m_last_wakeup = -1; 
    553462 
    554463    debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing iso handler (%p, client=%p)\n", this, m_Client); 
     
    622531    } 
    623532 
    624     m_poll_fd.events = POLLIN; 
    625533    m_State = E_Running; 
    626534    return true; 
  • trunk/libffado/src/libieee1394/IsoHandler.h

    r868 r904  
    4545*/ 
    4646 
    47 class IsoHandler : public Util::RunnableInterface 
     47class IsoHandler 
    4848{ 
    4949public: 
     
    8282public: 
    8383    // runnable interface 
    84     bool Init(); 
    85     bool Execute(); 
    8684    bool iterate(); 
    8785 
    8886    int getFileDescriptor() { return raw1394_get_fd(m_handle);}; 
    89     bool setThreadParameters(bool rt, int priority); 
    9087 
    9188    bool init(); 
     
    9794    bool enable(int cycle); 
    9895    bool disable(); 
    99  
    100     bool isDead(); 
    10196 
    10297    void flush(); 
     
    137132    int             m_irq_interval; 
    138133 
    139     int64_t         m_last_wakeup; 
    140  
    141134    Streaming::StreamProcessor *m_Client; 
    142135 
     
    144137 
    145138    static int busreset_handler(raw1394handle_t handle, unsigned int generation); 
    146  
    147     struct pollfd   m_poll_fd; 
    148     int             m_poll_timeout; 
    149     // threading 
    150     bool            m_realtime; 
    151     int             m_priority; 
    152     Util::Thread *  m_Thread; 
    153139 
    154140    enum raw1394_iso_speed m_speed; 
  • trunk/libffado/src/libieee1394/IsoHandlerManager.cpp

    r870 r904  
    3232#include "libutil/PosixThread.h" 
    3333 
     34#include "libutil/SystemTimeSource.h" 
     35 
    3436#include <assert.h> 
    3537 
    3638IMPL_DEBUG_MODULE( IsoHandlerManager, IsoHandlerManager, DEBUG_LEVEL_NORMAL ); 
     39IMPL_DEBUG_MODULE( IsoTask, IsoTask, DEBUG_LEVEL_NORMAL ); 
    3740 
    3841using namespace Streaming; 
    3942 
     43// --- ISO Thread --- // 
     44 
     45IsoTask::IsoTask(IsoHandlerManager& manager, enum IsoTask::eTaskType t) 
     46    : m_manager( manager ) 
     47    , m_type( t ) 
     48{ 
     49} 
     50 
     51bool 
     52IsoTask::Init() 
     53{ 
     54    request_update = 0; 
     55 
     56    int i; 
     57    for (i=0; i < ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT; i++) { 
     58        m_IsoHandler_map_shadow[i] = NULL; 
     59        m_poll_fds_shadow[i].events = 0; 
     60    } 
     61    m_poll_nfds_shadow = 0; 
     62    return true; 
     63} 
     64 
     65bool 
     66IsoTask::requestShadowMapUpdate() 
     67{ 
     68    debugOutput(DEBUG_LEVEL_VERBOSE, "enter\n"); 
     69    INC_ATOMIC(&request_update); 
     70    return true; 
     71} 
     72 
     73// updates the internal stream map 
     74// note that this should be executed with the guarantee that 
     75// nobody will modify  
     76void 
     77IsoTask::updateShadowMapHelper() 
     78{ 
     79    debugOutput( DEBUG_LEVEL_VERBOSE, "updating shadow vars...\n"); 
     80    unsigned int i, cnt, max; 
     81    max = m_manager.m_IsoHandlers.size(); 
     82    for (i = 0, cnt = 0; i < max; i++) { 
     83        IsoHandler *h = m_manager.m_IsoHandlers.at(i); 
     84        assert(h); 
     85 
     86        // skip handlers of the wrong type 
     87        if (h->getType() == IsoHandler::eHT_Receive  && m_type == eTT_Transmit) continue; 
     88        if (h->getType() == IsoHandler::eHT_Transmit && m_type == eTT_Receive) continue; 
     89 
     90        if (h->isEnabled()) { 
     91            m_IsoHandler_map_shadow[cnt] = h; 
     92            m_poll_fds_shadow[cnt].fd = h->getFileDescriptor(); 
     93            m_poll_fds_shadow[cnt].revents = 0; 
     94            m_poll_fds_shadow[cnt].events = POLLIN; 
     95            cnt++; 
     96            debugOutput( DEBUG_LEVEL_VERBOSE, "%s handler %p added\n", h->getTypeString(), h); 
     97        } else { 
     98            debugOutput( DEBUG_LEVEL_VERBOSE, "%s handler %p skipped (disabled)\n", h->getTypeString(), h); 
     99        } 
     100        if(cnt > ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT) { 
     101            debugWarning("Too much ISO Handlers in thread...\n"); 
     102            break; 
     103        } 
     104    } 
     105    m_poll_nfds_shadow = cnt; 
     106    debugOutput( DEBUG_LEVEL_VERBOSE, " updated shadow vars...\n"); 
     107} 
     108 
     109bool 
     110IsoTask::Execute() 
     111{ 
     112    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Execute\n"); 
     113    int err; 
     114    unsigned int i; 
     115    unsigned int m_poll_timeout = 10; 
     116 
     117    // if some other thread requested a shadow map update, do it 
     118    if(request_update) { 
     119        updateShadowMapHelper(); 
     120        DEC_ATOMIC(&request_update); // ack the update 
     121    } 
     122 
     123    // bypass if no handlers are registered 
     124    if (m_poll_nfds_shadow == 0) { 
     125        debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "bypass iterate since no handlers to poll\n"); 
     126        usleep(m_poll_timeout * 1000); 
     127        return true; 
     128    } 
     129 
     130    // setup the poll here 
     131    if (m_type==eTT_Transmit) { 
     132        // if we are a transmit thread, we should only poll on 
     133        // those handlers that have a client that is ready to send 
     134        // something. poll'ing the others will only cause busy-wait 
     135        // looping. 
     136        for (i = 0; i < m_poll_nfds_shadow; i++) { 
     137            short events = 0; 
     138            if (m_IsoHandler_map_shadow[i]->tryWaitForClient()) { 
     139                events = POLLIN | POLLPRI; 
     140            } 
     141            m_poll_fds_shadow[i].events = events; 
     142        } 
     143    } else { 
     144        // for receive handlers, we can do the same. we might not have to though 
     145        // FIXME: check whether this is necessary 
     146        for (i = 0; i < m_poll_nfds_shadow; i++) { 
     147            short events = 0; 
     148//             if (m_IsoHandler_map_shadow[i]->tryWaitForClient()) { 
     149//                 events = POLLIN | POLLERR | POLLHUP; 
     150//             } 
     151            events = POLLIN | POLLPRI; 
     152            m_poll_fds_shadow[i].events = events; 
     153        } 
     154    } 
     155 
     156    // Use a shadow map of the fd's such that the poll call is not in a critical section 
     157    DEBUG_EXTREME( uint64_t poll_enter = m_service.getCurrentTimeAsUsecs() ); 
     158    err = poll (m_poll_fds_shadow, m_poll_nfds_shadow, m_poll_timeout); 
     159    DEBUG_EXTREME( uint64_t poll_exit = m_service.getCurrentTimeAsUsecs() ); 
     160 
     161    if (err < 0) { 
     162        if (errno == EINTR) { 
     163            return true; 
     164        } 
     165        debugFatal("poll error: %s\n", strerror (errno)); 
     166        return false; 
     167    } 
     168 
     169    DEBUG_EXTREME( uint64_t iter_enter = m_service.getCurrentTimeAsUsecs() ); 
     170    for (i = 0; i < m_poll_nfds_shadow; i++) { 
     171        #ifdef DEBUG 
     172        if(m_poll_fds_shadow[i].revents) { 
     173            debugOutput(DEBUG_LEVEL_VERY_VERBOSE, 
     174                        "received events: %08X for (%d/%d, %p, %s)\n", 
     175                        m_poll_fds_shadow[i].revents, 
     176                        i, m_poll_nfds_shadow, 
     177                        m_IsoHandler_map_shadow[i], 
     178                        m_IsoHandler_map_shadow[i]->getTypeString()); 
     179        } 
     180        #endif 
     181 
     182        // if we get here, it means two things: 
     183        // 1) the kernel can accept or provide packets (poll returned POLLIN) 
     184        // 2) the client can provide or accept packets (we enabled polling) 
     185        if(m_poll_fds_shadow[i].revents & (POLLIN)) { 
     186            m_IsoHandler_map_shadow[i]->iterate(); 
     187        } else { 
     188            // there might be some error condition 
     189            if (m_poll_fds_shadow[i].revents & POLLERR) { 
     190                debugWarning("error on fd for %d\n",i); 
     191            } 
     192            if (m_poll_fds_shadow[i].revents & POLLHUP) { 
     193                debugWarning("hangup on fd for %d\n",i); 
     194            } 
     195        } 
     196 
     197//         #ifdef DEBUG 
     198//         // check if the handler is still alive 
     199//         if(m_IsoHandler_map_shadow[i]->isDead()) { 
     200//             debugError("Iso handler (%p, %s) is dead!\n", 
     201//                        m_IsoHandler_map_shadow[i], 
     202//                        m_IsoHandler_map_shadow[i]->getTypeString()); 
     203//             return false; // shutdown the system 
     204//         } 
     205//         #endif 
     206 
     207    } 
     208    DEBUG_EXTREME( uint64_t iter_exit = m_service.getCurrentTimeAsUsecs() ); 
     209 
     210    debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, " poll took %6lldus, iterate took %6lldus, iterated (R: %2d, X: %2d) handlers\n", 
     211                       poll_exit-poll_enter, iter_exit-iter_enter, 
     212                       nb_rcv, nb_xmit); 
     213 
     214    return true; 
     215 
     216} 
     217 
     218void IsoTask::setVerboseLevel(int i) { 
     219    setDebugLevel(i); 
     220} 
     221 
     222// -- the ISO handler manager -- // 
    40223IsoHandlerManager::IsoHandlerManager(Ieee1394Service& service) 
    41224   : m_State(E_Created) 
    42225   , m_service( service ) 
    43226   , m_realtime(false), m_priority(0) 
    44    , m_Thread ( NULL ) 
     227   , m_ReceiveThread ( NULL ) 
     228   , m_TransmitThread ( NULL ) 
     229   , m_ReceiveTask ( NULL ) 
     230   , m_TransmitTask ( NULL ) 
    45231{} 
    46232 
     
    49235   , m_service( service ) 
    50236   , m_realtime(run_rt), m_priority(rt_prio) 
    51    , m_Thread ( NULL ) 
     237   , m_ReceiveThread ( NULL ) 
     238   , m_TransmitThread ( NULL ) 
     239   , m_ReceiveTask ( NULL ) 
     240   , m_TransmitTask ( NULL ) 
    52241{} 
    53242 
     
    59248        debugError("Still some handlers in use\n"); 
    60249    } 
    61     if (m_Thread) { 
    62         m_Thread->Stop(); 
    63         delete m_Thread; 
     250    if (m_ReceiveThread) { 
     251        m_ReceiveThread->Stop(); 
     252        delete m_ReceiveThread; 
     253    } 
     254    if (m_ReceiveTask) { 
     255        delete m_ReceiveTask; 
     256    } 
     257 
     258    if (m_TransmitThread) { 
     259        m_TransmitThread->Stop(); 
     260        delete m_TransmitThread; 
     261    } 
     262    if (m_TransmitTask) { 
     263        delete m_TransmitTask; 
    64264    } 
    65265} 
     
    72272    m_priority = priority; 
    73273    bool result = true; 
    74     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin(); 
    75         it != m_IsoHandlers.end(); 
    76         ++it ) 
    77     { 
    78         result &= (*it)->setThreadParameters(m_realtime, m_priority); 
    79     } 
    80  
    81     if (m_Thread) { 
     274 
     275    if (m_ReceiveThread) { 
    82276        if (m_realtime) { 
    83             m_Thread->AcquireRealTime(m_priority); 
     277            m_ReceiveThread->AcquireRealTime(m_priority); 
    84278        } else { 
    85             m_Thread->DropRealTime(); 
     279            m_ReceiveThread->DropRealTime(); 
     280        } 
     281    } 
     282    if (m_TransmitThread) { 
     283        if (m_realtime) { 
     284            m_TransmitThread->AcquireRealTime(m_priority); 
     285        } else { 
     286            m_TransmitThread->DropRealTime(); 
    86287        } 
    87288    } 
    88289 
    89290    return result; 
    90 } 
    91  
    92 /** 
    93  * Update the shadow variables. Should only be called from 
    94  * the iso handler iteration thread 
    95  */ 
    96 void 
    97 IsoHandlerManager::updateShadowVars() 
    98 { 
    99     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "updating shadow vars...\n"); 
    100     unsigned int i, cnt, max; 
    101     max = m_IsoHandlers.size(); 
    102     for (i = 0, cnt = 0; i < max; i++) { 
    103         IsoHandler *h = m_IsoHandlers.at(i); 
    104         assert(h); 
    105         if (h->isEnabled()) { 
    106             // receive handlers are always poll'ed 
    107             // transmit handlers only when the client is ready 
    108             if (h->tryWaitForClient()) { 
    109                 m_IsoHandler_map_shadow[cnt] = h; 
    110                 m_poll_fds_shadow[cnt].fd = h->getFileDescriptor(); 
    111                 m_poll_fds_shadow[cnt].revents = 0; 
    112                 m_poll_fds_shadow[cnt].events = POLLIN; 
    113                 cnt++; 
    114             } else { 
    115                 debugOutput(DEBUG_LEVEL_VERBOSE, "skipped handler %p\n", h); 
    116             } 
    117         } 
    118         if(cnt > ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT) { 
    119             debugWarning("Too much ISO Handlers in manager...\n"); 
    120             break; 
    121         } 
    122     } 
    123     m_poll_nfds_shadow = cnt; 
    124     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " updated shadow vars...\n"); 
    125 } 
    126  
    127 bool 
    128 IsoHandlerManager::Init() { 
    129     debugOutput( DEBUG_LEVEL_VERBOSE, "%p: Init thread...\n", this); 
    130     bool result = true; 
    131     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin(); 
    132         it != m_IsoHandlers.end(); 
    133         ++it ) 
    134     { 
    135         result &= (*it)->Init(); 
    136     } 
    137     return result; 
    138 } 
    139  
    140 bool 
    141 IsoHandlerManager::Execute() { 
    142     int err; 
    143     unsigned int i; 
    144  
    145     unsigned int m_poll_timeout = 100; 
    146  
    147     updateShadowVars(); 
    148     // bypass if no handlers are registered 
    149     if (m_poll_nfds_shadow == 0) { 
    150         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "bypass iterate since no handlers to poll\n"); 
    151         usleep(m_poll_timeout * 1000); 
    152         return true; 
    153     } 
    154  
    155     // Use a shadow map of the fd's such that the poll call is not in a critical section 
    156     DEBUG_EXTREME( uint64_t poll_enter = m_service.getCurrentTimeAsUsecs() ); 
    157     err = poll (m_poll_fds_shadow, m_poll_nfds_shadow, m_poll_timeout); 
    158     DEBUG_EXTREME( uint64_t poll_exit = m_service.getCurrentTimeAsUsecs() ); 
    159  
    160     if (err == -1) { 
    161         if (errno == EINTR) { 
    162             return true; 
    163         } 
    164         debugFatal("poll error: %s\n", strerror (errno)); 
    165         return false; 
    166     } 
    167  
    168     int nb_rcv = 0; 
    169     int nb_xmit = 0; 
    170     DEBUG_EXTREME( uint64_t iter_enter = m_service.getCurrentTimeAsUsecs() ); 
    171     for (i = 0; i < m_poll_nfds_shadow; i++) { 
    172         if(m_poll_fds_shadow[i].revents) { 
    173             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, 
    174                         "received events: %08X for (%d/%d, %p, %s)\n", 
    175                         m_poll_fds_shadow[i].revents,  
    176                         i, m_poll_nfds_shadow, 
    177                         m_IsoHandler_map_shadow[i], 
    178                         m_IsoHandler_map_shadow[i]->getTypeString()); 
    179         } 
    180         if (m_poll_fds_shadow[i].revents & POLLERR) { 
    181             debugWarning("error on fd for %d\n",i); 
    182         } 
    183  
    184         if (m_poll_fds_shadow[i].revents & POLLHUP) { 
    185             debugWarning("hangup on fd for %d\n",i); 
    186         } 
    187  
    188         if(m_poll_fds_shadow[i].revents & (POLLIN)) { 
    189             if (m_IsoHandler_map_shadow[i]->getType() == IsoHandler::eHT_Receive) { 
    190                 m_IsoHandler_map_shadow[i]->iterate(); 
    191                 nb_rcv++; 
    192             } else { 
    193                 // only iterate the xmit handler if it makes sense 
    194                 if(m_IsoHandler_map_shadow[i]->tryWaitForClient()) { 
    195                     m_IsoHandler_map_shadow[i]->iterate(); 
    196                     nb_xmit++; 
    197                 } 
    198             } 
    199         } 
    200  
    201         #ifdef DEBUG 
    202         // check if the handler is still alive 
    203         if(m_IsoHandler_map_shadow[i]->isDead()) { 
    204             debugError("Iso handler (%p, %s) is dead!\n", 
    205                        m_IsoHandler_map_shadow[i], 
    206                        m_IsoHandler_map_shadow[i]->getTypeString()); 
    207             return false; // shutdown the system 
    208         } 
    209         #endif 
    210          
    211     } 
    212     DEBUG_EXTREME( uint64_t iter_exit = m_service.getCurrentTimeAsUsecs() ); 
    213  
    214     debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, " poll took %6lldus, iterate took %6lldus, iterated (R: %2d, X: %2d) handlers\n", 
    215                        poll_exit-poll_enter, iter_exit-iter_enter, 
    216                        nb_rcv, nb_xmit); 
    217  
    218     return true; 
    219291} 
    220292 
     
    228300    } 
    229301 
    230 #if ISOHANDLER_PER_HANDLER_THREAD 
    231     // the IsoHandlers will create their own thread. 
    232 #else 
    233     // create a thread to iterate our handlers 
    234     debugOutput( DEBUG_LEVEL_VERBOSE, "Start thread for %p...\n", this); 
    235     m_Thread = new Util::PosixThread(this, m_realtime, m_priority,  
    236                                      PTHREAD_CANCEL_DEFERRED); 
    237     if(!m_Thread) { 
     302    // create a thread to iterate our receive handlers 
     303    debugOutput( DEBUG_LEVEL_VERBOSE, "Create receive thread for %p...\n", this); 
     304    m_ReceiveTask = new IsoTask( *this, IsoTask::eTT_Receive ); 
     305    if(!m_ReceiveTask) { 
     306        debugFatal("No task\n"); 
     307        return false; 
     308    } 
     309    m_ReceiveThread = new Util::PosixThread(m_ReceiveTask, m_realtime, 
     310                                            m_priority + ISOHANDLERMANAGER_RECEIVE_PRIO_INCREASE, 
     311                                            PTHREAD_CANCEL_DEFERRED); 
     312 
     313    if(!m_ReceiveThread) { 
    238314        debugFatal("No thread\n"); 
    239315        return false; 
    240316    } 
    241     if (m_Thread->Start() != 0) { 
    242         debugFatal("Could not start update thread\n"); 
    243         return false; 
    244     } 
    245 #endif 
     317    if (m_ReceiveThread->Start() != 0) { 
     318        debugFatal("Could not start receive thread\n"); 
     319        return false; 
     320    } 
     321 
     322    // create a thread to iterate our transmit handlers 
     323    debugOutput( DEBUG_LEVEL_VERBOSE, "Create transmit thread for %p...\n", this); 
     324    m_TransmitTask = new IsoTask( *this, IsoTask::eTT_Transmit ); 
     325    if(!m_TransmitTask) { 
     326        debugFatal("No task\n"); 
     327        return false; 
     328    } 
     329    m_TransmitThread = new Util::PosixThread(m_TransmitTask, m_realtime, 
     330                                             m_priority + ISOHANDLERMANAGER_TRANSMIT_PRIO_INCREASE, 
     331                                             PTHREAD_CANCEL_DEFERRED); 
     332    if(!m_TransmitThread) { 
     333        debugFatal("No thread\n"); 
     334        return false; 
     335    } 
     336    if (m_TransmitThread->Start() != 0) { 
     337        debugFatal("Could not start transmit thread\n"); 
     338        return false; 
     339    } 
    246340 
    247341    m_State=E_Running; 
     342    return true; 
     343} 
     344 
     345 
     346bool 
     347IsoHandlerManager::updateShadowMapFor(IsoHandler *h) 
     348{ 
     349    // update the shadow map 
     350    if(h->getType() == IsoHandler::eHT_Receive) { 
     351        if(!m_ReceiveTask->requestShadowMapUpdate()) { 
     352            debugError("failed to update shadow map\n"); 
     353            return false; 
     354        } 
     355    } else { 
     356        if(!m_TransmitTask->requestShadowMapUpdate()) { 
     357            debugError("failed to update shadow map\n"); 
     358            return false; 
     359        } 
     360    } 
    248361    return true; 
    249362} 
     
    253366    bool result; 
    254367    int i=0; 
    255     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Disable on IsoHandler %p\n", h); 
     368    debugOutput(DEBUG_LEVEL_VERBOSE, "Disable on IsoHandler %p\n", h); 
    256369    for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin(); 
    257370        it != m_IsoHandlers.end(); 
     
    260373        if ((*it) == h) { 
    261374            result = h->disable(); 
     375            result &= updateShadowMapFor(h); 
    262376            debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " disabled\n"); 
    263377            return result; 
     
    273387    bool result; 
    274388    int i=0; 
    275     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Enable on IsoHandler %p\n", h); 
     389    debugOutput(DEBUG_LEVEL_VERBOSE, "Enable on IsoHandler %p\n", h); 
    276390    for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin(); 
    277391        it != m_IsoHandlers.end(); 
     
    280394        if ((*it) == h) { 
    281395            result = h->enable(); 
     396            result &= updateShadowMapFor(h); 
    282397            debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " enabled\n"); 
    283398            return result; 
     
    295410    handler->setVerboseLevel(getDebugLevel()); 
    296411    m_IsoHandlers.push_back(handler); 
    297     updateShadowVars(); 
    298     return true; 
     412    return updateShadowMapFor(handler); 
    299413} 
    300414 
     
    310424        if ( *it == handler ) { 
    311425            m_IsoHandlers.erase(it); 
    312             updateShadowVars(); 
    313             return true; 
     426            return updateShadowMapFor(handler); 
    314427        } 
    315428    } 
     
    452565    } 
    453566 
    454     if(!h->setThreadParameters(m_realtime, thread_prio)) { 
    455         debugFatal("Could not set handler thread parameters\n"); 
    456         return false; 
    457     } 
    458  
    459567    // register the stream with the handler 
    460568    if(!h->registerStream(stream)) { 
     
    563671    { 
    564672        if((*it)->isStreamRegistered(stream)) { 
    565             bool result; 
    566673            debugOutput( DEBUG_LEVEL_VERBOSE, " stopping handler %p for stream %p\n", *it, stream); 
    567             result = (*it)->disable(); 
    568             if(!result) { 
     674            if(!(*it)->disable()) { 
    569675                debugOutput( DEBUG_LEVEL_VERBOSE, " could not disable handler (%p)\n",*it); 
     676                return false; 
     677            } 
     678            if(!updateShadowMapFor(*it)) { 
     679                debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it); 
    570680                return false; 
    571681            } 
     
    622732    { 
    623733        if((*it)->isStreamRegistered(stream)) { 
    624             bool result; 
    625734            debugOutput( DEBUG_LEVEL_VERBOSE, " starting handler %p for stream %p\n", *it, stream); 
    626             result = (*it)->enable(cycle); 
    627             if(!result) { 
     735            if(!(*it)->enable(cycle)) { 
    628736                debugOutput( DEBUG_LEVEL_VERBOSE, " could not enable handler (%p)\n",*it); 
    629737                return false; 
    630738            } 
     739            if(!updateShadowMapFor(*it)) { 
     740                debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it); 
     741                return false; 
     742            } 
    631743            return true; 
    632744        } 
     
    654766        if(!(*it)->disable()){ 
    655767            debugOutput( DEBUG_LEVEL_VERBOSE, " could not stop handler (%p)\n",*it); 
     768            retval=false; 
     769        } 
     770        if(!updateShadowMapFor(*it)) { 
     771            debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it); 
    656772            retval=false; 
    657773        } 
     
    686802        (*it)->setVerboseLevel(i); 
    687803    } 
     804    if(m_ReceiveThread) m_ReceiveThread->setVerboseLevel(i); 
     805    if(m_ReceiveTask) m_ReceiveTask->setVerboseLevel(i); 
     806    if(m_TransmitThread) m_TransmitThread->setVerboseLevel(i); 
     807    if(m_TransmitTask) m_TransmitTask->setVerboseLevel(i); 
    688808} 
    689809 
  • trunk/libffado/src/libieee1394/IsoHandlerManager.h

    r864 r904  
    3737 
    3838class Ieee1394Service; 
    39  
    4039class IsoHandler; 
     40 
    4141namespace Streaming { 
    4242    class StreamProcessor; 
     
    4949typedef std::vector<IsoHandler *>::iterator IsoHandlerVectorIterator; 
    5050 
     51class IsoHandlerManager; 
     52 
     53// threads that will handle the packet framing 
     54// one thread per direction, as a compromise for one per 
     55// channel and one for all 
     56class IsoTask : public Util::RunnableInterface 
     57{ 
     58    public: 
     59        enum eTaskType { 
     60            eTT_Receive, 
     61            eTT_Transmit, 
     62        }; 
     63        IsoTask(IsoHandlerManager& manager, enum IsoTask::eTaskType t); 
     64        virtual ~IsoTask() {}; 
     65 
     66    public: 
     67        bool Init(); 
     68        bool Execute(); 
     69 
     70        /** 
     71         * requests the thread to sync it's stream map with the manager 
     72         */ 
     73        bool requestShadowMapUpdate(); 
     74 
     75        void setVerboseLevel(int i); 
     76    protected: 
     77        IsoHandlerManager& m_manager; 
     78        enum eTaskType m_type; 
     79 
     80        // the event request structure 
     81        SInt32 request_update; 
     82 
     83        // static allocation due to RT constraints 
     84        // this is the map used by the actual thread 
     85        // it is a shadow of the m_StreamProcessors vector 
     86        struct pollfd m_poll_fds_shadow[ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT]; 
     87        IsoHandler *m_IsoHandler_map_shadow[ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT]; 
     88        unsigned int m_poll_nfds_shadow; 
     89 
     90        // updates the streams map 
     91        void updateShadowMapHelper(); 
     92 
     93        // debug stuff 
     94        DECLARE_DEBUG_MODULE; 
     95}; 
     96 
    5197/*! 
    5298\brief The ISO Handler management class 
     
    60106 
    61107*/ 
    62 class IsoHandlerManager : public Util::RunnableInterface 
     108 
     109class IsoHandlerManager 
    63110{ 
    64111    friend class Streaming::StreamProcessorManager; 
    65     public: 
    66         bool Init(); 
    67         bool Execute(); 
    68         void updateShadowVars(); 
    69     private: 
    70         // shadow variables 
    71         struct pollfd m_poll_fds_shadow[ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT]; 
    72         IsoHandler *m_IsoHandler_map_shadow[ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT]; 
    73         unsigned int m_poll_nfds_shadow; 
     112    friend class IsoTask; 
    74113 
    75114    public: 
     
    151190        bool m_realtime; 
    152191        int m_priority; 
    153         Util::Thread *  m_Thread; 
     192        // handler threads 
     193        Util::Thread *  m_ReceiveThread; 
     194        Util::Thread *  m_TransmitThread; 
     195 
     196        // actual tasks 
     197        IsoTask *  m_ReceiveTask; 
     198        IsoTask *  m_TransmitTask; 
     199 
     200        bool updateShadowMapFor(IsoHandler *h); 
    154201 
    155202        // debug stuff 
  • trunk/libffado/src/libstreaming/generic/Port.cpp

    r864 r904  
    125125void 
    126126Port::enable()  { 
    127     debugOutput(DEBUG_LEVEL_VERBOSE, "Enabling port %s...\n",m_Name.c_str()); 
     127    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Enabling port %s...\n",m_Name.c_str()); 
    128128    m_disabled=false; 
    129129} 
     
    132132void 
    133133Port::disable() { 
    134     debugOutput(DEBUG_LEVEL_VERBOSE, "Disabling port %s...\n",m_Name.c_str()); 
     134    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Disabling port %s...\n",m_Name.c_str()); 
    135135    m_disabled=false; 
    136136} 
  • trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp

    r885 r904  
    132132StreamProcessor::getNbPacketsIsoXmitBuffer() 
    133133{ 
    134 #if ISOHANDLER_PER_HANDLER_THREAD 
    135134    // if we use one thread per packet, we can put every frame directly into the ISO buffer 
    136135    // the waitForClient in IsoHandler will take care of the fact that the frames are 
     
    139138    debugOutput(DEBUG_LEVEL_VERBOSE, "Nominal prebuffer: %u\n", packets_to_prebuffer); 
    140139    return packets_to_prebuffer; 
    141 #else 
    142     // the target is to have all of the transmit buffer (at period transfer) as ISO packets 
    143     // when one period is received, there will be approx (NbBuffers - 1) * period_size frames 
    144     // in the transmit buffer (the others are still to be put into the xmit frame buffer) 
    145     unsigned int packets_to_prebuffer = (getPacketsPerPeriod() * (m_StreamProcessorManager.getNbBuffers()-1)); 
    146     debugOutput(DEBUG_LEVEL_VERBOSE, "Nominal prebuffer: %u\n", packets_to_prebuffer); 
    147      
    148     // however we have to take into account the fact that there is some sync delay 
    149     // we assume that the SPM has indicated 
    150     // HACK: this counts on the fact that the latency for this stream will be the same as the 
    151     //       latency for the receive sync source 
    152     unsigned int est_sync_delay = getPacketsPerPeriod() / MINIMUM_INTERRUPTS_PER_PERIOD; 
    153     est_sync_delay += STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS / TICKS_PER_CYCLE; 
    154     packets_to_prebuffer -= est_sync_delay; 
    155     debugOutput(DEBUG_LEVEL_VERBOSE, " correct for sync delay (%d): %u\n", 
    156                                      est_sync_delay, 
    157                                      packets_to_prebuffer); 
    158      
    159     // only queue a part of the theoretical max in order not to have too much 'not ready' cycles 
    160     packets_to_prebuffer = (packets_to_prebuffer * MAX_ISO_XMIT_BUFFER_FILL_PCT * 1000) / 100000; 
    161     debugOutput(DEBUG_LEVEL_VERBOSE, " reduce to %d%%: %u\n", 
    162                                      MAX_ISO_XMIT_BUFFER_FILL_PCT, packets_to_prebuffer); 
    163      
    164     return packets_to_prebuffer; 
    165 #endif 
    166140} 
    167141 
  • trunk/libffado/src/libutil/SystemTimeSource.h

    r864 r904  
    4444    inline ffado_microsecs_t unWrapTime(ffado_microsecs_t t) {return t;}; 
    4545    inline ffado_microsecs_t wrapTime(ffado_microsecs_t t) {return t;}; 
    46      
     46 
    4747    static void SleepUsecRelative(ffado_microsecs_t usecs); 
    4848    static void SleepUsecAbsolute(ffado_microsecs_t wake_time);