Show
Ignore:
Timestamp:
04/21/08 01:27:47 (16 years ago)
Author:
ppalmers
Message:

Improve thread synchronisation. Switch back to separate threads for transmit and
receive since it is not possible to statically schedule things properly. One
of the threads (i.e. the client thread) is out of our control, hence it's
execution can't be controlled. Using separate threads and correct priorities
will shift this problem to the OS. Note that the priority of the packet
receive thread should be lower than the client thread (such that the client
thread is woken ASAP), and the priority of the transmit thread should be
higher than the client thread (such that packets are queued ASAP).
Extra benefit: multi-cores are used.

Some other startup improvements.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/libffado/src/libieee1394/IsoHandler.cpp

    r977 r1005  
    2626#include "IsoHandler.h" 
    2727#include "ieee1394service.h"  
     28#include "IsoHandlerManager.h" 
    2829 
    2930#include "libstreaming/generic/StreamProcessor.h" 
     
    9192   , m_speed( RAW1394_ISO_SPEED_400 ) 
    9293   , m_prebuffers( 0 ) 
     94   , m_dont_exit_iterate_loop( true ) 
    9395   , m_State( E_Created ) 
    9496#ifdef DEBUG 
     
    150152 
    151153bool 
    152 IsoHandler::waitForClient() 
    153 
    154     debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, "waiting...\n"); 
    155     if(m_Client) { 
    156         bool result; 
    157         if (m_type == eHT_Receive) { 
    158             result = m_Client->waitForProducePacket(); 
    159         } else { 
    160             result = m_Client->waitForConsumePacket(); 
    161         } 
    162         debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, " returns %d\n", result); 
    163         return result; 
    164     } else { 
    165         debugOutputExtreme(DEBUG_LEVEL_VERBOSE, " no client\n"); 
    166     } 
    167     return false; 
    168 
    169  
    170 bool 
    171 IsoHandler::tryWaitForClient() 
    172 
    173     debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, "waiting...\n"); 
     154IsoHandler::canIterateClient() 
     155
     156    debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, "checking...\n"); 
    174157    if(m_Client) { 
    175158        bool result; 
     
    186169    return false; 
    187170} 
    188 /* 
    189 bool 
    190 IsoHandler::Execute() 
    191 { 
    192     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE, "%p: Execute thread...\n", this); 
    193  
    194     // bypass if not running 
    195     if (m_State != E_Running) { 
    196         debugOutput( DEBUG_LEVEL_VERBOSE, "%p: not polling since not running...\n", this); 
    197         usleep(m_poll_timeout * 1000); 
    198         debugOutput( DEBUG_LEVEL_VERBOSE, "%p: done sleeping...\n", this); 
    199         return true; 
    200     } 
    201  
    202     // wait for the availability of frames in the client 
    203     // (blocking for transmit handlers) 
    204     debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) Waiting for Client activity...\n", this, getTypeString()); 
    205     if (waitForClient()) { 
    206 #if ISOHANDLER_USE_POLL 
    207         bool result = true; 
    208         while(result && m_Client && tryWaitForClient()) { 
    209             int err = poll(&m_poll_fd, 1, m_poll_timeout); 
    210             if (err == -1) { 
    211                 if (errno == EINTR) { 
    212                     return true; 
    213                 } 
    214                 debugFatal("%p, poll error: %s\n", this, strerror (errno)); 
    215                 return false; 
    216             } 
    217  
    218             if(m_poll_fd.revents & (POLLIN)) { 
    219                 result=iterate(); 
    220                 if(!result) { 
    221                     debugOutput( DEBUG_LEVEL_VERBOSE, 
    222                                 "IsoHandler (%p): Failed to iterate handler\n", 
    223                                 this); 
    224                 } 
    225             } else { 
    226                 if (m_poll_fd.revents & POLLERR) { 
    227                     debugWarning("error on fd for %p\n", this); 
    228                 } 
    229                 if (m_poll_fd.revents & POLLHUP) { 
    230                     debugWarning("hangup on fd for %p\n",this); 
    231                 } 
    232                 break; 
    233             } 
    234         } 
    235         return result; 
    236 #else 
    237         // iterate() is blocking if no 1394 data is available 
    238         // so poll'ing is not really necessary 
    239         bool result = true; 
    240         while(result && m_Client && tryWaitForClient()) { 
    241             result = iterate(); 
    242 //             if (getType() == eHT_Receive) { 
    243 //                 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) Iterate returned: %d\n", 
    244 //                             this, (m_type==eHT_Receive?"Receive":"Transmit"), result); 
    245 //             } 
    246         } 
    247         return result; 
    248 #endif 
    249     } else { 
    250         debugError("waitForClient() failed.\n"); 
    251         return false; 
    252     } 
    253 }*/ 
    254171 
    255172bool 
     
    440357    #endif 
    441358    if(m_Client) { 
    442         return m_Client->putPacket(data, length, channel, tag, sy, cycle, dropped, skipped); 
     359        enum raw1394_iso_disposition retval = m_Client->putPacket(data, length, channel, tag, sy, cycle, dropped, skipped); 
     360        if (retval == RAW1394_ISO_OK) { 
     361            if (m_dont_exit_iterate_loop) { 
     362                return RAW1394_ISO_OK; 
     363            } else { 
     364                m_dont_exit_iterate_loop = true; 
     365                debugOutput(DEBUG_LEVEL_VERBOSE, 
     366                                "(%p) loop exit requested\n", 
     367                                this); 
     368                return RAW1394_ISO_DEFER; 
     369            } 
     370        } else { 
     371            return retval; 
     372        } 
    443373    } 
    444374 
     
    467397        } 
    468398        #endif 
    469         return retval; 
     399        if (retval == RAW1394_ISO_OK) { 
     400            if (m_dont_exit_iterate_loop) { 
     401                return RAW1394_ISO_OK; 
     402            } else { 
     403                m_dont_exit_iterate_loop = true; 
     404                debugOutput(DEBUG_LEVEL_VERBOSE, 
     405                                "(%p) loop exit requested\n", 
     406                                this); 
     407                return RAW1394_ISO_DEFER; 
     408            } 
     409        } else { 
     410            return retval; 
     411        } 
    470412    } 
    471413    *tag = 0; 
  • trunk/libffado/src/libieee1394/IsoHandler.h

    r930 r1005  
    2626 
    2727#include "debugmodule/debugmodule.h" 
    28 #include "IsoHandlerManager.h" 
    2928 
    3029#include "libutil/Thread.h" 
     
    3231enum raw1394_iso_disposition; 
    3332 
     33class IsoHandlerManager; 
    3434namespace Streaming { 
    3535    class StreamProcessor; 
     
    121121    bool unregisterStream(Streaming::StreamProcessor *); 
    122122 
    123     bool waitForClient(); 
    124     bool tryWaitForClient(); 
     123    bool canIterateClient(); // FIXME: implement with functor 
     124 
     125    /** 
     126     * @brief request that the handler exits the packet processing loop ASAP 
     127     * 
     128     * The raw1394 lib doesn't provide a means to stop the packet iteration loop 
     129     * except when the iterate callback returns a DEFER value. Calling this function 
     130     * will make the callback return DEFER ASAP. 
     131     */ 
     132    void requestIterateLoopExit() {m_dont_exit_iterate_loop = false;}; 
     133    /** 
     134     * @brief allow the handler to stay in the packet processing loop 
     135     * 
     136     * This resets the state set by requestIterateLoopExit() 
     137     */ 
     138    void allowIterateLoop() {m_dont_exit_iterate_loop = true;}; 
    125139 
    126140private: 
     
    132146    int             m_irq_interval; 
    133147 
    134     Streaming::StreamProcessor *m_Client; 
     148    Streaming::StreamProcessor *m_Client; // FIXME: implement with functors 
    135149 
    136150    int handleBusReset(unsigned int generation); 
     
    140154    enum raw1394_iso_speed m_speed; 
    141155    unsigned int m_prebuffers; 
     156    bool m_dont_exit_iterate_loop; 
    142157 
    143158    // the state machine 
  • trunk/libffado/src/libieee1394/IsoHandlerManager.cpp

    r993 r1005  
    2525#include "IsoHandlerManager.h" 
    2626#include "ieee1394service.h"  
    27 #include "IsoHandler.h" 
    2827#include "libstreaming/generic/StreamProcessor.h" 
    2928 
     
    4241// --- ISO Thread --- // 
    4342 
    44 IsoTask::IsoTask(IsoHandlerManager& manager
     43IsoTask::IsoTask(IsoHandlerManager& manager, enum IsoHandler::EHandlerType t
    4544    : m_manager( manager ) 
    4645    , m_SyncIsoHandler ( NULL ) 
    47 
     46    , m_handlerType( t ) 
     47
     48
     49 
     50IsoTask::~IsoTask() 
     51
     52    sem_destroy(&m_activity_semaphore); 
    4853} 
    4954 
     
    6570    #endif 
    6671 
     72    sem_init(&m_activity_semaphore, 0, 0); 
    6773    return true; 
    6874} 
     
    8995        IsoHandler *h = m_manager.m_IsoHandlers.at(i); 
    9096        assert(h); 
     97 
     98        // skip the handlers not intended for us 
     99        if(h->getType() != m_handlerType) continue; 
    91100 
    92101        if (h->isEnabled()) { 
     
    129138{ 
    130139    debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 
    131                        "(%p) Execute\n", this); 
     140                       "(%p, %s) Execute\n", 
     141                       this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 
    132142    int err; 
    133143    unsigned int i; 
     
    139149    if(diff < 100) { 
    140150        debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 
    141                            "(%p) short loop detected (%d usec), cnt: %d\n", 
    142                            this, diff, m_successive_short_loops); 
     151                           "(%p, %s) short loop detected (%d usec), cnt: %d\n", 
     152                           this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"), 
     153                           diff, m_successive_short_loops); 
    143154        m_successive_short_loops++; 
    144         if(m_successive_short_loops > 100) { 
     155        if(m_successive_short_loops > 10000) { 
    145156            debugError("Shutting down runaway thread\n"); 
    146157            return false; 
     
    163174    if (m_poll_nfds_shadow == 0) { 
    164175        debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 
    165                            "(%p) bypass iterate since no handlers to poll\n", 
    166                            this); 
     176                           "(%p, %s) bypass iterate since no handlers to poll\n", 
     177                           this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 
    167178        usleep(m_poll_timeout * 1000); 
    168179        return true; 
     
    179190            short events = 0; 
    180191            IsoHandler *h = m_IsoHandler_map_shadow[i]; 
    181             if(h->getType() == IsoHandler::eHT_Transmit) { 
    182                 // we should only poll on a transmit handler 
    183                 // that has a client that is ready to send 
    184                 // something. Otherwise it will end up in 
    185                 // busy wait looping since the packet function 
    186                 // will defer processing (also avoids the 
    187                 // AGAIN problem) 
    188                 if (h->tryWaitForClient()) { 
    189                     events = POLLIN | POLLPRI; 
    190                     no_one_to_poll = false; 
    191                 } 
    192             } else { 
    193                 // a receive handler should only be polled if 
    194                 // it's client doesn't already have enough data 
    195                 // and if it can still accept data. 
    196                 if (h->tryWaitForClient()) { // FIXME 
    197                     events = POLLIN | POLLPRI; 
    198                     no_one_to_poll = false; 
    199                 } 
     192            // we should only poll on a transmit handler 
     193            // that has a client that is ready to send 
     194            // something. Otherwise it will end up in 
     195            // busy wait looping since the packet function 
     196            // will defer processing (also avoids the 
     197            // AGAIN problem) 
     198            if (h->canIterateClient()) { 
     199                events = POLLIN | POLLPRI; 
     200                no_one_to_poll = false; 
     201                // if we are going to poll() it, let's ensure 
     202                // it can run until someone wants it to exit 
     203                h->allowIterateLoop(); 
    200204            } 
    201205            m_poll_fds_shadow[i].events = events; 
     
    203207 
    204208        if(no_one_to_poll) { 
    205             debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 
    206                                "(%p) No one to poll, waiting on the sync handler to become ready\n", 
    207                                this); 
    208  
    209             if(!m_SyncIsoHandler->waitForClient()) { 
    210                 debugError("Failed to wait for client\n"); 
    211                 // This can be due to error or due to timeout 
    212                  
    213                 // sleep for a while 
    214                 usleep(m_poll_timeout * 1000); // FIXME 
    215                 // exit this iteration loop 
    216                 return true; 
    217             } 
    218  
    219             #ifdef DEBUG 
    220             // if this happens we end up in a deadlock! 
    221             if(!m_SyncIsoHandler->tryWaitForClient()) { 
    222                 debugFatal("inconsistency in wait functions!\n"); 
    223                 return false; 
    224             } 
    225             #endif 
    226  
    227             debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 
    228                                "(%p) sync handler ready\n", 
    229                                this); 
     209            debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 
     210                               "(%p, %s) No one to poll, waiting for something to happen\n", 
     211                               this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 
     212            // wait for something to happen 
     213            switch(waitForActivity()) { 
     214                case IsoTask::eAR_Error: 
     215                    debugError("Error while waiting for activity\n"); 
     216                    return false; 
     217                case IsoTask::eAR_Interrupted: 
     218                    // FIXME: what to do here? 
     219                    debugWarning("Interrupted while waiting for activity\n"); 
     220                    break; 
     221                case IsoTask::eAR_Timeout: 
     222                    // FIXME: what to do here? 
     223                    debugWarning("Timeout while waiting for activity\n"); 
     224                    break; 
     225                case IsoTask::eAR_Activity: 
     226                    // do nothing 
     227                    debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 
     228                                       "(%p, %s) something happened\n", 
     229                                       this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 
     230                    break; 
     231            } 
    230232        } 
    231233    } 
    232234 
    233235    // Use a shadow map of the fd's such that we don't have to update 
    234     // the fd map everytime we run poll(). It doesn't change that much 
    235     // anyway 
     236    // the fd map everytime we run poll(). 
    236237    err = poll (m_poll_fds_shadow, m_poll_nfds_shadow, m_poll_timeout); 
    237238 
    238239    if (err < 0) { 
    239240        if (errno == EINTR) { 
     241            debugOutput(DEBUG_LEVEL_VERBOSE, "Ignoring poll return due to signal\n"); 
    240242            return true; 
    241243        } 
     
    247249        #ifdef DEBUG 
    248250        if(m_poll_fds_shadow[i].revents) { 
    249             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, 
    250                         "(%p) received events: %08X for (%d/%d, %p, %s)\n", 
    251                         this, m_poll_fds_shadow[i].revents, 
     251            debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, 
     252                        "(%p, %s) received events: %08X for (%d/%d, %p, %s)\n", 
     253                        this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"), 
     254                        m_poll_fds_shadow[i].revents, 
    252255                        i, m_poll_nfds_shadow, 
    253256                        m_IsoHandler_map_shadow[i], 
     
    283286    } 
    284287    return true; 
    285  
     288
     289 
     290enum IsoTask::eActivityResult 
     291IsoTask::waitForActivity() 
     292
     293    debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 
     294                       "(%p, %s) waiting for activity\n", 
     295                       this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 
     296    struct timespec ts; 
     297    int result; 
     298 
     299    if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { 
     300        debugError("clock_gettime failed\n"); 
     301        return eAR_Error; 
     302    } 
     303    long long int timeout_nsec=0; 
     304    int timeout_sec = 0; 
     305 
     306    timeout_nsec = ISOHANDLERMANAGER_ISO_TASK_WAIT_TIMEOUT_USECS * 1000LL; 
     307    timeout_sec = 0; 
     308    while(timeout_nsec >= 1000000000LL) { 
     309        timeout_sec += 1; 
     310        timeout_nsec -= 1000000000LL; 
     311    } 
     312    ts.tv_nsec += timeout_nsec; 
     313    ts.tv_sec += timeout_sec; 
     314 
     315    result = sem_timedwait(&m_activity_semaphore, &ts); 
     316 
     317    if(result != 0) { 
     318        if (result == ETIMEDOUT) { 
     319            debugOutput(DEBUG_LEVEL_VERBOSE, 
     320                        "(%p) pthread_cond_timedwait() timed out (result=%d)\n", 
     321                        this, result); 
     322            return eAR_Timeout; 
     323        } else if (result == EINTR) { 
     324            debugOutput(DEBUG_LEVEL_VERBOSE, 
     325                        "(%p) pthread_cond_[timed]wait() interrupted by signal (result=%d)\n", 
     326                        this, result); 
     327            return eAR_Interrupted; 
     328        } else { 
     329            debugError("(%p) pthread_cond_[timed]wait error (result=%d)\n",  
     330                        this, result); 
     331            debugError("(%p) timeout_sec=%d timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n",  
     332                       this, timeout_sec, timeout_nsec, ts.tv_sec, ts.tv_nsec); 
     333            return eAR_Error; 
     334        } 
     335    } 
     336 
     337    debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 
     338                       "(%p, %s) got activity\n", 
     339                       this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 
     340    return eAR_Activity; 
     341
     342 
     343void 
     344IsoTask::signalActivity() 
     345
     346    // signal the activity cond var 
     347    sem_post(&m_activity_semaphore); 
     348    debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 
     349                       "(%p, %s) activity\n", 
     350                       this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 
    286351} 
    287352 
     
    295360   , m_service( service ) 
    296361   , m_realtime(false), m_priority(0) 
    297    , m_IsoThread ( NULL ) 
    298    , m_IsoTask ( NULL ) 
    299 {} 
     362   , m_IsoThreadTransmit ( NULL ) 
     363   , m_IsoTaskTransmit ( NULL ) 
     364   , m_IsoThreadReceive ( NULL ) 
     365   , m_IsoTaskReceive ( NULL ) 
     366
     367
    300368 
    301369IsoHandlerManager::IsoHandlerManager(Ieee1394Service& service, bool run_rt, int rt_prio) 
     
    303371   , m_service( service ) 
    304372   , m_realtime(run_rt), m_priority(rt_prio) 
    305    , m_IsoThread ( NULL ) 
    306    , m_IsoTask ( NULL ) 
    307 {} 
     373   , m_IsoThreadTransmit ( NULL ) 
     374   , m_IsoTaskTransmit ( NULL ) 
     375   , m_IsoThreadReceive ( NULL ) 
     376   , m_IsoTaskReceive ( NULL ) 
     377
     378
    308379 
    309380IsoHandlerManager::~IsoHandlerManager() 
     
    314385        debugError("Still some handlers in use\n"); 
    315386    } 
    316     if (m_IsoThread) { 
    317         m_IsoThread->Stop(); 
    318         delete m_IsoThread; 
    319     } 
    320     if (m_IsoTask) { 
    321         delete m_IsoTask; 
     387    if (m_IsoThreadTransmit) { 
     388        m_IsoThreadTransmit->Stop(); 
     389        delete m_IsoThreadTransmit; 
     390    } 
     391    if (m_IsoThreadReceive) { 
     392        m_IsoThreadReceive->Stop(); 
     393        delete m_IsoThreadReceive; 
     394    } 
     395    if (m_IsoTaskTransmit) { 
     396        delete m_IsoTaskTransmit; 
     397    } 
     398    if (m_IsoTaskReceive) { 
     399        delete m_IsoTaskReceive; 
    322400    } 
    323401} 
     
    326404IsoHandlerManager::requestShadowMapUpdate() 
    327405{ 
    328     if(m_IsoTask) m_IsoTask->requestShadowMapUpdate(); 
     406    if(m_IsoTaskTransmit) m_IsoTaskTransmit->requestShadowMapUpdate(); 
     407    if(m_IsoTaskReceive) m_IsoTaskReceive->requestShadowMapUpdate(); 
    329408} 
    330409 
     
    336415    m_priority = priority; 
    337416 
    338     if (m_IsoThread) { 
     417    if (m_IsoThreadTransmit) { 
    339418        if (m_realtime) { 
    340             m_IsoThread->AcquireRealTime(m_priority); 
     419            m_IsoThreadTransmit->AcquireRealTime(m_priority 
     420                                                 + ISOHANDLERMANAGER_ISO_PRIO_INCREASE 
     421                                                 + ISOHANDLERMANAGER_ISO_PRIO_INCREASE_XMIT); 
    341422        } else { 
    342             m_IsoThread->DropRealTime(); 
     423            m_IsoThreadTransmit->DropRealTime(); 
     424        } 
     425    } 
     426    if (m_IsoThreadReceive) { 
     427        if (m_realtime) { 
     428            m_IsoThreadReceive->AcquireRealTime(m_priority  
     429                                                + ISOHANDLERMANAGER_ISO_PRIO_INCREASE 
     430                                                + ISOHANDLERMANAGER_ISO_PRIO_INCREASE_RECV); 
     431        } else { 
     432            m_IsoThreadReceive->DropRealTime(); 
    343433        } 
    344434    } 
     
    356446    } 
    357447 
    358     // create a thread to iterate our ISO handlers 
    359     debugOutput( DEBUG_LEVEL_VERBOSE, "Create iso thread for %p...\n", this); 
    360     m_IsoTask = new IsoTask( *this ); 
    361     if(!m_IsoTask) { 
     448    // create threads to iterate our ISO handlers 
     449    debugOutput( DEBUG_LEVEL_VERBOSE, "Create iso thread for %p transmit...\n", this); 
     450    m_IsoTaskTransmit = new IsoTask( *this, IsoHandler::eHT_Transmit ); 
     451    if(!m_IsoTaskTransmit) { 
    362452        debugFatal("No task\n"); 
    363453        return false; 
    364454    } 
    365     m_IsoThread = new Util::PosixThread(m_IsoTask, m_realtime, 
    366                                         m_priority + ISOHANDLERMANAGER_ISO_PRIO_INCREASE, 
    367                                         PTHREAD_CANCEL_DEFERRED); 
    368  
    369     if(!m_IsoThread) { 
     455    m_IsoThreadTransmit = new Util::PosixThread(m_IsoTaskTransmit, m_realtime, 
     456                                                m_priority + ISOHANDLERMANAGER_ISO_PRIO_INCREASE 
     457                                                + ISOHANDLERMANAGER_ISO_PRIO_INCREASE_XMIT, 
     458                                                PTHREAD_CANCEL_DEFERRED); 
     459 
     460    if(!m_IsoThreadTransmit) { 
     461        debugFatal("No thread\n"); 
     462        return false; 
     463    } 
     464 
     465    debugOutput( DEBUG_LEVEL_VERBOSE, "Create iso thread for %p receive...\n", this); 
     466    m_IsoTaskReceive = new IsoTask( *this, IsoHandler::eHT_Receive ); 
     467    if(!m_IsoTaskReceive) { 
     468        debugFatal("No task\n"); 
     469        return false; 
     470    } 
     471    m_IsoThreadReceive = new Util::PosixThread(m_IsoTaskReceive, m_realtime, 
     472                                               m_priority + ISOHANDLERMANAGER_ISO_PRIO_INCREASE 
     473                                               + ISOHANDLERMANAGER_ISO_PRIO_INCREASE_RECV, 
     474                                               PTHREAD_CANCEL_DEFERRED); 
     475 
     476    if(!m_IsoThreadReceive) { 
    370477        debugFatal("No thread\n"); 
    371478        return false; 
     
    374481    Util::Watchdog *watchdog = m_service.getWatchdog(); 
    375482    if(watchdog) { 
    376         if(!watchdog->registerThread(m_IsoThread)) { 
    377             debugWarning("could not register iso thread with watchdog\n"); 
     483        if(!watchdog->registerThread(m_IsoThreadTransmit)) { 
     484            debugWarning("could not register iso transmit thread with watchdog\n"); 
     485        } 
     486        if(!watchdog->registerThread(m_IsoThreadReceive)) { 
     487            debugWarning("could not register iso receive thread with watchdog\n"); 
    378488        } 
    379489    } else { 
     
    381491    } 
    382492 
    383     if (m_IsoThread->Start() != 0) { 
    384         debugFatal("Could not start ISO thread\n"); 
     493    if (m_IsoThreadTransmit->Start() != 0) { 
     494        debugFatal("Could not start ISO Transmit thread\n"); 
     495        return false; 
     496    } 
     497    if (m_IsoThreadReceive->Start() != 0) { 
     498        debugFatal("Could not start ISO Receive thread\n"); 
    385499        return false; 
    386500    } 
     
    401515        if ((*it) == h) { 
    402516            result = h->disable(); 
    403             result &= m_IsoTask->requestShadowMapUpdate(); 
     517            if(h->getType() == IsoHandler::eHT_Transmit) { 
     518                result &= m_IsoTaskTransmit->requestShadowMapUpdate(); 
     519            } else { 
     520                result &= m_IsoTaskReceive->requestShadowMapUpdate(); 
     521            } 
    404522            debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " disabled\n"); 
    405523            return result; 
     
    422540        if ((*it) == h) { 
    423541            result = h->enable(); 
    424             result &= m_IsoTask->requestShadowMapUpdate(); 
     542            if(h->getType() == IsoHandler::eHT_Transmit) { 
     543                result &= m_IsoTaskTransmit->requestShadowMapUpdate(); 
     544            } else { 
     545                result &= m_IsoTaskReceive->requestShadowMapUpdate(); 
     546            } 
    425547            debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " enabled\n"); 
    426548            return result; 
     
    430552    debugError("Handler not found\n"); 
    431553    return false; 
     554} 
     555 
     556void 
     557IsoHandlerManager::signalActivityTransmit() 
     558{ 
     559    assert(m_IsoTaskTransmit); 
     560    m_IsoTaskTransmit->signalActivity(); 
     561} 
     562 
     563void 
     564IsoHandlerManager::signalActivityReceive() 
     565{ 
     566    assert(m_IsoTaskReceive); 
     567    m_IsoTaskReceive->signalActivity(); 
    432568} 
    433569 
     
    438574    handler->setVerboseLevel(getDebugLevel()); 
    439575    m_IsoHandlers.push_back(handler); 
    440     return m_IsoTask->requestShadowMapUpdate(); 
     576    requestShadowMapUpdate(); 
     577    return true; 
    441578} 
    442579 
     
    452589        if ( *it == handler ) { 
    453590            m_IsoHandlers.erase(it); 
    454             return m_IsoTask->requestShadowMapUpdate(); 
     591            requestShadowMapUpdate(); 
     592            return true; 
    455593        } 
    456594    } 
     
    690828                return false; 
    691829            } 
    692             if(!m_IsoTask->requestShadowMapUpdate()) { 
     830            bool result; 
     831            if((*it)->getType() == IsoHandler::eHT_Transmit) { 
     832                result = m_IsoTaskTransmit->requestShadowMapUpdate(); 
     833            } else { 
     834                result = m_IsoTaskReceive->requestShadowMapUpdate(); 
     835            } 
     836            if(!result) { 
    693837                debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it); 
    694838                return false; 
     
    751895                return false; 
    752896            } 
    753             if(!m_IsoTask->requestShadowMapUpdate()) { 
     897            bool result; 
     898            if((*it)->getType() == IsoHandler::eHT_Transmit) { 
     899                result = m_IsoTaskTransmit->requestShadowMapUpdate(); 
     900            } else { 
     901                result = m_IsoTaskReceive->requestShadowMapUpdate(); 
     902            } 
     903            if(!result) { 
    754904                debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it); 
    755905                return false; 
     
    782932            retval=false; 
    783933        } 
    784         if(!m_IsoTask->requestShadowMapUpdate()) { 
     934        bool result; 
     935        if((*it)->getType() == IsoHandler::eHT_Transmit) { 
     936            result = m_IsoTaskTransmit->requestShadowMapUpdate(); 
     937        } else { 
     938            result = m_IsoTaskReceive->requestShadowMapUpdate(); 
     939        } 
     940        if(!result) { 
    785941            debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it); 
    786             retval=false; 
     942            return false; 
    787943        } 
    788944    } 
     
    816972        (*it)->setVerboseLevel(i); 
    817973    } 
    818     if(m_IsoThread) m_IsoThread->setVerboseLevel(i); 
    819     if(m_IsoTask)   m_IsoTask->setVerboseLevel(i); 
     974    if(m_IsoThreadTransmit) m_IsoThreadTransmit->setVerboseLevel(i); 
     975    if(m_IsoTaskTransmit)   m_IsoTaskTransmit->setVerboseLevel(i); 
     976    if(m_IsoThreadReceive)  m_IsoThreadReceive->setVerboseLevel(i); 
     977    if(m_IsoTaskReceive)    m_IsoTaskReceive->setVerboseLevel(i); 
    820978} 
    821979 
  • trunk/libffado/src/libieee1394/IsoHandlerManager.h

    r978 r1005  
    2626 
    2727#include "config.h" 
    28  
    2928#include "debugmodule/debugmodule.h" 
    3029 
    3130#include "libutil/Thread.h" 
     31 
     32#include "IsoHandler.h" 
    3233 
    3334#include <sys/poll.h> 
    3435#include <errno.h> 
    35  
    3636#include <vector> 
     37#include <semaphore.h> 
    3738 
    3839class Ieee1394Service; 
    39 class IsoHandler; 
     40//class IsoHandler; 
     41//enum IsoHandler::EHandlerType; 
    4042 
    4143namespace Streaming { 
     
    5759{ 
    5860    public: 
    59         IsoTask(IsoHandlerManager& manager); 
    60         virtual ~IsoTask() {}
     61        IsoTask(IsoHandlerManager& manager, enum IsoHandler::EHandlerType); 
     62        virtual ~IsoTask()
    6163 
    6264    public: 
     
    6567 
    6668        /** 
    67          * requests the thread to sync it's stream map with the manager 
     69         * @brief requests the thread to sync it's stream map with the manager 
    6870         */ 
    6971        bool requestShadowMapUpdate(); 
     72        enum eActivityResult { 
     73            eAR_Activity, 
     74            eAR_Timeout, 
     75            eAR_Interrupted, 
     76            eAR_Error 
     77        }; 
     78 
     79        /** 
     80         * @brief signals that something happened in one of the clients of this task 
     81         */ 
     82        void signalActivity(); 
     83        /** 
     84         * @brief wait until something happened in one of the clients of this task 
     85         */ 
     86        enum eActivityResult waitForActivity(); 
    7087 
    7188        void setVerboseLevel(int i); 
     
    92109#endif 
    93110 
     111        // activity signaling 
     112        sem_t m_activity_semaphore; 
     113 
     114        enum IsoHandler::EHandlerType m_handlerType; 
    94115        // debug stuff 
    95116        DECLARE_DEBUG_MODULE; 
     
    110131class IsoHandlerManager 
    111132{ 
    112     friend class Streaming::StreamProcessorManager; 
    113133    friend class IsoTask; 
    114     friend class IsoHandler; 
    115134 
    116135    public: 
     
    138157        bool disable(IsoHandler *); ///< disables a handler 
    139158        bool enable(IsoHandler *); ///< enables a handler 
     159 
     160        /** 
     161         * @brief signals that something happened in one of the clients 
     162         */ 
     163        void signalActivityTransmit(); 
     164        void signalActivityReceive(); 
     165 
    140166        ///> disables the handler attached to the stream 
    141167        bool stopHandlerForStream(Streaming::StreamProcessor *); 
     
    157183        Ieee1394Service& get1394Service() {return m_service;}; 
    158184 
    159     protected: 
    160185        void requestShadowMapUpdate(); 
    161186 
     
    195220        bool            m_realtime; 
    196221        int             m_priority; 
    197         Util::Thread *  m_IsoThread; 
    198         IsoTask *       m_IsoTask; 
     222        Util::Thread *  m_IsoThreadTransmit; 
     223        IsoTask *       m_IsoTaskTransmit; 
     224        Util::Thread *  m_IsoThreadReceive; 
     225        IsoTask *       m_IsoTaskReceive; 
    199226 
    200227        // debug stuff