Changeset 807

Show
Ignore:
Timestamp:
01/05/08 06:07:23 (13 years ago)
Author:
ppalmers
Message:

more reliability things

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/libffado/config.h.in

    r803 r807  
    3838#define SHAREDIR "$sharedir" 
    3939 
    40 #define MINIMUM_INTERRUPTS_PER_PERIOD   2U 
    41 #define MAX_ISO_XMIT_BUFFER_FILL_PCT    50 
     40#define IEEE1394SERVICE_USE_CYCLETIMER_DLL  0 
     41#define IEEE1394SERVICE_MAX_FIREWIRE_PORTS  16 
    4242 
    43 #define ISOHANDLER_PER_HANDLER_THREAD   1 
    44 #define ISOHANDLER_USE_POLL             0 
     43#define IEEE1394SERVICE_ISOMANAGER_PRIO_INCREASE         0 
     44#define IEEE1394SERVICE_CYCLETIMER_HELPER_PRIO_INCREASE  5 
     45 
     46#define THREAD_MAX_RTPRIO                   98 
     47#define THREAD_MIN_RTPRIO                   0 
     48 
     49#define MINIMUM_INTERRUPTS_PER_PERIOD       2U 
     50#define MAX_ISO_XMIT_BUFFER_FILL_PCT        50 
     51 
     52#define ISOHANDLER_PER_HANDLER_THREAD       1 
     53#define ISOHANDLER_USE_POLL                 0 
     54#define ISOHANDLER_FLUSH_BEFORE_ITERATE     0 
    4555 
    4656#define ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT         16 
  • trunk/libffado/src/devicemanager.cpp

    r784 r807  
    560560        debugWarning("XRUN detected\n"); 
    561561        // do xrun recovery 
    562         m_processorManager->handleXrun(); 
    563         return false; 
     562        if(m_processorManager->handleXrun()) { 
     563            return false; 
     564        } else { 
     565            debugError("Could not handle XRUN\n"); 
     566            return false; 
     567        } 
    564568    } 
    565569} 
  • trunk/libffado/src/ffado.cpp

    r795 r807  
    245245        return dev->options.period_size; 
    246246    } else { 
    247         debugWarning("XRUN"); 
     247        debugWarning("XRUN\n"); 
    248248        xruns++; 
    249249        return -1; 
  • trunk/libffado/src/libieee1394/CycleTimerHelper.cpp

    r788 r807  
    2121 * 
    2222 */ 
     23 
     24#include "config.h" 
    2325 
    2426#include "CycleTimerHelper.h" 
     
    4749    pthread_mutex_unlock(&m_compute_vars_lock); \ 
    4850    } 
    49  
    50 #define OLD_STYLE 
    5151 
    5252IMPL_DEBUG_MODULE( CycleTimerHelper, CycleTimerHelper, DEBUG_LEVEL_NORMAL ); 
     
    100100{ 
    101101    debugOutput( DEBUG_LEVEL_VERBOSE, "Start %p...\n", this); 
    102 #ifndef OLD_STYLE 
     102#if IEEE1394SERVICE_USE_CYCLETIMER_DLL 
    103103    m_Thread = new Util::PosixThread(this, m_realtime, m_priority,  
    104104                                     PTHREAD_CANCEL_DEFERRED); 
     
    126126CycleTimerHelper::setThreadParameters(bool rt, int priority) { 
    127127    debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) switch to: (rt=%d, prio=%d)...\n", this, rt, priority); 
    128     if (priority > 98) priority = 98; // cap the priority 
     128    if (priority > THREAD_MAX_RTPRIO) priority = THREAD_MAX_RTPRIO; // cap the priority 
    129129    m_realtime = rt; 
    130130    m_priority = priority; 
    131131 
    132 #ifndef OLD_STYLE 
     132#if IEEE1394SERVICE_USE_CYCLETIMER_DLL 
    133133    if (m_Thread) { 
    134134        if (m_realtime) { 
     
    139139    } 
    140140#endif 
     141 
    141142    return true; 
    142143} 
     
    157158} 
    158159 
    159 #ifdef OLD_STYLE 
    160  
    161 bool 
    162 CycleTimerHelper::Execute() 
    163 
    164     usleep(1000*1000); 
    165     return true; 
    166 
    167 uint32_t 
    168 CycleTimerHelper::getCycleTimerTicks() 
    169 
    170     uint32_t cycle_timer; 
    171     uint64_t local_time; 
    172     if(!m_Parent.readCycleTimerReg(&cycle_timer, &local_time)) { 
    173         debugError("Could not read cycle timer register\n"); 
    174         return 0; 
    175     } 
    176     return CYCLE_TIMER_TO_TICKS(cycle_timer); 
    177 
    178  
    179 uint32_t 
    180 CycleTimerHelper::getCycleTimerTicks(uint64_t now) 
    181 
    182     return getCycleTimerTicks(); 
    183 
    184  
    185 uint32_t 
    186 CycleTimerHelper::getCycleTimer() 
    187 
    188     uint32_t cycle_timer; 
    189     uint64_t local_time; 
    190     if(!m_Parent.readCycleTimerReg(&cycle_timer, &local_time)) { 
    191         debugError("Could not read cycle timer register\n"); 
    192         return 0; 
    193     } 
    194     return cycle_timer; 
    195 
    196  
    197 uint32_t 
    198 CycleTimerHelper::getCycleTimer(uint64_t now) 
    199 
    200     return getCycleTimer(); 
    201 
    202 #else 
     160#if IEEE1394SERVICE_USE_CYCLETIMER_DLL 
    203161 
    204162bool 
     
    327285    return TICKS_TO_CYCLE_TIMER(getCycleTimerTicks(now)); 
    328286} 
     287 
     288#else 
     289 
     290bool 
     291CycleTimerHelper::Execute() 
     292{ 
     293    usleep(1000*1000); 
     294    return true; 
     295} 
     296uint32_t 
     297CycleTimerHelper::getCycleTimerTicks() 
     298{ 
     299    uint32_t cycle_timer; 
     300    uint64_t local_time; 
     301    if(!m_Parent.readCycleTimerReg(&cycle_timer, &local_time)) { 
     302        debugError("Could not read cycle timer register\n"); 
     303        return 0; 
     304    } 
     305    return CYCLE_TIMER_TO_TICKS(cycle_timer); 
     306} 
     307 
     308uint32_t 
     309CycleTimerHelper::getCycleTimerTicks(uint64_t now) 
     310{ 
     311    return getCycleTimerTicks(); 
     312} 
     313 
     314uint32_t 
     315CycleTimerHelper::getCycleTimer() 
     316{ 
     317    uint32_t cycle_timer; 
     318    uint64_t local_time; 
     319    if(!m_Parent.readCycleTimerReg(&cycle_timer, &local_time)) { 
     320        debugError("Could not read cycle timer register\n"); 
     321        return 0; 
     322    } 
     323    return cycle_timer; 
     324} 
     325 
     326uint32_t 
     327CycleTimerHelper::getCycleTimer(uint64_t now) 
     328{ 
     329    return getCycleTimer(); 
     330} 
     331 
    329332#endif 
    330333 
  • trunk/libffado/src/libieee1394/ieee1394service.cpp

    r782 r807  
    2323 */ 
    2424 
     25#include "config.h" 
    2526#include "ieee1394service.h" 
    2627#include "ARMHandler.h" 
     
    4243#include <iostream> 
    4344#include <iomanip> 
    44  
    45 #define FFADO_MAX_FIREWIRE_PORTS 16 
    46  
    47 #define ISOMANAGER_PRIO_INCREASE         11 
    48 #define CYCLETIMER_HELPER_PRIO_INCREASE  10 
    4945 
    5046IMPL_DEBUG_MODULE( Ieee1394Service, Ieee1394Service, DEBUG_LEVEL_NORMAL ); 
     
    8076    , m_realtime ( rt ) 
    8177    , m_base_priority ( prio ) 
    82     , m_pIsoManager( new IsoHandlerManager( *this, rt, prio + ISOMANAGER_PRIO_INCREASE ) ) 
    83     , m_pCTRHelper ( new CycleTimerHelper( *this, 1000, rt, prio + CYCLETIMER_HELPER_PRIO_INCREASE ) ) 
     78    , m_pIsoManager( new IsoHandlerManager( *this, rt, prio + IEEE1394SERVICE_ISOMANAGER_PRIO_INCREASE ) ) 
     79    , m_pCTRHelper ( new CycleTimerHelper( *this, 1000, rt, prio + IEEE1394SERVICE_CYCLETIMER_HELPER_PRIO_INCREASE ) ) 
    8480    , m_have_new_ctr_read ( false ) 
    8581    , m_pTimeSource ( new Util::SystemTimeSource() ) 
     
    135131        return 0; 
    136132    } 
    137     struct raw1394_portinfo pinf[FFADO_MAX_FIREWIRE_PORTS]; 
    138     int nb_detected_ports = raw1394_get_port_info(tmp_handle, pinf, FFADO_MAX_FIREWIRE_PORTS); 
     133    struct raw1394_portinfo pinf[IEEE1394SERVICE_MAX_FIREWIRE_PORTS]; 
     134    int nb_detected_ports = raw1394_get_port_info(tmp_handle, pinf, IEEE1394SERVICE_MAX_FIREWIRE_PORTS); 
    139135    raw1394_destroy_handle(tmp_handle); 
    140136 
     
    215211        return false; 
    216212    } 
    217     struct raw1394_portinfo pinf[FFADO_MAX_FIREWIRE_PORTS]; 
    218     int nb_detected_ports = raw1394_get_port_info(tmp_handle, pinf, FFADO_MAX_FIREWIRE_PORTS); 
     213    struct raw1394_portinfo pinf[IEEE1394SERVICE_MAX_FIREWIRE_PORTS]; 
     214    int nb_detected_ports = raw1394_get_port_info(tmp_handle, pinf, IEEE1394SERVICE_MAX_FIREWIRE_PORTS); 
    219215    raw1394_destroy_handle(tmp_handle); 
    220216 
     
    224220    } 
    225221 
    226     if(nb_detected_ports && port < FFADO_MAX_FIREWIRE_PORTS) { 
     222    if(nb_detected_ports && port < IEEE1394SERVICE_MAX_FIREWIRE_PORTS) { 
    227223        m_portName = pinf[port].name; 
    228224    } else { 
     
    275271Ieee1394Service::setThreadParameters(bool rt, int priority) { 
    276272    bool result = true; 
    277     if (priority > 98) priority = 98
     273    if (priority > THREAD_MAX_RTPRIO) priority = THREAD_MAX_RTPRIO
    278274    m_base_priority = priority; 
    279275    m_realtime = rt; 
    280276    if (m_pIsoManager) { 
    281277        debugOutput(DEBUG_LEVEL_VERBOSE, "Switching IsoManager to (rt=%d, prio=%d)\n", 
    282                                          rt, priority + ISOMANAGER_PRIO_INCREASE); 
    283         result &= m_pIsoManager->setThreadParameters(rt, priority + ISOMANAGER_PRIO_INCREASE); 
     278                                         rt, priority + IEEE1394SERVICE_ISOMANAGER_PRIO_INCREASE); 
     279        result &= m_pIsoManager->setThreadParameters(rt, priority + IEEE1394SERVICE_ISOMANAGER_PRIO_INCREASE); 
    284280    } 
    285281    if (m_pCTRHelper) { 
    286282        debugOutput(DEBUG_LEVEL_VERBOSE, "Switching CycleTimerHelper to (rt=%d, prio=%d)\n",  
    287                                          rt, priority + CYCLETIMER_HELPER_PRIO_INCREASE); 
    288         result &= m_pCTRHelper->setThreadParameters(rt, priority + CYCLETIMER_HELPER_PRIO_INCREASE); 
     283                                         rt, priority + IEEE1394SERVICE_CYCLETIMER_HELPER_PRIO_INCREASE); 
     284        result &= m_pCTRHelper->setThreadParameters(rt, priority + IEEE1394SERVICE_CYCLETIMER_HELPER_PRIO_INCREASE); 
    289285    } 
    290286    return result; 
  • trunk/libffado/src/libieee1394/IsoHandler.cpp

    r803 r807  
    174174IsoHandler::waitForClient() 
    175175{ 
     176    debugOutput(DEBUG_LEVEL_VERBOSE, "waiting...\n"); 
     177    if(m_Client) { 
     178        bool result = m_Client->waitForSignal(); 
     179        debugOutput(DEBUG_LEVEL_VERBOSE, " returns %d\n", result); 
     180        return result; 
     181    } else { 
     182        debugOutput(DEBUG_LEVEL_VERBOSE, " no client\n"); 
     183    } 
     184    return false; 
     185} 
     186 
     187bool 
     188IsoHandler::tryWaitForClient() 
     189{ 
    176190    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "waiting...\n"); 
    177191    if(m_Client) { 
    178         bool result = m_Client->waitForFrames(); 
     192        bool result = m_Client->tryWaitForSignal(); 
    179193        debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " returns %d\n", result); 
    180194        return result; 
    181     } 
    182     return false; 
    183 
    184  
    185 bool 
    186 IsoHandler::tryWaitForClient() 
    187 
    188     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "waiting...\n"); 
    189     if(m_Client) { 
    190         bool result = m_Client->tryWaitForFrames(); 
    191         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " returns %d\n", result); 
    192         return result; 
     195    } else { 
     196        debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " no client\n"); 
    193197    } 
    194198    return false; 
     
    209213 
    210214    // wait for the availability of frames in the client 
    211     // (blocking) 
    212     if (getType()==eHT_Receive || waitForClient()) { 
     215    // (blocking for transmit handlers) 
     216#ifdef DEBUG 
     217    if (getType() == eHT_Transmit) { 
     218        debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) Waiting for Client to signal frame availability...\n", this); 
     219    } 
     220#endif 
     221    if (getType() == eHT_Receive || waitForClient()) { 
    213222 
    214223#if ISOHANDLER_USE_POLL 
     
    249258        // iterate blocks if no 1394 data is available 
    250259        // so poll'ing is not really necessary 
    251         bool result = iterate(); 
    252         //usleep(125); 
     260         
     261        bool result = true; 
     262        while(result && m_Client->canProcessPackets()) { 
     263            result = iterate(); 
     264            debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) Iterate returned: %d\n", 
     265                        this, (m_type==eHT_Receive?"Receive":"Transmit"), result); 
     266        } 
    253267        return result; 
    254268#endif 
     
    261275bool 
    262276IsoHandler::iterate() { 
    263     //flush(); 
    264     if(raw1394_loop_iterate(m_handle)) { 
    265         debugOutput( DEBUG_LEVEL_VERBOSE, 
    266                     "IsoHandler (%p): Failed to iterate handler: %s\n", 
    267                     this,strerror(errno)); 
    268         return false; 
    269     } 
    270     return true; 
     277    debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) Iterating ISO handler\n",  
     278                this, (m_type==eHT_Receive?"Receive":"Transmit")); 
     279    if(m_State == E_Running) { 
     280#if ISOHANDLER_FLUSH_BEFORE_ITERATE 
     281        flush(); 
     282#endif 
     283        if(raw1394_loop_iterate(m_handle)) { 
     284            debugOutput( DEBUG_LEVEL_VERBOSE, 
     285                        "IsoHandler (%p): Failed to iterate handler: %s\n", 
     286                        this, strerror(errno)); 
     287            return false; 
     288        } 
     289        return true; 
     290    } else { 
     291        debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) Not iterating a non-running handler...\n", 
     292                    this, (m_type==eHT_Receive?"Receive":"Transmit")); 
     293        return false; 
     294    } 
    271295} 
    272296 
     
    274298IsoHandler::setThreadParameters(bool rt, int priority) { 
    275299    debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) switch to: (rt=%d, prio=%d)...\n", this, rt, priority); 
    276     if (priority > 98) priority = 98; // cap the priority 
     300    if (priority > THREAD_MAX_RTPRIO) priority = THREAD_MAX_RTPRIO; // cap the priority 
    277301    m_realtime = rt; 
    278302    m_priority = priority; 
     
    343367bool IsoHandler::disable() 
    344368{ 
    345     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); 
     369    debugOutput( DEBUG_LEVEL_VERBOSE, "(%p, %s) enter...\n",  
     370                 this, (m_type==eHT_Receive?"Receive":"Transmit")); 
    346371 
    347372    // check state 
  • trunk/libffado/src/libieee1394/IsoHandlerManager.cpp

    r803 r807  
    6868IsoHandlerManager::setThreadParameters(bool rt, int priority) { 
    6969    debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) switch to: (rt=%d, prio=%d)...\n", this, rt, priority); 
    70     if (priority > 98) priority = 98; // cap the priority 
     70    if (priority > THREAD_MAX_RTPRIO) priority = THREAD_MAX_RTPRIO; // cap the priority 
    7171    m_realtime = rt; 
    7272    m_priority = priority; 
     
    409409 
    410410    // set the handler's thread parameters 
    411     if(!h->setThreadParameters(m_realtime, m_priority)) { 
     411    // receive handlers have lower priority than the client thread 
     412    // since they have ISO side buffering 
     413    // xmit handlers have higher priority since we want client side 
     414    // frames to be put into the ISO buffers ASAP 
     415    int thread_prio; 
     416    if (stream->getType()==StreamProcessor::ePT_Receive) { 
     417        thread_prio = m_priority - 1; 
     418        if (thread_prio < THREAD_MIN_RTPRIO) thread_prio = THREAD_MIN_RTPRIO; 
     419    } else { 
     420        thread_prio = m_priority + 1; 
     421        if (thread_prio > THREAD_MAX_RTPRIO) thread_prio = THREAD_MAX_RTPRIO; 
     422    } 
     423 
     424    if(!h->setThreadParameters(m_realtime, thread_prio)) { 
    412425        debugFatal("Could not set handler thread parameters\n"); 
    413426        return false; 
  • trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp

    r803 r807  
    3939#include <assert.h> 
    4040#include <math.h> 
     41 
     42/* 
     43#define POST_SEMAPHORE { \ 
     44    int tmp; \ 
     45    sem_getvalue(&m_signal_semaphore, &tmp); \ 
     46    debugWarning("posting semaphore from value %d\n", tmp); \ 
     47    sem_post(&m_signal_semaphore); \ 
     48} 
     49*/ 
     50 
     51#define POST_SEMAPHORE { \ 
     52    sem_post(&m_signal_semaphore); \ 
     53} 
    4154 
    4255namespace Streaming { 
     
    6376    , m_sync_delay( 0 ) 
    6477    , m_in_xrun( false ) 
     78    , m_signal_period( 0 ) 
     79    , m_signal_offset( 0 ) 
    6580{ 
    6681    // create the timestamped buffer and register ourselves as its client 
     
    7691    if (m_data_buffer) delete m_data_buffer; 
    7792    if (m_scratch_buffer) delete[] m_scratch_buffer; 
     93    sem_destroy(&m_signal_semaphore); 
    7894} 
    7995 
     
    111127{ 
    112128#if ISOHANDLER_PER_HANDLER_THREAD 
    113     // if we use one thread per packet, we can put every frame into the ISO buffer 
     129    // if we use one thread per packet, we can put every frame directly into the ISO buffer 
    114130    // the waitForClient in IsoHandler will take care of the fact that the frames are 
    115131    // not present in time 
    116     unsigned int packets_to_prebuffer = (getPacketsPerPeriod() * (m_StreamProcessorManager.getNbBuffers()-1)); 
     132    unsigned int packets_to_prebuffer = (getPacketsPerPeriod() * (m_StreamProcessorManager.getNbBuffers())); 
    117133    debugOutput(DEBUG_LEVEL_VERBOSE, "Nominal prebuffer: %u\n", packets_to_prebuffer); 
    118134    return packets_to_prebuffer; 
     
    286302            m_dropped += dropped_cycles; 
    287303            m_in_xrun = true; 
     304            m_last_cycle = cycle; 
     305            POST_SEMAPHORE; 
     306            return RAW1394_ISO_DEFER; 
    288307            //flushDebugOutput(); 
    289308            //assert(0); 
     
    295314    if (m_state == ePS_Invalid) { 
    296315        debugError("Should not have state %s\n", ePSToString(m_state) ); 
     316        POST_SEMAPHORE; 
    297317        return RAW1394_ISO_ERROR; 
    298318    } 
     
    316336            if (!updateState()) { // we are allowed to change the state directly 
    317337                debugError("Could not update state!\n"); 
     338                POST_SEMAPHORE; 
    318339                return RAW1394_ISO_ERROR; 
    319340            } 
     
    334355            if (!updateState()) { // we are allowed to change the state directly 
    335356                debugError("Could not update state!\n"); 
     357                POST_SEMAPHORE; 
    336358                return RAW1394_ISO_ERROR; 
    337359            } 
     
    364386                if (!updateState()) { // we are allowed to change the state directly 
    365387                    debugError("Could not update state!\n"); 
     388                    POST_SEMAPHORE; 
    366389                    return RAW1394_ISO_ERROR; 
    367390                } 
     
    381404            if (!updateState()) { // we are allowed to change the state directly 
    382405                debugError("Could not update state!\n"); 
     406                POST_SEMAPHORE; 
    383407                return RAW1394_ISO_ERROR; 
    384408            } 
     
    400424                if (!updateState()) { // we are allowed to change the state directly 
    401425                    debugError("Could not update state!\n"); 
     426                    POST_SEMAPHORE; 
    402427                    return RAW1394_ISO_ERROR; 
    403428                } 
     429                POST_SEMAPHORE; 
    404430                return RAW1394_ISO_DEFER; 
    405431            } 
     
    421447            if (!updateState()) { // we are allowed to change the state directly 
    422448                debugError("Could not update state!\n"); 
     449                POST_SEMAPHORE; 
    423450                return RAW1394_ISO_ERROR; 
    424451            } 
     452            POST_SEMAPHORE; 
    425453            return RAW1394_ISO_DEFER; 
    426454        } else if(result2 == eCRV_OK) { 
    427455            // no problem here 
     456            // if we have enough samples, we can post the semaphore and  
     457            // defer further processing until later. this will allow us to 
     458            // run the client and process the frames such that we can put them 
     459            // into the xmit buffers ASAP 
     460            if (m_state == ePS_Running) { 
     461                unsigned int bufferfill = m_data_buffer->getBufferFill(); 
     462                if(bufferfill >= m_signal_period + m_signal_offset) { 
     463                    // this to avoid multiple signals for the same period 
     464                    int semval; 
     465                    sem_getvalue(&m_signal_semaphore, &semval); 
     466                    unsigned int signal_period = m_signal_period * (semval + 1) + m_signal_offset; 
     467                    if(bufferfill >= signal_period) { 
     468                        debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) buffer fill (%d) > signal period (%d), sem_val=%d\n", 
     469                                    this, m_data_buffer->getBufferFill(), signal_period, semval); 
     470                        POST_SEMAPHORE; 
     471                    } 
     472                    // the process thread should have higher prio such that we are blocked until 
     473                    // the samples are processed. 
     474                } 
     475            } 
    428476            return RAW1394_ISO_OK; 
    429477        } else { 
    430478            debugError("Invalid response\n"); 
     479            POST_SEMAPHORE; 
    431480            return RAW1394_ISO_ERROR; 
    432481        } 
     
    436485    } else { 
    437486        debugError("Invalid response\n"); 
     487        POST_SEMAPHORE; 
    438488        return RAW1394_ISO_ERROR; 
    439489    } 
    440490    debugError("reached the unreachable\n"); 
     491    POST_SEMAPHORE; 
    441492    return RAW1394_ISO_ERROR; 
    442493} 
     
    783834    m_data_buffer->blockProcessWriteFrames(nbframes, ts); 
    784835    debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, " New timestamp: %llu\n", ts); 
     836 
     837    unsigned int bufferfill = m_data_buffer->getBufferFill(); 
     838    if (bufferfill >= m_signal_period + m_signal_offset) { 
     839        debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) sufficient frames in buffer (%d / %d), posting semaphore\n", 
     840                                         this, bufferfill, m_signal_period + m_signal_offset); 
     841        POST_SEMAPHORE; 
     842    } else { 
     843        debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) insufficient frames in buffer (%d / %d), not posting semaphore\n", 
     844                                         this, bufferfill, m_signal_period + m_signal_offset); 
     845    } 
    785846    return true; // FIXME: what about failure? 
    786847} 
     
    816877        return false; 
    817878    } 
     879 
     880    unsigned int bufferfill = m_data_buffer->getBufferFill(); 
     881    if (bufferfill >= m_signal_period + m_signal_offset) { 
     882        debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) sufficient frames in buffer (%d / %d), posting semaphore\n", 
     883                                         this, bufferfill, m_signal_period + m_signal_offset); 
     884        POST_SEMAPHORE; 
     885    } else { 
     886        debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) insufficient frames in buffer (%d / %d), not posting semaphore\n", 
     887                                         this, bufferfill, m_signal_period + m_signal_offset); 
     888    } 
     889 
    818890    return true; 
    819891} 
    820892 
    821893bool 
    822 StreamProcessor::waitForFrames() 
    823 
     894StreamProcessor::waitForSignal() 
     895
     896    int result; 
    824897    if(m_state == ePS_Running) { 
    825         assert(m_data_buffer); 
    826         if(getType() == ePT_Receive) { 
    827             return m_data_buffer->waitForFrames(m_StreamProcessorManager.getPeriodSize()); 
    828         } else { 
    829             return m_data_buffer->waitForFrames(getNominalFramesPerPacket()); 
    830         } 
     898        result = sem_wait(&m_signal_semaphore); 
     899#ifdef DEBUG 
     900        int tmp; 
     901        sem_getvalue(&m_signal_semaphore, &tmp); 
     902        debugOutput(DEBUG_LEVEL_VERBOSE, " sem_wait returns: %d, sem_value: %d\n", result, tmp); 
     903#endif 
     904        return result == 0; 
     905    } else { 
     906        // when we're not running, we can always provide frames 
     907        debugOutput(DEBUG_LEVEL_VERBOSE, "Not running...\n"); 
     908        return true; 
     909    } 
     910
     911 
     912bool 
     913StreamProcessor::tryWaitForSignal() 
     914
     915    if(m_state == ePS_Running) { 
     916        return sem_trywait(&m_signal_semaphore) == 0; 
    831917    } else { 
    832918        // when we're not running, we can always provide frames 
     
    837923 
    838924bool 
    839 StreamProcessor::tryWaitForFrames() 
    840 
    841     if(m_state == ePS_Running) { 
    842         assert(m_data_buffer); 
    843         if(getType() == ePT_Receive) { 
    844             return m_data_buffer->tryWaitForFrames(m_StreamProcessorManager.getPeriodSize()); 
    845         } else { 
    846             return m_data_buffer->tryWaitForFrames(getNominalFramesPerPacket()); 
    847         } 
     925StreamProcessor::canProcessPackets() 
     926
     927    if(m_state != ePS_Running) return true; 
     928    bool result; 
     929    int bufferfill; 
     930    if(getType() == ePT_Receive) { 
     931        bufferfill = m_data_buffer->getBufferSpace(); 
    848932    } else { 
    849         // when we're not running, we can always provide frames 
    850         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Not running...\n"); 
    851         return true; 
    852     } 
     933        bufferfill = m_data_buffer->getBufferFill(); 
     934    } 
     935    result = bufferfill > getNominalFramesPerPacket(); 
     936    debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) for a bufferfill of %d, we return %d\n", 
     937                this, ePTToString(getType()), bufferfill, result); 
     938    return result; 
    853939} 
    854940 
     
    9421028{ 
    9431029    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "init...\n"); 
     1030 
     1031    if (sem_init(&m_signal_semaphore, 0, 0) == -1) { 
     1032        debugError("Could not init signal semaphore"); 
     1033        return false; 
     1034    } 
    9441035 
    9451036    if(!m_IsoHandlerManager.registerStream(this)) { 
     
    14081499            // be picked up by the packet iterator 
    14091500 
    1410             if(!m_data_buffer->clearBuffer()) { // FIXME: don't like the reset() name 
     1501            sem_init(&m_signal_semaphore, 0, 0); 
     1502            m_signal_period = m_StreamProcessorManager.getPeriodSize(); 
     1503            m_signal_offset = 0; // FIXME: we have to ensure that everyone is ready 
     1504 
     1505            if(!m_data_buffer->clearBuffer()) { 
    14111506                debugError("Could not reset data buffer\n"); 
    14121507                return false; 
     
    14191514                    debugFatal("Could not prefill transmit stream\n"); 
    14201515                    return false; 
     1516                } 
     1517                if (m_data_buffer->getBufferFill() >= m_signal_period + m_signal_offset) { 
     1518                    POST_SEMAPHORE; 
    14211519                } 
    14221520            } 
     
    14891587        case ePS_Running: 
    14901588            // the thread will do the transition 
     1589 
     1590            // we have to wake the iterator if it's asleep 
     1591            POST_SEMAPHORE; 
    14911592            break; 
    14921593        default: 
     
    16911792StreamProcessor::dumpInfo() 
    16921793{ 
    1693     debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor %p:\n", this); 
     1794    debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor %p, %s:\n", this, ePTToString(m_processor_type)); 
    16941795    debugOutputShort( DEBUG_LEVEL_NORMAL, "  Port, Channel  : %d, %d\n", m_1394service.getPort(), m_channel); 
    16951796    uint64_t now = m_1394service.getCycleTimerTicks(); 
  • trunk/libffado/src/libstreaming/generic/StreamProcessor.h

    r803 r807  
    3434 
    3535#include "debugmodule/debugmodule.h" 
     36#include <semaphore.h> 
    3637 
    3738class Ieee1394Service; 
     
    157158    bool putFrames(unsigned int nbframes, int64_t ts); ///< transfer the client contents to the buffer 
    158159 
    159     /** 
    160      * @brief waits for the availability of frames (blocking) 
    161      * @param nframes number of frames 
    162      * 
    163      * @return true if frames are available, false if not (e.g. signal occurred) 
    164      */ 
    165     bool waitForFrames(); 
    166  
    167     /** 
    168      * @brief waits for the availability of frames (non-blocking) 
    169      * @param nframes number of frames 
    170      * 
    171      * @return true if frames are available, false if not 
    172      */ 
    173     bool tryWaitForFrames(); 
     160    unsigned int getSignalPeriod() {return m_signal_period;}; 
     161    bool setSignalPeriod(unsigned int p) {m_signal_period=p; return true;}; 
     162    /** 
     163     * @brief waits for a 'signal' (blocking) 
     164     * 
     165     * a 'signal' is: 
     166     * when type==Receive: 
     167     *  - one signal_period of frames is present in the buffer 
     168     *    (received by the iso side) 
     169     *  - an error has occurred (xrun, iso error, ...) 
     170     * when type==Transmit: 
     171     *  - at least one signal_period of frames are present in the buffer 
     172     *    (have been written into it by the client) 
     173     *  - an error occurred 
     174     * 
     175     * @return true if the 'signal' is available, false if error 
     176     */ 
     177    bool waitForSignal(); 
     178 
     179    /** 
     180     * @brief checks for a 'signal' (non-blocking) 
     181     * 
     182     * a 'signal' is: 
     183     * when type==Receive: 
     184     *  - one signal_period of frames is present in the buffer 
     185     *    (received by the iso side) 
     186     *  - an error has occurred (xrun, iso error, ...) 
     187     * when type==Transmit: 
     188     *  - at least one signal_period of frames are present in the buffer 
     189     *    (have been written into it by the client) 
     190     *  - an error occurred 
     191     * 
     192     * @return true if the 'signal' is available, false if not (or error) 
     193     */ 
     194    bool tryWaitForSignal(); 
     195 
     196    /** 
     197     * @brief can a SP process (queue, dequeue) packets at this moment? 
     198     * 
     199     * 
     200     * @return true if packet processing makes sense 
     201     */ 
     202    bool canProcessPackets(); 
    174203 
    175204    /** 
     
    460489    private: 
    461490        bool m_in_xrun; 
     491        sem_t m_signal_semaphore; 
     492        unsigned int m_signal_period; 
     493        unsigned int m_signal_offset; 
     494 
    462495public: 
    463496    // debug stuff 
  • trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

    r803 r807  
    111111            if ( *it == processor ) { 
    112112                if (*it == m_SyncSource) { 
    113                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source"); 
     113                    debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n"); 
    114114                    m_SyncSource = NULL; 
    115115                } 
     
    127127            if ( *it == processor ) { 
    128128                if (*it == m_SyncSource) { 
    129                     debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source"); 
     129                    debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n"); 
    130130                    m_SyncSource = NULL; 
    131131                } 
     
    546546    } 
    547547    // wait for the SP's to get into the dry-running state 
    548     int cnt = 200
     548    int cnt = 2000
    549549    bool ready = false; 
    550550    while (!ready && cnt) { 
     
    565565    if(cnt==0) { 
    566566        debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n"); 
     567        for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     568            it != m_ReceiveProcessors.end(); 
     569            ++it ) { 
     570            (*it)->dumpInfo(); 
     571        } 
     572        for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     573            it != m_TransmitProcessors.end(); 
     574            ++it ) { 
     575            (*it)->dumpInfo(); 
     576        } 
    567577        return false; 
    568578    } 
     
    661671bool StreamProcessorManager::waitForPeriod() { 
    662672    if(m_SyncSource == NULL) return false; 
    663     int time_till_next_period; 
    664673    bool xrun_occurred = false; 
    665  
    666     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n"); 
    667  
    668     time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); 
    669  
    670     while(time_till_next_period > 0) { 
    671         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period); 
    672  
    673         // wait for the period 
    674         SleepRelativeUsec(time_till_next_period); 
     674    bool period_not_ready = true; 
     675 
     676    while(period_not_ready) { 
     677        debugOutput( DEBUG_LEVEL_VERBOSE, "waiting for period (%d frames in buffer)...\n", m_SyncSource->getBufferFill()); 
     678        if(!m_SyncSource->waitForSignal()) { 
     679            debugError("Error waiting for signal\n"); 
     680            return false; 
     681        } 
     682 
     683        unsigned int bufferfill = m_SyncSource->getBufferFill(); 
     684        period_not_ready = bufferfill < m_period; 
     685 
     686#ifdef DEBUG 
     687        if(period_not_ready) { 
     688            debugOutput(DEBUG_LEVEL_VERBOSE, "period is not ready (bufferfill: %u)\n", bufferfill); 
     689        } else { 
     690            debugOutput(DEBUG_LEVEL_VERBOSE, "period is ready (bufferfill: %u)\n", bufferfill); 
     691        } 
     692#endif 
    675693 
    676694        // check for underruns on the ISO side, 
     
    689707        } 
    690708        if(xrun_occurred) break; 
    691  
    692         // check if we were waked up too soon 
    693         time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); 
     709        // FIXME: make sure we also exit this loop when something else happens (e.g. signal, iso error) 
    694710    } 
    695711 
     
    701717    //       and the receive processors should have done their transfer. 
    702718    m_time_of_transfer = m_SyncSource->getTimeAtPeriod(); 
    703     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n", 
     719    debugOutput( DEBUG_LEVEL_VERBOSE, "transfer at %llu ticks...\n", 
    704720        m_time_of_transfer); 
    705  
    706     xrun_occurred = false; 
    707  
    708 #if STREAMPROCESSORMANAGER_DYNAMIC_SYNC_DELAY 
    709     // normally we can transfer frames at this time, but in some cases this is not true 
    710     // e.g. when there are not enough frames in the receive buffer. 
    711     // however this doesn't have to be a problem, since we can wait some more until we 
    712     // have enough frames. There is only a problem once the ISO xmit doesn't have packets 
    713     // to transmit, or if the receive buffer overflows. These conditions are signaled by 
    714     // the iso threads 
    715     // check if xruns occurred on the Iso side. 
    716     // also check if xruns will occur should we transfer() now 
    717     #ifdef DEBUG 
    718     int waited = 0; 
    719     #endif 
    720      
    721     bool ready_for_transfer = false; 
    722     bool ready; 
    723     while (!ready_for_transfer && !xrun_occurred) { 
    724         // FIXME: can deadlock when the iso handlers die (e.g. unplug the device) 
    725         ready_for_transfer = true; 
    726         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    727             it != m_ReceiveProcessors.end(); 
    728             ++it ) { 
    729             ready = ((*it)->canClientTransferFrames(m_period)); 
    730             ready_for_transfer &= ready; 
    731             xrun_occurred |= (*it)->xrunOccurred(); 
    732         } 
    733         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    734             it != m_TransmitProcessors.end(); 
    735             ++it ) { 
    736             ready = ((*it)->canClientTransferFrames(m_period)); 
    737             //ready_for_transfer &= ready; 
    738             xrun_occurred |= (*it)->xrunOccurred(); 
    739         } 
    740         if(!ready_for_transfer) { 
    741             debugWarning("xrun_occurred = %d\n", xrun_occurred); 
    742         } 
    743         if (!ready_for_transfer) { 
    744              
    745             SleepRelativeUsec(125); // MAGIC: one cycle sleep... 
    746  
    747             // in order to avoid this in the future, we increase the sync delay of the sync source SP 
    748             int d = m_SyncSource->getSyncDelay() + TICKS_PER_CYCLE; 
    749             m_SyncSource->setSyncDelay(d); 
    750             d = m_SyncSource->getSyncDelay(); 
    751             debugOutput(DEBUG_LEVEL_VERBOSE, "Increased the Sync delay to: %d ticks (%f frames, %f cy)\n",  
    752                                              d, ((float)d)/m_SyncSource->getTicksPerFrame(),  
    753                                              ((float)d)/((float)TICKS_PER_CYCLE)); 
    754  
    755             #ifdef DEBUG 
    756             waited++; 
    757             #endif 
    758         } 
    759     } // we are either ready or an xrun occurred 
    760      
    761     // in order to avoid a runaway value of the sync delay, we gradually decrease 
    762     // it. It will be increased by a 'too early' event (cfr some lines higher) 
    763     // hence we'll be at a good point on average. 
    764     int d = m_SyncSource->getSyncDelay() - 1; 
    765     if (d >= 0) m_SyncSource->setSyncDelay(d); 
    766  
    767  
    768     #ifdef DEBUG 
    769     if(waited > 0) { 
    770         debugOutput(DEBUG_LEVEL_VERBOSE, "Waited %d x 125us due to SP not ready for transfer\n", waited); 
    771     } 
    772     #endif 
    773 #endif 
    774721 
    775722    // this is to notify the client of the delay that we introduced by waiting 
  • trunk/libffado/src/libutil/TimestampedBuffer.cpp

    r803 r807  
    5252    pthread_mutex_unlock(&m_framecounter_lock); \ 
    5353    } 
    54 /* 
    55 #define POST_SEMAPHORE { \ 
    56     int tmp; \ 
    57     sem_getvalue(&m_frame_semaphore, &tmp); \ 
    58     debugWarning("posting semaphore from value %d\n", tmp); \ 
    59     sem_post(&m_frame_semaphore); \ 
    60 
    61 */ 
    62  
    63 //HACK 
    64 #define POST_SEMAPHORE { \ 
    65     if(m_update_period > 8) { \ 
    66         sem_post(&m_frame_semaphore); \ 
    67     } \ 
    68 
     54 
    6955namespace Util { 
    7056 
     
    8571{ 
    8672    pthread_mutex_init(&m_framecounter_lock, NULL); 
    87  
    8873} 
    8974 
     
    9176    ffado_ringbuffer_free(m_event_buffer); 
    9277    free(m_cluster_buffer); 
    93     sem_destroy(&m_frame_semaphore); 
    9478} 
    9579 
     
    281265 
    282266/** 
     267 * \brief Returns the current write space in the buffer 
     268 * 
     269 * This returns the buffer free space of the internal ringbuffer. This 
     270 * can only be used as an indication because it's state is not 
     271 * guaranteed to be consistent at all times due to threading issues. 
     272 * 
     273 * @return the internal buffer fill in frames 
     274 */ 
     275unsigned int TimestampedBuffer::getBufferSpace() { 
     276    return ffado_ringbuffer_write_space(m_event_buffer)/(m_bytes_per_frame); 
     277} 
     278 
     279/** 
    283280 * \brief Initializes the TimestampedBuffer 
    284281 * 
     
    289286 */ 
    290287bool TimestampedBuffer::init() { 
    291     if (sem_init(&m_frame_semaphore, 0, 0) == -1) { 
    292         debugError("Could not init frame semaphore"); 
    293         return false; 
    294     } 
    295288    return true; 
    296289} 
     
    371364} 
    372365 
    373 bool 
    374 TimestampedBuffer::waitForFrames(unsigned int nframes) 
    375 { 
    376     int result; 
    377     do { 
    378         unsigned int bufferfill = getBufferFill(); 
    379         if(bufferfill >= nframes) { 
    380             // first make the semaphore 0 
    381             while((result=sem_trywait(&m_frame_semaphore)) == 0) {}; 
    382             return true; 
    383         } else { 
    384             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, 
    385                         "only %d frames in buffer, waiting for more (%d)\n",  
    386                         bufferfill, nframes); 
    387         } 
    388     } while((result=sem_wait(&m_frame_semaphore)) == 0); 
    389     debugOutput(DEBUG_LEVEL_VERBOSE, 
    390                 "sem_wait returns: %d\n", 
    391                 result); 
    392     return false; 
    393 } 
    394  
    395 bool 
    396 TimestampedBuffer::tryWaitForFrames(unsigned int nframes) 
    397 { 
    398     int result; 
    399     do { 
    400         unsigned int bufferfill = getBufferFill(); 
    401         if(bufferfill >= nframes) { 
    402             // first make the semaphore 0 
    403             while((result=sem_trywait(&m_frame_semaphore)) == 0) {}; 
    404             return true; 
    405         } else { 
    406             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, 
    407                         "only %d frames in buffer, waiting for more (%d)\n",  
    408                         bufferfill, nframes); 
    409         } 
    410     } while((result=sem_trywait(&m_frame_semaphore)) == 0); 
    411     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, 
    412                 "sem_trywait returns: %d\n",  
    413                 result); 
    414     return false; 
    415 } 
    416  
    417 bool 
    418 TimestampedBuffer::waitForFrames() 
    419 { 
    420     return waitForFrames(m_update_period); 
    421 } 
    422  
    423 bool 
    424 TimestampedBuffer::tryWaitForFrames() 
    425 { 
    426     return tryWaitForFrames(m_update_period); 
    427 } 
    428  
    429  
    430366/** 
    431367 * @brief Insert a dummy frame to the head buffer 
     
    459395    m_framecounter++; 
    460396    EXIT_CRITICAL_SECTION; 
    461  
    462     POST_SEMAPHORE; 
    463397    return true; 
    464398} 
     
    537471        setBufferTailTimestamp(ts); 
    538472    } 
    539  
    540     POST_SEMAPHORE; 
    541473    return true; 
    542474} 
     
    13101242    // ts(x) = m_buffer_tail_timestamp + 
    13111243    //         (m_buffer_next_tail_timestamp - m_buffer_tail_timestamp)/(samples_between_updates)*x 
    1312     POST_SEMAPHORE; 
    13131244} 
    13141245 
  • trunk/libffado/src/libutil/TimestampedBuffer.h

    r803 r807  
    2727#include "../debugmodule/debugmodule.h" 
    2828#include "libutil/ringbuffer.h" 
    29 #include <semaphore.h> 
    3029 
    3130//typedef float ffado_timestamp_t; 
     
    156155 
    157156        unsigned int getBufferFill(); 
     157        unsigned int getBufferSpace(); 
    158158 
    159159        // timestamp stuff 
     
    244244        float m_current_rate; 
    245245        unsigned int m_update_period; 
    246  
    247         sem_t m_frame_semaphore; 
    248246}; 
    249247 
  • trunk/libffado/tests/streaming/teststreaming3.cpp

    r794 r807  
    292292 
    293293    dev_options.realtime = (arguments.rtprio != 0); 
    294     dev_options.packetizer_priority = arguments.rtprio + 1
     294    dev_options.packetizer_priority = arguments.rtprio
    295295     
    296296    dev_options.verbose = arguments.verbose;