Changeset 860

Show
Ignore:
Timestamp:
01/19/08 10:47:41 (16 years ago)
Author:
ppalmers
Message:

clean up synchronization in streamprocessor

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/libffado/src/libieee1394/IsoHandler.cpp

    r841 r860  
    174174IsoHandler::waitForClient() 
    175175{ 
    176     //debugOutput(DEBUG_LEVEL_VERBOSE, "waiting...\n"); 
     176    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "waiting...\n"); 
    177177    if(m_Client) { 
    178         bool result = m_Client->waitForSignal(); 
    179         //debugOutput(DEBUG_LEVEL_VERBOSE, " returns %d\n", result); 
     178        bool result; 
     179        if (m_type == eHT_Receive) { 
     180            result = m_Client->waitForProducePacket(); 
     181        } else { 
     182            result = m_Client->waitForConsumePacket(); 
     183        } 
     184        debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " returns %d\n", result); 
    180185        return result; 
    181186    } else { 
     
    188193IsoHandler::tryWaitForClient() 
    189194{ 
    190     //debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "waiting...\n"); 
     195    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "waiting...\n"); 
    191196    if(m_Client) { 
    192         bool result = m_Client->tryWaitForSignal(); 
    193         //debugOutput(DEBUG_LEVEL_VERBOSE, " returns %d\n", result); 
     197        bool result; 
     198        if (m_type == eHT_Receive) { 
     199            result = m_Client->canProducePacket(); 
     200        } else { 
     201            result = m_Client->canConsumePacket(); 
     202        } 
     203        debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " returns %d\n", result); 
    194204        return result; 
    195205    } else { 
     
    214224    // wait for the availability of frames in the client 
    215225    // (blocking for transmit handlers) 
    216 #if 0 //#ifdef DEBUG 
    217     if (getType() == eHT_Transmit) { 
    218         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p) Waiting for Client to signal frame availability...\n", this); 
    219     } 
    220 #endif 
    221     if (getType() == eHT_Receive || waitForClient()) { 
    222  
     226    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) Waiting for Client activity...\n", this, getTypeString()); 
     227    if (waitForClient()) { 
    223228#if ISOHANDLER_USE_POLL 
    224229        bool result = true; 
    225         while(result && m_Client && m_Client->canProcessPackets()) { 
     230        while(result && m_Client && tryWaitForClient()) { 
    226231            int err = poll(&m_poll_fd, 1, m_poll_timeout); 
    227232            if (err == -1) { 
     
    255260        // so poll'ing is not really necessary 
    256261        bool result = true; 
    257         while(result && m_Client && m_Client->canProcessPackets()) { 
     262        while(result && m_Client && tryWaitForClient()) { 
    258263            result = iterate(); 
    259264//             if (getType() == eHT_Receive) { 
     
    272277bool 
    273278IsoHandler::iterate() { 
    274 //     if(m_type==eHT_Receive) { 
    275 //         debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) Iterating ISO handler\n",  
    276 //                 this, (m_type==eHT_Receive?"Receive":"Transmit")); 
    277 //     } 
     279    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) Iterating ISO handler...\n", 
     280                this, getTypeString()); 
    278281    if(m_State == E_Running) { 
    279282#if ISOHANDLER_FLUSH_BEFORE_ITERATE 
     
    285288            return false; 
    286289        } 
    287 //         debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s)  done iterating ISO handler\n",  
    288 //                 this, (m_type==eHT_Receive?"Receive":"Transmit")); 
     290        debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) done interating ISO handler...\n", 
     291                    this, getTypeString()); 
    289292        return true; 
    290293    } else { 
    291294        debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) Not iterating a non-running handler...\n", 
    292                     this, (m_type==eHT_Receive?"Receive":"Transmit")); 
     295                    this, getTypeString()); 
    293296        return false; 
    294297    } 
     
    415418 
    416419    debugOutputShort( DEBUG_LEVEL_NORMAL, "  Handler type................: %s\n", 
    417             (this->getType() == eHT_Receive ? "Receive" : "Transmit")); 
     420            getTypeString()); 
    418421    debugOutputShort( DEBUG_LEVEL_NORMAL, "  Port, Channel...............: %2d, %2d\n", 
    419422            m_manager.get1394Service().getPort(), channel); 
     
    475478                    unsigned int cycle, unsigned int dropped) { 
    476479 
    477     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, 
     480/*    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, 
    478481                 "received packet: length=%d, channel=%d, cycle=%d\n", 
    479                  length, channel, cycle ); 
     482                 length, channel, cycle );*/ 
    480483    m_packetcount++; 
    481484    m_dropped += dropped; 
     
    494497                    int cycle, unsigned int dropped) { 
    495498 
    496     debugOutput( DEBUG_LEVEL_ULTRA_VERBOSE, 
     499/*    debugOutput( DEBUG_LEVEL_ULTRA_VERBOSE, 
    497500                    "sending packet: length=%d, cycle=%d\n", 
    498                     *length, cycle ); 
     501                    *length, cycle );*/ 
    499502    m_packetcount++; 
    500503    m_dropped += dropped; 
     
    503506        return m_Client->getPacket(data, length, tag, sy, cycle, dropped, m_max_packet_size); 
    504507    } 
     508    *tag = 0; 
     509    *sy = 0; 
     510    *length = 0; 
    505511    return RAW1394_ISO_OK; 
    506512} 
     
    594600    return true; 
    595601} 
     602 
     603/** 
     604 * @brief convert a EHandlerType to a string 
     605 * @param t the type 
     606 * @return a char * describing the state 
     607 */ 
     608const char * 
     609IsoHandler::eHTToString(enum EHandlerType t) { 
     610    switch (t) { 
     611        case eHT_Receive: return "Receive"; 
     612        case eHT_Transmit: return "Transmit"; 
     613        default: return "error: unknown type"; 
     614    } 
     615} 
  • trunk/libffado/src/libieee1394/IsoHandler.h

    r803 r860  
    100100    void flush(); 
    101101    enum EHandlerType getType() {return m_type;}; 
     102    const char *getTypeString() {return eHTToString(m_type); }; 
     103 
     104    // pretty printing 
     105    const char *eHTToString(enum EHandlerType); 
    102106 
    103107    bool isEnabled() 
  • trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp

    r857 r860  
    4040#include <math.h> 
    4141 
    42 /* 
    43 #define POST_SEMAPHORE { \ 
    44     int tmp; \ 
    45     sem_getvalue(&m_signal_semaphore, &tmp); \ 
    46     debugWarning("posting semaphore from value %d\n", tmp); \ 
    47     sem_post(&m_signal_semaphore); \ 
    48 
    49 */ 
    50  
    51 #define POST_SEMAPHORE { \ 
    52     sem_post(&m_signal_semaphore); \ 
     42#define SIGNAL_ACTIVITY { \ 
     43    pthread_mutex_lock(&m_activity_cond_lock); \ 
     44    pthread_cond_broadcast(&m_activity_cond); \ 
     45    pthread_mutex_unlock(&m_activity_cond_lock); \ 
    5346} 
    5447 
     
    7871    , m_sync_delay( 0 ) 
    7972    , m_in_xrun( false ) 
    80     , m_signal_period( 0 ) 
    81     , m_signal_offset( 0 ) 
    8273{ 
    8374    // create the timestamped buffer and register ourselves as its client 
    8475    m_data_buffer = new Util::TimestampedBuffer(this); 
     76    pthread_mutex_init(&m_activity_cond_lock, NULL); 
     77    pthread_cond_init(&m_activity_cond, NULL); 
    8578} 
    8679 
     
    9083        debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister stream processor with the Iso manager\n"); 
    9184    } 
    92     // make the threads leave the wait condition 
    93     POST_SEMAPHORE; 
    94     sem_destroy(&m_signal_semaphore); 
    95  
     85 
     86    // lock the condition mutex to keep threads from blocking on 
     87    // the condition var while we destroy it 
     88    pthread_mutex_lock(&m_activity_cond_lock); 
     89    // now signal activity, releasing threads that 
     90    // are already blocking on the condition variable 
     91    pthread_cond_broadcast(&m_activity_cond); 
     92    // then destroy it 
     93    pthread_cond_destroy(&m_activity_cond); 
     94    pthread_mutex_unlock(&m_activity_cond_lock); 
     95 
     96    // destroy the mutexes 
     97    pthread_mutex_destroy(&m_activity_cond_lock); 
    9698    if (m_data_buffer) delete m_data_buffer; 
    9799    if (m_scratch_buffer) delete[] m_scratch_buffer; 
     
    316318    if (m_state == ePS_Invalid) { 
    317319        debugError("Should not have state %s\n", ePSToString(m_state) ); 
    318         POST_SEMAPHORE; 
    319320        return RAW1394_ISO_ERROR; 
    320321    } 
     
    338339            if (!updateState()) { // we are allowed to change the state directly 
    339340                debugError("Could not update state!\n"); 
    340                 POST_SEMAPHORE; 
    341341                return RAW1394_ISO_ERROR; 
    342342            } 
     
    358358            if (!updateState()) { // we are allowed to change the state directly 
    359359                debugError("Could not update state!\n"); 
    360                 POST_SEMAPHORE; 
    361360                return RAW1394_ISO_ERROR; 
    362361            } 
     
    383382            if (!updateState()) { // we are allowed to change the state directly 
    384383                debugError("Could not update state!\n"); 
    385                 POST_SEMAPHORE; 
    386384                return RAW1394_ISO_ERROR; 
    387385            } 
     
    417415                if (!updateState()) { // we are allowed to change the state directly 
    418416                    debugError("Could not update state!\n"); 
    419                     POST_SEMAPHORE; 
    420417                    return RAW1394_ISO_ERROR; 
    421418                } 
     
    435432            if (!updateState()) { // we are allowed to change the state directly 
    436433                debugError("Could not update state!\n"); 
    437                 POST_SEMAPHORE; 
    438434                return RAW1394_ISO_ERROR; 
    439435            } 
     
    455451            if (!updateState()) { // we are allowed to change the state directly 
    456452                debugError("Could not update state!\n"); 
    457                 POST_SEMAPHORE; 
    458453                return RAW1394_ISO_ERROR; 
    459454            } 
    460             POST_SEMAPHORE; 
    461455            return RAW1394_ISO_DEFER; 
    462456        } else if(result2 == eCRV_OK) { 
    463457            // no problem here 
    464             // if we have enough samples, we can post the semaphore and  
    465             // defer further processing until later. this will allow us to 
    466             // run the client and process the frames such that we can put them 
    467             // into the xmit buffers ASAP 
    468             if (m_state == ePS_Running) { 
    469                 unsigned int bufferfill = m_data_buffer->getBufferFill(); 
    470                 if(bufferfill >= m_signal_period + m_signal_offset) { 
    471                     // this to avoid multiple signals for the same period 
    472                     int semval; 
    473                     sem_getvalue(&m_signal_semaphore, &semval); 
    474                     // NOTE: this can cause 2 posts to be done when the receiving thread 
    475                     //       decreases the semaphore but hasn't processed the frames yet 
    476                     unsigned int signal_period = m_signal_period * (semval + 1) + m_signal_offset; 
    477                     if(bufferfill >= signal_period) { 
    478                         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p) buffer fill (%d) > signal period (%d), sem_val=%d\n", 
    479                                     this, m_data_buffer->getBufferFill(), signal_period, semval); 
    480                         POST_SEMAPHORE; 
    481                     } 
    482                     // the process thread should have higher prio such that we are blocked until 
    483                     // the samples are processed. 
    484                     return RAW1394_ISO_DEFER; 
    485                 } 
    486             } 
     458            SIGNAL_ACTIVITY; 
    487459            return RAW1394_ISO_OK; 
    488460        } else { 
    489461            debugError("Invalid response\n"); 
    490             POST_SEMAPHORE; 
    491462            return RAW1394_ISO_ERROR; 
    492463        } 
     
    496467    } else { 
    497468        debugError("Invalid response\n"); 
    498         POST_SEMAPHORE; 
    499469        return RAW1394_ISO_ERROR; 
    500470    } 
    501471    debugError("reached the unreachable\n"); 
    502     POST_SEMAPHORE; 
    503472    return RAW1394_ISO_ERROR; 
    504473} 
     
    633602 
    634603            // assumed not to xrun 
    635             enum eChildReturnValue result2 = generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length); 
     604            generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length); 
    636605            return RAW1394_ISO_OK; 
    637606        } else { 
     
    789758 */ 
    790759bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) { 
     760    bool result; 
    791761    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.getFrames(%d, %11llu)", nbframes, ts); 
    792762    assert( getType() == ePT_Receive ); 
    793     if(isDryRunning()) return getFramesDry(nbframes, ts); 
    794     else return getFramesWet(nbframes, ts); 
     763    if(isDryRunning()) result = getFramesDry(nbframes, ts); 
     764    else result = getFramesWet(nbframes, ts); 
     765    SIGNAL_ACTIVITY; 
     766    return result; 
    795767} 
    796768 
     
    847819StreamProcessor::dropFrames(unsigned int nbframes, int64_t ts) 
    848820{ 
     821    bool result; 
    849822    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d, %lld)\n", nbframes, ts); 
    850     return m_data_buffer->dropFrames(nbframes); 
     823    result = m_data_buffer->dropFrames(nbframes); 
     824    SIGNAL_ACTIVITY; 
     825    return result; 
    851826} 
    852827 
    853828bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts) 
    854829{ 
     830    bool result; 
    855831    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.putFrames(%d, %11llu)", nbframes, ts); 
    856832    assert( getType() == ePT_Transmit ); 
    857  
    858     if(isDryRunning()) return putFramesDry(nbframes, ts); 
    859     else return putFramesWet(nbframes, ts); 
     833    if(isDryRunning()) result = putFramesDry(nbframes, ts); 
     834    else result = putFramesWet(nbframes, ts); 
     835    SIGNAL_ACTIVITY; 
     836    return result; 
    860837} 
    861838 
     
    867844    m_data_buffer->blockProcessWriteFrames(nbframes, ts); 
    868845    debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, " New timestamp: %llu\n", ts); 
    869  
    870     unsigned int bufferfill = m_data_buffer->getBufferFill(); 
    871     if (bufferfill >= m_signal_period + m_signal_offset) { 
    872         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p) sufficient frames in buffer (%d / %d), posting semaphore\n", 
    873                                          this, bufferfill, m_signal_period + m_signal_offset); 
    874         POST_SEMAPHORE; 
    875     } else { 
    876         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p) insufficient frames in buffer (%d / %d), not posting semaphore\n", 
    877                                          this, bufferfill, m_signal_period + m_signal_offset); 
    878     } 
    879846    return true; // FIXME: what about failure? 
    880847} 
     
    911878    } 
    912879 
    913     unsigned int bufferfill = m_data_buffer->getBufferFill(); 
    914     if (bufferfill >= m_signal_period + m_signal_offset) { 
    915         debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) sufficient frames in buffer (%d / %d), posting semaphore\n", 
    916                                          this, bufferfill, m_signal_period + m_signal_offset); 
    917         POST_SEMAPHORE; 
    918     } else { 
    919         debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) insufficient frames in buffer (%d / %d), not posting semaphore\n", 
    920                                          this, bufferfill, m_signal_period + m_signal_offset); 
    921     } 
    922  
     880    SIGNAL_ACTIVITY; 
    923881    return true; 
    924 } 
    925  
    926 bool 
    927 StreamProcessor::waitForSignal() 
    928 { 
    929     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) wait ...\n", this, getTypeString()); 
    930     int result; 
    931     if(m_state == ePS_Running && m_next_state == ePS_Running) { 
    932         // check whether we already fullfil the criterion 
    933         unsigned int bufferfill = m_data_buffer->getBufferFill(); 
    934         if(bufferfill >= m_signal_period + m_signal_offset) { 
    935             return true; 
    936         } 
    937  
    938         result = sem_wait(&m_signal_semaphore); 
    939 #ifdef DEBUG 
    940         int tmp; 
    941         sem_getvalue(&m_signal_semaphore, &tmp); 
    942         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " sem_wait returns: %d, sem_value: %d\n", result, tmp); 
    943 #endif 
    944         return result == 0; 
    945     } else { 
    946         // when we're not running, we can always provide frames 
    947         // when we're in a state transition, keep iterating too 
    948         debugOutput(DEBUG_LEVEL_VERBOSE, "Not running...\n"); 
    949         return true; 
    950     } 
    951 } 
    952  
    953 bool 
    954 StreamProcessor::tryWaitForSignal() 
    955 { 
    956     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) trywait ...\n", this, getTypeString()); 
    957     int result; 
    958     if(m_state == ePS_Running && m_next_state == ePS_Running) { 
    959         // check whether we already fullfil the criterion 
    960         unsigned int bufferfill = m_data_buffer->getBufferFill(); 
    961         if(bufferfill >= m_signal_period + m_signal_offset) { 
    962             return true; 
    963         } 
    964  
    965         result = sem_trywait(&m_signal_semaphore); 
    966 #ifdef DEBUG 
    967         int tmp; 
    968         sem_getvalue(&m_signal_semaphore, &tmp); 
    969         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " sem_wait returns: %d, sem_value: %d\n", result, tmp); 
    970 #endif 
    971         return result == 0; 
    972     } else { 
    973         // when we're not running, we can always provide frames 
    974         // when we're in a state transition, keep iterating too 
    975         debugOutput(DEBUG_LEVEL_VERBOSE, "Not running...\n"); 
    976         return true; 
    977     } 
    978 } 
    979  
    980 bool 
    981 StreamProcessor::canProcessPackets() 
    982 { 
    983     if(m_state != ePS_Running || m_next_state != ePS_Running) return true; 
    984     bool result; 
    985     unsigned int bufferfill; 
    986     if(getType() == ePT_Receive) { 
    987         bufferfill = m_data_buffer->getBufferSpace(); 
    988     } else { 
    989         bufferfill = m_data_buffer->getBufferFill(); 
    990     } 
    991     result = bufferfill > getNominalFramesPerPacket(); 
    992 //     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) for a bufferfill of %d, we return %d\n", 
    993 //                 this, ePTToString(getType()), bufferfill, result); 
    994     return result; 
    995882} 
    996883 
     
    998885StreamProcessor::shiftStream(int nbframes) 
    999886{ 
     887    bool result; 
    1000888    if(nbframes == 0) return true; 
    1001889    if(nbframes > 0) { 
    1002         return m_data_buffer->dropFrames(nbframes); 
     890        result = m_data_buffer->dropFrames(nbframes); 
     891        SIGNAL_ACTIVITY; 
     892        return result; 
    1003893    } else { 
    1004         bool result = true; 
     894        result = true; 
    1005895        while(nbframes++) { 
    1006896            result &= m_data_buffer->writeDummyFrame(); 
    1007897        } 
     898        SIGNAL_ACTIVITY; 
    1008899        return result; 
    1009900    } 
     
    1089980    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "init...\n"); 
    1090981 
    1091     if (sem_init(&m_signal_semaphore, 0, 0) == -1) { 
    1092         debugError("Could not init signal semaphore"); 
    1093         return false; 
    1094     } 
    1095  
    1096982    if(!m_IsoHandlerManager.registerStream(this)) { 
    1097983        debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register stream processor with the Iso manager\n"); 
     
    11701056    m_cycle_to_switch_state = TICKS_TO_CYCLES(time_instant); 
    11711057    m_next_state = state; 
    1172     POST_SEMAPHORE; // needed to ensure that things don't get deadlocked 
     1058    // wake up any threads that might be waiting on data in the buffers 
     1059    // since a state transition can cause data to become available 
     1060    SIGNAL_ACTIVITY; 
    11731061    return true; 
    11741062} 
     
    14171305    } 
    14181306    #endif 
     1307    SIGNAL_ACTIVITY; 
    14191308    return result; 
    14201309} 
     
    14501339    } 
    14511340    #endif 
     1341    SIGNAL_ACTIVITY; 
    14521342    return true; 
    14531343} 
     
    14991389    } 
    15001390    #endif 
     1391    SIGNAL_ACTIVITY; 
    15011392    return result; 
    15021393} 
     
    15211412            // this basically means nothing, the state change will 
    15221413            // be picked up by the packet iterator 
    1523  
    1524             sem_init(&m_signal_semaphore, 0, 0); 
    1525             m_signal_period = m_StreamProcessorManager.getPeriodSize(); 
    1526             m_signal_offset = 0; // FIXME: we have to ensure that everyone is ready 
    15271414 
    15281415            if(!m_data_buffer->clearBuffer()) { 
     
    15381425                    return false; 
    15391426                } 
    1540                 if (m_data_buffer->getBufferFill() >= m_signal_period + m_signal_offset) { 
    1541                     POST_SEMAPHORE; 
    1542                 } 
    1543             } 
    1544  
     1427            } 
    15451428            break; 
    15461429        default: 
     
    15551438    } 
    15561439    #endif 
     1440    SIGNAL_ACTIVITY; 
    15571441    return true; 
    15581442} 
     
    15921476    } 
    15931477    #endif 
     1478    SIGNAL_ACTIVITY; 
    15941479    return result; 
    15951480} 
     
    16111496        case ePS_Running: 
    16121497            // the thread will do the transition 
    1613  
    1614             // we have to wake the iterator if it's asleep 
    1615             POST_SEMAPHORE; 
    16161498            break; 
    16171499        default: 
     
    16261508    } 
    16271509    #endif 
     1510    SIGNAL_ACTIVITY; 
    16281511    return true; 
    16291512} 
     
    16481531        return true; 
    16491532    } 
    1650  
    16511533    // after creation, only initialization is allowed 
    16521534    if (m_state == ePS_Created) { 
     
    16561538        // do init here  
    16571539        result = doStop(); 
    1658         if (result) {POST_SEMAPHORE; return true;} 
     1540        if (result) {return true;} 
    16591541        else goto updateState_exit_change_failed; 
    16601542    } 
     
    16661548        } 
    16671549        result = doWaitForRunningStream(); 
    1668         if (result) {POST_SEMAPHORE; return true;} 
     1550        if (result) {return true;} 
    16691551        else goto updateState_exit_change_failed; 
    16701552    } 
     
    16771559        } 
    16781560        result = doDryRunning(); 
    1679         if (result) {POST_SEMAPHORE; return true;} 
     1561        if (result) {return true;} 
    16801562        else goto updateState_exit_change_failed; 
    16811563    } 
     
    16941576            result = doWaitForStreamEnable(); 
    16951577        } 
    1696         if (result) {POST_SEMAPHORE; return true;} 
     1578        if (result) {return true;} 
    16971579        else goto updateState_exit_change_failed; 
    16981580    } 
     
    17111593            result = doRunning(); 
    17121594        } 
    1713         if (result) {POST_SEMAPHORE; return true;} 
     1595        if (result) {return true;} 
    17141596        else goto updateState_exit_change_failed; 
    17151597    } 
     
    17211603        } 
    17221604        result = doWaitForStreamDisable(); 
    1723         if (result) {POST_SEMAPHORE; return true;} 
     1605        if (result) {return true;} 
    17241606        else goto updateState_exit_change_failed; 
    17251607    } 
     
    17311613        } 
    17321614        result = doDryRunning(); 
    1733         if (result) {POST_SEMAPHORE; return true;} 
     1615        if (result) {return true;} 
    17341616        else goto updateState_exit_change_failed; 
    17351617    } 
     
    17391621    debugError("Invalid state transition: %s => %s\n", 
    17401622        ePSToString(m_state), ePSToString(next_state)); 
    1741     POST_SEMAPHORE
     1623    SIGNAL_ACTIVITY
    17421624    return false; 
    17431625updateState_exit_change_failed: 
    17441626    debugError("State transition failed: %s => %s\n", 
    17451627        ePSToString(m_state), ePSToString(next_state)); 
    1746     POST_SEMAPHORE
     1628    SIGNAL_ACTIVITY
    17471629    return false; 
     1630} 
     1631 
     1632bool StreamProcessor::waitForProducePacket() 
     1633{ 
     1634    return waitForProduce(getNominalFramesPerPacket()); 
     1635} 
     1636bool StreamProcessor::waitForProducePeriod() 
     1637{ 
     1638    return waitForProduce(m_StreamProcessorManager.getPeriodSize()); 
     1639} 
     1640bool StreamProcessor::waitForProduce(unsigned int nframes) 
     1641{ 
     1642    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p) wait ...\n", this); 
     1643    struct timespec ts; 
     1644    int result; 
     1645 
     1646    if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { 
     1647        debugError("clock_gettime failed\n"); 
     1648        return false; 
     1649    } 
     1650 
     1651    // FIXME: hardcoded timeout of 0.1 sec 
     1652    ts.tv_nsec += 100 * 1000000LL; 
     1653    if (ts.tv_nsec > 1000000000LL) { 
     1654        ts.tv_sec += 1; 
     1655        ts.tv_nsec -= 1000000000LL; 
     1656    } 
     1657 
     1658    pthread_mutex_lock(&m_activity_cond_lock); 
     1659    while(!canProduce(nframes)) { 
     1660        result = pthread_cond_timedwait(&m_activity_cond, &m_activity_cond_lock, &ts); 
     1661     
     1662        if (result == -1) { 
     1663            if (errno == ETIMEDOUT) { 
     1664                debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) pthread_cond_timedwait() timed out\n", this); 
     1665                pthread_mutex_unlock(&m_activity_cond_lock); 
     1666                return false; 
     1667            } else { 
     1668                debugError("(%p) pthread_cond_timedwait error\n", this); 
     1669                pthread_mutex_unlock(&m_activity_cond_lock); 
     1670                return false; 
     1671            } 
     1672        } 
     1673    } 
     1674    pthread_mutex_unlock(&m_activity_cond_lock); 
     1675    return true; 
     1676} 
     1677 
     1678bool StreamProcessor::waitForConsumePacket() 
     1679{ 
     1680    return waitForConsume(getNominalFramesPerPacket()); 
     1681} 
     1682bool StreamProcessor::waitForConsumePeriod() 
     1683{ 
     1684    return waitForConsume(m_StreamProcessorManager.getPeriodSize()); 
     1685} 
     1686bool StreamProcessor::waitForConsume(unsigned int nframes) 
     1687{ 
     1688    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p) wait ...\n", this); 
     1689    struct timespec ts; 
     1690    int result; 
     1691 
     1692    if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { 
     1693        debugError("clock_gettime failed\n"); 
     1694        return false; 
     1695    } 
     1696 
     1697    // FIXME: hardcoded timeout of 0.1 sec 
     1698    ts.tv_nsec += 100 * 1000000LL; 
     1699    if (ts.tv_nsec > 1000000000LL) { 
     1700        ts.tv_sec += 1; 
     1701        ts.tv_nsec -= 1000000000LL; 
     1702    } 
     1703 
     1704    pthread_mutex_lock(&m_activity_cond_lock); 
     1705    while(!canConsume(nframes)) { 
     1706        result = pthread_cond_timedwait(&m_activity_cond, &m_activity_cond_lock, &ts); 
     1707     
     1708        if (result == -1) { 
     1709            if (errno == ETIMEDOUT) { 
     1710                debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) pthread_cond_timedwait() timed out\n", this); 
     1711                pthread_mutex_unlock(&m_activity_cond_lock); 
     1712                return false; 
     1713            } else { 
     1714                debugError("(%p) pthread_cond_timedwait error\n", this); 
     1715                pthread_mutex_unlock(&m_activity_cond_lock); 
     1716                return false; 
     1717            } 
     1718        } 
     1719    } 
     1720    pthread_mutex_unlock(&m_activity_cond_lock); 
     1721    return true; 
     1722} 
     1723 
     1724bool StreamProcessor::canProducePacket() 
     1725{ 
     1726    return canProduce(getNominalFramesPerPacket()); 
     1727} 
     1728bool StreamProcessor::canProducePeriod() 
     1729{ 
     1730    return canProduce(m_StreamProcessorManager.getPeriodSize()); 
     1731} 
     1732bool StreamProcessor::canProduce(unsigned int nframes) 
     1733{ 
     1734    if(m_state == ePS_Running && m_next_state == ePS_Running) { 
     1735        // check whether we already fullfil the criterion 
     1736        unsigned int bufferspace = m_data_buffer->getBufferSpace(); 
     1737        if(bufferspace >= nframes) { 
     1738//             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "enough space (%u)...\n", bufferspace); 
     1739            return true; 
     1740        } else return false; 
     1741    } else { 
     1742        if(getType() == ePT_Transmit) { 
     1743            // if we are an xmit SP, we cannot accept frames  
     1744            // when not running 
     1745            return false; 
     1746        } else { 
     1747            // if we are a receive SP, we can always accept frames 
     1748            // when not running 
     1749            return true; 
     1750        } 
     1751    } 
     1752} 
     1753 
     1754bool StreamProcessor::canConsumePacket() 
     1755{ 
     1756    return canConsume(getNominalFramesPerPacket()); 
     1757} 
     1758bool StreamProcessor::canConsumePeriod() 
     1759{ 
     1760    return canConsume(m_StreamProcessorManager.getPeriodSize()); 
     1761} 
     1762bool StreamProcessor::canConsume(unsigned int nframes) 
     1763{ 
     1764    if(m_state == ePS_Running && m_next_state == ePS_Running) { 
     1765        // check whether we already fullfil the criterion 
     1766        unsigned int bufferfill = m_data_buffer->getBufferFill(); 
     1767        if(bufferfill >= nframes) { 
     1768//             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "enough items (%u)...\n", bufferfill); 
     1769            return true; 
     1770        } else return false; 
     1771    } else { 
     1772        if(getType() == ePT_Transmit) { 
     1773            // if we are an xmit SP, and we're not running, 
     1774            // we can always provide frames 
     1775            return true; 
     1776        } else { 
     1777            // if we are a receive SP, we can't provide frames 
     1778            // when not running 
     1779            return false; 
     1780        } 
     1781    } 
    17481782} 
    17491783 
  • trunk/libffado/src/libstreaming/generic/StreamProcessor.h

    r857 r860  
    3434 
    3535#include "debugmodule/debugmodule.h" 
    36 #include <semaphore.h> 
     36 
     37#include <pthread.h> 
    3738 
    3839class Ieee1394Service; 
     
    161162    bool putFrames(unsigned int nbframes, int64_t ts); ///< transfer the client contents to the buffer 
    162163 
    163     unsigned int getSignalPeriod() {return m_signal_period;}; 
    164     bool setSignalPeriod(unsigned int p) {m_signal_period=p; return true;}; 
    165     /** 
    166      * @brief waits for a 'signal' (blocking) 
    167      * 
    168      * a 'signal' is: 
    169      * when type==Receive: 
    170      *  - one signal_period of frames is present in the buffer 
    171      *    (received by the iso side) 
    172      *  - an error has occurred (xrun, iso error, ...) 
    173      * when type==Transmit: 
    174      *  - at least one signal_period of frames are present in the buffer 
    175      *    (have been written into it by the client) 
    176      *  - an error occurred 
    177      * 
    178      * @return true if the 'signal' is available, false if error 
    179      */ 
    180     bool waitForSignal(); 
    181  
    182     /** 
    183      * @brief checks for a 'signal' (non-blocking) 
    184      * 
    185      * a 'signal' is: 
    186      * when type==Receive: 
    187      *  - one signal_period of frames is present in the buffer 
    188      *    (received by the iso side) 
    189      *  - an error has occurred (xrun, iso error, ...) 
    190      * when type==Transmit: 
    191      *  - at least one signal_period of frames are present in the buffer 
    192      *    (have been written into it by the client) 
    193      *  - an error occurred 
    194      * 
    195      * @return true if the 'signal' is available, false if not (or error) 
    196      */ 
    197     bool tryWaitForSignal(); 
    198  
    199     /** 
    200      * @brief can a SP process (queue, dequeue) packets at this moment? 
    201      * 
    202      * 
    203      * @return true if packet processing makes sense 
    204      */ 
    205     bool canProcessPackets(); 
    206  
     164    //FIXME: document wait functions 
     165    bool waitForProducePacket(); 
     166    bool waitForProducePeriod(); 
     167    bool waitForProduce(unsigned int nframes); 
     168 
     169    bool waitForConsumePacket(); 
     170    bool waitForConsumePeriod(); 
     171    bool waitForConsume(unsigned int nframes); 
     172 
     173    bool canProducePacket(); 
     174    bool canProducePeriod(); 
     175    bool canProduce(unsigned int nframes); 
     176 
     177    bool canConsumePacket(); 
     178    bool canConsumePeriod(); 
     179    bool canConsume(unsigned int nframes); 
     180 
     181public: 
    207182    /** 
    208183     * @brief drop nframes from the internal buffer as if they were transferred to the client side 
     
    503478    private: 
    504479        bool m_in_xrun; 
    505         sem_t m_signal_semaphore; 
    506         unsigned int m_signal_period; 
    507         unsigned int m_signal_offset; 
     480        pthread_mutex_t m_activity_cond_lock; 
     481        pthread_cond_t  m_activity_cond; 
    508482 
    509483public: 
  • trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

    r857 r860  
    712712    while(period_not_ready) { 
    713713        debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for period (%d frames in buffer)...\n", m_SyncSource->getBufferFill()); 
    714         if(!m_SyncSource->waitForSignal()) { 
    715             debugError("Error waiting for signal\n"); 
    716             return false; 
    717         } 
     714        bool result; 
     715        if(m_SyncSource->getType() == StreamProcessor::ePT_Receive) { 
     716            result = m_SyncSource->waitForConsumePeriod(); 
     717        } else { 
     718            result = m_SyncSource->waitForProducePeriod(); 
     719        } 
     720//         if(!result) { 
     721//             debugError("Error waiting for signal\n"); 
     722//             return false; 
     723//         } 
    718724 
    719725        // HACK: this should be solved more elegantly 
     
    722728            it != m_ReceiveProcessors.end(); 
    723729            ++it ) { 
    724             bool this_sp_period_ready = (*it)->canClientTransferFrames(m_period); 
     730            bool this_sp_period_ready = (*it)->canConsumePeriod(); 
     731            if (!this_sp_period_ready) { 
     732                period_not_ready = true; 
     733            } 
     734        } 
     735        for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     736            it != m_TransmitProcessors.end(); 
     737            ++it ) { 
     738            bool this_sp_period_ready = (*it)->canProducePeriod(); 
    725739            if (!this_sp_period_ready) { 
    726740                period_not_ready = true;