Index: /branches/streaming-rework/src/libstreaming/StreamProcessor.h =================================================================== --- /branches/streaming-rework/src/libstreaming/StreamProcessor.h (revision 390) +++ /branches/streaming-rework/src/libstreaming/StreamProcessor.h (revision 391) @@ -37,4 +37,6 @@ #include "libutil/StreamStatistics.h" + +#include "libutil/TimestampedBuffer.h" namespace FreebobStreaming { @@ -51,5 +53,6 @@ */ class StreamProcessor : public IsoStream, - public PortManager { + public PortManager, + public FreebobUtil::TimestampedBufferClient { friend class StreamProcessorManager; @@ -86,5 +89,5 @@ virtual bool putFrames(unsigned int nbframes, int64_t ts); ///< transfer the buffer contents from client - virtual bool getFrames(unsigned int nbframes, int64_t ts); ///< transfer the buffer contents to the client + virtual bool getFrames(unsigned int nbframes); ///< transfer the buffer contents to the client virtual bool reset(); ///< reset the streams & buffers (e.g. after xrun) @@ -104,6 +107,8 @@ virtual bool prepareForDisable(); +public: + FreebobUtil::TimestampedBuffer *m_data_buffer; + protected: - void setManager(StreamProcessorManager *manager) {m_manager=manager;}; @@ -145,10 +150,4 @@ */ virtual bool canClientTransferFrames(unsigned int nframes) {return true;}; - - int getFrameCounter() {return m_framecounter;}; - - void decrementFrameCounter(int nbframes, uint64_t new_timestamp); - void incrementFrameCounter(int nbframes, uint64_t new_timestamp); - void resetFrameCounter(); /** @@ -188,27 +187,9 @@ uint64_t getTimeNow(); - void getBufferHeadTimestamp(uint64_t *ts, uint64_t *fc); - void getBufferTailTimestamp(uint64_t *ts, uint64_t *fc); - - void setBufferTailTimestamp(uint64_t new_timestamp); - void setBufferHeadTimestamp(uint64_t new_timestamp); - void setBufferTimestamps(uint64_t new_head, uint64_t new_tail); - bool setSyncSource(StreamProcessor *s); float getTicksPerFrame() {return m_ticks_per_frame;}; - unsigned int getLastCycle() {return m_last_cycle;}; - - private: - // the framecounter gives the number of frames in the buffer - signed int m_framecounter; - - // the buffer tail timestamp gives the timestamp of the last frame - // that was put into the buffer - uint64_t m_buffer_tail_timestamp; - - // the buffer head timestamp gives the timestamp of the first frame - // that was put into the buffer - uint64_t m_buffer_head_timestamp; + int getLastCycle() {return m_last_cycle;}; + protected: @@ -217,10 +198,5 @@ float m_ticks_per_frame; - unsigned int m_last_cycle; - - private: - // this mutex protects the access to the framecounter - // and the buffer head timestamp. - pthread_mutex_t m_framecounter_lock; + int m_last_cycle; }; @@ -252,6 +228,7 @@ protected: - - DECLARE_DEBUG_MODULE; + bool processWriteBlock(char *data, unsigned int nevents, unsigned int offset) {return true;}; + + DECLARE_DEBUG_MODULE; }; @@ -282,6 +259,7 @@ protected: - - DECLARE_DEBUG_MODULE; + bool processReadBlock(char *data, unsigned int nevents, unsigned int offset) {return true;}; + + DECLARE_DEBUG_MODULE; Index: /branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp =================================================================== --- /branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp (revision 390) +++ /branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp (revision 391) @@ -53,14 +53,12 @@ /* transmit */ AmdtpTransmitStreamProcessor::AmdtpTransmitStreamProcessor(int port, int framerate, int dimension) - : TransmitStreamProcessor(port, framerate), m_dimension(dimension) - , m_last_timestamp(0), m_dbc(0), m_ringbuffer_size_frames(0) - { - + : TransmitStreamProcessor(port, framerate), m_dimension(dimension) + , m_last_timestamp(0), m_dbc(0), m_ringbuffer_size_frames(0) +{ } AmdtpTransmitStreamProcessor::~AmdtpTransmitStreamProcessor() { - freebob_ringbuffer_free(m_event_buffer); - free(m_cluster_buffer); + } @@ -95,4 +93,6 @@ unsigned int nevents=0; + m_last_cycle=cycle; + debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"Xmit handler for cycle %d, (running=%d, enabled=%d,%d)\n", cycle, m_running, m_disabled, m_is_disabled); @@ -124,5 +124,5 @@ uint64_t fc; - getBufferTailTimestamp(&ts_tail, &fc); // thread safe + m_data_buffer->getBufferTailTimestamp(&ts_tail, &fc); // thread safe int64_t timestamp = ts_tail; @@ -134,6 +134,9 @@ // FIXME: test - // substract the receive transfer delay - timestamp -= RECEIVE_PROCESSING_DELAY; +// timestamp -= (uint64_t)(((float)m_handler->getWakeupInterval()) +// * ((float)m_syt_interval) * ticks_per_frame); +// +// // substract the receive transfer delay +// timestamp -= RECEIVE_PROCESSING_DELAY; // this happens if m_buffer_tail_timestamp wraps around while there are @@ -268,6 +271,16 @@ debugOutput(DEBUG_LEVEL_VERBOSE,"Preparing to enable...\n"); - m_SyncSource->getBufferHeadTimestamp(&ts, &fc); // thread safe - + m_SyncSource->m_data_buffer->getBufferHeadTimestamp(&ts, &fc); // thread safe + + // the number of cycles the sync source lags + // or leads (< 0) + int sync_lag_cycles=cycle-m_SyncSource->getLastCycle()-1; + if(sync_lag_cycles > (int)(CYCLES_PER_SECOND/2)) { + sync_lag_cycles -= CYCLES_PER_SECOND/2; + } + if (sync_lag_cycles < -((int)CYCLES_PER_SECOND/2)) { + sync_lag_cycles += CYCLES_PER_SECOND/2; + } + // recalculate the buffer head timestamp float ticks_per_frame=m_SyncSource->getTicksPerFrame(); @@ -278,9 +291,13 @@ // plus one frame ts += (uint64_t)ticks_per_frame; + + // account for the cycle lag between sync SP and this SP + ts += sync_lag_cycles * TICKS_PER_CYCLE; + if (ts >= TICKS_PER_SECOND * 128L) { ts -= TICKS_PER_SECOND * 128L; } - - setBufferHeadTimestamp(ts); + +// m_data_buffer->setBufferHeadTimestamp(ts); int64_t timestamp = ts; @@ -289,5 +306,5 @@ // frames_in_buffer * rate // later - int frames_in_buffer=getFrameCounter(); + int frames_in_buffer=m_data_buffer->getFrameCounter(); timestamp += (int64_t)((float)frames_in_buffer * ticks_per_frame); @@ -302,5 +319,5 @@ } - StreamProcessor::setBufferTailTimestamp(timestamp); + m_data_buffer->setBufferTailTimestamp(timestamp); debugOutput(DEBUG_LEVEL_VERBOSE,"XMIT TS SET: TS=%10lld, TSTMP=%10llu, FC=%4d, %f\n", @@ -349,10 +366,37 @@ *tag = IEC61883_TAG_WITH_CIP; *sy = 0; - - unsigned int read_size=nevents*sizeof(quadlet_t)*m_dimension; - - if ((freebob_ringbuffer_read(m_event_buffer,(char *)(data+8),read_size)) < - read_size) + + if (m_data_buffer->readFrames(nevents, (char *)(data + 8))) { + *length = nevents*sizeof(quadlet_t)*m_dimension + 8; + + // process all ports that should be handled on a per-packet base + // this is MIDI for AMDTP (due to the need of DBC) + if (!encodePacketPorts((quadlet_t *)(data+8), nevents, packet->dbc)) { + debugWarning("Problem encoding Packet Ports\n"); + } + + packet->fdf = m_fdf; + + // convert the timestamp to SYT format + uint64_t ts=timestamp + TRANSMIT_TRANSFER_DELAY; + + // check if it wrapped + if (ts >= TICKS_PER_SECOND * 128L) { + ts -= TICKS_PER_SECOND * 128L; + } + + unsigned int timestamp_SYT = TICKS_TO_SYT(ts); + packet->syt = ntohs(timestamp_SYT); + + // update the frame counter such that it reflects the new value + // done in the SP base class + if (!StreamProcessor::getFrames(nevents)) { + debugError("Could not do StreamProcessor::getFrames(%d)\n",nevents); + return RAW1394_ISO_ERROR; + } + + return RAW1394_ISO_OK; + } else { /* there is no more data in the ringbuffer */ // convert the timestamp to SYT format @@ -392,45 +436,4 @@ return RAW1394_ISO_DEFER; - } else { - *length = read_size + 8; - - // process all ports that should be handled on a per-packet base - // this is MIDI for AMDTP (due to the need of DBC) - if (!encodePacketPorts((quadlet_t *)(data+8), nevents, packet->dbc)) { - debugWarning("Problem encoding Packet Ports\n"); - } - - packet->fdf = m_fdf; - - // convert the timestamp to SYT format - uint64_t ts=timestamp + TRANSMIT_TRANSFER_DELAY; - - // check if it wrapped - if (ts >= TICKS_PER_SECOND * 128L) { - ts -= TICKS_PER_SECOND * 128L; - } - - unsigned int timestamp_SYT = TICKS_TO_SYT(ts); - packet->syt = ntohs(timestamp_SYT); - - // calculate the new buffer head timestamp. this is - // the previous buffer head timestamp plus - // the number of frames sent * ticks_per_frame - timestamp += (int64_t)((float)nevents * ticks_per_frame ); - - // check if it wrapped - if (timestamp >= TICKS_PER_SECOND * 128L) { - timestamp -= TICKS_PER_SECOND * 128L; - } - - // update the frame counter such that it reflects the new value - // also update the buffer head timestamp - // done in the SP base class - if (!StreamProcessor::getFrames(nevents, timestamp)) { - debugError("Could not do StreamProcessor::getFrames(%d, %llu)\n",nevents, timestamp); - return RAW1394_ISO_ERROR; - } - - return RAW1394_ISO_OK; } @@ -470,5 +473,5 @@ uint64_t ts; uint64_t fc; - m_SyncSource->getBufferHeadTimestamp(&ts, &fc); // thread safe + m_SyncSource->m_data_buffer->getBufferHeadTimestamp(&ts, &fc); // thread safe // update the frame counter such that it reflects the buffer content, @@ -476,6 +479,6 @@ // done in the SP base class if (!StreamProcessor::putFrames(m_ringbuffer_size_frames, ts)) { - debugError("Could not do StreamProcessor::putFrames(%d, %0)\n", - m_ringbuffer_size_frames); + debugError("Could not do StreamProcessor::putFrames(%d, %011llu)\n", + m_ringbuffer_size_frames,ts); return false; } @@ -488,7 +491,4 @@ debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting...\n"); - // reset the event buffer, discard all content - freebob_ringbuffer_reset(m_event_buffer); - // reset the statistics m_PeriodStat.reset(); @@ -566,25 +566,22 @@ m_syt_interval); + // prepare the framerate estimate + m_ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_framerate); + // allocate the event buffer m_ringbuffer_size_frames=m_nb_buffers * m_period; - - // prepare the framerate estimate - m_ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_framerate); - + // add the receive processing delay // m_ringbuffer_size_frames+=(uint)(RECEIVE_PROCESSING_DELAY/m_ticks_per_frame); - - if( !(m_event_buffer=freebob_ringbuffer_create( - (m_dimension * m_ringbuffer_size_frames) * sizeof(quadlet_t)))) { - debugFatal("Could not allocate memory event ringbuffer"); - return false; - } - - // allocate the temporary cluster buffer - if( !(m_cluster_buffer=(char *)calloc(m_dimension,sizeof(quadlet_t)))) { - debugFatal("Could not allocate temporary cluster buffer"); - freebob_ringbuffer_free(m_event_buffer); - return false; - } + + assert(m_data_buffer); + m_data_buffer->setBufferSize(m_ringbuffer_size_frames); + m_data_buffer->setEventSize(sizeof(quadlet_t)); + m_data_buffer->setEventsPerFrame(m_dimension); + + m_data_buffer->setUpdatePeriod(m_period); + m_data_buffer->setNominalRate(m_ticks_per_frame); + + m_data_buffer->prepare(); // set the parameters of ports we can: @@ -685,8 +682,9 @@ // prefill the event buffer - if (!prefill()) { - debugFatal("Could not prefill buffers\n"); - return false; - } + // NOTE: do we need to prefill? reset() is called, so everything is prefilled then +// if (!prefill()) { +// debugFatal("Could not prefill buffers\n"); +// return false; +// } debugOutput( DEBUG_LEVEL_VERBOSE, "Prepared for:\n"); @@ -724,116 +722,35 @@ } -bool AmdtpTransmitStreamProcessor::transferSilence(unsigned int size) { - /* a naive implementation would look like this: */ - - unsigned int write_size=size*sizeof(quadlet_t)*m_dimension; - char *dummybuffer=(char *)calloc(sizeof(quadlet_t),size*m_dimension); - transmitSilenceBlock(dummybuffer, size, 0); - - if (freebob_ringbuffer_write(m_event_buffer,(char *)(dummybuffer),write_size) < write_size) { +bool AmdtpTransmitStreamProcessor::transferSilence(unsigned int nframes) { + bool retval; + + char *dummybuffer=(char *)calloc(sizeof(quadlet_t),nframes*m_dimension); + + transmitSilenceBlock(dummybuffer, nframes, 0); + + // add the silence data to the ringbuffer + if(m_data_buffer->writeFrames(nframes, dummybuffer)) { + retval=true; + } else { debugWarning("Could not write to event buffer\n"); - } - + retval=false; + } + free(dummybuffer); - return true; + return retval; } bool AmdtpTransmitStreamProcessor::canClientTransferFrames(unsigned int nbframes) { // there has to be enough space to put the frames in - return m_ringbuffer_size_frames - getFrameCounter() > nbframes; + return m_ringbuffer_size_frames - m_data_buffer->getFrameCounter() > nbframes; } bool AmdtpTransmitStreamProcessor::putFrames(unsigned int nbframes, int64_t ts) { - m_PeriodStat.mark(freebob_ringbuffer_read_space(m_event_buffer)/(4*m_dimension)); - - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n"); - int xrun; - unsigned int offset=0; + m_PeriodStat.mark(m_data_buffer->getBufferFill()); debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "AmdtpTransmitStreamProcessor::putFrames(%d, %llu)\n",nbframes, ts); - freebob_ringbuffer_data_t vec[2]; - // we received one period of frames - // this is period_size*dimension of events - unsigned int events2write=nbframes*m_dimension; - unsigned int bytes2write=events2write*sizeof(quadlet_t); - - /* write events2write bytes to the ringbuffer - * first see if it can be done in one read. - * if so, ok. - * otherwise write up to a multiple of clusters directly to the buffer - * then do the buffer wrap around using ringbuffer_write - * then write the remaining data directly to the buffer in a third pass - * Make sure that we cannot end up on a non-cluster aligned position! - */ - unsigned int cluster_size=m_dimension*sizeof(quadlet_t); - - while(bytes2write>0) { - int byteswritten=0; - - unsigned int frameswritten=(nbframes*cluster_size-bytes2write)/cluster_size; - offset=frameswritten; - - freebob_ringbuffer_get_write_vector(m_event_buffer, vec); - - if(vec[0].len==0) { // this indicates a full event buffer - debugError("XMT: Event buffer overrun in processor %p\n",this); - break; - } - - /* if we don't take care we will get stuck in an infinite loop - * because we align to a cluster boundary later - * the remaining nb of bytes in one write operation can be - * smaller than one cluster - * this can happen because the ringbuffer size is always a power of 2 - */ - if(vec[0].lenvec[0].len) { - // align to a cluster boundary - byteswritten=vec[0].len-(vec[0].len%cluster_size); - } else { - byteswritten=bytes2write; - } - - xrun = transmitBlock(vec[0].buf, - byteswritten/cluster_size, - offset); - - if(xrun<0) { - // xrun detected - debugError("XMT: Frame buffer underrun in processor %p\n",this); - break; // FIXME: return false ? - } - - freebob_ringbuffer_write_advance(m_event_buffer, byteswritten); - bytes2write -= byteswritten; - } - - // the bytes2write should always be cluster aligned - assert(bytes2write%cluster_size==0); - - } + m_data_buffer->blockProcessWriteFrames(nbframes, ts); // recalculate the buffer tail timestamp @@ -878,8 +795,8 @@ */ -int AmdtpTransmitStreamProcessor::transmitBlock(char *data, +bool AmdtpTransmitStreamProcessor::processWriteBlock(char *data, unsigned int nevents, unsigned int offset) { - int problem=0; + bool no_problem=true; for ( PortVectorIterator it = m_PeriodPorts.begin(); @@ -899,5 +816,5 @@ if(encodePortToMBLAEvents(static_cast(*it), (quadlet_t *)data, offset, nevents)) { debugWarning("Could not encode port %s to MBLA events",(*it)->getName().c_str()); - problem=1; + no_problem=false; } break; @@ -908,5 +825,5 @@ } } - return problem; + return no_problem; } @@ -1090,14 +1007,12 @@ : ReceiveStreamProcessor(port, framerate), m_dimension(dimension), m_last_timestamp(0), m_last_timestamp2(0) { - } AmdtpReceiveStreamProcessor::~AmdtpReceiveStreamProcessor() { - freebob_ringbuffer_free(m_event_buffer); - free(m_cluster_buffer); } bool AmdtpReceiveStreamProcessor::init() { + // call the parent init // this has to be done before allocating the buffers, @@ -1117,4 +1032,5 @@ enum raw1394_iso_disposition retval=RAW1394_ISO_OK; + m_last_cycle=cycle; struct iec61883_packet *packet = (struct iec61883_packet *) data; @@ -1144,5 +1060,5 @@ channel, cycle,syt_timestamp, CYCLE_TIMER_GET_CYCLES(syt_timestamp), CYCLE_TIMER_GET_OFFSET(syt_timestamp), - getFrameCounter(), m_is_disabled); + m_data_buffer->getFrameCounter(), m_is_disabled); // reconstruct the full cycle @@ -1199,16 +1115,25 @@ // we have to keep in mind that there are also // some packets buffered by the ISO layer - // at most x=m_handler->getNbBuffers() + // at most x=m_handler->getWakeupInterval() // these contain at most x*syt_interval // frames, meaning that we might receive // this packet x*syt_interval*ticks_per_frame // later than expected (the real receive time) - m_last_timestamp += (uint64_t)(((float)m_handler->getNbBuffers()) - * m_syt_interval * m_ticks_per_frame); + debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"STMP: %lluticks | buff=%d, syt_interval=%d, tpf=%f\n", + m_last_timestamp, m_handler->getWakeupInterval(),m_syt_interval,m_ticks_per_frame); + + m_last_timestamp += (uint64_t)(((float)m_handler->getWakeupInterval()) + * ((float)m_syt_interval) * m_ticks_per_frame); + debugOutput(DEBUG_LEVEL_VERY_VERBOSE," ==> %lluticks\n", m_last_timestamp); // the receive processing delay indicates how much // extra time we need as slack m_last_timestamp += RECEIVE_PROCESSING_DELAY; - + + // wrap if nescessary + if (m_last_timestamp >= TICKS_PER_SECOND * 128L) { + m_last_timestamp -= TICKS_PER_SECOND * 128L; + } + //=> now estimate the device frame rate if (m_last_timestamp2 && m_last_timestamp) { @@ -1271,4 +1196,8 @@ m_is_disabled=false; debugOutput(DEBUG_LEVEL_VERBOSE,"enabling StreamProcessor %p at %d\n", this, cycle); + // the previous timestamp is the one we need to start with + // because we're going to update the buffer again this loop + m_data_buffer->setBufferTailTimestamp(m_last_timestamp2); + } else { debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"will enable StreamProcessor %p at %u, now is %d\n", this, m_cycle_to_enable_at, cycle); @@ -1296,5 +1225,5 @@ } // set the timestamps - StreamProcessor::setBufferTimestamps(ts,ts); + m_data_buffer->setBufferTailTimestamp(ts); return RAW1394_ISO_DEFER; @@ -1302,22 +1231,8 @@ //=> process the packet - unsigned int write_size=nevents*sizeof(quadlet_t)*m_dimension; - // add the data payload to the ringbuffer - if (freebob_ringbuffer_write(m_event_buffer,(char *)(data+8),write_size) < write_size) - { - debugWarning("Receive buffer overrun (cycle %d, FC=%d, PC=%d)\n", - cycle, getFrameCounter(), m_handler->getPacketCount()); - - m_xruns++; - - // disable the processing, will be re-enabled when - // the xrun is handled - m_disabled=true; - m_is_disabled=true; - - retval=RAW1394_ISO_DEFER; - } else { + if(m_data_buffer->writeFrames(nevents, (char *)(data+8))) { retval=RAW1394_ISO_OK; + // process all ports that should be handled on a per-packet base // this is MIDI for AMDTP (due to the need of DBC) @@ -1326,4 +1241,19 @@ retval=RAW1394_ISO_DEFER; } + + } else { + + debugWarning("Receive buffer overrun (cycle %d, FC=%d, PC=%d)\n", + cycle, m_data_buffer->getFrameCounter(), m_handler->getPacketCount()); + + m_xruns++; + + // disable the processing, will be re-enabled when + // the xrun is handled + m_disabled=true; + m_is_disabled=true; + + retval=RAW1394_ISO_DEFER; + } @@ -1434,5 +1364,5 @@ // currently this is in ticks - int64_t fc=getFrameCounter(); + int64_t fc=m_data_buffer->getFrameCounter(); int64_t next_period_boundary = m_last_timestamp; @@ -1483,12 +1413,8 @@ } - bool AmdtpReceiveStreamProcessor::reset() { debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting...\n"); - // reset the event buffer, discard all content - freebob_ringbuffer_reset(m_event_buffer); - m_PeriodStat.reset(); m_PacketStat.reset(); @@ -1562,16 +1488,14 @@ ringbuffer_size_frames+=(uint)(RECEIVE_PROCESSING_DELAY/m_ticks_per_frame); - if( !(m_event_buffer=freebob_ringbuffer_create( - (m_dimension * ringbuffer_size_frames) * sizeof(quadlet_t)))) { - debugFatal("Could not allocate memory event ringbuffer"); - return false; - } - - // allocate the temporary cluster buffer - if( !(m_cluster_buffer=(char *)calloc(m_dimension,sizeof(quadlet_t)))) { - debugFatal("Could not allocate temporary cluster buffer"); - freebob_ringbuffer_free(m_event_buffer); - return false; - } + assert(m_data_buffer); + m_data_buffer->setBufferSize(ringbuffer_size_frames); + m_data_buffer->setEventSize(sizeof(quadlet_t)); + m_data_buffer->setEventsPerFrame(m_dimension); + + // the buffer is written every syt_interval + m_data_buffer->setUpdatePeriod(m_syt_interval); + m_data_buffer->setNominalRate(m_ticks_per_frame); + + m_data_buffer->prepare(); // set the parameters of ports we can: @@ -1671,104 +1595,21 @@ bool AmdtpReceiveStreamProcessor::canClientTransferFrames(unsigned int nbframes) { - return getFrameCounter() >= (int) nbframes; -} - -bool AmdtpReceiveStreamProcessor::getFrames(unsigned int nbframes, int64_t ts) { - - m_PeriodStat.mark(freebob_ringbuffer_read_space(m_event_buffer)/(4*m_dimension)); - - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n"); - - int xrun; - unsigned int offset=0; - - freebob_ringbuffer_data_t vec[2]; - // we received one period of frames on each connection - // this is period_size*dimension of events - - unsigned int events2read=nbframes*m_dimension; - unsigned int bytes2read=events2read*sizeof(quadlet_t); - /* read events2read bytes from the ringbuffer - * first see if it can be done in one read. - * if so, ok. - * otherwise read up to a multiple of clusters directly from the buffer - * then do the buffer wrap around using ringbuffer_read - * then read the remaining data directly from the buffer in a third pass - * Make sure that we cannot end up on a non-cluster aligned position! - */ - unsigned int cluster_size=m_dimension*sizeof(quadlet_t); - - while(bytes2read>0) { - unsigned int framesread=(nbframes*cluster_size-bytes2read)/cluster_size; - offset=framesread; - - int bytesread=0; - - freebob_ringbuffer_get_read_vector(m_event_buffer, vec); - - if(vec[0].len==0) { // this indicates an empty event buffer - debugError("RCV: Event buffer underrun in processor %p\n",this); - break; - } - - /* if we don't take care we will get stuck in an infinite loop - * because we align to a cluster boundary later - * the remaining nb of bytes in one read operation can be smaller than one cluster - * this can happen because the ringbuffer size is always a power of 2 - */ - if(vec[0].lenvec[0].len) { - // align to a cluster boundary - bytesread=vec[0].len-(vec[0].len%cluster_size); - } else { - bytesread=bytes2read; - } - - xrun = receiveBlock(vec[0].buf, bytesread/cluster_size, offset); - - if(xrun<0) { - // xrun detected - debugError("RCV: Frame buffer overrun in processor %p\n",this); - break; - } - - freebob_ringbuffer_read_advance(m_event_buffer, bytesread); - bytes2read -= bytesread; - } - - // the bytes2read should always be cluster aligned - assert(bytes2read%cluster_size==0); - } + return m_data_buffer->getFrameCounter() >= (int) nbframes; +} + +bool AmdtpReceiveStreamProcessor::getFrames(unsigned int nbframes) { + + m_PeriodStat.mark(m_data_buffer->getBufferFill()); + + // ask the buffer to process nbframes of frames + // using it's registered client's processReadBlock(), + // which should be ours + m_data_buffer->blockProcessReadFrames(nbframes); // update the frame counter such that it reflects the new value, - // and also update the buffer head timestamp as we pull frames // done in the SP base class - - // wrap the timestamp if nescessary - if (ts < 0) { - ts += TICKS_PER_SECOND * 128L; - } else if (ts >= TICKS_PER_SECOND * 128L) { - ts -= TICKS_PER_SECOND * 128L; - } - - if (!StreamProcessor::getFrames(nbframes, ts)) { - debugError("Could not do StreamProcessor::getFrames(%d, %llu)\n", nbframes, ts); + + if (!StreamProcessor::getFrames(nbframes)) { + debugError("Could not do StreamProcessor::getFrames(%d)\n", nbframes); return false; } @@ -1780,8 +1621,10 @@ * \brief write received events to the stream ringbuffers. */ -int AmdtpReceiveStreamProcessor::receiveBlock(char *data, +bool AmdtpReceiveStreamProcessor::processReadBlock(char *data, unsigned int nevents, unsigned int offset) { - int problem=0; + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "(%p)->processReadBlock(%u, %u)\n",this,nevents,offset); + + bool no_problem=true; for ( PortVectorIterator it = m_PeriodPorts.begin(); @@ -1801,5 +1644,5 @@ if(decodeMBLAEventsToPort(static_cast(*it), (quadlet_t *)data, offset, nevents)) { debugWarning("Could not decode packet MBLA to port %s",(*it)->getName().c_str()); - problem=1; + no_problem=false; } break; @@ -1813,5 +1656,5 @@ } } - return problem; + return no_problem; } Index: /branches/streaming-rework/src/libstreaming/IsoHandlerManager.cpp =================================================================== --- /branches/streaming-rework/src/libstreaming/IsoHandlerManager.cpp (revision 390) +++ /branches/streaming-rework/src/libstreaming/IsoHandlerManager.cpp (revision 391) @@ -45,5 +45,5 @@ IsoHandlerManager::IsoHandlerManager() : m_State(E_Created), - m_poll_timeout(1), m_poll_fds(0), m_poll_nfds(0), + m_poll_timeout(100), m_poll_fds(0), m_poll_nfds(0), m_realtime(false), m_priority(0) { @@ -323,5 +323,5 @@ unsigned int packets_per_period=stream->getPacketsPerPeriod(); -#if 1 +#if 0 // hardware interrupts occur when one DMA block is full, and the size of one DMA // block = PAGE_SIZE. Setting the max_packet_size makes sure that the HW irq is Index: /branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.h =================================================================== --- /branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.h (revision 386) +++ /branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.h (revision 391) @@ -35,7 +35,7 @@ #include "../debugmodule/debugmodule.h" #include "StreamProcessor.h" + #include "cip.h" #include -#include "libutil/ringbuffer.h" #include @@ -117,9 +117,8 @@ protected: + bool processWriteBlock(char *data, unsigned int nevents, unsigned int offset); struct iec61883_cip m_cip_status; - - freebob_ringbuffer_t * m_event_buffer; - char* m_cluster_buffer; + int m_dimension; unsigned int m_syt_interval; @@ -184,5 +183,5 @@ bool canClientTransferFrames(unsigned int nbframes); - bool getFrames(unsigned int nbframes, int64_t ts); ///< transfer the buffer contents to the client + bool getFrames(unsigned int nbframes); ///< transfer the buffer contents to the client // We have 1 period of samples = m_period @@ -208,11 +207,10 @@ protected: - int receiveBlock(char *data, unsigned int nevents, unsigned int offset); + bool processReadBlock(char *data, unsigned int nevents, unsigned int offset); + bool decodePacketPorts(quadlet_t *data, unsigned int nevents, unsigned int dbc); int decodeMBLAEventsToPort(AmdtpAudioPort *, quadlet_t *data, unsigned int offset, unsigned int nevents); - freebob_ringbuffer_t * m_event_buffer; - char* m_cluster_buffer; int m_dimension; unsigned int m_syt_interval; Index: /branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp =================================================================== --- /branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp (revision 390) +++ /branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp (revision 391) @@ -392,6 +392,7 @@ // FIXME: this should not be in cycles, but in 'time' - unsigned int enable_at=TICKS_TO_CYCLES(now)+300; - + unsigned int enable_at=TICKS_TO_CYCLES(now)+2000; + if (enable_at > 8000) enable_at -= 8000; + debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor...\n"); if (!m_SyncSource->prepareForEnable()) { @@ -835,7 +836,9 @@ if ((*it)->xrunOccurred()) { debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it); + (*it)->dumpInfo(); } if (!((*it)->canClientTransferFrames(m_period))) { debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it); + (*it)->dumpInfo(); } #endif @@ -927,5 +930,5 @@ #endif - if(!(*it)->getFrames(m_period, (int64_t)m_time_of_transfer)) { + if(!(*it)->getFrames(m_period)) { debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)", m_period, m_time_of_transfer,*it); Index: /branches/streaming-rework/src/libstreaming/IsoHandler.cpp =================================================================== --- /branches/streaming-rework/src/libstreaming/IsoHandler.cpp (revision 390) +++ /branches/streaming-rework/src/libstreaming/IsoHandler.cpp (revision 391) @@ -134,4 +134,21 @@ if (m_TimeSource) delete m_TimeSource; +} + +bool IsoHandler::iterate() { + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "IsoHandler (%p) iterate...\n",this); + + if(m_handle) { + if(raw1394_loop_iterate(m_handle)) { + debugOutput( DEBUG_LEVEL_VERBOSE, + "IsoHandler (%p): Failed to iterate handler: %s\n", + this,strerror(errno)); + return false; + } else { + return true; + } + } else { + return false; + } } Index: /branches/streaming-rework/src/libstreaming/StreamProcessor.cpp =================================================================== --- /branches/streaming-rework/src/libstreaming/StreamProcessor.cpp (revision 390) +++ /branches/streaming-rework/src/libstreaming/StreamProcessor.cpp (revision 391) @@ -52,19 +52,18 @@ , m_is_disabled(true) , m_cycle_to_enable_at(0) - , m_framecounter(0) , m_SyncSource(NULL) , m_ticks_per_frame(0) { + // create the timestamped buffer and register ourselves as its client + m_data_buffer=new FreebobUtil::TimestampedBuffer(this); } StreamProcessor::~StreamProcessor() { - + if (m_data_buffer) delete m_data_buffer; } void StreamProcessor::dumpInfo() { - int64_t diff=(int64_t)m_buffer_head_timestamp - (int64_t)m_buffer_tail_timestamp; - debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor information\n"); debugOutputShort( DEBUG_LEVEL_NORMAL, " Iso stream info:\n"); @@ -72,8 +71,4 @@ IsoStream::dumpInfo(); debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor info:\n"); - debugOutputShort( DEBUG_LEVEL_NORMAL, " Frame counter : %d\n", m_framecounter); - debugOutputShort( DEBUG_LEVEL_NORMAL, " Buffer head timestamp : %011llu\n",m_buffer_head_timestamp); - debugOutputShort( DEBUG_LEVEL_NORMAL, " Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp); - debugOutputShort( DEBUG_LEVEL_NORMAL, " Head - Tail : %011lld\n",diff); debugOutputShort( DEBUG_LEVEL_NORMAL, " Now : %011u\n",m_handler->getCycleTimerTicks()); debugOutputShort( DEBUG_LEVEL_NORMAL, " Xruns : %d\n", m_xruns); @@ -82,4 +77,6 @@ debugOutputShort( DEBUG_LEVEL_NORMAL, " enable status : %s\n", m_is_disabled ? "No" : "Yes"); + m_data_buffer->dumpInfo(); + // m_PeriodStat.dumpInfo(); // m_PacketStat.dumpInfo(); @@ -92,6 +89,6 @@ debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n"); - pthread_mutex_init(&m_framecounter_lock, NULL); - + m_data_buffer->init(); + return IsoStream::init(); } @@ -105,6 +102,10 @@ debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting...\n"); - resetFrameCounter(); - + // reset the event buffer, discard all content + if (!m_data_buffer->reset()) { + debugFatal("Could not reset data buffer\n"); + return false; + } + resetXrunCounter(); @@ -125,26 +126,15 @@ bool StreamProcessor::prepareForEnable() { - int64_t diff=(int64_t)m_buffer_head_timestamp - (int64_t)m_buffer_tail_timestamp; - debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::prepareForEnable for (%p)\n",this); - debugOutput(DEBUG_LEVEL_VERBOSE," Frame Counter : %05d\n",m_framecounter); - debugOutput(DEBUG_LEVEL_VERBOSE," Buffer head timestamp : %011llu\n",m_buffer_head_timestamp); - debugOutput(DEBUG_LEVEL_VERBOSE," Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp); - debugOutput(DEBUG_LEVEL_VERBOSE," Head - Tail : %011lld\n",diff); debugOutput(DEBUG_LEVEL_VERBOSE," Now : %011u\n",m_handler->getCycleTimerTicks()); + m_data_buffer->dumpInfo(); return true; } bool StreamProcessor::prepareForDisable() { - int64_t diff=(int64_t)m_buffer_head_timestamp - (int64_t)m_buffer_tail_timestamp; - debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::prepareForDisable for (%p)\n",this); - debugOutput(DEBUG_LEVEL_VERBOSE," Frame Counter : %05d\n",m_framecounter); - debugOutput(DEBUG_LEVEL_VERBOSE," Buffer head timestamp : %011llu\n",m_buffer_head_timestamp); - debugOutput(DEBUG_LEVEL_VERBOSE," Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp); - debugOutput(DEBUG_LEVEL_VERBOSE," Head - Tail : %011lld\n",diff); debugOutput(DEBUG_LEVEL_VERBOSE," Now : %011u\n",m_handler->getCycleTimerTicks()); - return true; - + m_data_buffer->dumpInfo(); + return true; } @@ -152,6 +142,5 @@ debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n"); -// TODO: implement - + // init the ports @@ -191,5 +180,5 @@ debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Putting %d frames for %llu into frame buffer...\n", nbframes,ts); - incrementFrameCounter(nbframes, ts); + m_data_buffer->incrementFrameCounter(nbframes, ts); return true; } @@ -202,12 +191,10 @@ * * @param nbframes the number of frames that are read from the internal buffers - * @param ts the new timestamp of the 'head' of the buffer, i.e. the first sample - * present in the buffer. * @return true if successful */ -bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) { - - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Getting %d frames from frame buffer at (%011lld)...\n", nbframes, ts); - decrementFrameCounter(nbframes, ts); +bool StreamProcessor::getFrames(unsigned int nbframes) { + + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Getting %d frames from frame buffer...\n", nbframes); + m_data_buffer->decrementFrameCounter(nbframes); return true; } @@ -266,138 +253,4 @@ /** - * Decrements the frame counter, in a atomic way. This - * also sets the buffer head timestamp - * is thread safe. - */ -void StreamProcessor::decrementFrameCounter(int nbframes, uint64_t new_timestamp) { - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer head timestamp for (%p) to %11llu\n", - this, new_timestamp); - - pthread_mutex_lock(&m_framecounter_lock); - m_framecounter -= nbframes; - m_buffer_head_timestamp = new_timestamp; - pthread_mutex_unlock(&m_framecounter_lock); -} - -/** - * Increments the frame counter, in a atomic way. - * also sets the buffer tail timestamp - * This is thread safe. - */ -void StreamProcessor::incrementFrameCounter(int nbframes, uint64_t new_timestamp) { - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer tail timestamp for (%p) to %11llu\n", - this, new_timestamp); - - pthread_mutex_lock(&m_framecounter_lock); - m_framecounter += nbframes; - m_buffer_tail_timestamp = new_timestamp; - pthread_mutex_unlock(&m_framecounter_lock); - -} - -/** - * Sets the buffer tail timestamp (in usecs) - * This is thread safe. - */ -void StreamProcessor::setBufferTailTimestamp(uint64_t new_timestamp) { - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer tail timestamp for (%p) to %11llu\n", - this, new_timestamp); - - pthread_mutex_lock(&m_framecounter_lock); - m_buffer_tail_timestamp = new_timestamp; - pthread_mutex_unlock(&m_framecounter_lock); -} - -/** - * Sets the buffer head timestamp (in usecs) - * This is thread safe. - */ -void StreamProcessor::setBufferHeadTimestamp(uint64_t new_timestamp) { - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer head timestamp for (%p) to %11llu\n", - this, new_timestamp); - - pthread_mutex_lock(&m_framecounter_lock); - m_buffer_head_timestamp = new_timestamp; - pthread_mutex_unlock(&m_framecounter_lock); -} - -/** - * Sets both the buffer head and tail timestamps (in usecs) - * (avoids multiple mutex lock/unlock's) - * This is thread safe. - */ -void StreamProcessor::setBufferTimestamps(uint64_t new_head, uint64_t new_tail) { - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer head timestamp for (%p) to %11llu\n", - this, new_head); - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " and buffer tail timestamp for (%p) to %11llu\n", - this, new_tail); - - pthread_mutex_lock(&m_framecounter_lock); - m_buffer_head_timestamp = new_head; - m_buffer_tail_timestamp = new_tail; - pthread_mutex_unlock(&m_framecounter_lock); -} -/** - * \brief return the timestamp of the first frame in the buffer - * - * This function returns the timestamp of the very first sample in - * the StreamProcessor's buffer. This is useful for slave StreamProcessors - * to find out what the base for their timestamp generation should - * be. It also returns the framecounter value for which this timestamp - * is valid. - * - * The system is built in such a way that we assume that the processing - * of the buffers doesn't take any time. Assume we have a buffer transfer at - * time T1, meaning that the last sample of this buffer occurs at T1. As - * processing does not take time, we don't have to add anything to T1. When - * transferring the processed buffer to the xmit processor, the timestamp - * of the last sample is still T1. - * - * When starting the streams, we don't have any information on this last - * timestamp. We prefill the buffer at the xmit side, and we should find - * out what the timestamp for the last sample in the buffer is. If we sync - * on a receive SP, we know that the last prefilled sample corresponds with - * the first sample received - 1 sample duration. This is the same as if the last - * transfer from iso to client would have emptied the receive buffer. - * - * - * @param ts address to store the timestamp in - * @param fc address to store the associated framecounter in - */ -void StreamProcessor::getBufferHeadTimestamp(uint64_t *ts, uint64_t *fc) { - pthread_mutex_lock(&m_framecounter_lock); - *fc = m_framecounter; - *ts = m_buffer_head_timestamp; - pthread_mutex_unlock(&m_framecounter_lock); -} - -/** - * \brief return the timestamp of the last frame in the buffer - * - * This function returns the timestamp of the last frame in - * the StreamProcessor's buffer. It also returns the framecounter - * value for which this timestamp is valid. - * - * @param ts address to store the timestamp in - * @param fc address to store the associated framecounter in - */ -void StreamProcessor::getBufferTailTimestamp(uint64_t *ts, uint64_t *fc) { - pthread_mutex_lock(&m_framecounter_lock); - *fc = m_framecounter; - *ts = m_buffer_tail_timestamp; - pthread_mutex_unlock(&m_framecounter_lock); -} - -/** - * Resets the frame counter, in a atomic way. This - * is thread safe. - */ -void StreamProcessor::resetFrameCounter() { - pthread_mutex_lock(&m_framecounter_lock); - m_framecounter = 0; - pthread_mutex_unlock(&m_framecounter_lock); -} - -/** * Resets the xrun counter, in a atomic way. This * is thread safe. Index: /branches/streaming-rework/src/libstreaming/IsoHandler.h =================================================================== --- /branches/streaming-rework/src/libstreaming/IsoHandler.h (revision 390) +++ /branches/streaming-rework/src/libstreaming/IsoHandler.h (revision 391) @@ -72,5 +72,5 @@ virtual bool stop(); - int iterate() { if(m_handle) return raw1394_loop_iterate(m_handle); else return -1; }; + bool iterate(); void setVerboseLevel(int l); Index: /branches/streaming-rework/src/libutil/TimestampedBuffer.cpp =================================================================== --- /branches/streaming-rework/src/libutil/TimestampedBuffer.cpp (revision 391) +++ /branches/streaming-rework/src/libutil/TimestampedBuffer.cpp (revision 391) @@ -0,0 +1,521 @@ +/* $Id$ */ + +/* + * FreeBob Streaming API + * FreeBob = Firewire (pro-)audio for linux + * + * http://freebob.sf.net + * + * Copyright (C) 2005,2006,2007 Pieter Palmers + * + * This program is free software {} you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation {} either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY {} without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program {} if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + * + * + */ + +#include "libutil/Atomic.h" +#include "libstreaming/cycletimer.h" + +#include "TimestampedBuffer.h" +#include "assert.h" + +namespace FreebobUtil { + +IMPL_DEBUG_MODULE( TimestampedBuffer, TimestampedBuffer, DEBUG_LEVEL_VERBOSE ); + +TimestampedBuffer::TimestampedBuffer(TimestampedBufferClient *c) + : m_event_buffer(NULL), m_cluster_buffer(NULL), + m_event_size(0), m_events_per_frame(0), m_buffer_size(0), + m_bytes_per_frame(0), m_bytes_per_buffer(0), + m_Client(c), m_framecounter(0), m_buffer_tail_timestamp(0), + m_dll_e2(0.0), m_dll_b(0.877), m_dll_c(0.384), + m_nominal_rate(0.0), m_update_period(0) +{ + +} + +TimestampedBuffer::~TimestampedBuffer() { + freebob_ringbuffer_free(m_event_buffer); + free(m_cluster_buffer); +} + +bool TimestampedBuffer::setEventSize(unsigned int s) { + m_event_size=s; + + m_bytes_per_frame=m_event_size*m_events_per_frame; + m_bytes_per_buffer=m_bytes_per_frame*m_buffer_size; + + return true; +} + +bool TimestampedBuffer::setEventsPerFrame(unsigned int s) { + m_events_per_frame=s; + + m_bytes_per_frame=m_event_size*m_events_per_frame; + m_bytes_per_buffer=m_bytes_per_frame*m_buffer_size; + + return true; +} + +bool TimestampedBuffer::setBufferSize(unsigned int s) { + m_buffer_size=s; + + m_bytes_per_frame=m_event_size*m_events_per_frame; + m_bytes_per_buffer=m_bytes_per_frame*m_buffer_size; + + return true; +} + +unsigned int TimestampedBuffer::getBufferFill() { + return freebob_ringbuffer_read_space(m_event_buffer)/(m_bytes_per_frame); +} + +bool TimestampedBuffer::init() { + + pthread_mutex_init(&m_framecounter_lock, NULL); + + return true; +} + +bool TimestampedBuffer::reset() { + freebob_ringbuffer_reset(m_event_buffer); + + resetFrameCounter(); + + return true; +} + +void TimestampedBuffer::dumpInfo() { + + uint64_t ts_head, fc; + getBufferHeadTimestamp(&ts_head,&fc); + + int64_t diff=(int64_t)ts_head - (int64_t)m_buffer_tail_timestamp; + + debugOutputShort( DEBUG_LEVEL_NORMAL, " TimestampedBuffer (%p) info:\n",this); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Frame counter : %d\n", m_framecounter); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Buffer head timestamp : %011llu\n",ts_head); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Head - Tail : %011lld\n",diff); + debugOutputShort( DEBUG_LEVEL_NORMAL, " rate : %lf (%f)\n",m_dll_e2,m_dll_e2/m_update_period); +} + +bool TimestampedBuffer::prepare() { + debugOutput(DEBUG_LEVEL_VERBOSE,"Preparing buffer (%p)\n",this); + debugOutput(DEBUG_LEVEL_VERBOSE," Size=%u events, events/frame=%u, event size=%ubytes\n", + m_buffer_size,m_events_per_frame,m_event_size); + + assert(m_buffer_size); + assert(m_events_per_frame); + assert(m_event_size); + + if( !(m_event_buffer=freebob_ringbuffer_create( + (m_events_per_frame * m_buffer_size) * m_event_size))) { + + debugFatal("Could not allocate memory event ringbuffer\n"); + return false; + } + + // allocate the temporary cluster buffer + if( !(m_cluster_buffer=(char *)calloc(m_events_per_frame,m_event_size))) { + debugFatal("Could not allocate temporary cluster buffer\n"); + freebob_ringbuffer_free(m_event_buffer); + return false; + } + + assert(m_nominal_rate != 0.0); + assert(m_update_period != 0); + + // init the DLL + m_dll_e2=m_nominal_rate * (double)m_update_period; + + m_dll_b=((double)(0.877)); + m_dll_c=((double)(0.384)); + + return true; +} + +bool TimestampedBuffer::writeFrames(unsigned int nevents, char *data) { + + unsigned int write_size=nevents*m_event_size*m_events_per_frame; + + // add the data payload to the ringbuffer + if (freebob_ringbuffer_write(m_event_buffer,data,write_size) < write_size) + { + debugWarning("writeFrames buffer overrun\n"); + return false; + } + return true; + +} + +bool TimestampedBuffer::readFrames(unsigned int nevents, char *data) { + + unsigned int read_size=nevents*m_event_size*m_events_per_frame; + + // get the data payload to the ringbuffer + if ((freebob_ringbuffer_read(m_event_buffer,data,read_size)) < read_size) + { + debugWarning("readFrames buffer underrun\n"); + return false; + } + return true; + +} + +bool TimestampedBuffer::blockProcessWriteFrames(unsigned int nbframes, int64_t ts) { + + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n"); + int xrun; + unsigned int offset=0; + + freebob_ringbuffer_data_t vec[2]; + // we received one period of frames + // this is period_size*dimension of events + unsigned int events2write=nbframes*m_events_per_frame; + unsigned int bytes2write=events2write*m_event_size; + + /* write events2write bytes to the ringbuffer + * first see if it can be done in one read. + * if so, ok. + * otherwise write up to a multiple of clusters directly to the buffer + * then do the buffer wrap around using ringbuffer_write + * then write the remaining data directly to the buffer in a third pass + * Make sure that we cannot end up on a non-cluster aligned position! + */ + unsigned int cluster_size=m_events_per_frame*m_event_size; + + while(bytes2write>0) { + int byteswritten=0; + + unsigned int frameswritten=(nbframes*cluster_size-bytes2write)/cluster_size; + offset=frameswritten; + + freebob_ringbuffer_get_write_vector(m_event_buffer, vec); + + if(vec[0].len==0) { // this indicates a full event buffer + debugError("Event buffer overrun in buffer %p\n",this); + break; + } + + /* if we don't take care we will get stuck in an infinite loop + * because we align to a cluster boundary later + * the remaining nb of bytes in one write operation can be + * smaller than one cluster + * this can happen because the ringbuffer size is always a power of 2 + */ + if(vec[0].lenprocessWriteBlock(m_cluster_buffer, 1, offset); + + if(xrun<0) { + // xrun detected + debugError("Frame buffer underrun in buffer %p\n",this); + return false; + } + + // use the ringbuffer function to write one cluster + // the write function handles the wrap around. + freebob_ringbuffer_write(m_event_buffer, + m_cluster_buffer, + cluster_size); + + // we advanced one cluster_size + bytes2write-=cluster_size; + + } else { // + + if(bytes2write>vec[0].len) { + // align to a cluster boundary + byteswritten=vec[0].len-(vec[0].len%cluster_size); + } else { + byteswritten=bytes2write; + } + + xrun = m_Client->processWriteBlock(vec[0].buf, + byteswritten/cluster_size, + offset); + + if(xrun<0) { + // xrun detected + debugError("Frame buffer underrun in buffer %p\n",this); + return false; // FIXME: return false ? + } + + freebob_ringbuffer_write_advance(m_event_buffer, byteswritten); + bytes2write -= byteswritten; + } + + // the bytes2write should always be cluster aligned + assert(bytes2write%cluster_size==0); + + } + + return true; + +} + +bool TimestampedBuffer::blockProcessReadFrames(unsigned int nbframes) { + + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Reading %u from buffer (%p)...\n", nbframes, this); + + int xrun; + unsigned int offset=0; + + freebob_ringbuffer_data_t vec[2]; + // we received one period of frames on each connection + // this is period_size*dimension of events + + unsigned int events2read=nbframes*m_events_per_frame; + unsigned int bytes2read=events2read*m_event_size; + /* read events2read bytes from the ringbuffer + * first see if it can be done in one read. + * if so, ok. + * otherwise read up to a multiple of clusters directly from the buffer + * then do the buffer wrap around using ringbuffer_read + * then read the remaining data directly from the buffer in a third pass + * Make sure that we cannot end up on a non-cluster aligned position! + */ + unsigned int cluster_size=m_events_per_frame*m_event_size; + + while(bytes2read>0) { + unsigned int framesread=(nbframes*cluster_size-bytes2read)/cluster_size; + offset=framesread; + + int bytesread=0; + + freebob_ringbuffer_get_read_vector(m_event_buffer, vec); + + if(vec[0].len==0) { // this indicates an empty event buffer + debugError("RCV: Event buffer underrun in processor %p\n",this); + return false; + } + + /* if we don't take care we will get stuck in an infinite loop + * because we align to a cluster boundary later + * the remaining nb of bytes in one read operation can be smaller than one cluster + * this can happen because the ringbuffer size is always a power of 2 + */ + if(vec[0].lenprocessReadBlock(m_cluster_buffer, 1, offset); + + if(xrun<0) { + // xrun detected + debugError("RCV: Frame buffer overrun in processor %p\n",this); + return false; + } + + // we advanced one cluster_size + bytes2read-=cluster_size; + + } else { // + + if(bytes2read>vec[0].len) { + // align to a cluster boundary + bytesread=vec[0].len-(vec[0].len%cluster_size); + } else { + bytesread=bytes2read; + } + + assert(m_Client); + xrun = m_Client->processReadBlock(vec[0].buf, bytesread/cluster_size, offset); + + if(xrun<0) { + // xrun detected + debugError("RCV: Frame buffer overrun in processor %p\n",this); + return false; + } + + freebob_ringbuffer_read_advance(m_event_buffer, bytesread); + bytes2read -= bytesread; + } + + // the bytes2read should always be cluster aligned + assert(bytes2read%cluster_size==0); + } + + return true; +} + +/** + * Decrements the frame counter, in a atomic way. + * is thread safe. + */ +void TimestampedBuffer::decrementFrameCounter(int nbframes) { + pthread_mutex_lock(&m_framecounter_lock); + m_framecounter -= nbframes; + pthread_mutex_unlock(&m_framecounter_lock); +} + +/** + * Increments the frame counter, in a atomic way. + * also sets the buffer tail timestamp + * This is thread safe. + */ +void TimestampedBuffer::incrementFrameCounter(int nbframes, uint64_t new_timestamp) { + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer tail timestamp for (%p) to %11llu\n", + this, new_timestamp); + + pthread_mutex_lock(&m_framecounter_lock); + m_framecounter += nbframes; + + // update the DLL + int64_t diff = (int64_t)new_timestamp-(int64_t)m_buffer_next_tail_timestamp; + + // idea to implement it for nbframes values that differ from m_update_period: + // diff = diff * nbframes/m_update_period + // m_buffer_next_tail_timestamp = m_buffer_tail_timestamp + diff + + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p): diff=%lld ", + this, diff); + + // the maximal difference we can allow (64secs) + const int64_t max=TICKS_PER_SECOND*64L; + + if(diff > max) { + diff -= TICKS_PER_SECOND*128L; + } else if (diff < -max) { + diff += TICKS_PER_SECOND*128L; + } + + double err=diff; + + debugOutputShort(DEBUG_LEVEL_VERY_VERBOSE, "diff2=%lld err=%f\n", + diff, err); + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "FC=%10u, TS=%011llu\n",m_framecounter, m_buffer_tail_timestamp); + + m_buffer_tail_timestamp=m_buffer_next_tail_timestamp; + m_buffer_next_tail_timestamp += (uint64_t)(m_dll_b * err + m_dll_e2); + + if (m_buffer_next_tail_timestamp > TICKS_PER_SECOND*128L) { + m_buffer_next_tail_timestamp -= TICKS_PER_SECOND*128L; + } + + m_dll_e2 += m_dll_c*err; + + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "TS=%011llu, NTS=%011llu, DLLe2=%f\n", + m_buffer_tail_timestamp, m_buffer_next_tail_timestamp, m_dll_e2); + + pthread_mutex_unlock(&m_framecounter_lock); + + // this DLL allows the calculation of any sample timestamp relative to the buffer tail, + // to the next period and beyond (through extrapolation) + // + // ts(x) = m_buffer_tail_timestamp + + // (m_buffer_next_tail_timestamp - m_buffer_tail_timestamp)/(samples_between_updates)*x + +} + +/** + * Sets the buffer tail timestamp (in usecs) + * This is thread safe. + */ +void TimestampedBuffer::setBufferTailTimestamp(uint64_t new_timestamp) { + + pthread_mutex_lock(&m_framecounter_lock); + + m_buffer_tail_timestamp = new_timestamp; + + m_dll_e2=m_update_period * m_nominal_rate; + m_buffer_next_tail_timestamp = (uint64_t)((double)m_buffer_tail_timestamp + m_dll_e2); + + pthread_mutex_unlock(&m_framecounter_lock); + + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Set buffer tail timestamp for (%p) to %11llu, NTS=%llu, DLL2=%f\n", + this, new_timestamp, m_buffer_next_tail_timestamp, m_dll_e2); + +} + +/** + * \brief return the timestamp of the first frame in the buffer + * + * This function returns the timestamp of the very first sample in + * the StreamProcessor's buffer. This is useful for slave StreamProcessors + * to find out what the base for their timestamp generation should + * be. It also returns the framecounter value for which this timestamp + * is valid. + * + * The system is built in such a way that we assume that the processing + * of the buffers doesn't take any time. Assume we have a buffer transfer at + * time T1, meaning that the last sample of this buffer occurs at T1. As + * processing does not take time, we don't have to add anything to T1. When + * transferring the processed buffer to the xmit processor, the timestamp + * of the last sample is still T1. + * + * When starting the streams, we don't have any information on this last + * timestamp. We prefill the buffer at the xmit side, and we should find + * out what the timestamp for the last sample in the buffer is. If we sync + * on a receive SP, we know that the last prefilled sample corresponds with + * the first sample received - 1 sample duration. This is the same as if the last + * transfer from iso to client would have emptied the receive buffer. + * + * + * @param ts address to store the timestamp in + * @param fc address to store the associated framecounter in + */ +void TimestampedBuffer::getBufferHeadTimestamp(uint64_t *ts, uint64_t *fc) { + double rate=(double)m_buffer_next_tail_timestamp - (double)m_buffer_tail_timestamp; + rate /= (double)m_update_period; + + pthread_mutex_lock(&m_framecounter_lock); + *fc = m_framecounter; + + // ts(x) = m_buffer_tail_timestamp + + // (m_buffer_next_tail_timestamp - m_buffer_tail_timestamp)/(samples_between_updates)*x + + *ts=m_buffer_tail_timestamp + (uint64_t)(m_framecounter * rate); + + pthread_mutex_unlock(&m_framecounter_lock); + if(*ts > TICKS_PER_SECOND*128L) { + *ts -= TICKS_PER_SECOND*128L; + } +} + +/** + * \brief return the timestamp of the last frame in the buffer + * + * This function returns the timestamp of the last frame in + * the StreamProcessor's buffer. It also returns the framecounter + * value for which this timestamp is valid. + * + * @param ts address to store the timestamp in + * @param fc address to store the associated framecounter in + */ +void TimestampedBuffer::getBufferTailTimestamp(uint64_t *ts, uint64_t *fc) { + pthread_mutex_lock(&m_framecounter_lock); + *fc = m_framecounter; + *ts = m_buffer_tail_timestamp; + pthread_mutex_unlock(&m_framecounter_lock); +} + +/** + * Resets the frame counter, in a atomic way. This + * is thread safe. + */ +void TimestampedBuffer::resetFrameCounter() { + pthread_mutex_lock(&m_framecounter_lock); + m_framecounter = 0; + pthread_mutex_unlock(&m_framecounter_lock); +} + + +} // end of namespace FreebobUtil Index: /branches/streaming-rework/src/libutil/TimestampedBuffer.h =================================================================== --- /branches/streaming-rework/src/libutil/TimestampedBuffer.h (revision 391) +++ /branches/streaming-rework/src/libutil/TimestampedBuffer.h (revision 391) @@ -0,0 +1,137 @@ +/* $Id$ */ + +/* + * FreeBob Streaming API + * FreeBob = Firewire (pro-)audio for linux + * + * http://freebob.sf.net + * + * Copyright (C) 2005,2006,2007 Pieter Palmers + * + * This program is free software {} you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation {} either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY {} without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program {} if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + * + * + */ +#ifndef __FREEBOB_TIMESTAMPEDBUFFER__ +#define __FREEBOB_TIMESTAMPEDBUFFER__ + +#include "../debugmodule/debugmodule.h" +#include "libutil/ringbuffer.h" + +namespace FreebobUtil { + +class TimestampedBufferClient; + +class TimestampedBuffer { + +public: + + TimestampedBuffer(TimestampedBufferClient *); + virtual ~TimestampedBuffer(); + + bool writeFrames(unsigned int nbframes, char *data); + bool readFrames(unsigned int nbframes, char *data); + + bool blockProcessWriteFrames(unsigned int nbframes, int64_t ts); + bool blockProcessReadFrames(unsigned int nbframes); + + bool init(); + bool prepare(); + bool reset(); + + bool setEventSize(unsigned int s); + bool setEventsPerFrame(unsigned int s); + bool setBufferSize(unsigned int s); + + unsigned int getBufferFill(); + + // timestamp stuff + int getFrameCounter() {return m_framecounter;}; + + void decrementFrameCounter(int nbframes); + void incrementFrameCounter(int nbframes, uint64_t new_timestamp); + void resetFrameCounter(); + + void getBufferHeadTimestamp(uint64_t *ts, uint64_t *fc); + void getBufferTailTimestamp(uint64_t *ts, uint64_t *fc); + + void setBufferTailTimestamp(uint64_t new_timestamp); + + // dll stuff + void setNominalRate(double r) {m_nominal_rate=r;}; + double getRate() {return m_dll_e2;}; + + void setUpdatePeriod(unsigned int t) {m_update_period=t;}; + + // misc stuff + void dumpInfo(); + + +protected: + + freebob_ringbuffer_t * m_event_buffer; + char* m_cluster_buffer; + + unsigned int m_event_size; // the size of one event + unsigned int m_events_per_frame; // the number of events in a frame + unsigned int m_buffer_size; // the number of frames in the buffer + unsigned int m_bytes_per_frame; + unsigned int m_bytes_per_buffer; + + TimestampedBufferClient *m_Client; + + DECLARE_DEBUG_MODULE; + +private: + // the framecounter gives the number of frames in the buffer + signed int m_framecounter; + + // the buffer tail timestamp gives the timestamp of the last frame + // that was put into the buffer + uint64_t m_buffer_tail_timestamp; + uint64_t m_buffer_next_tail_timestamp; + + // the buffer head timestamp gives the timestamp of the first frame + // that was put into the buffer +// uint64_t m_buffer_head_timestamp; + // this mutex protects the access to the framecounter + // and the buffer head timestamp. + pthread_mutex_t m_framecounter_lock; + + // tracking DLL variables + double m_dll_e2; + double m_dll_b; + double m_dll_c; + + double m_nominal_rate; + unsigned int m_update_period; +}; + +class TimestampedBufferClient { + public: + TimestampedBufferClient() {}; + virtual ~TimestampedBufferClient() {}; + + virtual bool processReadBlock(char *data, unsigned int nevents, unsigned int offset)=0; + virtual bool processWriteBlock(char *data, unsigned int nevents, unsigned int offset)=0; + +}; + +} // end of namespace FreebobUtil + +#endif /* __FREEBOB_TIMESTAMPEDBUFFER__ */ + + Index: /branches/streaming-rework/src/Makefile.am =================================================================== --- /branches/streaming-rework/src/Makefile.am (revision 386) +++ /branches/streaming-rework/src/Makefile.am (revision 391) @@ -46,5 +46,5 @@ libutil/ringbuffer.h libutil/PacketBuffer.h libutil/StreamStatistics.h \ libutil/serialize.h libutil/SystemTimeSource.h libutil/Thread.h libutil/Time.h \ - libutil/TimeSource.h + libutil/TimeSource.h libutil/TimestampedBuffer.h libfreebob_la_SOURCES = \ @@ -105,5 +105,6 @@ libutil/SystemTimeSource.cpp \ libutil/Time.c \ - libutil/TimeSource.cpp + libutil/TimeSource.cpp \ + libutil/TimestampedBuffer.cpp libfreebob_la_LDFLAGS = \