Show
Ignore:
Timestamp:
11/25/07 12:57:43 (16 years ago)
Author:
ppalmers
Message:

stream alignment implemented

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.cpp

    r722 r727  
    438438        unsigned int nevents, unsigned int offset ) 
    439439{ 
    440     bool no_problem=true; 
     440    bool no_problem = true; 
    441441 
    442442    for ( PortVectorIterator it = m_PeriodPorts.begin(); 
    443             it != m_PeriodPorts.end(); 
    444             ++it ) 
    445     { 
    446  
    447         if ( ( *it )->isDisabled() ) {continue;}; 
     443          it != m_PeriodPorts.end(); 
     444          ++it ) 
     445    { 
     446        if ( (*it)->isDisabled() ) { continue; }; 
    448447 
    449448        //FIXME: make this into a static_cast when not DEBUG? 
    450  
    451         AmdtpPortInfo *pinfo=dynamic_cast<AmdtpPortInfo *> ( *it ); 
     449        AmdtpPortInfo *pinfo = dynamic_cast<AmdtpPortInfo *> ( *it ); 
    452450        assert ( pinfo ); // this should not fail!! 
    453451 
    454         switch ( pinfo->getFormat() ) 
     452        switch( pinfo->getFormat() ) 
    455453        { 
    456454            case AmdtpPortInfo::E_MBLA: 
    457                 if ( encodePortToMBLAEvents ( static_cast<AmdtpAudioPort *> ( *it ), ( quadlet_t * ) data, offset, nevents ) ) 
     455                if( encodePortToMBLAEvents(static_cast<AmdtpAudioPort *>(*it), (quadlet_t *)data, offset, nevents) ) 
    458456                { 
    459                     debugWarning ( "Could not encode port %s to MBLA events", ( *it )->getName().c_str() ); 
    460                     no_problem=false; 
     457                    debugWarning ( "Could not encode port %s to MBLA events", (*it)->getName().c_str() ); 
     458                    no_problem = false; 
    461459                } 
    462460                break; 
     
    470468} 
    471469 
    472 bool AmdtpTransmitStreamProcessor::transmitSilenceBlock ( char *data, 
    473         unsigned int nevents, unsigned int offset ) 
    474 
    475     bool problem = false; 
    476     for ( PortVectorIterator it = m_PeriodPorts.begin(); 
    477             it != m_PeriodPorts.end(); 
    478             ++it ) 
     470bool 
     471AmdtpTransmitStreamProcessor::transmitSilenceBlock( 
     472    char *data, unsigned int nevents, unsigned int offset) 
     473
     474    bool no_problem = true; 
     475    for(PortVectorIterator it = m_PeriodPorts.begin(); 
     476        it != m_PeriodPorts.end(); 
     477        ++it ) 
    479478    { 
    480479        //FIXME: make this into a static_cast when not DEBUG? 
    481         AmdtpPortInfo *pinfo=dynamic_cast<AmdtpPortInfo *> ( *it ); 
    482         assert ( pinfo ); // this should not fail!! 
    483  
    484         switch ( pinfo->getFormat() ) 
     480        AmdtpPortInfo *pinfo=dynamic_cast<AmdtpPortInfo *>(*it); 
     481        assert(pinfo); // this should not fail!! 
     482 
     483        switch( pinfo->getFormat() ) 
    485484        { 
    486485            case AmdtpPortInfo::E_MBLA: 
    487                 if ( encodeSilencePortToMBLAEvents ( static_cast<AmdtpAudioPort *> ( *it ), ( quadlet_t * ) data, offset, nevents ) ) 
     486                if ( encodeSilencePortToMBLAEvents(static_cast<AmdtpAudioPort *>(*it), (quadlet_t *)data, offset, nevents) ) 
    488487                { 
    489                     debugWarning ( "Could not encode port %s to MBLA events", ( *it )->getName().c_str() ); 
    490                     problem = true; 
     488                    debugWarning("Could not encode port %s to MBLA events", (*it)->getName().c_str()); 
     489                    no_problem = false; 
    491490                } 
    492491                break; 
     
    497496        } 
    498497    } 
    499     return problem; 
     498    return no_problem; 
    500499} 
    501500 
  • branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.cpp

    r723 r727  
    4141    , m_next_state( ePS_Invalid ) 
    4242    , m_cycle_to_switch_state( 0 ) 
     43    , m_scratch_buffer( NULL ) 
     44    , m_scratch_buffer_size_bytes( 0 ) 
    4345    , m_manager( NULL ) 
    4446    , m_ticks_per_frame( 0 ) 
     
    5153{ 
    5254    // create the timestamped buffer and register ourselves as its client 
    53     m_data_buffer=new Util::TimestampedBuffer(this); 
     55    m_data_buffer = new Util::TimestampedBuffer(this); 
    5456} 
    5557 
    5658StreamProcessor::~StreamProcessor() { 
    5759    if (m_data_buffer) delete m_data_buffer; 
     60    if (m_scratch_buffer) delete[] m_scratch_buffer; 
    5861} 
    5962 
     
    7578int StreamProcessor::getBufferFill() { 
    7679    return m_data_buffer->getBufferFill(); 
    77 } 
    78  
    79 bool 
    80 StreamProcessor::dropFrames(unsigned int nbframes) 
    81 { 
    82     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d)\n", nbframes); 
    83     return m_data_buffer->dropFrames(nbframes); 
    8480} 
    8581 
     
    377373        if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles); 
    378374        if (dropped_cycles > 0) { 
    379             debugWarning("(%p) dropped %d packets on cycle %u\n", this, dropped_cycles, cycle); 
     375            debugWarning("(%p) dropped %d packets on cycle %u (last_cycle=%u, dropped=%d)\n", this, dropped_cycles, cycle, m_last_cycle, dropped); 
    380376            m_dropped += dropped_cycles; 
    381377        } 
    382378    } 
    383     if (cycle > 0) { 
     379    if (cycle >= 0) { 
    384380        m_last_cycle = cycle; 
    385381    } 
     
    623619} 
    624620 
    625 bool StreamProcessor::getFramesDry(unsigned int nbframes, int64_t ts) { 
     621bool StreamProcessor::getFramesDry(unsigned int nbframes, int64_t ts) 
     622
    626623    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): dry run %d frames (@ ts=%lld)\n", 
    627624                 this, nbframes, ts); 
    628  
    629625    // dry run on this side means that we put silence in all enabled ports 
    630626    // since there is do data put into the ringbuffer in the dry-running state 
     
    632628} 
    633629 
    634 bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts) { 
     630bool 
     631StreamProcessor::dropFrames(unsigned int nbframes, int64_t ts) 
     632
     633    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d, %lld)\n", nbframes, ts); 
     634    return m_data_buffer->dropFrames(nbframes); 
     635
     636 
     637bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts) 
     638
    635639    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.putFrames(%d, %11llu)", nbframes, ts); 
    636640    assert( getType() == ePT_Transmit ); 
     
    640644 
    641645bool 
    642 StreamProcessor::putFramesWet(unsigned int nbframes, int64_t ts) { 
     646StreamProcessor::putFramesWet(unsigned int nbframes, int64_t ts) 
     647
    643648    debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesWet(%d, %llu)\n", nbframes, ts); 
    644649    // transfer the data 
     
    649654 
    650655bool 
    651 StreamProcessor::putFramesDry(unsigned int nbframes, int64_t ts) { 
     656StreamProcessor::putFramesDry(unsigned int nbframes, int64_t ts) 
     657
    652658    debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesDry(%d, %llu)\n", nbframes, ts); 
    653659    // do nothing 
    654660    return true; 
     661} 
     662 
     663bool 
     664StreamProcessor::putSilenceFrames(unsigned int nbframes, int64_t ts) 
     665{ 
     666    debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putSilenceFrames(%d, %llu)\n", nbframes, ts); 
     667 
     668    size_t bytes_per_frame = getEventSize() * getEventsPerFrame(); 
     669    unsigned int scratch_buffer_size_frames = m_scratch_buffer_size_bytes / bytes_per_frame; 
     670 
     671    if (nbframes > scratch_buffer_size_frames) { 
     672        debugError("nframes (%u) > scratch_buffer_size_frames (%u)\n", 
     673                   nbframes, scratch_buffer_size_frames); 
     674    } 
     675 
     676    assert(m_scratch_buffer); 
     677    if(!transmitSilenceBlock((char *)m_scratch_buffer, nbframes, 0)) { 
     678        debugError("Could not prepare silent block\n"); 
     679        return false; 
     680    } 
     681    if(!m_data_buffer->writeFrames(nbframes, (char *)m_scratch_buffer, ts)) { 
     682        debugError("Could not write silent block\n"); 
     683        return false; 
     684    } 
     685    return true; 
     686} 
     687 
     688bool 
     689StreamProcessor::shiftStream(int nbframes) 
     690{ 
     691    if(nbframes == 0) return true; 
     692    if(nbframes > 0) { 
     693        return m_data_buffer->dropFrames(nbframes); 
     694    } else { 
     695        bool result = true; 
     696        while(nbframes--) { 
     697            result &= m_data_buffer->writeDummyFrame(); 
     698        } 
     699        return result; 
     700    } 
    655701} 
    656702 
     
    670716bool StreamProcessor::prepare() 
    671717{ 
    672     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "prepare...\n"); 
     718    debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare SP (%p)...\n", this); 
    673719    if(!m_manager) { 
    674720        debugFatal("Not attached to a manager!\n"); 
     721        return false; 
     722    } 
     723 
     724    // make the scratch buffer one period of frames long 
     725    m_scratch_buffer_size_bytes = m_manager->getPeriodSize() * getEventsPerFrame() * getEventSize(); 
     726    debugOutput( DEBUG_LEVEL_VERBOSE, " Allocate scratch buffer of %d quadlets\n"); 
     727    if(m_scratch_buffer) delete[] m_scratch_buffer; 
     728    m_scratch_buffer = new byte_t[m_scratch_buffer_size_bytes]; 
     729    if(m_scratch_buffer == NULL) { 
     730        debugFatal("Could not allocate scratch buffer\n"); 
    675731        return false; 
    676732    } 
  • branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.h

    r723 r727  
    154154    bool putFrames(unsigned int nbframes, int64_t ts); ///< transfer the client contents to the buffer 
    155155 
     156    /** 
     157     * @brief drop nframes from the internal buffer as if they were transferred to the client side 
     158     * 
     159     * Gets nframes of frames from the buffer as done by getFrames(), but does not transfer them 
     160     * to the client side. Instead they are discarded. 
     161     * 
     162     * @param nframes number of frames 
     163     * @return true if the operation was successful 
     164     */ 
     165    bool dropFrames(unsigned int nframes, int64_t ts); 
     166 
     167    /** 
     168     * @brief put silence frames into the internal buffer 
     169     * 
     170     * Puts nframes of frames into the buffer as done by putFrames(), but does not transfer them 
     171     * from the client side. Instead, silent frames are used. 
     172     * 
     173     * @param nframes number of frames 
     174     * @return true if the operation was successful 
     175     */ 
     176    bool putSilenceFrames(unsigned int nbframes, int64_t ts); 
     177     
     178    /** 
     179     * @brief Shifts the stream with the specified number of frames 
     180     * 
     181     * Used to align several streams to each other. It comes down to 
     182     * making sure the head timestamp corresponds to the timestamp of 
     183     * one master stream 
     184     * 
     185     * @param nframes the number of frames to shift 
     186     * @return true if successful 
     187     */ 
     188    bool shiftStream(int nframes); 
    156189protected: // the helper receive/transmit functions 
    157190    enum eChildReturnValue { 
     
    238271protected: 
    239272    Util::TimestampedBuffer *m_data_buffer; 
    240  
     273    // the scratch buffer is temporary buffer space that can be 
     274    // used by any function. It's pre-allocated when the SP is created. 
     275    // the purpose is to avoid allocation of memory (or heap/stack) in 
     276    // an RT context 
     277    byte_t*         m_scratch_buffer; 
     278    size_t          m_scratch_buffer_size_bytes; 
    241279protected: 
    242280    StreamProcessorManager *m_manager; 
     
    255293         */ 
    256294        bool canClientTransferFrames(unsigned int nframes); 
    257  
    258         /** 
    259          * @brief drop nframes from the internal buffer 
    260          * 
    261          * this function drops nframes from the internal buffers, without any 
    262          * specification on what frames are dropped. Timestamps are not updated. 
    263          * 
    264          * @param nframes number of frames 
    265          * @return true if the operation was successful 
    266          */ 
    267         bool dropFrames(unsigned int nframes); 
    268295 
    269296        /** 
  • branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp

    r723 r727  
    2929#include <errno.h> 
    3030#include <assert.h> 
     31#include <math.h> 
    3132 
    3233#define RUNNING_TIMEOUT_MSEC 4000 
     
    350351    int64_t time_till_next_period; 
    351352    while(nb_sync_runs--) { // or while not sync-ed? 
    352         // check if we were waked up too soon 
    353         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); 
     353        // check if we were woken up too soon 
     354        time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); 
    354355        debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period); 
    355356        if(time_till_next_period > 0) { 
     
    444445 
    445446    // now align the received streams 
    446     debugOutput( DEBUG_LEVEL_VERBOSE, " Aligning incoming streams...\n"); 
    447      
    448      
     447    if(!alignReceivedStreams()) { 
     448        debugError("Could not align streams\n"); 
     449        return false; 
     450    } 
    449451    debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n"); 
     452    return true; 
     453} 
     454 
     455bool 
     456StreamProcessorManager::alignReceivedStreams() 
     457{ 
     458    #define NB_PERIODS_FOR_ALIGN_AVERAGE 20 
     459    #define NB_ALIGN_TRIES 20 
     460    debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n"); 
     461    unsigned int nb_sync_runs; 
     462    unsigned int nb_rcv_sp = m_ReceiveProcessors.size(); 
     463    int64_t diff_between_streams[nb_rcv_sp]; 
     464    int64_t diff; 
     465 
     466    unsigned int i; 
     467 
     468    bool aligned = false; 
     469    int cnt = NB_ALIGN_TRIES; 
     470    while (!aligned && cnt--) { 
     471        nb_sync_runs = NB_PERIODS_FOR_ALIGN_AVERAGE; 
     472        while(nb_sync_runs) { 
     473            debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs); 
     474            waitForPeriod(); 
     475 
     476            i = 0; 
     477            for ( i = 0; i < nb_rcv_sp; i++) { 
     478                StreamProcessor *s = m_ReceiveProcessors.at(i); 
     479                diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod()); 
     480                debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",  
     481                    m_SyncSource, s, diff); 
     482                if ( nb_sync_runs == NB_PERIODS_FOR_ALIGN_AVERAGE ) { 
     483                    diff_between_streams[i] = diff; 
     484                } else { 
     485                    diff_between_streams[i] += diff; 
     486                } 
     487            } 
     488            if(!transferSilence()) { 
     489                debugError("Could not transfer silence\n"); 
     490                return false; 
     491            } 
     492            nb_sync_runs--; 
     493        } 
     494        // calculate the average offsets 
     495        debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n"); 
     496        int diff_between_streams_frames[nb_rcv_sp]; 
     497        aligned = true; 
     498        for ( i = 0; i < nb_rcv_sp; i++) { 
     499            StreamProcessor *s = m_ReceiveProcessors.at(i); 
     500 
     501            diff_between_streams[i] /= NB_PERIODS_FOR_ALIGN_AVERAGE; 
     502            diff_between_streams_frames[i] = roundf(diff_between_streams[i] / s->getTicksPerFrame()); 
     503            debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",  
     504                m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]); 
     505 
     506            aligned &= (diff_between_streams_frames[i] == 0); 
     507 
     508            // position the stream 
     509            if(!s->shiftStream(diff_between_streams_frames[i])) { 
     510                debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]); 
     511                return false; 
     512            } 
     513        } 
     514        if (!aligned) { 
     515            debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n"); 
     516        } 
     517    } 
     518    if (cnt == 0) { 
     519        debugError("Align failed\n"); 
     520        return false; 
     521    } 
    450522    return true; 
    451523} 
     
    817889 */ 
    818890bool StreamProcessorManager::transfer() { 
    819  
    820891    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n"); 
    821892    bool retval=true; 
     
    833904 * @return true if successful, false otherwise (indicates xrun). 
    834905 */ 
    835  
    836906bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) { 
    837907    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",  
     
    879949} 
    880950 
     951/** 
     952 * @brief Transfer one period of silence for both receive and transmit StreamProcessors 
     953 * 
     954 * Transfers one period of silence to the Iso side for transmit SP's 
     955 * or dump one period of frames for receive SP's 
     956 * 
     957 * @return true if successful, false otherwise (indicates xrun). 
     958 */ 
     959bool StreamProcessorManager::transferSilence() { 
     960    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n"); 
     961    bool retval=true; 
     962    retval &= transferSilence(StreamProcessor::ePT_Receive); 
     963    retval &= transferSilence(StreamProcessor::ePT_Transmit); 
     964    return retval; 
     965} 
     966 
     967/** 
     968 * @brief Transfer one period of silence for either the receive or transmit StreamProcessors 
     969 * 
     970 * Transfers one period of silence to the Iso side for transmit SP's 
     971 * or dump one period of frames for receive SP's 
     972 * 
     973 * @param t The processor type to tranfer for (receive or transmit) 
     974 * @return true if successful, false otherwise (indicates xrun). 
     975 */ 
     976bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) { 
     977    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",  
     978        t, m_time_of_transfer, 
     979        (unsigned int)TICKS_TO_SECS(m_time_of_transfer), 
     980        (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer), 
     981        (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer)); 
     982 
     983    bool retval = true; 
     984    // a static cast could make sure that there is no performance 
     985    // penalty for the virtual functions (to be checked) 
     986    if (t==StreamProcessor::ePT_Receive) { 
     987        for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     988                it != m_ReceiveProcessors.end(); 
     989                ++it ) { 
     990            if(!(*it)->dropFrames(m_period, m_time_of_transfer)) { 
     991                    debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n", 
     992                            m_period, m_time_of_transfer,*it); 
     993                retval &= false; // buffer underrun 
     994            } 
     995        } 
     996    } else { 
     997        // FIXME: in the SPM it would be nice to have system time instead of 
     998        //        1394 time 
     999        float rate = m_SyncSource->getTicksPerFrame(); 
     1000        int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate); 
     1001 
     1002        // the data we are putting into the buffer is intended to be transmitted 
     1003        // one ringbuffer size after it has been received 
     1004        int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks); 
     1005 
     1006        for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     1007                it != m_TransmitProcessors.end(); 
     1008                ++it ) { 
     1009            // FIXME: in the SPM it would be nice to have system time instead of 
     1010            //        1394 time 
     1011            if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) { 
     1012                debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n", 
     1013                        m_period, transmit_timestamp, *it); 
     1014                retval &= false; // buffer underrun 
     1015            } 
     1016        } 
     1017    } 
     1018    return retval; 
     1019} 
     1020 
    8811021void StreamProcessorManager::dumpInfo() { 
    8821022    debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n"); 
  • branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.h

    r720 r727  
    7171    void setPeriodSize(unsigned int period); 
    7272    void setPeriodSize(unsigned int period, unsigned int nb_buffers); 
    73     int getPeriodSize() {return m_period;}; 
     73    unsigned int getPeriodSize() {return m_period;}; 
    7474 
    7575    void setNbBuffers(unsigned int nb_buffers); 
     
    8181 
    8282    // the client-side functions 
     83    bool waitForPeriod(); 
     84    bool transfer(); 
     85    bool transfer(enum StreamProcessor::eProcessorType); 
     86private: 
     87    bool transferSilence(); 
     88    bool transferSilence(enum StreamProcessor::eProcessorType); 
    8389 
    84     bool waitForPeriod(); ///< wait for the next period 
    85     bool transfer(); ///< transfer the buffer contents from/to client 
    86     bool transfer(enum StreamProcessor::eProcessorType); ///< transfer the buffer contents from/to client (single processor type) 
    87  
     90    bool alignReceivedStreams(); 
     91public: 
    8892    int getDelayedUsecs() {return m_delayed_usecs;}; 
    8993    bool xrunOccurred(); 
  • branches/ppalmers-streaming/src/libutil/TimestampedBuffer.cpp

    r720 r727  
    448448 * @return true if successful 
    449449 */ 
    450 bool TimestampedBuffer::dropFrames(unsigned int nframes) { 
    451  
    452     unsigned int read_size=nframes*m_event_size*m_events_per_frame; 
    453  
     450bool 
     451TimestampedBuffer::dropFrames(unsigned int nframes) { 
     452    unsigned int read_size = nframes * m_event_size * m_events_per_frame; 
    454453    ffado_ringbuffer_read_advance(m_event_buffer, read_size); 
    455454    decrementFrameCounter(nframes);