Changeset 391

Show
Ignore:
Timestamp:
02/09/07 00:01:30 (14 years ago)
Author:
pieterpalmers
Message:

* Partially finished:

  • Introduce TimestampedBuffer? util class
  • replace interal ringbuffer of SP with timed ringbuffer

* Compiles & works

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp

    r390 r391  
    5353/* transmit */ 
    5454AmdtpTransmitStreamProcessor::AmdtpTransmitStreamProcessor(int port, int framerate, int dimension) 
    55         : TransmitStreamProcessor(port, framerate), m_dimension(dimension) 
    56         , m_last_timestamp(0), m_dbc(0), m_ringbuffer_size_frames(0) 
    57         { 
    58  
     55        : TransmitStreamProcessor(port, framerate), m_dimension(dimension) 
     56        , m_last_timestamp(0), m_dbc(0), m_ringbuffer_size_frames(0) 
     57
    5958 
    6059} 
    6160 
    6261AmdtpTransmitStreamProcessor::~AmdtpTransmitStreamProcessor() { 
    63         freebob_ringbuffer_free(m_event_buffer); 
    64         free(m_cluster_buffer); 
     62 
    6563} 
    6664 
     
    9593    unsigned int nevents=0; 
    9694     
     95    m_last_cycle=cycle; 
     96     
    9797    debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"Xmit handler for cycle %d, (running=%d, enabled=%d,%d)\n", 
    9898        cycle, m_running, m_disabled, m_is_disabled); 
     
    124124    uint64_t fc; 
    125125     
    126     getBufferTailTimestamp(&ts_tail, &fc); // thread safe 
     126    m_data_buffer->getBufferTailTimestamp(&ts_tail, &fc); // thread safe 
    127127     
    128128    int64_t timestamp = ts_tail; 
     
    134134     
    135135    // FIXME: test 
    136     // substract the receive transfer delay 
    137     timestamp -= RECEIVE_PROCESSING_DELAY; 
     136//     timestamp -= (uint64_t)(((float)m_handler->getWakeupInterval()) 
     137//                                        * ((float)m_syt_interval) * ticks_per_frame); 
     138//  
     139//     // substract the receive transfer delay 
     140//     timestamp -= RECEIVE_PROCESSING_DELAY; 
    138141     
    139142    // this happens if m_buffer_tail_timestamp wraps around while there are  
     
    268271            debugOutput(DEBUG_LEVEL_VERBOSE,"Preparing to enable...\n"); 
    269272             
    270             m_SyncSource->getBufferHeadTimestamp(&ts, &fc); // thread safe 
    271              
     273            m_SyncSource->m_data_buffer->getBufferHeadTimestamp(&ts, &fc); // thread safe 
     274             
     275            // the number of cycles the sync source lags 
     276            // or leads (< 0) 
     277            int sync_lag_cycles=cycle-m_SyncSource->getLastCycle()-1; 
     278            if(sync_lag_cycles > (int)(CYCLES_PER_SECOND/2)) { 
     279                sync_lag_cycles -= CYCLES_PER_SECOND/2; 
     280            } 
     281            if (sync_lag_cycles < -((int)CYCLES_PER_SECOND/2)) { 
     282                sync_lag_cycles += CYCLES_PER_SECOND/2; 
     283            } 
     284 
    272285            // recalculate the buffer head timestamp 
    273286            float ticks_per_frame=m_SyncSource->getTicksPerFrame(); 
     
    278291            // plus one frame 
    279292            ts += (uint64_t)ticks_per_frame; 
     293             
     294            // account for the cycle lag between sync SP and this SP 
     295            ts += sync_lag_cycles * TICKS_PER_CYCLE; 
     296             
    280297            if (ts >= TICKS_PER_SECOND * 128L) { 
    281298                ts -= TICKS_PER_SECOND * 128L; 
    282299            } 
    283              
    284             setBufferHeadTimestamp(ts); 
     300 
     301//             m_data_buffer->setBufferHeadTimestamp(ts); 
    285302            int64_t timestamp = ts; 
    286303         
     
    289306            // frames_in_buffer * rate  
    290307            // later 
    291             int frames_in_buffer=getFrameCounter(); 
     308            int frames_in_buffer=m_data_buffer->getFrameCounter(); 
    292309            timestamp += (int64_t)((float)frames_in_buffer * ticks_per_frame); 
    293310             
     
    302319            } 
    303320         
    304             StreamProcessor::setBufferTailTimestamp(timestamp); 
     321            m_data_buffer->setBufferTailTimestamp(timestamp); 
    305322             
    306323            debugOutput(DEBUG_LEVEL_VERBOSE,"XMIT TS SET: TS=%10lld, TSTMP=%10llu, FC=%4d, %f\n", 
     
    349366    *tag = IEC61883_TAG_WITH_CIP; 
    350367    *sy = 0; 
    351       
    352     unsigned int read_size=nevents*sizeof(quadlet_t)*m_dimension; 
    353  
    354     if ((freebob_ringbuffer_read(m_event_buffer,(char *)(data+8),read_size)) <  
    355                             read_size)  
     368     
     369    if (m_data_buffer->readFrames(nevents, (char *)(data + 8))) 
    356370    { 
     371        *length = nevents*sizeof(quadlet_t)*m_dimension + 8; 
     372 
     373        // process all ports that should be handled on a per-packet base 
     374        // this is MIDI for AMDTP (due to the need of DBC) 
     375        if (!encodePacketPorts((quadlet_t *)(data+8), nevents, packet->dbc)) { 
     376            debugWarning("Problem encoding Packet Ports\n"); 
     377        } 
     378 
     379        packet->fdf = m_fdf; 
     380 
     381        // convert the timestamp to SYT format 
     382        uint64_t ts=timestamp + TRANSMIT_TRANSFER_DELAY; 
     383         
     384        // check if it wrapped 
     385        if (ts >= TICKS_PER_SECOND * 128L) { 
     386            ts -= TICKS_PER_SECOND * 128L; 
     387        } 
     388         
     389        unsigned int timestamp_SYT = TICKS_TO_SYT(ts); 
     390        packet->syt = ntohs(timestamp_SYT); 
     391         
     392        // update the frame counter such that it reflects the new value 
     393        // done in the SP base class 
     394        if (!StreamProcessor::getFrames(nevents)) { 
     395            debugError("Could not do StreamProcessor::getFrames(%d)\n",nevents); 
     396             return RAW1394_ISO_ERROR; 
     397        } 
     398         
     399        return RAW1394_ISO_OK;     
     400    } else { 
    357401        /* there is no more data in the ringbuffer */ 
    358402        // convert the timestamp to SYT format 
     
    392436 
    393437        return RAW1394_ISO_DEFER; 
    394     } else { 
    395         *length = read_size + 8; 
    396  
    397         // process all ports that should be handled on a per-packet base 
    398         // this is MIDI for AMDTP (due to the need of DBC) 
    399         if (!encodePacketPorts((quadlet_t *)(data+8), nevents, packet->dbc)) { 
    400             debugWarning("Problem encoding Packet Ports\n"); 
    401         } 
    402  
    403         packet->fdf = m_fdf; 
    404  
    405         // convert the timestamp to SYT format 
    406         uint64_t ts=timestamp + TRANSMIT_TRANSFER_DELAY; 
    407          
    408         // check if it wrapped 
    409         if (ts >= TICKS_PER_SECOND * 128L) { 
    410             ts -= TICKS_PER_SECOND * 128L; 
    411         } 
    412          
    413         unsigned int timestamp_SYT = TICKS_TO_SYT(ts); 
    414         packet->syt = ntohs(timestamp_SYT); 
    415  
    416         // calculate the new buffer head timestamp. this is 
    417         // the previous buffer head timestamp plus 
    418         // the number of frames sent * ticks_per_frame 
    419         timestamp += (int64_t)((float)nevents * ticks_per_frame ); 
    420          
    421         // check if it wrapped 
    422         if (timestamp >= TICKS_PER_SECOND * 128L) { 
    423             timestamp -= TICKS_PER_SECOND * 128L; 
    424         } 
    425          
    426         // update the frame counter such that it reflects the new value 
    427         // also update the buffer head timestamp 
    428         // done in the SP base class 
    429         if (!StreamProcessor::getFrames(nevents, timestamp)) { 
    430             debugError("Could not do StreamProcessor::getFrames(%d, %llu)\n",nevents, timestamp); 
    431              return RAW1394_ISO_ERROR; 
    432         } 
    433          
    434         return RAW1394_ISO_OK; 
    435438    } 
    436439 
     
    470473    uint64_t ts; 
    471474    uint64_t fc; 
    472     m_SyncSource->getBufferHeadTimestamp(&ts, &fc); // thread safe 
     475    m_SyncSource->m_data_buffer->getBufferHeadTimestamp(&ts, &fc); // thread safe 
    473476 
    474477    // update the frame counter such that it reflects the buffer content, 
     
    476479    // done in the SP base class 
    477480    if (!StreamProcessor::putFrames(m_ringbuffer_size_frames, ts)) { 
    478         debugError("Could not do StreamProcessor::putFrames(%d, %0)\n", 
    479             m_ringbuffer_size_frames); 
     481        debugError("Could not do StreamProcessor::putFrames(%d, %011llu)\n", 
     482            m_ringbuffer_size_frames,ts); 
    480483        return false; 
    481484    } 
     
    488491    debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting...\n"); 
    489492 
    490     // reset the event buffer, discard all content 
    491     freebob_ringbuffer_reset(m_event_buffer); 
    492      
    493493    // reset the statistics 
    494494    m_PeriodStat.reset(); 
     
    566566        m_syt_interval); 
    567567 
     568    // prepare the framerate estimate 
     569    m_ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_framerate); 
     570     
    568571    // allocate the event buffer 
    569572    m_ringbuffer_size_frames=m_nb_buffers * m_period; 
    570      
    571     // prepare the framerate estimate 
    572     m_ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_framerate); 
    573      
     573 
    574574    // add the receive processing delay 
    575575//     m_ringbuffer_size_frames+=(uint)(RECEIVE_PROCESSING_DELAY/m_ticks_per_frame); 
    576      
    577     if( !(m_event_buffer=freebob_ringbuffer_create( 
    578             (m_dimension * m_ringbuffer_size_frames) * sizeof(quadlet_t)))) { 
    579         debugFatal("Could not allocate memory event ringbuffer"); 
    580         return false; 
    581     } 
    582  
    583     // allocate the temporary cluster buffer 
    584     if( !(m_cluster_buffer=(char *)calloc(m_dimension,sizeof(quadlet_t)))) { 
    585         debugFatal("Could not allocate temporary cluster buffer"); 
    586         freebob_ringbuffer_free(m_event_buffer); 
    587         return false; 
    588     } 
     576 
     577    assert(m_data_buffer);     
     578    m_data_buffer->setBufferSize(m_ringbuffer_size_frames); 
     579    m_data_buffer->setEventSize(sizeof(quadlet_t)); 
     580    m_data_buffer->setEventsPerFrame(m_dimension); 
     581     
     582    m_data_buffer->setUpdatePeriod(m_period); 
     583    m_data_buffer->setNominalRate(m_ticks_per_frame); 
     584     
     585    m_data_buffer->prepare(); 
    589586 
    590587    // set the parameters of ports we can: 
     
    685682     
    686683    // prefill the event buffer 
    687     if (!prefill()) { 
    688         debugFatal("Could not prefill buffers\n"); 
    689         return false;     
    690     } 
     684    // NOTE: do we need to prefill? reset() is called, so everything is prefilled then 
     685//     if (!prefill()) { 
     686//         debugFatal("Could not prefill buffers\n"); 
     687//         return false;     
     688//     } 
    691689     
    692690    debugOutput( DEBUG_LEVEL_VERBOSE, "Prepared for:\n"); 
     
    724722} 
    725723 
    726 bool AmdtpTransmitStreamProcessor::transferSilence(unsigned int size) { 
    727     /* a naive implementation would look like this: */ 
    728      
    729     unsigned int write_size=size*sizeof(quadlet_t)*m_dimension; 
    730     char *dummybuffer=(char *)calloc(sizeof(quadlet_t),size*m_dimension); 
    731     transmitSilenceBlock(dummybuffer, size, 0); 
    732  
    733     if (freebob_ringbuffer_write(m_event_buffer,(char *)(dummybuffer),write_size) < write_size) { 
     724bool AmdtpTransmitStreamProcessor::transferSilence(unsigned int nframes) { 
     725    bool retval; 
     726     
     727    char *dummybuffer=(char *)calloc(sizeof(quadlet_t),nframes*m_dimension); 
     728     
     729    transmitSilenceBlock(dummybuffer, nframes, 0); 
     730 
     731    // add the silence data to the ringbuffer 
     732    if(m_data_buffer->writeFrames(nframes, dummybuffer)) {  
     733        retval=true; 
     734    } else { 
    734735        debugWarning("Could not write to event buffer\n"); 
    735     } 
    736      
     736        retval=false; 
     737    } 
     738 
    737739    free(dummybuffer); 
    738740     
    739     return true
     741    return retval
    740742} 
    741743 
    742744bool AmdtpTransmitStreamProcessor::canClientTransferFrames(unsigned int nbframes) { 
    743745    // there has to be enough space to put the frames in 
    744     return m_ringbuffer_size_frames - getFrameCounter() > nbframes; 
     746    return m_ringbuffer_size_frames - m_data_buffer->getFrameCounter() > nbframes; 
    745747} 
    746748 
    747749bool AmdtpTransmitStreamProcessor::putFrames(unsigned int nbframes, int64_t ts) { 
    748     m_PeriodStat.mark(freebob_ringbuffer_read_space(m_event_buffer)/(4*m_dimension)); 
    749  
    750     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n"); 
    751     int xrun; 
    752     unsigned int offset=0; 
     750    m_PeriodStat.mark(m_data_buffer->getBufferFill()); 
    753751     
    754752    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "AmdtpTransmitStreamProcessor::putFrames(%d, %llu)\n",nbframes, ts); 
    755753     
    756     freebob_ringbuffer_data_t vec[2]; 
    757     // we received one period of frames 
    758     // this is period_size*dimension of events 
    759     unsigned int events2write=nbframes*m_dimension; 
    760     unsigned int bytes2write=events2write*sizeof(quadlet_t); 
    761  
    762     /* write events2write bytes to the ringbuffer  
    763     *  first see if it can be done in one read. 
    764     *  if so, ok.  
    765     *  otherwise write up to a multiple of clusters directly to the buffer 
    766     *  then do the buffer wrap around using ringbuffer_write 
    767     *  then write the remaining data directly to the buffer in a third pass  
    768     *  Make sure that we cannot end up on a non-cluster aligned position! 
    769     */ 
    770     unsigned int cluster_size=m_dimension*sizeof(quadlet_t); 
    771  
    772     while(bytes2write>0) { 
    773         int byteswritten=0; 
    774          
    775         unsigned int frameswritten=(nbframes*cluster_size-bytes2write)/cluster_size; 
    776         offset=frameswritten; 
    777          
    778         freebob_ringbuffer_get_write_vector(m_event_buffer, vec); 
    779              
    780         if(vec[0].len==0) { // this indicates a full event buffer 
    781             debugError("XMT: Event buffer overrun in processor %p\n",this); 
    782             break; 
    783         } 
    784              
    785         /* if we don't take care we will get stuck in an infinite loop 
    786         * because we align to a cluster boundary later 
    787         * the remaining nb of bytes in one write operation can be  
    788         * smaller than one cluster 
    789         * this can happen because the ringbuffer size is always a power of 2 
    790         */ 
    791         if(vec[0].len<cluster_size) { 
    792              
    793             // encode to the temporary buffer 
    794             xrun = transmitBlock(m_cluster_buffer, 1, offset); 
    795              
    796             if(xrun<0) { 
    797                 // xrun detected 
    798                 debugError("XMT: Frame buffer underrun in processor %p\n",this); 
    799                 break; 
    800             } 
    801                  
    802             // use the ringbuffer function to write one cluster  
    803             // the write function handles the wrap around. 
    804             freebob_ringbuffer_write(m_event_buffer, 
    805                          m_cluster_buffer, 
    806                          cluster_size); 
    807                  
    808             // we advanced one cluster_size 
    809             bytes2write-=cluster_size; 
    810                  
    811         } else { //  
    812              
    813             if(bytes2write>vec[0].len) { 
    814                 // align to a cluster boundary 
    815                 byteswritten=vec[0].len-(vec[0].len%cluster_size); 
    816             } else { 
    817                 byteswritten=bytes2write; 
    818             } 
    819                  
    820             xrun = transmitBlock(vec[0].buf, 
    821                          byteswritten/cluster_size, 
    822                          offset); 
    823              
    824             if(xrun<0) { 
    825                     // xrun detected 
    826                 debugError("XMT: Frame buffer underrun in processor %p\n",this); 
    827                 break; // FIXME: return false ? 
    828             } 
    829  
    830             freebob_ringbuffer_write_advance(m_event_buffer, byteswritten); 
    831             bytes2write -= byteswritten; 
    832         } 
    833  
    834         // the bytes2write should always be cluster aligned 
    835         assert(bytes2write%cluster_size==0); 
    836  
    837     } 
     754    m_data_buffer->blockProcessWriteFrames(nbframes, ts); 
    838755     
    839756    // recalculate the buffer tail timestamp 
     
    878795 */ 
    879796 
    880 int AmdtpTransmitStreamProcessor::transmitBlock(char *data,  
     797bool AmdtpTransmitStreamProcessor::processWriteBlock(char *data,  
    881798                       unsigned int nevents, unsigned int offset) 
    882799{ 
    883     int problem=0
     800    bool no_problem=true
    884801 
    885802    for ( PortVectorIterator it = m_PeriodPorts.begin(); 
     
    899816            if(encodePortToMBLAEvents(static_cast<AmdtpAudioPort *>(*it), (quadlet_t *)data, offset, nevents)) { 
    900817                debugWarning("Could not encode port %s to MBLA events",(*it)->getName().c_str()); 
    901                 problem=1
     818                no_problem=false
    902819            } 
    903820            break; 
     
    908825        } 
    909826    } 
    910     return problem; 
     827    return no_problem; 
    911828 
    912829} 
     
    10901007    : ReceiveStreamProcessor(port, framerate), m_dimension(dimension), m_last_timestamp(0), m_last_timestamp2(0) { 
    10911008 
    1092  
    10931009} 
    10941010 
    10951011AmdtpReceiveStreamProcessor::~AmdtpReceiveStreamProcessor() { 
    1096     freebob_ringbuffer_free(m_event_buffer); 
    1097     free(m_cluster_buffer); 
    10981012 
    10991013} 
    11001014 
    11011015bool AmdtpReceiveStreamProcessor::init() { 
     1016 
    11021017    // call the parent init 
    11031018    // this has to be done before allocating the buffers,  
     
    11171032     
    11181033    enum raw1394_iso_disposition retval=RAW1394_ISO_OK; 
     1034    m_last_cycle=cycle; 
    11191035     
    11201036    struct iec61883_packet *packet = (struct iec61883_packet *) data; 
     
    11441060            channel, cycle,syt_timestamp,   
    11451061            CYCLE_TIMER_GET_CYCLES(syt_timestamp), CYCLE_TIMER_GET_OFFSET(syt_timestamp), 
    1146             getFrameCounter(), m_is_disabled); 
     1062            m_data_buffer->getFrameCounter(), m_is_disabled); 
    11471063         
    11481064        // reconstruct the full cycle 
     
    11991115        // we have to keep in mind that there are also 
    12001116        // some packets buffered by the ISO layer 
    1201         // at most x=m_handler->getNbBuffers() 
     1117        // at most x=m_handler->getWakeupInterval() 
    12021118        // these contain at most x*syt_interval 
    12031119        // frames, meaning that we might receive 
    12041120        // this packet x*syt_interval*ticks_per_frame 
    12051121        // later than expected (the real receive time) 
    1206         m_last_timestamp += (uint64_t)(((float)m_handler->getNbBuffers()) 
    1207                                        * m_syt_interval * m_ticks_per_frame); 
     1122        debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"STMP: %lluticks | buff=%d, syt_interval=%d, tpf=%f\n", 
     1123            m_last_timestamp, m_handler->getWakeupInterval(),m_syt_interval,m_ticks_per_frame); 
     1124         
     1125        m_last_timestamp += (uint64_t)(((float)m_handler->getWakeupInterval()) 
     1126                                       * ((float)m_syt_interval) * m_ticks_per_frame); 
     1127        debugOutput(DEBUG_LEVEL_VERY_VERBOSE," ==> %lluticks\n", m_last_timestamp); 
    12081128         
    12091129        // the receive processing delay indicates how much 
    12101130        // extra time we need as slack 
    12111131        m_last_timestamp += RECEIVE_PROCESSING_DELAY; 
    1212  
     1132         
     1133        // wrap if nescessary 
     1134        if (m_last_timestamp >= TICKS_PER_SECOND * 128L) { 
     1135            m_last_timestamp -= TICKS_PER_SECOND * 128L; 
     1136        } 
     1137         
    12131138        //=> now estimate the device frame rate 
    12141139        if (m_last_timestamp2 && m_last_timestamp) { 
     
    12711196                m_is_disabled=false; 
    12721197                debugOutput(DEBUG_LEVEL_VERBOSE,"enabling StreamProcessor %p at %d\n", this, cycle); 
     1198                // the previous timestamp is the one we need to start with 
     1199                // because we're going to update the buffer again this loop 
     1200                m_data_buffer->setBufferTailTimestamp(m_last_timestamp2); 
     1201                 
    12731202            } else { 
    12741203                debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"will enable StreamProcessor %p at %u, now is %d\n", this, m_cycle_to_enable_at, cycle); 
     
    12961225            } 
    12971226            // set the timestamps 
    1298             StreamProcessor::setBufferTimestamps(ts,ts); 
     1227            m_data_buffer->setBufferTailTimestamp(ts); 
    12991228             
    13001229            return RAW1394_ISO_DEFER; 
     
    13021231         
    13031232        //=> process the packet 
    1304         unsigned int write_size=nevents*sizeof(quadlet_t)*m_dimension; 
    1305          
    13061233        // add the data payload to the ringbuffer 
    1307         if (freebob_ringbuffer_write(m_event_buffer,(char *)(data+8),write_size) < write_size)  
    1308         { 
    1309             debugWarning("Receive buffer overrun (cycle %d, FC=%d, PC=%d)\n",  
    1310                  cycle, getFrameCounter(), m_handler->getPacketCount()); 
    1311              
    1312             m_xruns++; 
    1313              
    1314             // disable the processing, will be re-enabled when 
    1315             // the xrun is handled 
    1316             m_disabled=true; 
    1317             m_is_disabled=true; 
    1318  
    1319             retval=RAW1394_ISO_DEFER; 
    1320         } else { 
     1234        if(m_data_buffer->writeFrames(nevents, (char *)(data+8))) {  
    13211235            retval=RAW1394_ISO_OK; 
     1236             
    13221237            // process all ports that should be handled on a per-packet base 
    13231238            // this is MIDI for AMDTP (due to the need of DBC) 
     
    13261241                retval=RAW1394_ISO_DEFER; 
    13271242            } 
     1243             
     1244        } else { 
     1245         
     1246            debugWarning("Receive buffer overrun (cycle %d, FC=%d, PC=%d)\n",  
     1247                 cycle, m_data_buffer->getFrameCounter(), m_handler->getPacketCount()); 
     1248             
     1249            m_xruns++; 
     1250             
     1251            // disable the processing, will be re-enabled when 
     1252            // the xrun is handled 
     1253            m_disabled=true; 
     1254            m_is_disabled=true; 
     1255 
     1256            retval=RAW1394_ISO_DEFER; 
     1257             
    13281258        } 
    13291259 
     
    14341364    // currently this is in ticks 
    14351365     
    1436     int64_t fc=getFrameCounter(); 
     1366    int64_t fc=m_data_buffer->getFrameCounter(); 
    14371367     
    14381368    int64_t next_period_boundary =  m_last_timestamp; 
     
    14831413} 
    14841414 
    1485  
    14861415bool AmdtpReceiveStreamProcessor::reset() { 
    14871416 
    14881417    debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting...\n"); 
    14891418 
    1490     // reset the event buffer, discard all content 
    1491     freebob_ringbuffer_reset(m_event_buffer); 
    1492      
    14931419    m_PeriodStat.reset(); 
    14941420    m_PacketStat.reset(); 
     
    15621488    ringbuffer_size_frames+=(uint)(RECEIVE_PROCESSING_DELAY/m_ticks_per_frame); 
    15631489     
    1564     if( !(m_event_buffer=freebob_ringbuffer_create( 
    1565             (m_dimension * ringbuffer_size_frames) * sizeof(quadlet_t)))) { 
    1566                 debugFatal("Could not allocate memory event ringbuffer"); 
    1567                 return false; 
    1568         } 
    1569  
    1570         // allocate the temporary cluster buffer 
    1571         if( !(m_cluster_buffer=(char *)calloc(m_dimension,sizeof(quadlet_t)))) { 
    1572                 debugFatal("Could not allocate temporary cluster buffer"); 
    1573                 freebob_ringbuffer_free(m_event_buffer); 
    1574                 return false; 
    1575         } 
     1490    assert(m_data_buffer);     
     1491    m_data_buffer->setBufferSize(ringbuffer_size_frames); 
     1492    m_data_buffer->setEventSize(sizeof(quadlet_t)); 
     1493    m_data_buffer->setEventsPerFrame(m_dimension); 
     1494         
     1495    // the buffer is written every syt_interval 
     1496    m_data_buffer->setUpdatePeriod(m_syt_interval); 
     1497    m_data_buffer->setNominalRate(m_ticks_per_frame); 
     1498     
     1499    m_data_buffer->prepare(); 
    15761500 
    15771501        // set the parameters of ports we can: 
     
    16711595 
    16721596bool AmdtpReceiveStreamProcessor::canClientTransferFrames(unsigned int nbframes) { 
    1673     return getFrameCounter() >= (int) nbframes; 
    1674 
    1675  
    1676 bool AmdtpReceiveStreamProcessor::getFrames(unsigned int nbframes, int64_t ts) { 
    1677  
    1678     m_PeriodStat.mark(freebob_ringbuffer_read_space(m_event_buffer)/(4*m_dimension)); 
    1679  
    1680         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n"); 
    1681          
    1682         int xrun; 
    1683         unsigned int offset=0; 
    1684          
    1685         freebob_ringbuffer_data_t vec[2]; 
    1686         // we received one period of frames on each connection 
    1687         // this is period_size*dimension of events 
    1688  
    1689         unsigned int events2read=nbframes*m_dimension; 
    1690         unsigned int bytes2read=events2read*sizeof(quadlet_t); 
    1691         /* read events2read bytes from the ringbuffer  
    1692         *  first see if it can be done in one read.  
    1693         *  if so, ok.  
    1694         *  otherwise read up to a multiple of clusters directly from the buffer 
    1695         *  then do the buffer wrap around using ringbuffer_read 
    1696         *  then read the remaining data directly from the buffer in a third pass  
    1697         *  Make sure that we cannot end up on a non-cluster aligned position! 
    1698         */ 
    1699         unsigned int cluster_size=m_dimension*sizeof(quadlet_t); 
    1700          
    1701         while(bytes2read>0) { 
    1702                 unsigned int framesread=(nbframes*cluster_size-bytes2read)/cluster_size; 
    1703                 offset=framesread; 
    1704                  
    1705                 int bytesread=0; 
    1706  
    1707                 freebob_ringbuffer_get_read_vector(m_event_buffer, vec); 
    1708                          
    1709                 if(vec[0].len==0) { // this indicates an empty event buffer 
    1710                         debugError("RCV: Event buffer underrun in processor %p\n",this); 
    1711                         break; 
    1712                 } 
    1713                          
    1714                 /* if we don't take care we will get stuck in an infinite loop 
    1715                 * because we align to a cluster boundary later 
    1716                 * the remaining nb of bytes in one read operation can be smaller than one cluster 
    1717                 * this can happen because the ringbuffer size is always a power of 2 
    1718                         */ 
    1719                 if(vec[0].len<cluster_size) { 
    1720                         // use the ringbuffer function to read one cluster  
    1721                         // the read function handles wrap around 
    1722                         freebob_ringbuffer_read(m_event_buffer,m_cluster_buffer,cluster_size); 
    1723  
    1724                         xrun = receiveBlock(m_cluster_buffer, 1, offset); 
    1725                                  
    1726                         if(xrun<0) { 
    1727                                 // xrun detected 
    1728                                 debugError("RCV: Frame buffer overrun in processor %p\n",this); 
    1729                                 break; 
    1730                         } 
    1731                                  
    1732                                 // we advanced one cluster_size 
    1733                         bytes2read-=cluster_size; 
    1734                                  
    1735                 } else { //  
    1736                          
    1737                         if(bytes2read>vec[0].len) { 
    1738                                         // align to a cluster boundary 
    1739                                 bytesread=vec[0].len-(vec[0].len%cluster_size); 
    1740                         } else { 
    1741                                 bytesread=bytes2read; 
    1742                         } 
    1743                                  
    1744                         xrun = receiveBlock(vec[0].buf, bytesread/cluster_size, offset); 
    1745                                  
    1746                         if(xrun<0) { 
    1747                                         // xrun detected 
    1748                                 debugError("RCV: Frame buffer overrun in processor %p\n",this); 
    1749                                 break; 
    1750                         } 
    1751  
    1752                         freebob_ringbuffer_read_advance(m_event_buffer, bytesread); 
    1753                         bytes2read -= bytesread; 
    1754                 } 
    1755                          
    1756                 // the bytes2read should always be cluster aligned 
    1757                 assert(bytes2read%cluster_size==0); 
    1758         } 
     1597    return m_data_buffer->getFrameCounter() >= (int) nbframes; 
     1598
     1599 
     1600bool AmdtpReceiveStreamProcessor::getFrames(unsigned int nbframes) { 
     1601 
     1602    m_PeriodStat.mark(m_data_buffer->getBufferFill()); 
     1603 
     1604    // ask the buffer to process nbframes of frames 
     1605    // using it's registered client's processReadBlock(), 
     1606    // which should be ours 
     1607    m_data_buffer->blockProcessReadFrames(nbframes); 
    17591608         
    17601609    // update the frame counter such that it reflects the new value, 
    1761     // and also update the buffer head timestamp as we pull frames 
    17621610    // done in the SP base class 
    1763  
    1764     // wrap the timestamp if nescessary 
    1765     if (ts < 0) { 
    1766         ts += TICKS_PER_SECOND * 128L; 
    1767     } else if (ts >= TICKS_PER_SECOND * 128L) { 
    1768         ts -= TICKS_PER_SECOND * 128L; 
    1769     } 
    1770      
    1771     if (!StreamProcessor::getFrames(nbframes, ts)) { 
    1772         debugError("Could not do StreamProcessor::getFrames(%d, %llu)\n", nbframes, ts); 
     1611     
     1612    if (!StreamProcessor::getFrames(nbframes)) { 
     1613        debugError("Could not do StreamProcessor::getFrames(%d)\n", nbframes); 
    17731614        return false; 
    17741615    } 
     
    17801621 * \brief write received events to the stream ringbuffers. 
    17811622 */ 
    1782 int AmdtpReceiveStreamProcessor::receiveBlock(char *data,  
     1623bool AmdtpReceiveStreamProcessor::processReadBlock(char *data,  
    17831624                                           unsigned int nevents, unsigned int offset) 
    17841625{ 
    1785         int problem=0; 
     1626        debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "(%p)->processReadBlock(%u, %u)\n",this,nevents,offset); 
     1627         
     1628        bool no_problem=true; 
    17861629 
    17871630        for ( PortVectorIterator it = m_PeriodPorts.begin(); 
     
    18011644                        if(decodeMBLAEventsToPort(static_cast<AmdtpAudioPort *>(*it), (quadlet_t *)data, offset, nevents)) { 
    18021645                                debugWarning("Could not decode packet MBLA to port %s",(*it)->getName().c_str()); 
    1803                                 problem=1
     1646                                no_problem=false
    18041647                        } 
    18051648                        break; 
     
    18131656                } 
    18141657    } 
    1815        return problem; 
     1658    return no_problem; 
    18161659 
    18171660} 
  • branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.h

    r386 r391  
    3535#include "../debugmodule/debugmodule.h" 
    3636#include "StreamProcessor.h" 
     37 
    3738#include "cip.h" 
    3839#include <libiec61883/iec61883.h> 
    39 #include "libutil/ringbuffer.h" 
    4040#include <pthread.h> 
    4141 
     
    117117     
    118118protected: 
     119    bool processWriteBlock(char *data, unsigned int nevents, unsigned int offset); 
    119120 
    120121    struct iec61883_cip m_cip_status; 
    121  
    122     freebob_ringbuffer_t * m_event_buffer; 
    123     char* m_cluster_buffer; 
     122     
    124123    int m_dimension; 
    125124    unsigned int m_syt_interval; 
     
    184183         
    185184    bool canClientTransferFrames(unsigned int nbframes); 
    186     bool getFrames(unsigned int nbframes, int64_t ts); ///< transfer the buffer contents to the client 
     185    bool getFrames(unsigned int nbframes); ///< transfer the buffer contents to the client 
    187186 
    188187    // We have 1 period of samples = m_period 
     
    208207protected: 
    209208 
    210     int receiveBlock(char *data, unsigned int nevents, unsigned int offset); 
     209    bool processReadBlock(char *data, unsigned int nevents, unsigned int offset); 
     210 
    211211    bool decodePacketPorts(quadlet_t *data, unsigned int nevents, unsigned int dbc); 
    212212     
    213213    int decodeMBLAEventsToPort(AmdtpAudioPort *, quadlet_t *data, unsigned int offset, unsigned int nevents); 
    214214 
    215     freebob_ringbuffer_t * m_event_buffer; 
    216     char* m_cluster_buffer; 
    217215    int m_dimension; 
    218216    unsigned int m_syt_interval; 
  • branches/streaming-rework/src/libstreaming/IsoHandler.cpp

    r390 r391  
    134134     
    135135    if (m_TimeSource) delete m_TimeSource; 
     136} 
     137 
     138bool IsoHandler::iterate() { 
     139    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "IsoHandler (%p) iterate...\n",this); 
     140 
     141    if(m_handle) { 
     142        if(raw1394_loop_iterate(m_handle)) { 
     143            debugOutput( DEBUG_LEVEL_VERBOSE,  
     144                 "IsoHandler (%p): Failed to iterate handler: %s\n", 
     145                 this,strerror(errno)); 
     146            return false; 
     147        } else { 
     148            return true; 
     149        } 
     150    } else { 
     151        return false;  
     152    } 
    136153} 
    137154 
  • branches/streaming-rework/src/libstreaming/IsoHandler.h

    r390 r391  
    7272        virtual bool stop(); 
    7373         
    74         int iterate() { if(m_handle) return raw1394_loop_iterate(m_handle); else return -1; }
     74        bool iterate()
    7575         
    7676        void setVerboseLevel(int l); 
  • branches/streaming-rework/src/libstreaming/IsoHandlerManager.cpp

    r390 r391  
    4545IsoHandlerManager::IsoHandlerManager() : 
    4646   m_State(E_Created), 
    47    m_poll_timeout(1), m_poll_fds(0), m_poll_nfds(0), 
     47   m_poll_timeout(100), m_poll_fds(0), m_poll_nfds(0), 
    4848   m_realtime(false), m_priority(0) 
    4949{ 
     
    323323                unsigned int packets_per_period=stream->getPacketsPerPeriod(); 
    324324                 
    325 #if 1 
     325#if 0 
    326326                // hardware interrupts occur when one DMA block is full, and the size of one DMA 
    327327                // block = PAGE_SIZE. Setting the max_packet_size makes sure that the HW irq is  
  • branches/streaming-rework/src/libstreaming/StreamProcessor.cpp

    r390 r391  
    5252        , m_is_disabled(true) 
    5353        , m_cycle_to_enable_at(0) 
    54         , m_framecounter(0) 
    5554        , m_SyncSource(NULL) 
    5655        , m_ticks_per_frame(0) 
    5756{ 
     57    // create the timestamped buffer and register ourselves as its client 
     58    m_data_buffer=new FreebobUtil::TimestampedBuffer(this); 
    5859 
    5960} 
    6061 
    6162StreamProcessor::~StreamProcessor() { 
    62  
     63    if (m_data_buffer) delete m_data_buffer; 
    6364} 
    6465 
    6566void StreamProcessor::dumpInfo() 
    6667{ 
    67     int64_t diff=(int64_t)m_buffer_head_timestamp - (int64_t)m_buffer_tail_timestamp; 
    68  
    6968    debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor information\n"); 
    7069    debugOutputShort( DEBUG_LEVEL_NORMAL, "  Iso stream info:\n"); 
     
    7271    IsoStream::dumpInfo(); 
    7372    debugOutputShort( DEBUG_LEVEL_NORMAL, "  StreamProcessor info:\n"); 
    74     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Frame counter         : %d\n", m_framecounter); 
    75     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer head timestamp : %011llu\n",m_buffer_head_timestamp); 
    76     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp); 
    77     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Head - Tail           : %011lld\n",diff); 
    7873    debugOutputShort( DEBUG_LEVEL_NORMAL, "  Now                   : %011u\n",m_handler->getCycleTimerTicks()); 
    7974    debugOutputShort( DEBUG_LEVEL_NORMAL, "  Xruns                 : %d\n", m_xruns); 
     
    8277    debugOutputShort( DEBUG_LEVEL_NORMAL, "   enable status        : %s\n", m_is_disabled ? "No" : "Yes"); 
    8378     
     79    m_data_buffer->dumpInfo(); 
     80     
    8481//     m_PeriodStat.dumpInfo(); 
    8582//     m_PacketStat.dumpInfo(); 
     
    9289    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n"); 
    9390     
    94     pthread_mutex_init(&m_framecounter_lock, NULL); 
    95  
     91    m_data_buffer->init(); 
     92     
    9693    return IsoStream::init(); 
    9794} 
     
    105102    debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting...\n"); 
    106103 
    107     resetFrameCounter(); 
    108  
     104    // reset the event buffer, discard all content 
     105    if (!m_data_buffer->reset()) { 
     106        debugFatal("Could not reset data buffer\n"); 
     107        return false; 
     108    } 
     109     
    109110    resetXrunCounter(); 
    110111 
     
    125126     
    126127bool StreamProcessor::prepareForEnable() { 
    127     int64_t diff=(int64_t)m_buffer_head_timestamp - (int64_t)m_buffer_tail_timestamp; 
    128      
    129128    debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::prepareForEnable for (%p)\n",this); 
    130     debugOutput(DEBUG_LEVEL_VERBOSE," Frame Counter         : %05d\n",m_framecounter); 
    131     debugOutput(DEBUG_LEVEL_VERBOSE," Buffer head timestamp : %011llu\n",m_buffer_head_timestamp); 
    132     debugOutput(DEBUG_LEVEL_VERBOSE," Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp); 
    133     debugOutput(DEBUG_LEVEL_VERBOSE," Head - Tail           : %011lld\n",diff); 
    134129    debugOutput(DEBUG_LEVEL_VERBOSE," Now                   : %011u\n",m_handler->getCycleTimerTicks()); 
     130    m_data_buffer->dumpInfo(); 
    135131    return true; 
    136132} 
    137133 
    138134bool StreamProcessor::prepareForDisable() { 
    139     int64_t diff=(int64_t)m_buffer_head_timestamp - (int64_t)m_buffer_tail_timestamp; 
    140      
    141135    debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::prepareForDisable for (%p)\n",this); 
    142     debugOutput(DEBUG_LEVEL_VERBOSE," Frame Counter         : %05d\n",m_framecounter); 
    143     debugOutput(DEBUG_LEVEL_VERBOSE," Buffer head timestamp : %011llu\n",m_buffer_head_timestamp); 
    144     debugOutput(DEBUG_LEVEL_VERBOSE," Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp); 
    145     debugOutput(DEBUG_LEVEL_VERBOSE," Head - Tail           : %011lld\n",diff); 
    146136    debugOutput(DEBUG_LEVEL_VERBOSE," Now                   : %011u\n",m_handler->getCycleTimerTicks()); 
    147     return true
    148  
     137    m_data_buffer->dumpInfo()
     138    return true; 
    149139} 
    150140 
     
    152142 
    153143        debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n"); 
    154 // TODO: implement 
    155  
     144         
    156145        // init the ports 
    157146         
     
    191180 
    192181        debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Putting %d frames for %llu into frame buffer...\n", nbframes,ts); 
    193         incrementFrameCounter(nbframes, ts); 
     182        m_data_buffer->incrementFrameCounter(nbframes, ts); 
    194183        return true; 
    195184} 
     
    202191 * 
    203192 * @param nbframes the number of frames that are read from the internal buffers 
    204  * @param ts the new timestamp of the 'head' of the buffer, i.e. the first sample 
    205  *           present in the buffer. 
    206193 * @return true if successful 
    207194 */ 
    208 bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) { 
    209  
    210         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Getting %d frames from frame buffer at (%011lld)...\n", nbframes, ts); 
    211         decrementFrameCounter(nbframes, ts); 
     195bool StreamProcessor::getFrames(unsigned int nbframes) { 
     196 
     197        debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Getting %d frames from frame buffer...\n", nbframes); 
     198        m_data_buffer->decrementFrameCounter(nbframes); 
    212199        return true; 
    213200} 
     
    266253 
    267254/** 
    268  * Decrements the frame counter, in a atomic way. This 
    269  * also sets the buffer head timestamp 
    270  * is thread safe. 
    271  */ 
    272 void StreamProcessor::decrementFrameCounter(int nbframes, uint64_t new_timestamp) { 
    273     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer head timestamp for (%p) to %11llu\n", 
    274                 this, new_timestamp); 
    275                  
    276     pthread_mutex_lock(&m_framecounter_lock); 
    277     m_framecounter -= nbframes; 
    278     m_buffer_head_timestamp = new_timestamp; 
    279     pthread_mutex_unlock(&m_framecounter_lock); 
    280 } 
    281  
    282 /** 
    283  * Increments the frame counter, in a atomic way. 
    284  * also sets the buffer tail timestamp 
    285  * This is thread safe. 
    286  */ 
    287 void StreamProcessor::incrementFrameCounter(int nbframes, uint64_t new_timestamp) { 
    288     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer tail timestamp for (%p) to %11llu\n", 
    289                 this, new_timestamp); 
    290      
    291     pthread_mutex_lock(&m_framecounter_lock); 
    292     m_framecounter += nbframes; 
    293     m_buffer_tail_timestamp = new_timestamp; 
    294     pthread_mutex_unlock(&m_framecounter_lock); 
    295      
    296 } 
    297  
    298 /** 
    299  * Sets the buffer tail timestamp (in usecs) 
    300  * This is thread safe. 
    301  */ 
    302 void StreamProcessor::setBufferTailTimestamp(uint64_t new_timestamp) { 
    303     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer tail timestamp for (%p) to %11llu\n", 
    304                 this, new_timestamp); 
    305      
    306     pthread_mutex_lock(&m_framecounter_lock); 
    307     m_buffer_tail_timestamp = new_timestamp; 
    308     pthread_mutex_unlock(&m_framecounter_lock); 
    309 } 
    310  
    311 /** 
    312  * Sets the buffer head timestamp (in usecs) 
    313  * This is thread safe. 
    314  */ 
    315 void StreamProcessor::setBufferHeadTimestamp(uint64_t new_timestamp) { 
    316     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer head timestamp for (%p) to %11llu\n", 
    317                 this, new_timestamp); 
    318  
    319     pthread_mutex_lock(&m_framecounter_lock); 
    320     m_buffer_head_timestamp = new_timestamp; 
    321     pthread_mutex_unlock(&m_framecounter_lock); 
    322 } 
    323  
    324 /** 
    325  * Sets both the buffer head and tail timestamps (in usecs) 
    326  * (avoids multiple mutex lock/unlock's) 
    327  * This is thread safe. 
    328  */ 
    329 void StreamProcessor::setBufferTimestamps(uint64_t new_head, uint64_t new_tail) { 
    330     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer head timestamp for (%p) to %11llu\n", 
    331                 this, new_head); 
    332     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "    and buffer tail timestamp for (%p) to %11llu\n", 
    333                 this, new_tail); 
    334     
    335     pthread_mutex_lock(&m_framecounter_lock); 
    336     m_buffer_head_timestamp = new_head; 
    337     m_buffer_tail_timestamp = new_tail; 
    338     pthread_mutex_unlock(&m_framecounter_lock); 
    339 } 
    340 /** 
    341  * \brief return the timestamp of the first frame in the buffer 
    342  *  
    343  * This function returns the timestamp of the very first sample in 
    344  * the StreamProcessor's buffer. This is useful for slave StreamProcessors  
    345  * to find out what the base for their timestamp generation should 
    346  * be. It also returns the framecounter value for which this timestamp 
    347  * is valid. 
    348  * 
    349  * The system is built in such a way that we assume that the processing 
    350  * of the buffers doesn't take any time. Assume we have a buffer transfer at  
    351  * time T1, meaning that the last sample of this buffer occurs at T1. As  
    352  * processing does not take time, we don't have to add anything to T1. When 
    353  * transferring the processed buffer to the xmit processor, the timestamp 
    354  * of the last sample is still T1. 
    355  * 
    356  * When starting the streams, we don't have any information on this last 
    357  * timestamp. We prefill the buffer at the xmit side, and we should find 
    358  * out what the timestamp for the last sample in the buffer is. If we sync 
    359  * on a receive SP, we know that the last prefilled sample corresponds with 
    360  * the first sample received - 1 sample duration. This is the same as if the last 
    361  * transfer from iso to client would have emptied the receive buffer. 
    362  * 
    363  * 
    364  * @param ts address to store the timestamp in 
    365  * @param fc address to store the associated framecounter in 
    366  */ 
    367 void StreamProcessor::getBufferHeadTimestamp(uint64_t *ts, uint64_t *fc) { 
    368     pthread_mutex_lock(&m_framecounter_lock); 
    369     *fc = m_framecounter; 
    370     *ts = m_buffer_head_timestamp; 
    371     pthread_mutex_unlock(&m_framecounter_lock); 
    372 } 
    373          
    374 /** 
    375  * \brief return the timestamp of the last frame in the buffer 
    376  *  
    377  * This function returns the timestamp of the last frame in 
    378  * the StreamProcessor's buffer. It also returns the framecounter  
    379  * value for which this timestamp is valid. 
    380  * 
    381  * @param ts address to store the timestamp in 
    382  * @param fc address to store the associated framecounter in 
    383  */ 
    384 void StreamProcessor::getBufferTailTimestamp(uint64_t *ts, uint64_t *fc) { 
    385     pthread_mutex_lock(&m_framecounter_lock); 
    386     *fc = m_framecounter; 
    387     *ts = m_buffer_tail_timestamp; 
    388     pthread_mutex_unlock(&m_framecounter_lock); 
    389 } 
    390  
    391 /** 
    392  * Resets the frame counter, in a atomic way. This 
    393  * is thread safe. 
    394  */ 
    395 void StreamProcessor::resetFrameCounter() { 
    396     pthread_mutex_lock(&m_framecounter_lock); 
    397     m_framecounter = 0; 
    398     pthread_mutex_unlock(&m_framecounter_lock); 
    399 } 
    400  
    401 /** 
    402255 * Resets the xrun counter, in a atomic way. This 
    403256 * is thread safe. 
  • branches/streaming-rework/src/libstreaming/StreamProcessor.h

    r390 r391  
    3737 
    3838#include "libutil/StreamStatistics.h" 
     39 
     40#include "libutil/TimestampedBuffer.h" 
    3941 
    4042namespace FreebobStreaming { 
     
    5153*/ 
    5254class StreamProcessor : public IsoStream,  
    53                         public PortManager { 
     55                        public PortManager,  
     56                        public FreebobUtil::TimestampedBufferClient { 
    5457 
    5558    friend class StreamProcessorManager; 
     
    8689 
    8790    virtual bool putFrames(unsigned int nbframes, int64_t ts); ///< transfer the buffer contents from client 
    88     virtual bool getFrames(unsigned int nbframes, int64_t ts); ///< transfer the buffer contents to the client 
     91    virtual bool getFrames(unsigned int nbframes); ///< transfer the buffer contents to the client 
    8992 
    9093    virtual bool reset(); ///< reset the streams & buffers (e.g. after xrun) 
     
    104107    virtual bool prepareForDisable(); 
    105108 
     109public: 
     110    FreebobUtil::TimestampedBuffer *m_data_buffer; 
     111 
    106112protected: 
    107          
    108113 
    109114    void setManager(StreamProcessorManager *manager) {m_manager=manager;}; 
     
    145150         */ 
    146151        virtual bool canClientTransferFrames(unsigned int nframes) {return true;}; 
    147          
    148         int getFrameCounter() {return m_framecounter;}; 
    149      
    150         void decrementFrameCounter(int nbframes, uint64_t new_timestamp); 
    151         void incrementFrameCounter(int nbframes, uint64_t new_timestamp); 
    152         void resetFrameCounter(); 
    153152         
    154153        /** 
     
    188187        uint64_t getTimeNow(); 
    189188         
    190         void getBufferHeadTimestamp(uint64_t *ts, uint64_t *fc); 
    191         void getBufferTailTimestamp(uint64_t *ts, uint64_t *fc); 
    192          
    193         void setBufferTailTimestamp(uint64_t new_timestamp); 
    194         void setBufferHeadTimestamp(uint64_t new_timestamp); 
    195         void setBufferTimestamps(uint64_t new_head, uint64_t new_tail); 
    196          
    197189        bool setSyncSource(StreamProcessor *s); 
    198190        float getTicksPerFrame() {return m_ticks_per_frame;}; 
    199191         
    200         unsigned int getLastCycle() {return m_last_cycle;}; 
    201      
    202     private: 
    203         // the framecounter gives the number of frames in the buffer 
    204         signed int m_framecounter; 
    205          
    206         // the buffer tail timestamp gives the timestamp of the last frame 
    207         // that was put into the buffer 
    208         uint64_t   m_buffer_tail_timestamp; 
    209          
    210         // the buffer head timestamp gives the timestamp of the first frame 
    211         // that was put into the buffer 
    212         uint64_t   m_buffer_head_timestamp; 
     192        int getLastCycle() {return m_last_cycle;}; 
     193     
    213194         
    214195    protected: 
     
    217198        float m_ticks_per_frame; 
    218199         
    219         unsigned int m_last_cycle; 
    220  
    221     private: 
    222         // this mutex protects the access to the framecounter 
    223         // and the buffer head timestamp. 
    224         pthread_mutex_t m_framecounter_lock; 
     200        int m_last_cycle; 
    225201 
    226202}; 
     
    252228 
    253229protected: 
    254  
    255      DECLARE_DEBUG_MODULE; 
     230    bool processWriteBlock(char *data, unsigned int nevents, unsigned int offset) {return true;}; 
     231 
     232    DECLARE_DEBUG_MODULE; 
    256233 
    257234}; 
     
    282259 
    283260protected: 
    284  
    285      DECLARE_DEBUG_MODULE; 
     261    bool processReadBlock(char *data, unsigned int nevents, unsigned int offset) {return true;}; 
     262 
     263    DECLARE_DEBUG_MODULE; 
    286264 
    287265 
  • branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp

    r390 r391  
    392392     
    393393    // FIXME: this should not be in cycles, but in 'time' 
    394     unsigned int enable_at=TICKS_TO_CYCLES(now)+300; 
    395          
     394    unsigned int enable_at=TICKS_TO_CYCLES(now)+2000; 
     395    if (enable_at > 8000) enable_at -= 8000; 
     396 
    396397    debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor...\n"); 
    397398    if (!m_SyncSource->prepareForEnable()) { 
     
    835836        if ((*it)->xrunOccurred()) { 
    836837            debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it); 
     838            (*it)->dumpInfo(); 
    837839        } 
    838840        if (!((*it)->canClientTransferFrames(m_period))) { 
    839841            debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it); 
     842            (*it)->dumpInfo(); 
    840843        } 
    841844#endif 
     
    927930            #endif 
    928931     
    929             if(!(*it)->getFrames(m_period, (int64_t)m_time_of_transfer)) { 
     932            if(!(*it)->getFrames(m_period)) { 
    930933                    debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)", 
    931934                            m_period, m_time_of_transfer,*it); 
  • branches/streaming-rework/src/Makefile.am

    r386 r391  
    4646        libutil/ringbuffer.h libutil/PacketBuffer.h libutil/StreamStatistics.h \ 
    4747        libutil/serialize.h libutil/SystemTimeSource.h libutil/Thread.h libutil/Time.h \ 
    48         libutil/TimeSource.h 
     48        libutil/TimeSource.h libutil/TimestampedBuffer.h 
    4949 
    5050libfreebob_la_SOURCES = \ 
     
    105105        libutil/SystemTimeSource.cpp \ 
    106106        libutil/Time.c \ 
    107         libutil/TimeSource.cpp 
     107        libutil/TimeSource.cpp \ 
     108        libutil/TimestampedBuffer.cpp 
    108109 
    109110libfreebob_la_LDFLAGS =                                 \