Changeset 1005

Show
Ignore:
Timestamp:
04/21/08 01:27:47 (13 years ago)
Author:
ppalmers
Message:

Improve thread synchronisation. Switch back to separate threads for transmit and
receive since it is not possible to statically schedule things properly. One
of the threads (i.e. the client thread) is out of our control, hence it's
execution can't be controlled. Using separate threads and correct priorities
will shift this problem to the OS. Note that the priority of the packet
receive thread should be lower than the client thread (such that the client
thread is woken ASAP), and the priority of the transmit thread should be
higher than the client thread (such that packets are queued ASAP).
Extra benefit: multi-cores are used.

Some other startup improvements.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/libffado/config.h.in

    r981 r1005  
    8080#define ISOHANDLERMANAGER_MAX_STREAMS_PER_ISOTHREAD         16 
    8181 
    82 // Ideally the audio processing will be driven by this thread 
    83 #define ISOHANDLERMANAGER_ISO_PRIO_INCREASE            1 
     82// The best setup is if the receive handlers have lower priority 
     83// than the client thread since that ensures that as soon as we 
     84// received sufficient frames, the client thread runs. 
     85// The transmit thread should have higher priority to ensure that 
     86// all available data is flushed to the ISO kernel buffers as 
     87// soon as possible 
     88// At this moment, the jack backend uses base+5 to init ffado 
     89// prio 
     90#define ISOHANDLERMANAGER_ISO_PRIO_INCREASE                  0 
     91#define ISOHANDLERMANAGER_ISO_PRIO_INCREASE_RECV            -6 
     92#define ISOHANDLERMANAGER_ISO_PRIO_INCREASE_XMIT             1 
     93 
     94// the timeout for ISO activity on any thread 
     95// NOTE: don't make this 0 
     96#define ISOHANDLERMANAGER_ISO_TASK_WAIT_TIMEOUT_USECS        1000000LL 
    8497 
    8598// allows to add some processing margin. This shifts the time 
     
    97110#define STREAMPROCESSORMANAGER_SYNCSTART_TRIES              10 
    98111#define STREAMPROCESSORMANAGER_SYNC_WAIT_TIME_MSEC          200 
    99 #define STREAMPROCESSORMANAGER_ALIGN_AVERAGE_TIME_MSEC      200 
     112#define STREAMPROCESSORMANAGER_ALIGN_AVERAGE_TIME_MSEC      400 
    100113#define STREAMPROCESSORMANAGER_NB_ALIGN_TRIES               40 
    101114 
  • trunk/libffado/src/libieee1394/IsoHandler.cpp

    r977 r1005  
    2626#include "IsoHandler.h" 
    2727#include "ieee1394service.h"  
     28#include "IsoHandlerManager.h" 
    2829 
    2930#include "libstreaming/generic/StreamProcessor.h" 
     
    9192   , m_speed( RAW1394_ISO_SPEED_400 ) 
    9293   , m_prebuffers( 0 ) 
     94   , m_dont_exit_iterate_loop( true ) 
    9395   , m_State( E_Created ) 
    9496#ifdef DEBUG 
     
    150152 
    151153bool 
    152 IsoHandler::waitForClient() 
    153 
    154     debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, "waiting...\n"); 
    155     if(m_Client) { 
    156         bool result; 
    157         if (m_type == eHT_Receive) { 
    158             result = m_Client->waitForProducePacket(); 
    159         } else { 
    160             result = m_Client->waitForConsumePacket(); 
    161         } 
    162         debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, " returns %d\n", result); 
    163         return result; 
    164     } else { 
    165         debugOutputExtreme(DEBUG_LEVEL_VERBOSE, " no client\n"); 
    166     } 
    167     return false; 
    168 
    169  
    170 bool 
    171 IsoHandler::tryWaitForClient() 
    172 
    173     debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, "waiting...\n"); 
     154IsoHandler::canIterateClient() 
     155
     156    debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, "checking...\n"); 
    174157    if(m_Client) { 
    175158        bool result; 
     
    186169    return false; 
    187170} 
    188 /* 
    189 bool 
    190 IsoHandler::Execute() 
    191 { 
    192     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE, "%p: Execute thread...\n", this); 
    193  
    194     // bypass if not running 
    195     if (m_State != E_Running) { 
    196         debugOutput( DEBUG_LEVEL_VERBOSE, "%p: not polling since not running...\n", this); 
    197         usleep(m_poll_timeout * 1000); 
    198         debugOutput( DEBUG_LEVEL_VERBOSE, "%p: done sleeping...\n", this); 
    199         return true; 
    200     } 
    201  
    202     // wait for the availability of frames in the client 
    203     // (blocking for transmit handlers) 
    204     debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) Waiting for Client activity...\n", this, getTypeString()); 
    205     if (waitForClient()) { 
    206 #if ISOHANDLER_USE_POLL 
    207         bool result = true; 
    208         while(result && m_Client && tryWaitForClient()) { 
    209             int err = poll(&m_poll_fd, 1, m_poll_timeout); 
    210             if (err == -1) { 
    211                 if (errno == EINTR) { 
    212                     return true; 
    213                 } 
    214                 debugFatal("%p, poll error: %s\n", this, strerror (errno)); 
    215                 return false; 
    216             } 
    217  
    218             if(m_poll_fd.revents & (POLLIN)) { 
    219                 result=iterate(); 
    220                 if(!result) { 
    221                     debugOutput( DEBUG_LEVEL_VERBOSE, 
    222                                 "IsoHandler (%p): Failed to iterate handler\n", 
    223                                 this); 
    224                 } 
    225             } else { 
    226                 if (m_poll_fd.revents & POLLERR) { 
    227                     debugWarning("error on fd for %p\n", this); 
    228                 } 
    229                 if (m_poll_fd.revents & POLLHUP) { 
    230                     debugWarning("hangup on fd for %p\n",this); 
    231                 } 
    232                 break; 
    233             } 
    234         } 
    235         return result; 
    236 #else 
    237         // iterate() is blocking if no 1394 data is available 
    238         // so poll'ing is not really necessary 
    239         bool result = true; 
    240         while(result && m_Client && tryWaitForClient()) { 
    241             result = iterate(); 
    242 //             if (getType() == eHT_Receive) { 
    243 //                 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) Iterate returned: %d\n", 
    244 //                             this, (m_type==eHT_Receive?"Receive":"Transmit"), result); 
    245 //             } 
    246         } 
    247         return result; 
    248 #endif 
    249     } else { 
    250         debugError("waitForClient() failed.\n"); 
    251         return false; 
    252     } 
    253 }*/ 
    254171 
    255172bool 
     
    440357    #endif 
    441358    if(m_Client) { 
    442         return m_Client->putPacket(data, length, channel, tag, sy, cycle, dropped, skipped); 
     359        enum raw1394_iso_disposition retval = m_Client->putPacket(data, length, channel, tag, sy, cycle, dropped, skipped); 
     360        if (retval == RAW1394_ISO_OK) { 
     361            if (m_dont_exit_iterate_loop) { 
     362                return RAW1394_ISO_OK; 
     363            } else { 
     364                m_dont_exit_iterate_loop = true; 
     365                debugOutput(DEBUG_LEVEL_VERBOSE, 
     366                                "(%p) loop exit requested\n", 
     367                                this); 
     368                return RAW1394_ISO_DEFER; 
     369            } 
     370        } else { 
     371            return retval; 
     372        } 
    443373    } 
    444374 
     
    467397        } 
    468398        #endif 
    469         return retval; 
     399        if (retval == RAW1394_ISO_OK) { 
     400            if (m_dont_exit_iterate_loop) { 
     401                return RAW1394_ISO_OK; 
     402            } else { 
     403                m_dont_exit_iterate_loop = true; 
     404                debugOutput(DEBUG_LEVEL_VERBOSE, 
     405                                "(%p) loop exit requested\n", 
     406                                this); 
     407                return RAW1394_ISO_DEFER; 
     408            } 
     409        } else { 
     410            return retval; 
     411        } 
    470412    } 
    471413    *tag = 0; 
  • trunk/libffado/src/libieee1394/IsoHandler.h

    r930 r1005  
    2626 
    2727#include "debugmodule/debugmodule.h" 
    28 #include "IsoHandlerManager.h" 
    2928 
    3029#include "libutil/Thread.h" 
     
    3231enum raw1394_iso_disposition; 
    3332 
     33class IsoHandlerManager; 
    3434namespace Streaming { 
    3535    class StreamProcessor; 
     
    121121    bool unregisterStream(Streaming::StreamProcessor *); 
    122122 
    123     bool waitForClient(); 
    124     bool tryWaitForClient(); 
     123    bool canIterateClient(); // FIXME: implement with functor 
     124 
     125    /** 
     126     * @brief request that the handler exits the packet processing loop ASAP 
     127     * 
     128     * The raw1394 lib doesn't provide a means to stop the packet iteration loop 
     129     * except when the iterate callback returns a DEFER value. Calling this function 
     130     * will make the callback return DEFER ASAP. 
     131     */ 
     132    void requestIterateLoopExit() {m_dont_exit_iterate_loop = false;}; 
     133    /** 
     134     * @brief allow the handler to stay in the packet processing loop 
     135     * 
     136     * This resets the state set by requestIterateLoopExit() 
     137     */ 
     138    void allowIterateLoop() {m_dont_exit_iterate_loop = true;}; 
    125139 
    126140private: 
     
    132146    int             m_irq_interval; 
    133147 
    134     Streaming::StreamProcessor *m_Client; 
     148    Streaming::StreamProcessor *m_Client; // FIXME: implement with functors 
    135149 
    136150    int handleBusReset(unsigned int generation); 
     
    140154    enum raw1394_iso_speed m_speed; 
    141155    unsigned int m_prebuffers; 
     156    bool m_dont_exit_iterate_loop; 
    142157 
    143158    // the state machine 
  • trunk/libffado/src/libieee1394/IsoHandlerManager.cpp

    r993 r1005  
    2525#include "IsoHandlerManager.h" 
    2626#include "ieee1394service.h"  
    27 #include "IsoHandler.h" 
    2827#include "libstreaming/generic/StreamProcessor.h" 
    2928 
     
    4241// --- ISO Thread --- // 
    4342 
    44 IsoTask::IsoTask(IsoHandlerManager& manager
     43IsoTask::IsoTask(IsoHandlerManager& manager, enum IsoHandler::EHandlerType t
    4544    : m_manager( manager ) 
    4645    , m_SyncIsoHandler ( NULL ) 
    47 
     46    , m_handlerType( t ) 
     47
     48
     49 
     50IsoTask::~IsoTask() 
     51
     52    sem_destroy(&m_activity_semaphore); 
    4853} 
    4954 
     
    6570    #endif 
    6671 
     72    sem_init(&m_activity_semaphore, 0, 0); 
    6773    return true; 
    6874} 
     
    8995        IsoHandler *h = m_manager.m_IsoHandlers.at(i); 
    9096        assert(h); 
     97 
     98        // skip the handlers not intended for us 
     99        if(h->getType() != m_handlerType) continue; 
    91100 
    92101        if (h->isEnabled()) { 
     
    129138{ 
    130139    debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 
    131                        "(%p) Execute\n", this); 
     140                       "(%p, %s) Execute\n", 
     141                       this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 
    132142    int err; 
    133143    unsigned int i; 
     
    139149    if(diff < 100) { 
    140150        debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 
    141                            "(%p) short loop detected (%d usec), cnt: %d\n", 
    142                            this, diff, m_successive_short_loops); 
     151                           "(%p, %s) short loop detected (%d usec), cnt: %d\n", 
     152                           this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"), 
     153                           diff, m_successive_short_loops); 
    143154        m_successive_short_loops++; 
    144         if(m_successive_short_loops > 100) { 
     155        if(m_successive_short_loops > 10000) { 
    145156            debugError("Shutting down runaway thread\n"); 
    146157            return false; 
     
    163174    if (m_poll_nfds_shadow == 0) { 
    164175        debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 
    165                            "(%p) bypass iterate since no handlers to poll\n", 
    166                            this); 
     176                           "(%p, %s) bypass iterate since no handlers to poll\n", 
     177                           this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 
    167178        usleep(m_poll_timeout * 1000); 
    168179        return true; 
     
    179190            short events = 0; 
    180191            IsoHandler *h = m_IsoHandler_map_shadow[i]; 
    181             if(h->getType() == IsoHandler::eHT_Transmit) { 
    182                 // we should only poll on a transmit handler 
    183                 // that has a client that is ready to send 
    184                 // something. Otherwise it will end up in 
    185                 // busy wait looping since the packet function 
    186                 // will defer processing (also avoids the 
    187                 // AGAIN problem) 
    188                 if (h->tryWaitForClient()) { 
    189                     events = POLLIN | POLLPRI; 
    190                     no_one_to_poll = false; 
    191                 } 
    192             } else { 
    193                 // a receive handler should only be polled if 
    194                 // it's client doesn't already have enough data 
    195                 // and if it can still accept data. 
    196                 if (h->tryWaitForClient()) { // FIXME 
    197                     events = POLLIN | POLLPRI; 
    198                     no_one_to_poll = false; 
    199                 } 
     192            // we should only poll on a transmit handler 
     193            // that has a client that is ready to send 
     194            // something. Otherwise it will end up in 
     195            // busy wait looping since the packet function 
     196            // will defer processing (also avoids the 
     197            // AGAIN problem) 
     198            if (h->canIterateClient()) { 
     199                events = POLLIN | POLLPRI; 
     200                no_one_to_poll = false; 
     201                // if we are going to poll() it, let's ensure 
     202                // it can run until someone wants it to exit 
     203                h->allowIterateLoop(); 
    200204            } 
    201205            m_poll_fds_shadow[i].events = events; 
     
    203207 
    204208        if(no_one_to_poll) { 
    205             debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 
    206                                "(%p) No one to poll, waiting on the sync handler to become ready\n", 
    207                                this); 
    208  
    209             if(!m_SyncIsoHandler->waitForClient()) { 
    210                 debugError("Failed to wait for client\n"); 
    211                 // This can be due to error or due to timeout 
    212                  
    213                 // sleep for a while 
    214                 usleep(m_poll_timeout * 1000); // FIXME 
    215                 // exit this iteration loop 
    216                 return true; 
    217             } 
    218  
    219             #ifdef DEBUG 
    220             // if this happens we end up in a deadlock! 
    221             if(!m_SyncIsoHandler->tryWaitForClient()) { 
    222                 debugFatal("inconsistency in wait functions!\n"); 
    223                 return false; 
    224             } 
    225             #endif 
    226  
    227             debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 
    228                                "(%p) sync handler ready\n", 
    229                                this); 
     209            debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 
     210                               "(%p, %s) No one to poll, waiting for something to happen\n", 
     211                               this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 
     212            // wait for something to happen 
     213            switch(waitForActivity()) { 
     214                case IsoTask::eAR_Error: 
     215                    debugError("Error while waiting for activity\n"); 
     216                    return false; 
     217                case IsoTask::eAR_Interrupted: 
     218                    // FIXME: what to do here? 
     219                    debugWarning("Interrupted while waiting for activity\n"); 
     220                    break; 
     221                case IsoTask::eAR_Timeout: 
     222                    // FIXME: what to do here? 
     223                    debugWarning("Timeout while waiting for activity\n"); 
     224                    break; 
     225                case IsoTask::eAR_Activity: 
     226                    // do nothing 
     227                    debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 
     228                                       "(%p, %s) something happened\n", 
     229                                       this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 
     230                    break; 
     231            } 
    230232        } 
    231233    } 
    232234 
    233235    // Use a shadow map of the fd's such that we don't have to update 
    234     // the fd map everytime we run poll(). It doesn't change that much 
    235     // anyway 
     236    // the fd map everytime we run poll(). 
    236237    err = poll (m_poll_fds_shadow, m_poll_nfds_shadow, m_poll_timeout); 
    237238 
    238239    if (err < 0) { 
    239240        if (errno == EINTR) { 
     241            debugOutput(DEBUG_LEVEL_VERBOSE, "Ignoring poll return due to signal\n"); 
    240242            return true; 
    241243        } 
     
    247249        #ifdef DEBUG 
    248250        if(m_poll_fds_shadow[i].revents) { 
    249             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, 
    250                         "(%p) received events: %08X for (%d/%d, %p, %s)\n", 
    251                         this, m_poll_fds_shadow[i].revents, 
     251            debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, 
     252                        "(%p, %s) received events: %08X for (%d/%d, %p, %s)\n", 
     253                        this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"), 
     254                        m_poll_fds_shadow[i].revents, 
    252255                        i, m_poll_nfds_shadow, 
    253256                        m_IsoHandler_map_shadow[i], 
     
    283286    } 
    284287    return true; 
    285  
     288
     289 
     290enum IsoTask::eActivityResult 
     291IsoTask::waitForActivity() 
     292
     293    debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 
     294                       "(%p, %s) waiting for activity\n", 
     295                       this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 
     296    struct timespec ts; 
     297    int result; 
     298 
     299    if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { 
     300        debugError("clock_gettime failed\n"); 
     301        return eAR_Error; 
     302    } 
     303    long long int timeout_nsec=0; 
     304    int timeout_sec = 0; 
     305 
     306    timeout_nsec = ISOHANDLERMANAGER_ISO_TASK_WAIT_TIMEOUT_USECS * 1000LL; 
     307    timeout_sec = 0; 
     308    while(timeout_nsec >= 1000000000LL) { 
     309        timeout_sec += 1; 
     310        timeout_nsec -= 1000000000LL; 
     311    } 
     312    ts.tv_nsec += timeout_nsec; 
     313    ts.tv_sec += timeout_sec; 
     314 
     315    result = sem_timedwait(&m_activity_semaphore, &ts); 
     316 
     317    if(result != 0) { 
     318        if (result == ETIMEDOUT) { 
     319            debugOutput(DEBUG_LEVEL_VERBOSE, 
     320                        "(%p) pthread_cond_timedwait() timed out (result=%d)\n", 
     321                        this, result); 
     322            return eAR_Timeout; 
     323        } else if (result == EINTR) { 
     324            debugOutput(DEBUG_LEVEL_VERBOSE, 
     325                        "(%p) pthread_cond_[timed]wait() interrupted by signal (result=%d)\n", 
     326                        this, result); 
     327            return eAR_Interrupted; 
     328        } else { 
     329            debugError("(%p) pthread_cond_[timed]wait error (result=%d)\n",  
     330                        this, result); 
     331            debugError("(%p) timeout_sec=%d timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n",  
     332                       this, timeout_sec, timeout_nsec, ts.tv_sec, ts.tv_nsec); 
     333            return eAR_Error; 
     334        } 
     335    } 
     336 
     337    debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 
     338                       "(%p, %s) got activity\n", 
     339                       this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 
     340    return eAR_Activity; 
     341
     342 
     343void 
     344IsoTask::signalActivity() 
     345
     346    // signal the activity cond var 
     347    sem_post(&m_activity_semaphore); 
     348    debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 
     349                       "(%p, %s) activity\n", 
     350                       this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 
    286351} 
    287352 
     
    295360   , m_service( service ) 
    296361   , m_realtime(false), m_priority(0) 
    297    , m_IsoThread ( NULL ) 
    298    , m_IsoTask ( NULL ) 
    299 {} 
     362   , m_IsoThreadTransmit ( NULL ) 
     363   , m_IsoTaskTransmit ( NULL ) 
     364   , m_IsoThreadReceive ( NULL ) 
     365   , m_IsoTaskReceive ( NULL ) 
     366
     367
    300368 
    301369IsoHandlerManager::IsoHandlerManager(Ieee1394Service& service, bool run_rt, int rt_prio) 
     
    303371   , m_service( service ) 
    304372   , m_realtime(run_rt), m_priority(rt_prio) 
    305    , m_IsoThread ( NULL ) 
    306    , m_IsoTask ( NULL ) 
    307 {} 
     373   , m_IsoThreadTransmit ( NULL ) 
     374   , m_IsoTaskTransmit ( NULL ) 
     375   , m_IsoThreadReceive ( NULL ) 
     376   , m_IsoTaskReceive ( NULL ) 
     377
     378
    308379 
    309380IsoHandlerManager::~IsoHandlerManager() 
     
    314385        debugError("Still some handlers in use\n"); 
    315386    } 
    316     if (m_IsoThread) { 
    317         m_IsoThread->Stop(); 
    318         delete m_IsoThread; 
    319     } 
    320     if (m_IsoTask) { 
    321         delete m_IsoTask; 
     387    if (m_IsoThreadTransmit) { 
     388        m_IsoThreadTransmit->Stop(); 
     389        delete m_IsoThreadTransmit; 
     390    } 
     391    if (m_IsoThreadReceive) { 
     392        m_IsoThreadReceive->Stop(); 
     393        delete m_IsoThreadReceive; 
     394    } 
     395    if (m_IsoTaskTransmit) { 
     396        delete m_IsoTaskTransmit; 
     397    } 
     398    if (m_IsoTaskReceive) { 
     399        delete m_IsoTaskReceive; 
    322400    } 
    323401} 
     
    326404IsoHandlerManager::requestShadowMapUpdate() 
    327405{ 
    328     if(m_IsoTask) m_IsoTask->requestShadowMapUpdate(); 
     406    if(m_IsoTaskTransmit) m_IsoTaskTransmit->requestShadowMapUpdate(); 
     407    if(m_IsoTaskReceive) m_IsoTaskReceive->requestShadowMapUpdate(); 
    329408} 
    330409 
     
    336415    m_priority = priority; 
    337416 
    338     if (m_IsoThread) { 
     417    if (m_IsoThreadTransmit) { 
    339418        if (m_realtime) { 
    340             m_IsoThread->AcquireRealTime(m_priority); 
     419            m_IsoThreadTransmit->AcquireRealTime(m_priority 
     420                                                 + ISOHANDLERMANAGER_ISO_PRIO_INCREASE 
     421                                                 + ISOHANDLERMANAGER_ISO_PRIO_INCREASE_XMIT); 
    341422        } else { 
    342             m_IsoThread->DropRealTime(); 
     423            m_IsoThreadTransmit->DropRealTime(); 
     424        } 
     425    } 
     426    if (m_IsoThreadReceive) { 
     427        if (m_realtime) { 
     428            m_IsoThreadReceive->AcquireRealTime(m_priority  
     429                                                + ISOHANDLERMANAGER_ISO_PRIO_INCREASE 
     430                                                + ISOHANDLERMANAGER_ISO_PRIO_INCREASE_RECV); 
     431        } else { 
     432            m_IsoThreadReceive->DropRealTime(); 
    343433        } 
    344434    } 
     
    356446    } 
    357447 
    358     // create a thread to iterate our ISO handlers 
    359     debugOutput( DEBUG_LEVEL_VERBOSE, "Create iso thread for %p...\n", this); 
    360     m_IsoTask = new IsoTask( *this ); 
    361     if(!m_IsoTask) { 
     448    // create threads to iterate our ISO handlers 
     449    debugOutput( DEBUG_LEVEL_VERBOSE, "Create iso thread for %p transmit...\n", this); 
     450    m_IsoTaskTransmit = new IsoTask( *this, IsoHandler::eHT_Transmit ); 
     451    if(!m_IsoTaskTransmit) { 
    362452        debugFatal("No task\n"); 
    363453        return false; 
    364454    } 
    365     m_IsoThread = new Util::PosixThread(m_IsoTask, m_realtime, 
    366                                         m_priority + ISOHANDLERMANAGER_ISO_PRIO_INCREASE, 
    367                                         PTHREAD_CANCEL_DEFERRED); 
    368  
    369     if(!m_IsoThread) { 
     455    m_IsoThreadTransmit = new Util::PosixThread(m_IsoTaskTransmit, m_realtime, 
     456                                                m_priority + ISOHANDLERMANAGER_ISO_PRIO_INCREASE 
     457                                                + ISOHANDLERMANAGER_ISO_PRIO_INCREASE_XMIT, 
     458                                                PTHREAD_CANCEL_DEFERRED); 
     459 
     460    if(!m_IsoThreadTransmit) { 
     461        debugFatal("No thread\n"); 
     462        return false; 
     463    } 
     464 
     465    debugOutput( DEBUG_LEVEL_VERBOSE, "Create iso thread for %p receive...\n", this); 
     466    m_IsoTaskReceive = new IsoTask( *this, IsoHandler::eHT_Receive ); 
     467    if(!m_IsoTaskReceive) { 
     468        debugFatal("No task\n"); 
     469        return false; 
     470    } 
     471    m_IsoThreadReceive = new Util::PosixThread(m_IsoTaskReceive, m_realtime, 
     472                                               m_priority + ISOHANDLERMANAGER_ISO_PRIO_INCREASE 
     473                                               + ISOHANDLERMANAGER_ISO_PRIO_INCREASE_RECV, 
     474                                               PTHREAD_CANCEL_DEFERRED); 
     475 
     476    if(!m_IsoThreadReceive) { 
    370477        debugFatal("No thread\n"); 
    371478        return false; 
     
    374481    Util::Watchdog *watchdog = m_service.getWatchdog(); 
    375482    if(watchdog) { 
    376         if(!watchdog->registerThread(m_IsoThread)) { 
    377             debugWarning("could not register iso thread with watchdog\n"); 
     483        if(!watchdog->registerThread(m_IsoThreadTransmit)) { 
     484            debugWarning("could not register iso transmit thread with watchdog\n"); 
     485        } 
     486        if(!watchdog->registerThread(m_IsoThreadReceive)) { 
     487            debugWarning("could not register iso receive thread with watchdog\n"); 
    378488        } 
    379489    } else { 
     
    381491    } 
    382492 
    383     if (m_IsoThread->Start() != 0) { 
    384         debugFatal("Could not start ISO thread\n"); 
     493    if (m_IsoThreadTransmit->Start() != 0) { 
     494        debugFatal("Could not start ISO Transmit thread\n"); 
     495        return false; 
     496    } 
     497    if (m_IsoThreadReceive->Start() != 0) { 
     498        debugFatal("Could not start ISO Receive thread\n"); 
    385499        return false; 
    386500    } 
     
    401515        if ((*it) == h) { 
    402516            result = h->disable(); 
    403             result &= m_IsoTask->requestShadowMapUpdate(); 
     517            if(h->getType() == IsoHandler::eHT_Transmit) { 
     518                result &= m_IsoTaskTransmit->requestShadowMapUpdate(); 
     519            } else { 
     520                result &= m_IsoTaskReceive->requestShadowMapUpdate(); 
     521            } 
    404522            debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " disabled\n"); 
    405523            return result; 
     
    422540        if ((*it) == h) { 
    423541            result = h->enable(); 
    424             result &= m_IsoTask->requestShadowMapUpdate(); 
     542            if(h->getType() == IsoHandler::eHT_Transmit) { 
     543                result &= m_IsoTaskTransmit->requestShadowMapUpdate(); 
     544            } else { 
     545                result &= m_IsoTaskReceive->requestShadowMapUpdate(); 
     546            } 
    425547            debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " enabled\n"); 
    426548            return result; 
     
    430552    debugError("Handler not found\n"); 
    431553    return false; 
     554} 
     555 
     556void 
     557IsoHandlerManager::signalActivityTransmit() 
     558{ 
     559    assert(m_IsoTaskTransmit); 
     560    m_IsoTaskTransmit->signalActivity(); 
     561} 
     562 
     563void 
     564IsoHandlerManager::signalActivityReceive() 
     565{ 
     566    assert(m_IsoTaskReceive); 
     567    m_IsoTaskReceive->signalActivity(); 
    432568} 
    433569 
     
    438574    handler->setVerboseLevel(getDebugLevel()); 
    439575    m_IsoHandlers.push_back(handler); 
    440     return m_IsoTask->requestShadowMapUpdate(); 
     576    requestShadowMapUpdate(); 
     577    return true; 
    441578} 
    442579 
     
    452589        if ( *it == handler ) { 
    453590            m_IsoHandlers.erase(it); 
    454             return m_IsoTask->requestShadowMapUpdate(); 
     591            requestShadowMapUpdate(); 
     592            return true; 
    455593        } 
    456594    } 
     
    690828                return false; 
    691829            } 
    692             if(!m_IsoTask->requestShadowMapUpdate()) { 
     830            bool result; 
     831            if((*it)->getType() == IsoHandler::eHT_Transmit) { 
     832                result = m_IsoTaskTransmit->requestShadowMapUpdate(); 
     833            } else { 
     834                result = m_IsoTaskReceive->requestShadowMapUpdate(); 
     835            } 
     836            if(!result) { 
    693837                debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it); 
    694838                return false; 
     
    751895                return false; 
    752896            } 
    753             if(!m_IsoTask->requestShadowMapUpdate()) { 
     897            bool result; 
     898            if((*it)->getType() == IsoHandler::eHT_Transmit) { 
     899                result = m_IsoTaskTransmit->requestShadowMapUpdate(); 
     900            } else { 
     901                result = m_IsoTaskReceive->requestShadowMapUpdate(); 
     902            } 
     903            if(!result) { 
    754904                debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it); 
    755905                return false; 
     
    782932            retval=false; 
    783933        } 
    784         if(!m_IsoTask->requestShadowMapUpdate()) { 
     934        bool result; 
     935        if((*it)->getType() == IsoHandler::eHT_Transmit) { 
     936            result = m_IsoTaskTransmit->requestShadowMapUpdate(); 
     937        } else { 
     938            result = m_IsoTaskReceive->requestShadowMapUpdate(); 
     939        } 
     940        if(!result) { 
    785941            debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it); 
    786             retval=false; 
     942            return false; 
    787943        } 
    788944    } 
     
    816972        (*it)->setVerboseLevel(i); 
    817973    } 
    818     if(m_IsoThread) m_IsoThread->setVerboseLevel(i); 
    819     if(m_IsoTask)   m_IsoTask->setVerboseLevel(i); 
     974    if(m_IsoThreadTransmit) m_IsoThreadTransmit->setVerboseLevel(i); 
     975    if(m_IsoTaskTransmit)   m_IsoTaskTransmit->setVerboseLevel(i); 
     976    if(m_IsoThreadReceive)  m_IsoThreadReceive->setVerboseLevel(i); 
     977    if(m_IsoTaskReceive)    m_IsoTaskReceive->setVerboseLevel(i); 
    820978} 
    821979 
  • trunk/libffado/src/libieee1394/IsoHandlerManager.h

    r978 r1005  
    2626 
    2727#include "config.h" 
    28  
    2928#include "debugmodule/debugmodule.h" 
    3029 
    3130#include "libutil/Thread.h" 
     31 
     32#include "IsoHandler.h" 
    3233 
    3334#include <sys/poll.h> 
    3435#include <errno.h> 
    35  
    3636#include <vector> 
     37#include <semaphore.h> 
    3738 
    3839class Ieee1394Service; 
    39 class IsoHandler; 
     40//class IsoHandler; 
     41//enum IsoHandler::EHandlerType; 
    4042 
    4143namespace Streaming { 
     
    5759{ 
    5860    public: 
    59         IsoTask(IsoHandlerManager& manager); 
    60         virtual ~IsoTask() {}
     61        IsoTask(IsoHandlerManager& manager, enum IsoHandler::EHandlerType); 
     62        virtual ~IsoTask()
    6163 
    6264    public: 
     
    6567 
    6668        /** 
    67          * requests the thread to sync it's stream map with the manager 
     69         * @brief requests the thread to sync it's stream map with the manager 
    6870         */ 
    6971        bool requestShadowMapUpdate(); 
     72        enum eActivityResult { 
     73            eAR_Activity, 
     74            eAR_Timeout, 
     75            eAR_Interrupted, 
     76            eAR_Error 
     77        }; 
     78 
     79        /** 
     80         * @brief signals that something happened in one of the clients of this task 
     81         */ 
     82        void signalActivity(); 
     83        /** 
     84         * @brief wait until something happened in one of the clients of this task 
     85         */ 
     86        enum eActivityResult waitForActivity(); 
    7087 
    7188        void setVerboseLevel(int i); 
     
    92109#endif 
    93110 
     111        // activity signaling 
     112        sem_t m_activity_semaphore; 
     113 
     114        enum IsoHandler::EHandlerType m_handlerType; 
    94115        // debug stuff 
    95116        DECLARE_DEBUG_MODULE; 
     
    110131class IsoHandlerManager 
    111132{ 
    112     friend class Streaming::StreamProcessorManager; 
    113133    friend class IsoTask; 
    114     friend class IsoHandler; 
    115134 
    116135    public: 
     
    138157        bool disable(IsoHandler *); ///< disables a handler 
    139158        bool enable(IsoHandler *); ///< enables a handler 
     159 
     160        /** 
     161         * @brief signals that something happened in one of the clients 
     162         */ 
     163        void signalActivityTransmit(); 
     164        void signalActivityReceive(); 
     165 
    140166        ///> disables the handler attached to the stream 
    141167        bool stopHandlerForStream(Streaming::StreamProcessor *); 
     
    157183        Ieee1394Service& get1394Service() {return m_service;}; 
    158184 
    159     protected: 
    160185        void requestShadowMapUpdate(); 
    161186 
     
    195220        bool            m_realtime; 
    196221        int             m_priority; 
    197         Util::Thread *  m_IsoThread; 
    198         IsoTask *       m_IsoTask; 
     222        Util::Thread *  m_IsoThreadTransmit; 
     223        IsoTask *       m_IsoTaskTransmit; 
     224        Util::Thread *  m_IsoThreadReceive; 
     225        IsoTask *       m_IsoTaskReceive; 
    199226 
    200227        // debug stuff 
  • trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp

    r1001 r1005  
    4040#include <math.h> 
    4141 
    42 #define SIGNAL_ACTIVITY { \ 
    43     pthread_mutex_lock(&m_activity_cond_lock); \ 
    44     pthread_cond_broadcast(&m_activity_cond); \ 
    45     pthread_mutex_unlock(&m_activity_cond_lock); \ 
     42#define SIGNAL_ACTIVITY_SPM { \ 
     43    m_StreamProcessorManager.signalActivity(); \ 
     44
     45#define SIGNAL_ACTIVITY_ISO_XMIT { \ 
     46    m_IsoHandlerManager.signalActivityTransmit(); \ 
     47
     48#define SIGNAL_ACTIVITY_ISO_RECV { \ 
     49    m_IsoHandlerManager.signalActivityReceive(); \ 
     50
     51#define SIGNAL_ACTIVITY_ALL { \ 
     52    m_StreamProcessorManager.signalActivity(); \ 
     53    m_IsoHandlerManager.signalActivityTransmit(); \ 
     54    m_IsoHandlerManager.signalActivityReceive(); \ 
    4655} 
    4756 
     
    7180    , m_sync_delay( 0 ) 
    7281    , m_in_xrun( false ) 
     82    , m_min_ahead( 7999 ) 
    7383{ 
    7484    // create the timestamped buffer and register ourselves as its client 
    7585    m_data_buffer = new Util::TimestampedBuffer(this); 
    76     pthread_mutex_init(&m_activity_cond_lock, NULL); 
    77     pthread_cond_init(&m_activity_cond, NULL); 
    7886} 
    7987 
     
    8492    } 
    8593 
    86     // lock the condition mutex to keep threads from blocking on 
    87     // the condition var while we destroy it 
    88     pthread_mutex_lock(&m_activity_cond_lock); 
    89     // now signal activity, releasing threads that 
    90     // are already blocking on the condition variable 
    91     pthread_cond_broadcast(&m_activity_cond); 
    92     // then destroy it 
    93     pthread_cond_destroy(&m_activity_cond); 
    94     pthread_mutex_unlock(&m_activity_cond_lock); 
    95  
    96     // destroy the mutexes 
    97     pthread_mutex_destroy(&m_activity_cond_lock); 
    9894    if (m_data_buffer) delete m_data_buffer; 
    9995    if (m_scratch_buffer) delete[] m_scratch_buffer; 
     
    108104        debugError("Failed to stop SP\n"); 
    109105    } 
    110     SIGNAL_ACTIVITY
     106    SIGNAL_ACTIVITY_ALL
    111107} 
    112108 
     
    313309            m_dropped += dropped_cycles; 
    314310            m_last_cycle = cycle; 
    315             m_Parent.showDevice(); 
    316 //             flushDebugOutput(); 
    317 //             assert(0); 
     311            dumpInfo(); 
    318312        } 
    319313    } 
     
    481475        } else if(result2 == eCRV_OK) { 
    482476            // no problem here 
    483             SIGNAL_ACTIVITY; 
     477            // FIXME: cache the period size? 
     478            unsigned int periodsize = m_StreamProcessorManager.getPeriodSize(); 
     479            unsigned int bufferfill = m_data_buffer->getBufferFill(); 
     480            if(bufferfill >= periodsize) { 
     481                debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "signal activity, %d>%d\n", bufferfill, periodsize); 
     482                SIGNAL_ACTIVITY_SPM; 
     483                return RAW1394_ISO_DEFER; 
     484            } 
    484485            return RAW1394_ISO_OK; 
    485486        } else { 
     
    589590 
    590591    if(cycle_diff < 0 && (m_state == ePS_Running || m_state == ePS_DryRunning)) { 
    591         debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy)\n", 
    592             cycle, now_cycles); 
     592        unsigned int fc = m_data_buffer->getBufferFill(); 
     593        debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy, fill=%u)\n", 
     594            cycle, now_cycles, fc); 
    593595        if(m_state == ePS_Running) { 
    594596            debugShowBackLogLines(200); 
     
    642644            generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length); 
    643645            return RAW1394_ISO_OK; 
    644         // FIXME: PP: I think this should be possible too 
     646        // FIXME: PP: I think this should also be a possibility 
    645647        //} else if (result == eCRV_EmptyPacket) { 
    646648        //    goto send_empty_packet; 
     
    687689        enum eChildReturnValue result = generatePacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length); 
    688690        if (result == eCRV_Packet || result == eCRV_Defer) { 
    689             debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 
    690                                "XMIT: CY=%04u TS=%011llu\n", 
    691                                cycle, m_last_timestamp); 
     691            int ahead = diffCycles(cycle, now_cycles); 
     692            if (ahead < m_min_ahead) m_min_ahead = ahead; 
     693            debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 
     694                               "XMIT: CY=%04u TS=%011llu NOW_CY=%04u AHEAD=%04d\n", 
     695                               cycle, m_last_timestamp, now_cycles, ahead); 
    692696            // update some accounting 
    693697            m_last_good_cycle = cycle; 
     
    804808        } 
    805809    } 
    806  
    807     debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 
    808                        "XMIT EMPTY: CY=%04u\n", 
    809                        cycle); 
     810     
     811    { // context to avoid ahead var clash 
     812        int ahead = diffCycles(cycle, now_cycles); 
     813        if (ahead < m_min_ahead) m_min_ahead = ahead; 
     814        debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 
     815                           "XMIT EMPTY: CY=%04u, NOW_CY=%04u, AHEAD=%04d\n", 
     816                           cycle, now_cycles, ahead); 
     817    } 
     818 
    810819    generateEmptyPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length); 
    811820    generateEmptyPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length); 
    812821    return RAW1394_ISO_OK; 
    813822} 
    814  
    815823 
    816824// Frame Transfer API 
     
    823831bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) { 
    824832    bool result; 
    825     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE, 
    826                         "%p.getFrames(%d, %11llu)", 
    827                         nbframes, ts); 
     833    debugOutputExtreme( DEBUG_LEVEL_VERBOSE, 
     834                        "(%p, %s) getFrames(%d, %11llu)\n", 
     835                        this, getTypeString(), nbframes, ts); 
    828836    assert( getType() == ePT_Receive ); 
    829837    if(isDryRunning()) result = getFramesDry(nbframes, ts); 
    830838    else result = getFramesWet(nbframes, ts); 
    831     SIGNAL_ACTIVITY
     839    SIGNAL_ACTIVITY_ISO_RECV
    832840    return result; 
    833841} 
     
    890898    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d, %lld)\n", nbframes, ts); 
    891899    result = m_data_buffer->dropFrames(nbframes); 
    892     SIGNAL_ACTIVITY
     900    SIGNAL_ACTIVITY_ISO_RECV
    893901    return result; 
    894902} 
     
    897905{ 
    898906    bool result; 
    899     debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 
    900                        "%p.putFrames(%d, %11llu)", 
    901                        nbframes, ts); 
     907    debugOutputExtreme( DEBUG_LEVEL_VERBOSE, 
     908                        "(%p, %s) putFrames(%d, %11llu)\n", 
     909                        this, getTypeString(), nbframes, ts); 
    902910    assert( getType() == ePT_Transmit ); 
    903911    if(isDryRunning()) result = putFramesDry(nbframes, ts); 
    904912    else result = putFramesWet(nbframes, ts); 
    905     SIGNAL_ACTIVITY
     913    SIGNAL_ACTIVITY_ISO_XMIT
    906914    return result; 
    907915} 
     
    933941StreamProcessor::putSilenceFrames(unsigned int nbframes, int64_t ts) 
    934942{ 
    935     debugOutputExtreme(DEBUG_LEVEL_ULTRA_VERBOSE, 
     943    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, 
    936944                       "StreamProcessor::putSilenceFrames(%d, %llu)\n", 
    937945                       nbframes, ts); 
     
    955963    } 
    956964 
    957     SIGNAL_ACTIVITY
     965    SIGNAL_ACTIVITY_ISO_XMIT
    958966    return true; 
    959967} 
     
    966974    if(nbframes > 0) { 
    967975        result = m_data_buffer->dropFrames(nbframes); 
    968         SIGNAL_ACTIVITY
     976        SIGNAL_ACTIVITY_ALL
    969977        return result; 
    970978    } else { 
     
    973981            result &= m_data_buffer->writeDummyFrame(); 
    974982        } 
    975         SIGNAL_ACTIVITY
     983        SIGNAL_ACTIVITY_ALL
    976984        return result; 
    977985    } 
     
    11351143    // wake up any threads that might be waiting on data in the buffers 
    11361144    // since a state transition can cause data to become available 
    1137     SIGNAL_ACTIVITY
     1145    SIGNAL_ACTIVITY_ALL
    11381146    return true; 
    11391147} 
     
    14121420    } 
    14131421    #endif 
    1414     SIGNAL_ACTIVITY
     1422    SIGNAL_ACTIVITY_ALL
    14151423    return result; 
    14161424} 
     
    14461454    } 
    14471455    #endif 
    1448     SIGNAL_ACTIVITY
     1456    SIGNAL_ACTIVITY_ALL
    14491457    return true; 
    14501458} 
     
    14961504    } 
    14971505    #endif 
    1498     SIGNAL_ACTIVITY
     1506    SIGNAL_ACTIVITY_ALL
    14991507    return result; 
    15001508} 
     
    15451553    } 
    15461554    #endif 
    1547     SIGNAL_ACTIVITY
     1555    SIGNAL_ACTIVITY_ALL
    15481556    return true; 
    15491557} 
     
    15691577                                             this, m_last_cycle); 
    15701578            m_in_xrun = false; 
     1579            m_min_ahead = 7999; 
    15711580            m_local_node_id = m_1394service.getLocalNodeId() & 0x3f; 
    15721581            m_data_buffer->setTransparent(false); 
     
    15831592    } 
    15841593    #endif 
    1585     SIGNAL_ACTIVITY
     1594    SIGNAL_ACTIVITY_ALL
    15861595    return result; 
    15871596} 
     
    16151624    } 
    16161625    #endif 
    1617     SIGNAL_ACTIVITY
     1626    SIGNAL_ACTIVITY_ALL
    16181627    return true; 
    16191628} 
     
    17281737    debugError("Invalid state transition: %s => %s\n", 
    17291738        ePSToString(m_state), ePSToString(next_state)); 
    1730     SIGNAL_ACTIVITY
     1739    SIGNAL_ACTIVITY_ALL
    17311740    return false; 
    17321741updateState_exit_change_failed: 
    17331742    debugError("State transition failed: %s => %s\n", 
    17341743        ePSToString(m_state), ePSToString(next_state)); 
    1735     SIGNAL_ACTIVITY
     1744    SIGNAL_ACTIVITY_ALL
    17361745    return false; 
    1737 } 
    1738  
    1739 bool StreamProcessor::waitForProducePacket() 
    1740 { 
    1741     return waitForProduce(getNominalFramesPerPacket()); 
    1742 } 
    1743 bool StreamProcessor::waitForProducePeriod() 
    1744 { 
    1745     return waitForProduce(m_StreamProcessorManager.getPeriodSize()); 
    1746 } 
    1747 bool StreamProcessor::waitForProduce(unsigned int nframes) 
    1748 { 
    1749     debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 
    1750                        "(%p, %s) wait ...\n", 
    1751                        this, getTypeString()); 
    1752     struct timespec ts; 
    1753     int result; 
    1754     int max_runs = 1000; 
    1755  
    1756     if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { 
    1757         debugError("clock_gettime failed\n"); 
    1758         return false; 
    1759     } 
    1760  
    1761     // FIXME: hardcoded timeout of 10 sec 
    1762 //     ts.tv_nsec += 1000 * 1000000LL; 
    1763 //     while (ts.tv_nsec > 1000000000LL) { 
    1764 //         ts.tv_sec += 1; 
    1765 //         ts.tv_nsec -= 1000000000LL; 
    1766 //     } 
    1767     ts.tv_sec += 2; 
    1768      
    1769     pthread_mutex_lock(&m_activity_cond_lock); 
    1770     while(!canProduce(nframes) && max_runs) { 
    1771         result = pthread_cond_timedwait(&m_activity_cond, &m_activity_cond_lock, &ts); 
    1772      
    1773         if(result != 0) { 
    1774             if (result == ETIMEDOUT) { 
    1775                 debugOutput(DEBUG_LEVEL_VERBOSE, 
    1776                             "(%p, %s) pthread_cond_timedwait() timed out (result=%d)\n", 
    1777                             this, getTypeString(), result); 
    1778                 pthread_mutex_unlock(&m_activity_cond_lock); 
    1779                 dumpInfo(); 
    1780                 return false; 
    1781             } else if (result == EINTR) { 
    1782                 debugOutput(DEBUG_LEVEL_VERBOSE, 
    1783                             "(%p, %s) pthread_cond_timedwait() interrupted by signal (result=%d)\n", 
    1784                             this, getTypeString(), result); 
    1785                 pthread_mutex_unlock(&m_activity_cond_lock); 
    1786                 dumpInfo(); 
    1787                 return false; 
    1788             } else { 
    1789                 debugError("(%p, %s) pthread_cond_timedwait error (result=%d)\n",  
    1790                             this, getTypeString(), result); 
    1791                 pthread_mutex_unlock(&m_activity_cond_lock); 
    1792                 dumpInfo(); 
    1793                 return false; 
    1794             } 
    1795         } 
    1796     } 
    1797     pthread_mutex_unlock(&m_activity_cond_lock); 
    1798     if(max_runs == 0) { 
    1799         debugWarning("(%p) runaway loop\n"); 
    1800     } 
    1801     return true; 
    1802 } 
    1803  
    1804 bool StreamProcessor::waitForConsumePacket() 
    1805 { 
    1806     return waitForConsume(getNominalFramesPerPacket()); 
    1807 } 
    1808 bool StreamProcessor::waitForConsumePeriod() 
    1809 { 
    1810     return waitForConsume(m_StreamProcessorManager.getPeriodSize()); 
    1811 } 
    1812 bool StreamProcessor::waitForConsume(unsigned int nframes) 
    1813 { 
    1814     debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 
    1815                        "(%p, %s) wait ...\n", 
    1816                        this, getTypeString()); 
    1817     struct timespec ts; 
    1818     int result; 
    1819  
    1820     int max_runs = 1000; 
    1821  
    1822     if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { 
    1823         debugError("clock_gettime failed\n"); 
    1824         return false; 
    1825     } 
    1826  
    1827     // FIXME: hardcoded timeout of 10 sec 
    1828 //     ts.tv_nsec += 1000 * 1000000LL; 
    1829 //     while (ts.tv_nsec > 1000000000LL) { 
    1830 //         ts.tv_sec += 1; 
    1831 //         ts.tv_nsec -= 1000000000LL; 
    1832 //     } 
    1833     ts.tv_sec += 2; 
    1834  
    1835     pthread_mutex_lock(&m_activity_cond_lock); 
    1836     while(!canConsume(nframes) && max_runs) { 
    1837         result = pthread_cond_timedwait(&m_activity_cond, &m_activity_cond_lock, &ts); 
    1838         if(result != 0) { 
    1839             if (result == ETIMEDOUT) { 
    1840                 debugOutput(DEBUG_LEVEL_VERBOSE, 
    1841                             "(%p, %s) pthread_cond_timedwait() timed out (result=%d)\n", 
    1842                             this, getTypeString(), result); 
    1843                 pthread_mutex_unlock(&m_activity_cond_lock); 
    1844                 dumpInfo(); 
    1845                 return false; 
    1846             } else if (result == EINTR) { 
    1847                 debugOutput(DEBUG_LEVEL_VERBOSE, 
    1848                             "(%p, %s) pthread_cond_timedwait() interrupted by signal (result=%d)\n", 
    1849                             this, getTypeString(), result); 
    1850                 pthread_mutex_unlock(&m_activity_cond_lock); 
    1851                 dumpInfo(); 
    1852                 return false; 
    1853             } else { 
    1854                 debugError("(%p, %s) pthread_cond_timedwait error (result=%d)\n",  
    1855                             this, getTypeString(), result); 
    1856                 pthread_mutex_unlock(&m_activity_cond_lock); 
    1857                 dumpInfo(); 
    1858                 return false; 
    1859             } 
    1860         } 
    1861         max_runs--; 
    1862     } 
    1863     pthread_mutex_unlock(&m_activity_cond_lock); 
    1864      
    1865     if(max_runs == 0) { 
    1866         debugWarning("(%p) runaway loop\n"); 
    1867     } 
    1868      
    1869     debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 
    1870                        "(%p, %s) leave ...\n", 
    1871                        this, getTypeString()); 
    1872     return true; 
    18731746} 
    18741747 
     
    18851758    if(m_in_xrun) return true; 
    18861759    if(m_state == ePS_Running && m_next_state == ePS_Running) { 
    1887          
    1888         if(getType() == ePT_Transmit) { 
    1889             // can we put a certain amount of frames into the buffer? 
    1890             unsigned int bufferspace = m_data_buffer->getBufferSpace(); 
    1891             if(bufferspace >= nframes) { 
    1892                 return true; 
    1893             } else return false; 
    1894         } else { 
    1895             // do we still have to put frames in the buffer? 
    1896             unsigned int bufferfill = m_data_buffer->getBufferFill(); 
    1897             unsigned int periodsize = m_StreamProcessorManager.getPeriodSize(); 
    1898             if (bufferfill > periodsize) return false; 
    1899             else return true; 
    1900         } 
    1901  
    1902  
     1760        // can we put a certain amount of frames into the buffer? 
     1761        unsigned int bufferspace = m_data_buffer->getBufferSpace(); 
     1762        if(bufferspace >= nframes) { 
     1763            return true; 
     1764        } else return false; 
    19031765    } else { 
    19041766        if(getType() == ePT_Transmit) { 
     
    19471809 * Helper routines                             * 
    19481810 ***********************************************/ 
     1811// FIXME: I think this can be removed and replaced by putSilenceFrames 
    19491812bool 
    19501813StreamProcessor::transferSilence(unsigned int nframes) 
    19511814{ 
    19521815    bool retval; 
     1816 
     1817    #ifdef DEBUG 
    19531818    signed int fc; 
    19541819    ffado_timestamp_t ts_tail_tmp; 
     1820    m_data_buffer->getBufferTailTimestamp(&ts_tail_tmp, &fc); 
     1821    if (fc != 0) { 
     1822        debugWarning("Prefilling a buffer that already contains %d frames\n", fc); 
     1823    } 
     1824    #endif 
    19551825 
    19561826    // prepare a buffer of silence 
    19571827    char *dummybuffer = (char *)calloc(getEventSize(), nframes * getEventsPerFrame()); 
    19581828    transmitSilenceBlock(dummybuffer, nframes, 0); 
    1959  
    1960     m_data_buffer->getBufferTailTimestamp(&ts_tail_tmp, &fc); 
    1961     if (fc != 0) { 
    1962         debugWarning("Prefilling a buffer that already contains %d frames\n", fc); 
    1963     } 
    19641829 
    19651830    // add the silence data to the ringbuffer 
     
    19701835        retval = false; 
    19711836    } 
     1837 
    19721838    free(dummybuffer); 
    19731839    return retval; 
     
    20231889                        (unsigned int)TICKS_TO_CYCLES(now), 
    20241890                        (unsigned int)TICKS_TO_OFFSET(now)); 
     1891    if(getType() == ePT_Transmit) { 
     1892        debugOutputShort( DEBUG_LEVEL_NORMAL, "  Min ISOXMT bufferfill : %04d\n", m_min_ahead); 
     1893    } 
    20251894    debugOutputShort( DEBUG_LEVEL_NORMAL, "  Xrun?                 : %s\n", (m_in_xrun ? "True":"False")); 
    20261895    if (m_state == m_next_state) { 
  • trunk/libffado/src/libstreaming/generic/StreamProcessor.h

    r1001 r1005  
    163163    bool putFrames(unsigned int nbframes, int64_t ts); ///< transfer the client contents to the buffer 
    164164 
    165     //FIXME: document wait functions 
    166     bool waitForProducePacket(); 
    167     bool waitForProducePeriod(); 
    168     bool waitForProduce(unsigned int nframes); 
    169  
    170     bool waitForConsumePacket(); 
    171     bool waitForConsumePeriod(); 
    172     bool waitForConsume(unsigned int nframes); 
    173  
    174165    bool canProducePacket(); 
    175166    bool canProducePeriod(); 
     
    476467    private: 
    477468        bool m_in_xrun; 
    478         pthread_mutex_t m_activity_cond_lock; 
    479         pthread_cond_t  m_activity_cond; 
    480469 
    481470public: 
     
    488477    const char *getTypeString() 
    489478        {return ePTToString(getType());}; 
    490     StreamStatistics m_PacketStat; 
    491     StreamStatistics m_PeriodStat; 
    492     StreamStatistics m_WakeupStat; 
     479 
     480    int m_min_ahead; // DEBUG 
     481 
    493482    DECLARE_DEBUG_MODULE; 
    494483}; 
  • trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

    r1001 r1005  
    4242    , m_SyncSource(NULL) 
    4343    , m_xrun_happened( false ) 
     44    , m_activity_wait_timeout_usec( 1000*1000 ) 
    4445    , m_nb_buffers( 0 ) 
    4546    , m_period( 0 ) 
     
    5152{ 
    5253    addOption(Util::OptionContainer::Option("slaveMode",false)); 
     54    sem_init(&m_activity_semaphore, 0, 0); 
    5355} 
    5456 
     
    5759    , m_SyncSource(NULL) 
    5860    , m_xrun_happened( false ) 
     61    , m_activity_wait_timeout_usec( 1000*1000 ) 
    5962    , m_nb_buffers(nb_buffers) 
    6063    , m_period(period) 
     
    6669{ 
    6770    addOption(Util::OptionContainer::Option("slaveMode",false)); 
     71    sem_init(&m_activity_semaphore, 0, 0); 
    6872} 
    6973 
    7074StreamProcessorManager::~StreamProcessorManager() { 
     75    sem_post(&m_activity_semaphore); 
     76    sem_destroy(&m_activity_semaphore); 
    7177} 
    7278 
     
    100106 
    101107    m_WaitLock.Unlock(); 
     108} 
     109 
     110void 
     111StreamProcessorManager::signalActivity() 
     112{ 
     113    sem_post(&m_activity_semaphore); 
     114    debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p activity\n", this); 
     115} 
     116 
     117enum StreamProcessorManager::eActivityResult 
     118StreamProcessorManager::waitForActivity() 
     119{ 
     120    debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p waiting for activity\n", this); 
     121    struct timespec ts; 
     122    int result; 
     123 
     124    if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { 
     125        debugError("clock_gettime failed\n"); 
     126        return eAR_Error; 
     127    } 
     128    long long int timeout_nsec=0; 
     129    int timeout_sec = 0; 
     130    if (m_activity_wait_timeout_usec >= 0) { 
     131        timeout_nsec = m_activity_wait_timeout_usec * 1000LL; 
     132        timeout_sec = 0; 
     133        while(timeout_nsec >= 1000000000LL) { 
     134            timeout_sec += 1; 
     135            timeout_nsec -= 1000000000LL; 
     136        } 
     137        ts.tv_nsec += timeout_nsec; 
     138        ts.tv_sec += timeout_sec; 
     139    } 
     140 
     141    if (m_activity_wait_timeout_usec >= 0) { 
     142        result = sem_timedwait(&m_activity_semaphore, &ts); 
     143    } else { 
     144        result = sem_wait(&m_activity_semaphore); 
     145    } 
     146 
     147    if(result != 0) { 
     148        if (result == ETIMEDOUT) { 
     149            debugOutput(DEBUG_LEVEL_VERBOSE, 
     150                        "(%p) pthread_cond_timedwait() timed out (result=%d)\n", 
     151                        this, result); 
     152            return eAR_Timeout; 
     153        } else if (result == EINTR) { 
     154            debugOutput(DEBUG_LEVEL_VERBOSE, 
     155                        "(%p) pthread_cond_[timed]wait() interrupted by signal (result=%d)\n", 
     156                        this, result); 
     157            return eAR_Interrupted; 
     158        } else { 
     159            debugError("(%p) pthread_cond_[timed]wait error (result=%d)\n",  
     160                        this, result); 
     161            debugError("(%p) timeout_sec=%d timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n",  
     162                       this, timeout_sec, timeout_nsec, ts.tv_sec, ts.tv_nsec); 
     163            return eAR_Error; 
     164        } 
     165    } 
     166 
     167    debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p got activity\n", this); 
     168    return eAR_Activity; 
    102169} 
    103170 
     
    545612            }; 
    546613 
     614            // before we do anything else, transfer 
     615            if(!transferSilence()) { 
     616                debugError("Could not transfer silence\n"); 
     617                return false; 
     618            } 
     619 
     620            // now calculate the stream offset 
    547621            i = 0; 
    548622            for ( i = 0; i < nb_rcv_sp; i++) { 
     
    558632            } 
    559633 
    560             if(!transferSilence()) { 
    561                 debugError("Could not transfer silence\n"); 
    562                 return false; 
    563             } 
    564634            nb_sync_runs--; 
    565635        } 
     
    806876 
    807877    while(period_not_ready) { 
    808         debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,  
     878        debugOutputExtreme(DEBUG_LEVEL_VERBOSE,  
    809879                           "waiting for period (%d frames in buffer)...\n", 
    810880                           m_SyncSource->getBufferFill()); 
    811         bool result; 
    812         if(m_SyncSource->getType() == StreamProcessor::ePT_Receive) { 
    813             result = m_SyncSource->waitForConsumePeriod(); 
    814         } else { 
    815             result = m_SyncSource->waitForProducePeriod(); 
    816         } 
    817 //         if(!result) { 
    818 //             debugError("Error waiting for signal\n"); 
    819 //             return false; 
    820 //         } 
     881 
     882        // wait for something to happen 
     883        switch(waitForActivity()) { 
     884            case eAR_Error: 
     885                debugError("Error while waiting for activity\n"); 
     886                return false; 
     887            case eAR_Interrupted: 
     888                // FIXME: what to do here? 
     889                debugWarning("Interrupted while waiting for activity\n"); 
     890                break; 
     891            case eAR_Timeout: 
     892                // FIXME: what to do here? 
     893                debugWarning("Timeout while waiting for activity\n"); 
     894                break; 
     895            case eAR_Activity: 
     896                // do nothing 
     897                break; 
     898        } 
     899        debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "got activity...\n"); 
    821900 
    822901        // HACK: this should be solved more elegantly 
     
    838917            } 
    839918        } 
     919        debugOutputExtreme(DEBUG_LEVEL_VERBOSE, " period not ready? %d...\n", period_not_ready); 
     920 
    840921        // check for underruns on the ISO side, 
    841922        // those should make us bail out of the wait loop 
     
    885966    m_time_of_transfer2 = m_time_of_transfer; 
    886967    #endif 
    887      
    888     debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE, 
     968 
     969    debugOutputExtreme( DEBUG_LEVEL_VERBOSE, 
    889970                        "transfer at %llu ticks...\n", 
    890971                        m_time_of_transfer); 
     
    10271108    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n"); 
    10281109    bool retval=true; 
     1110    // NOTE: the order here is opposite from the order in 
     1111    // normal operation (transmit is before receive), because 
     1112    // we can do that here (data=silence=available) and  
     1113    // it increases reliability (esp. on startup) 
     1114    retval &= transferSilence(StreamProcessor::ePT_Transmit); 
    10291115    retval &= transferSilence(StreamProcessor::ePT_Receive); 
    1030     retval &= transferSilence(StreamProcessor::ePT_Transmit); 
    10311116    return retval; 
    10321117} 
  • trunk/libffado/src/libstreaming/StreamProcessorManager.h

    r967 r1005  
    6969    bool startDryRunning(); 
    7070    bool syncStartAll(); 
     71    // activity signaling 
     72    enum eActivityResult { 
     73        eAR_Activity, 
     74        eAR_Timeout, 
     75        eAR_Interrupted, 
     76        eAR_Error 
     77    }; 
     78    void signalActivity(); 
     79    enum eActivityResult waitForActivity(); 
    7180 
    7281    // this is the setup API 
     
    139148        {return *m_SyncSource;}; 
    140149 
    141 protected: 
     150protected: // FIXME: private? 
    142151 
    143     // thread sync primitive
     152    // thread related var
    144153    bool m_xrun_happened; 
     154    int m_activity_wait_timeout_usec; 
    145155    bool m_thread_realtime; 
    146156    int m_thread_priority; 
     157 
     158    // activity signaling 
     159    sem_t m_activity_semaphore; 
    147160 
    148161    // processor list