Changeset 803

Show
Ignore:
Timestamp:
01/02/08 14:11:58 (16 years ago)
Author:
ppalmers
Message:

more reliable streaming. hackish, but a start for a better implementation

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/libffado/config.h.in

    r798 r803  
    4141#define MAX_ISO_XMIT_BUFFER_FILL_PCT    50 
    4242 
    43 #define ISOHANDLER_PER_HANDLER_THREAD   0 
     43#define ISOHANDLER_PER_HANDLER_THREAD   1 
    4444#define ISOHANDLER_USE_POLL             0 
    4545 
     
    5151// time to a later time instant also causes the xmit buffer fill to be 
    5252// lower on average. 
    53 #define STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS           (3072*4
     53#define STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS           (3072*6
    5454 
    5555#define STREAMPROCESSORMANAGER_CYCLES_FOR_DRYRUN            40000 
  • trunk/libffado/src/libieee1394/IsoHandler.cpp

    r796 r803  
    158158 
    159159bool 
    160 IsoHandler::Init() { 
     160IsoHandler::Init() 
     161
    161162    debugOutput( DEBUG_LEVEL_VERBOSE, "%p: Init thread...\n", this); 
    162163    m_poll_fd.fd = getFileDescriptor(); 
     
    171172 
    172173bool 
    173 IsoHandler::Execute() { 
    174     int err; 
    175  
     174IsoHandler::waitForClient() 
     175
     176    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "waiting...\n"); 
     177    if(m_Client) { 
     178        bool result = m_Client->waitForFrames(); 
     179        debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " returns %d\n", result); 
     180        return result; 
     181    } 
     182    return false; 
     183
     184 
     185bool 
     186IsoHandler::tryWaitForClient() 
     187
     188    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "waiting...\n"); 
     189    if(m_Client) { 
     190        bool result = m_Client->tryWaitForFrames(); 
     191        debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " returns %d\n", result); 
     192        return result; 
     193    } 
     194    return false; 
     195
     196 
     197bool 
     198IsoHandler::Execute() 
     199
    176200    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p: Execute thread...\n", this); 
     201 
    177202    // bypass if not running 
    178203    if (m_State != E_Running) { 
     
    183208    } 
    184209 
     210    // wait for the availability of frames in the client 
     211    // (blocking) 
     212    if (getType()==eHT_Receive || waitForClient()) { 
     213 
    185214#if ISOHANDLER_USE_POLL 
    186     uint64_t poll_enter = m_manager.get1394Service().getCurrentTimeAsUsecs(); 
    187     err = poll(&m_poll_fd, 1, m_poll_timeout); 
    188     uint64_t poll_exit = m_manager.get1394Service().getCurrentTimeAsUsecs(); 
    189     if (err == -1) { 
    190         if (errno == EINTR) { 
    191             return true; 
    192         } 
    193         debugFatal("%p, poll error: %s\n", this, strerror (errno)); 
    194         return false; 
    195     } 
    196     uint64_t iter_enter=0; 
    197     uint64_t iter_exit=0; 
    198     if(m_poll_fd.revents & (POLLIN)) { 
    199         iter_enter = m_manager.get1394Service().getCurrentTimeAsUsecs(); 
    200         if(!iterate()) { 
    201             debugOutput( DEBUG_LEVEL_VERBOSE, 
    202                         "IsoHandler (%p): Failed to iterate handler\n", 
    203                         this); 
    204             return false; 
    205         } 
    206         iter_exit = m_manager.get1394Service().getCurrentTimeAsUsecs(); 
     215        uint64_t poll_enter = m_manager.get1394Service().getCurrentTimeAsUsecs(); 
     216        err = poll(&m_poll_fd, 1, m_poll_timeout); 
     217        uint64_t poll_exit = m_manager.get1394Service().getCurrentTimeAsUsecs(); 
     218        if (err == -1) { 
     219            if (errno == EINTR) { 
     220                return true; 
     221            } 
     222            debugFatal("%p, poll error: %s\n", this, strerror (errno)); 
     223            return false; 
     224        } 
     225        uint64_t iter_enter=0; 
     226        uint64_t iter_exit=0; 
     227        if(m_poll_fd.revents & (POLLIN)) { 
     228            iter_enter = m_manager.get1394Service().getCurrentTimeAsUsecs(); 
     229            if(!iterate()) { 
     230                debugOutput( DEBUG_LEVEL_VERBOSE, 
     231                            "IsoHandler (%p): Failed to iterate handler\n", 
     232                            this); 
     233                return false; 
     234            } 
     235            iter_exit = m_manager.get1394Service().getCurrentTimeAsUsecs(); 
     236        } else { 
     237            if (m_poll_fd.revents & POLLERR) { 
     238                debugWarning("error on fd for %p\n", this); 
     239            } 
     240            if (m_poll_fd.revents & POLLHUP) { 
     241                debugWarning("hangup on fd for %p\n",this); 
     242            } 
     243        } 
     244        debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%c %p) poll took %lldus, iterate took %lldus\n",  
     245                    (getType()==eHT_Receive?'R':'X'), this,  
     246                    poll_exit-poll_enter, iter_exit-iter_enter); 
     247        return true; 
     248#else 
     249        // iterate blocks if no 1394 data is available 
     250        // so poll'ing is not really necessary 
     251        bool result = iterate(); 
     252        //usleep(125); 
     253        return result; 
     254#endif 
    207255    } else { 
    208         if (m_poll_fd.revents & POLLERR) { 
    209             debugWarning("error on fd for %p\n", this); 
    210         } 
    211         if (m_poll_fd.revents & POLLHUP) { 
    212             debugWarning("hangup on fd for %p\n",this); 
    213         } 
    214     } 
    215     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%c %p) poll took %lldus, iterate took %lldus\n",  
    216                 (getType()==eHT_Receive?'R':'X'), this,  
    217                 poll_exit-poll_enter, iter_exit-iter_enter); 
    218 #else 
    219     // iterate blocks if no 1394 data is available 
    220     // so poll'ing is not really necessary 
    221     bool result = iterate(); 
    222     //usleep(125); 
    223     return result; 
    224 #endif 
    225     return true; 
     256        debugError("waitForClient() failed.\n"); 
     257        return false; 
     258    } 
    226259} 
    227260 
  • trunk/libffado/src/libieee1394/IsoHandler.h

    r759 r803  
    126126    bool unregisterStream(Streaming::StreamProcessor *); 
    127127 
     128    bool waitForClient(); 
     129    bool tryWaitForClient(); 
     130 
    128131private: 
    129132    IsoHandlerManager& m_manager; 
  • trunk/libffado/src/libieee1394/IsoHandlerManager.cpp

    r797 r803  
    140140    unsigned int m_poll_timeout = 100; 
    141141 
    142     // update the shadow variables if requested 
    143    // if(m_request_fdmap_update) { 
    144         updateShadowVars(); 
    145     //    ZERO_ATOMIC((SInt32*)&m_request_fdmap_update); 
    146     //} 
    147  
     142    updateShadowVars(); 
    148143    // bypass if no handlers are registered 
    149144    if (m_poll_nfds_shadow == 0) { 
     145        debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "bypass iterate since no handlers registered\n"); 
    150146        usleep(m_poll_timeout * 1000); 
    151147        return true; 
     
    182178 
    183179        if(m_poll_fds_shadow[i].revents & (POLLIN)) { 
    184             m_IsoHandler_map_shadow[i]->iterate(); 
    185180            if (m_IsoHandler_map_shadow[i]->getType() == IsoHandler::eHT_Receive) { 
     181                m_IsoHandler_map_shadow[i]->iterate(); 
    186182                nb_rcv++; 
    187183            } else { 
    188                 nb_xmit++; 
     184                // only iterate the xmit handler if it makes sense 
     185                if(m_IsoHandler_map_shadow[i]->tryWaitForClient()) { 
     186                    m_IsoHandler_map_shadow[i]->iterate(); 
     187                    nb_xmit++; 
     188                } 
    189189            } 
    190190        } 
    191191    } 
    192192    uint64_t iter_exit = m_service.getCurrentTimeAsUsecs(); 
    193      
     193 
    194194    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " poll took %6lldus, iterate took %6lldus, iterated (R: %2d, X: %2d) handlers\n", 
    195195                poll_exit-poll_enter, iter_exit-iter_enter, 
     
    209209 
    210210#if ISOHANDLER_PER_HANDLER_THREAD 
     211    // the IsoHandlers will create their own thread. 
    211212#else 
    212213    // create a thread to iterate our handlers 
     
    274275    handler->setVerboseLevel(getDebugLevel()); 
    275276    m_IsoHandlers.push_back(handler); 
     277    updateShadowVars(); 
    276278    return true; 
    277279} 
     
    288290        if ( *it == handler ) { 
    289291            m_IsoHandlers.erase(it); 
     292            updateShadowVars(); 
    290293            return true; 
    291294        } 
  • trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp

    r798 r803  
    110110StreamProcessor::getNbPacketsIsoXmitBuffer() 
    111111{ 
     112#if ISOHANDLER_PER_HANDLER_THREAD 
     113    // if we use one thread per packet, we can put every frame into the ISO buffer 
     114    // the waitForClient in IsoHandler will take care of the fact that the frames are 
     115    // not present in time 
     116    unsigned int packets_to_prebuffer = (getPacketsPerPeriod() * (m_StreamProcessorManager.getNbBuffers()-1)); 
     117    debugOutput(DEBUG_LEVEL_VERBOSE, "Nominal prebuffer: %u\n", packets_to_prebuffer); 
     118    return packets_to_prebuffer; 
     119#else 
    112120    // the target is to have all of the transmit buffer (at period transfer) as ISO packets 
    113121    // when one period is received, there will be approx (NbBuffers - 1) * period_size frames 
     
    133141     
    134142    return packets_to_prebuffer; 
     143#endif 
    135144} 
    136145 
     
    762771    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.putFrames(%d, %11llu)", nbframes, ts); 
    763772    assert( getType() == ePT_Transmit ); 
     773 
    764774    if(isDryRunning()) return putFramesDry(nbframes, ts); 
    765775    else return putFramesWet(nbframes, ts); 
     
    807817    } 
    808818    return true; 
     819} 
     820 
     821bool 
     822StreamProcessor::waitForFrames() 
     823{ 
     824    if(m_state == ePS_Running) { 
     825        assert(m_data_buffer); 
     826        if(getType() == ePT_Receive) { 
     827            return m_data_buffer->waitForFrames(m_StreamProcessorManager.getPeriodSize()); 
     828        } else { 
     829            return m_data_buffer->waitForFrames(getNominalFramesPerPacket()); 
     830        } 
     831    } else { 
     832        // when we're not running, we can always provide frames 
     833        debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Not running...\n"); 
     834        return true; 
     835    } 
     836} 
     837 
     838bool 
     839StreamProcessor::tryWaitForFrames() 
     840{ 
     841    if(m_state == ePS_Running) { 
     842        assert(m_data_buffer); 
     843        if(getType() == ePT_Receive) { 
     844            return m_data_buffer->tryWaitForFrames(m_StreamProcessorManager.getPeriodSize()); 
     845        } else { 
     846            return m_data_buffer->tryWaitForFrames(getNominalFramesPerPacket()); 
     847        } 
     848    } else { 
     849        // when we're not running, we can always provide frames 
     850        debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Not running...\n"); 
     851        return true; 
     852    } 
    809853} 
    810854 
  • trunk/libffado/src/libstreaming/generic/StreamProcessor.h

    r766 r803  
    3434 
    3535#include "debugmodule/debugmodule.h" 
    36  
    37 #include <pthread.h> 
    3836 
    3937class Ieee1394Service; 
     
    160158 
    161159    /** 
     160     * @brief waits for the availability of frames (blocking) 
     161     * @param nframes number of frames 
     162     * 
     163     * @return true if frames are available, false if not (e.g. signal occurred) 
     164     */ 
     165    bool waitForFrames(); 
     166 
     167    /** 
     168     * @brief waits for the availability of frames (non-blocking) 
     169     * @param nframes number of frames 
     170     * 
     171     * @return true if frames are available, false if not 
     172     */ 
     173    bool tryWaitForFrames(); 
     174 
     175    /** 
    162176     * @brief drop nframes from the internal buffer as if they were transferred to the client side 
    163177     * 
     
    446460    private: 
    447461        bool m_in_xrun; 
    448  
    449462public: 
    450463    // debug stuff 
  • trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

    r798 r803  
    666666    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n"); 
    667667 
    668     time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); 
     668    time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); 
    669669 
    670670    while(time_till_next_period > 0) { 
  • trunk/libffado/src/libutil/TimestampedBuffer.cpp

    r796 r803  
    5252    pthread_mutex_unlock(&m_framecounter_lock); \ 
    5353    } 
    54  
     54/* 
     55#define POST_SEMAPHORE { \ 
     56    int tmp; \ 
     57    sem_getvalue(&m_frame_semaphore, &tmp); \ 
     58    debugWarning("posting semaphore from value %d\n", tmp); \ 
     59    sem_post(&m_frame_semaphore); \ 
     60
     61*/ 
     62 
     63//HACK 
     64#define POST_SEMAPHORE { \ 
     65    if(m_update_period > 8) { \ 
     66        sem_post(&m_frame_semaphore); \ 
     67    } \ 
     68
    5569namespace Util { 
    5670 
     
    7791    ffado_ringbuffer_free(m_event_buffer); 
    7892    free(m_cluster_buffer); 
     93    sem_destroy(&m_frame_semaphore); 
    7994} 
    8095 
     
    108123    m_update_period=n; 
    109124    return true; 
     125} 
     126 
     127/** 
     128 * \brief Get the nominal update period (in frames) 
     129 * 
     130 * Gets the nominal update period. This period is the number of frames 
     131 * between two timestamp updates (hence buffer writes) 
     132 * 
     133 * @return period in frames 
     134 */ 
     135unsigned int TimestampedBuffer::getUpdatePeriod() { 
     136    return m_update_period; 
    110137} 
    111138 
     
    262289 */ 
    263290bool TimestampedBuffer::init() { 
     291    if (sem_init(&m_frame_semaphore, 0, 0) == -1) { 
     292        debugError("Could not init frame semaphore"); 
     293        return false; 
     294    } 
    264295    return true; 
    265296} 
     
    340371} 
    341372 
     373bool 
     374TimestampedBuffer::waitForFrames(unsigned int nframes) 
     375{ 
     376    int result; 
     377    do { 
     378        unsigned int bufferfill = getBufferFill(); 
     379        if(bufferfill >= nframes) { 
     380            // first make the semaphore 0 
     381            while((result=sem_trywait(&m_frame_semaphore)) == 0) {}; 
     382            return true; 
     383        } else { 
     384            debugOutput(DEBUG_LEVEL_VERY_VERBOSE, 
     385                        "only %d frames in buffer, waiting for more (%d)\n",  
     386                        bufferfill, nframes); 
     387        } 
     388    } while((result=sem_wait(&m_frame_semaphore)) == 0); 
     389    debugOutput(DEBUG_LEVEL_VERBOSE, 
     390                "sem_wait returns: %d\n", 
     391                result); 
     392    return false; 
     393} 
     394 
     395bool 
     396TimestampedBuffer::tryWaitForFrames(unsigned int nframes) 
     397{ 
     398    int result; 
     399    do { 
     400        unsigned int bufferfill = getBufferFill(); 
     401        if(bufferfill >= nframes) { 
     402            // first make the semaphore 0 
     403            while((result=sem_trywait(&m_frame_semaphore)) == 0) {}; 
     404            return true; 
     405        } else { 
     406            debugOutput(DEBUG_LEVEL_VERY_VERBOSE, 
     407                        "only %d frames in buffer, waiting for more (%d)\n",  
     408                        bufferfill, nframes); 
     409        } 
     410    } while((result=sem_trywait(&m_frame_semaphore)) == 0); 
     411    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, 
     412                "sem_trywait returns: %d\n",  
     413                result); 
     414    return false; 
     415} 
     416 
     417bool 
     418TimestampedBuffer::waitForFrames() 
     419{ 
     420    return waitForFrames(m_update_period); 
     421} 
     422 
     423bool 
     424TimestampedBuffer::tryWaitForFrames() 
     425{ 
     426    return tryWaitForFrames(m_update_period); 
     427} 
     428 
     429 
    342430/** 
    343431 * @brief Insert a dummy frame to the head buffer 
     
    371459    m_framecounter++; 
    372460    EXIT_CRITICAL_SECTION; 
    373      
     461 
     462    POST_SEMAPHORE; 
    374463    return true; 
    375464} 
     
    449538    } 
    450539 
     540    POST_SEMAPHORE; 
    451541    return true; 
    452542} 
     
    12201310    // ts(x) = m_buffer_tail_timestamp + 
    12211311    //         (m_buffer_next_tail_timestamp - m_buffer_tail_timestamp)/(samples_between_updates)*x 
    1222  
     1312    POST_SEMAPHORE; 
    12231313} 
    12241314 
  • trunk/libffado/src/libutil/TimestampedBuffer.h

    r765 r803  
    2727#include "../debugmodule/debugmodule.h" 
    2828#include "libutil/ringbuffer.h" 
     29#include <semaphore.h> 
    2930 
    3031//typedef float ffado_timestamp_t; 
     
    8990        virtual ~TimestampedBuffer(); 
    9091 
     92        /** 
     93         * @brief waits for the availability of frames (blocking) 
     94         * @param nframes number of frames 
     95         * 
     96         * @return true if frames are available, false if not (e.g. signal occurred) 
     97         */ 
     98        bool waitForFrames(unsigned int nframes); 
     99 
     100        /** 
     101         * @brief waits for the availability of frames (blocking) 
     102         * 
     103         * waits for one update period of frames 
     104         * 
     105         * @return true if frames are available, false if not (e.g. signal occurred) 
     106         */ 
     107        bool waitForFrames(); 
     108 
     109        /** 
     110         * @brief waits for the availability of frames (non-blocking) 
     111         * @param nframes number of frames 
     112         * 
     113         * @return true if frames are available, false if not 
     114         */ 
     115        bool tryWaitForFrames(unsigned int nframes); 
     116 
     117        /** 
     118         * @brief waits for the availability of frames (non-blocking) 
     119         * 
     120         * waits for one update period of frames 
     121         * 
     122         * @return true if frames are available, false if not 
     123         */ 
     124        bool tryWaitForFrames(); 
     125 
    91126        bool writeDummyFrame(); 
    92127        bool dropFrames ( unsigned int nbframes ); 
     
    152187 
    153188        bool setUpdatePeriod ( unsigned int t ); 
     189        unsigned int getUpdatePeriod(); 
    154190 
    155191        // misc stuff 
     
    208244        float m_current_rate; 
    209245        unsigned int m_update_period; 
     246 
     247        sem_t m_frame_semaphore; 
    210248}; 
    211249