Index: /branches/streaming-rework/tests/test-timestampedbuffer.cpp =================================================================== --- /branches/streaming-rework/tests/test-timestampedbuffer.cpp (revision 392) +++ /branches/streaming-rework/tests/test-timestampedbuffer.cpp (revision 392) @@ -0,0 +1,343 @@ +/*************************************************************************** +Copyright (C) 2007 by Pieter Palmers * + * +This program is free software; you can redistribute it and/or modify * +it under the terms of the GNU General Public License as published by * +the Free Software Foundation; either version 2 of the License, or * +(at your option) any later version. * + * +This program is distributed in the hope that it will be useful, * +but WITHOUT ANY WARRANTY; without even the implied warranty of * +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * +GNU General Public License for more details. * + * +You should have received a copy of the GNU General Public License * +along with this program; if not, write to the * +Free Software Foundation, Inc., * +59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * +***************************************************************************/ + +#ifdef HAVE_CONFIG_H +#include +#endif + +#include +#include +#include +#include +#include + +#include +#include "src/debugmodule/debugmodule.h" + +#include + +#include "src/libstreaming/cycletimer.h" + +#include "src/libutil/TimestampedBuffer.h" + +#include + +using namespace FreebobUtil; + +class TimestampedBufferTestClient + : public TimestampedBufferClient { +public: + bool processReadBlock(char *data, unsigned int nevents, unsigned int offset) {return true;}; + bool processWriteBlock(char *data, unsigned int nevents, unsigned int offset) {return true;}; + + void setVerboseLevel(int l) {setDebugLevel(l);}; +private: + DECLARE_DEBUG_MODULE; +}; + +IMPL_DEBUG_MODULE( TimestampedBufferTestClient, TimestampedBufferTestClient, DEBUG_LEVEL_VERBOSE ); + +DECLARE_GLOBAL_DEBUG_MODULE; + +int run; +// Program documentation. +static char doc[] = "FreeBoB -- Timestamped buffer test\n\n"; + +// A description of the arguments we accept. +static char args_doc[] = ""; + + +struct arguments +{ + short verbose; + uint64_t wrap_at; + uint64_t frames_per_packet; + uint64_t events_per_frame; + float rate; + uint64_t total_cycles; + uint64_t buffersize; +}; + +// The options we understand. +static struct argp_option options[] = { + {"verbose", 'v', "n", 0, "Verbose level" }, + {"wrap", 'w', "n", 0, "Wrap at (ticks) (3072000)" }, + {"fpp", 'f', "n", 0, "Frames per packet (8)" }, + {"epf", 'e', "n", 0, "Events per frame (10)" }, + {"rate", 'r', "n", 0, "Rate (ticks/frame) (512.0)" }, + {"cycles", 'c', "n", 0, "Total cycles to run (2000)" }, + {"buffersize", 'b', "n", 0, "Buffer size (in frames) (1024)" }, + { 0 } +}; + +//------------------------------------------------------------- + +// Parse a single option. +static error_t +parse_opt( int key, char* arg, struct argp_state* state ) +{ + // Get the input argument from `argp_parse', which we + // know is a pointer to our arguments structure. + struct arguments* arguments = ( struct arguments* ) state->input; + char* tail; + + switch (key) { + case 'v': + if (arg) { + arguments->verbose = strtol( arg, &tail, 0 ); + if ( errno ) { + fprintf( stderr, "Could not parse 'verbose' argument\n" ); + return ARGP_ERR_UNKNOWN; + } + } else { + if ( errno ) { + fprintf( stderr, "Could not parse 'verbose' argument\n" ); + return ARGP_ERR_UNKNOWN; + } + } + break; + case 'w': + if (arg) { + arguments->wrap_at = strtol( arg, &tail, 0 ); + if ( errno ) { + fprintf( stderr, "Could not parse 'wrap' argument\n" ); + return ARGP_ERR_UNKNOWN; + } + } else { + if ( errno ) { + fprintf( stderr, "Could not parse 'wrap' argument\n" ); + return ARGP_ERR_UNKNOWN; + } + } + break; + case 'f': + if (arg) { + arguments->frames_per_packet = strtol( arg, &tail, 0 ); + if ( errno ) { + fprintf( stderr, "Could not parse 'fpp' argument\n" ); + return ARGP_ERR_UNKNOWN; + } + } else { + if ( errno ) { + fprintf( stderr, "Could not parse 'fpp' argument\n" ); + return ARGP_ERR_UNKNOWN; + } + } + break; + case 'e': + if (arg) { + arguments->events_per_frame = strtol( arg, &tail, 0 ); + if ( errno ) { + fprintf( stderr, "Could not parse 'epf' argument\n" ); + return ARGP_ERR_UNKNOWN; + } + } else { + if ( errno ) { + fprintf( stderr, "Could not parse 'epf' argument\n" ); + return ARGP_ERR_UNKNOWN; + } + } + break; + case 'c': + if (arg) { + arguments->total_cycles = strtol( arg, &tail, 0 ); + if ( errno ) { + fprintf( stderr, "Could not parse 'cycles' argument\n" ); + return ARGP_ERR_UNKNOWN; + } + } else { + if ( errno ) { + fprintf( stderr, "Could not parse 'cycles' argument\n" ); + return ARGP_ERR_UNKNOWN; + } + } + break; + case 'b': + if (arg) { + arguments->buffersize = strtol( arg, &tail, 0 ); + if ( errno ) { + fprintf( stderr, "Could not parse 'buffersize' argument\n" ); + return ARGP_ERR_UNKNOWN; + } + } else { + if ( errno ) { + fprintf( stderr, "Could not parse 'buffersize' argument\n" ); + return ARGP_ERR_UNKNOWN; + } + } + break; + case 'r': + if (arg) { + arguments->rate = strtof( arg, &tail ); + if ( errno ) { + fprintf( stderr, "Could not parse 'rate' argument\n" ); + return ARGP_ERR_UNKNOWN; + } + } else { + if ( errno ) { + fprintf( stderr, "Could not parse 'rate' argument\n" ); + return ARGP_ERR_UNKNOWN; + } + } + break; + default: + return ARGP_ERR_UNKNOWN; + } + return 0; +} + +// Our argp parser. +static struct argp argp = { options, parse_opt, args_doc, doc }; + + +static void sighandler (int sig) +{ + run = 0; +} + +int main(int argc, char *argv[]) +{ + + TimestampedBuffer *t=NULL; + TimestampedBufferTestClient *c=NULL; + + struct arguments arguments; + + // Default values. + arguments.verbose = 0; + arguments.wrap_at = 3072000LLU; // 1000 cycles + arguments.frames_per_packet = 8; + arguments.events_per_frame = 10; + arguments.rate = 512.0; + arguments.total_cycles = 2000; + arguments.buffersize = 1024; + + // Parse our arguments; every option seen by `parse_opt' will + // be reflected in `arguments'. + if ( argp_parse ( &argp, argc, argv, 0, 0, &arguments ) ) { + fprintf( stderr, "Could not parse command line\n" ); + exit(1); + } + + setDebugLevel(arguments.verbose); + + run=1; + + signal (SIGINT, sighandler); + signal (SIGPIPE, sighandler); + + c=new TimestampedBufferTestClient(); + + if(!c) { + debugOutput(DEBUG_LEVEL_NORMAL, "Could not create TimestampedBufferTestClient\n"); + exit(1); + } + c->setVerboseLevel(arguments.verbose); + + t=new TimestampedBuffer(c); + + if(!t) { + debugOutput(DEBUG_LEVEL_NORMAL, "Could not create TimestampedBuffer\n"); + delete c; + exit(1); + } + t->setVerboseLevel(arguments.verbose); + + t->init(); + + // Setup the buffer + t->setBufferSize(arguments.buffersize); + t->setEventSize(sizeof(int)); + t->setEventsPerFrame(arguments.events_per_frame); + + t->setUpdatePeriod(arguments.frames_per_packet); + t->setNominalRate(arguments.rate); + + t->setWrapValue(arguments.wrap_at); + + t->prepare(); + + usleep(1000); + + int dummyframe_in[arguments.events_per_frame*arguments.frames_per_packet]; + int dummyframe_out[arguments.events_per_frame*arguments.frames_per_packet]; + + for (unsigned int i=0;isetBufferTailTimestamp(timestamp); + timestamp += (uint64_t)(arguments.rate * arguments.frames_per_packet); + + uint64_t time=0; + + for(unsigned int cycle=0;cycle < arguments.total_cycles; cycle++) { + // simulate the rate adaptation + int64_t diff=time-timestamp; + + if (diff>(int64_t)arguments.wrap_at/2) { + diff -= arguments.wrap_at; + } else if (diff<(-(int64_t)arguments.wrap_at)/2){ + diff += arguments.wrap_at; + } + + debugOutput(DEBUG_LEVEL_NORMAL, "Simulating cycle %d @ time=%011llu, diff=%lld\n",cycle,time,diff); + + if(diff>0) { + // write one packet + t->writeFrames(arguments.frames_per_packet, (char *)&dummyframe_in, timestamp); + + // read one packet + t->readFrames(arguments.frames_per_packet, (char *)&dummyframe_out); + + // check + bool pass=true; + for (unsigned int i=0;i= arguments.wrap_at) { + timestamp -= arguments.wrap_at; + } + } + + // simulate the cycle timer clock in ticks + time += 3072; + if (time >= arguments.wrap_at) { + time -= arguments.wrap_at; + } + + // allow for the messagebuffer thread to catch up + usleep(200); + + } + + delete t; + delete c; + + return EXIT_SUCCESS; +} + + Index: /branches/streaming-rework/tests/test-sytmonitor.cpp =================================================================== --- /branches/streaming-rework/tests/test-sytmonitor.cpp (revision 390) +++ /branches/streaming-rework/tests/test-sytmonitor.cpp (revision 392) @@ -1,4 +1,4 @@ /*************************************************************************** -Copyright (C) 2005 by Pieter Palmers * +Copyright (C) 2007 by Pieter Palmers * * This program is free software; you can redistribute it and/or modify * @@ -41,5 +41,5 @@ #include "src/libutil/SystemTimeSource.h" -#include "pthread.h" +#include using namespace FreebobStreaming; Index: /branches/streaming-rework/tests/Makefile.am =================================================================== --- /branches/streaming-rework/tests/Makefile.am (revision 384) +++ /branches/streaming-rework/tests/Makefile.am (revision 392) @@ -22,5 +22,6 @@ noinst_PROGRAMS = test-freebob test-extplugcmd test-fw410 freebob-server \ - test-volume test-mixer test-cycletimer test-sytmonitor + test-volume test-mixer test-cycletimer test-sytmonitor \ + test-timestampedbuffer noinst_HEADERS = @@ -49,5 +50,4 @@ $(LIBAVC1394_LIBS) $(LIBIEC61883_LIBS) -lrom1394 - #TESTS_ENVIRONMENT TEST = test-freebob @@ -61,2 +61,6 @@ test_sytmonitor_SOURCES = test-sytmonitor.cpp SytMonitor.cpp \ SytMonitor.h + +test_timestampedbuffer_LDADD = $(top_builddir)/src/libfreebob.la $(LIBIEC61883_LIBS) \ + $(LIBRAW1394_LIBS) $(LIBAVC1394_LIBS) +test_timestampedbuffer_SOURCES = test-timestampedbuffer.cpp Index: /branches/streaming-rework/tests/SytMonitor.h =================================================================== --- /branches/streaming-rework/tests/SytMonitor.h (revision 386) +++ /branches/streaming-rework/tests/SytMonitor.h (revision 392) @@ -7,5 +7,5 @@ * http://freebob.sf.net * - * Copyright (C) 2005,2006 Pieter Palmers + * Copyright (C) 2007 Pieter Palmers * * This program is free software {} you can redistribute it and/or modify Index: /branches/streaming-rework/src/libstreaming/StreamProcessor.h =================================================================== --- /branches/streaming-rework/src/libstreaming/StreamProcessor.h (revision 391) +++ /branches/streaming-rework/src/libstreaming/StreamProcessor.h (revision 392) @@ -88,6 +88,6 @@ bool isEnabled() {return !m_is_disabled;}; - virtual bool putFrames(unsigned int nbframes, int64_t ts); ///< transfer the buffer contents from client - virtual bool getFrames(unsigned int nbframes); ///< transfer the buffer contents to the client + virtual bool putFrames(unsigned int nbframes, int64_t ts) = 0; ///< transfer the buffer contents from client + virtual bool getFrames(unsigned int nbframes) = 0; ///< transfer the buffer contents to the client virtual bool reset(); ///< reset the streams & buffers (e.g. after xrun) @@ -175,5 +175,5 @@ */ virtual uint64_t getTimeAtPeriodUsecs() = 0; - + /** * \brief return the time of the next period boundary (in internal units) @@ -184,18 +184,17 @@ */ virtual uint64_t getTimeAtPeriod() = 0; - + uint64_t getTimeNow(); - + bool setSyncSource(StreamProcessor *s); float getTicksPerFrame() {return m_ticks_per_frame;}; - + int getLastCycle() {return m_last_cycle;}; - - + protected: StreamProcessor *m_SyncSource; - + float m_ticks_per_frame; - + int m_last_cycle; @@ -221,6 +220,7 @@ int cycle, unsigned int dropped, unsigned int max_length) {return RAW1394_ISO_STOP;}; - - virtual enum raw1394_iso_disposition putPacket(unsigned char *data, unsigned int length, + virtual bool putFrames(unsigned int nbframes, int64_t ts) {return false;}; + + virtual enum raw1394_iso_disposition putPacket(unsigned char *data, unsigned int length, unsigned char channel, unsigned char tag, unsigned char sy, unsigned int cycle, unsigned int dropped) = 0; @@ -251,5 +251,6 @@ unsigned char channel, unsigned char tag, unsigned char sy, unsigned int cycle, unsigned int dropped) {return RAW1394_ISO_STOP;}; - + virtual bool getFrames(unsigned int nbframes) {return false;}; + virtual enum raw1394_iso_disposition getPacket(unsigned char *data, unsigned int *length, Index: /branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp =================================================================== --- /branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp (revision 391) +++ /branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp (revision 392) @@ -392,8 +392,8 @@ // update the frame counter such that it reflects the new value // done in the SP base class - if (!StreamProcessor::getFrames(nevents)) { - debugError("Could not do StreamProcessor::getFrames(%d)\n",nevents); - return RAW1394_ISO_ERROR; - } +// if (!StreamProcessor::getFrames(nevents)) { +// debugError("Could not do StreamProcessor::getFrames(%d)\n",nevents); +// return RAW1394_ISO_ERROR; +// } return RAW1394_ISO_OK; @@ -471,16 +471,16 @@ // this base timestamp is the timestamp of the // last buffer transfer. - uint64_t ts; - uint64_t fc; - m_SyncSource->m_data_buffer->getBufferHeadTimestamp(&ts, &fc); // thread safe +// uint64_t ts; +// uint64_t fc; +// m_SyncSource->m_data_buffer->getBufferHeadTimestamp(&ts, &fc); // thread safe // update the frame counter such that it reflects the buffer content, // the buffer tail timestamp is initialized when the SP is enabled // done in the SP base class - if (!StreamProcessor::putFrames(m_ringbuffer_size_frames, ts)) { - debugError("Could not do StreamProcessor::putFrames(%d, %011llu)\n", - m_ringbuffer_size_frames,ts); - return false; - } +// if (!StreamProcessor::putFrames(m_ringbuffer_size_frames, ts)) { +// debugError("Could not do StreamProcessor::putFrames(%d, %011llu)\n", +// m_ringbuffer_size_frames,ts); +// return false; +// } return true; @@ -582,4 +582,6 @@ m_data_buffer->setUpdatePeriod(m_period); m_data_buffer->setNominalRate(m_ticks_per_frame); + + m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND); m_data_buffer->prepare(); @@ -730,5 +732,5 @@ // add the silence data to the ringbuffer - if(m_data_buffer->writeFrames(nframes, dummybuffer)) { + if(m_data_buffer->writeFrames(nframes, dummybuffer, 0)) { retval=true; } else { @@ -784,8 +786,8 @@ // and also update the buffer tail timestamp // done in the SP base class - if (!StreamProcessor::putFrames(nbframes, timestamp)) { - debugError("Could not do StreamProcessor::putFrames(%d, %llu)\n",nbframes, timestamp); - return false; - } +// if (!StreamProcessor::putFrames(nbframes, timestamp)) { +// debugError("Could not do StreamProcessor::putFrames(%d, %llu)\n",nbframes, timestamp); +// return false; +// } return true; @@ -1232,5 +1234,5 @@ //=> process the packet // add the data payload to the ringbuffer - if(m_data_buffer->writeFrames(nevents, (char *)(data+8))) { + if(m_data_buffer->writeFrames(nevents, (char *)(data+8), m_last_timestamp)) { retval=RAW1394_ISO_OK; @@ -1275,8 +1277,8 @@ // and also update the buffer tail timestamp, as we add new frames // done in the SP base class - if (!StreamProcessor::putFrames(nevents, m_last_timestamp)) { - debugError("Could not do StreamProcessor::putFrames(%d, %llu)\n",nevents, m_last_timestamp); - return RAW1394_ISO_ERROR; - } +// if (!StreamProcessor::putFrames(nevents, m_last_timestamp)) { +// debugError("Could not do StreamProcessor::putFrames(%d, %llu)\n",nevents, m_last_timestamp); +// return RAW1394_ISO_ERROR; +// } } @@ -1496,4 +1498,6 @@ m_data_buffer->setUpdatePeriod(m_syt_interval); m_data_buffer->setNominalRate(m_ticks_per_frame); + + m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND); m_data_buffer->prepare(); @@ -1606,13 +1610,13 @@ // which should be ours m_data_buffer->blockProcessReadFrames(nbframes); - + // update the frame counter such that it reflects the new value, // done in the SP base class - +/* if (!StreamProcessor::getFrames(nbframes)) { debugError("Could not do StreamProcessor::getFrames(%d)\n", nbframes); return false; - } - + }*/ + return true; } Index: /branches/streaming-rework/src/libstreaming/StreamProcessor.cpp =================================================================== --- /branches/streaming-rework/src/libstreaming/StreamProcessor.cpp (revision 391) +++ /branches/streaming-rework/src/libstreaming/StreamProcessor.cpp (revision 392) @@ -166,37 +166,37 @@ } -/** - * @brief Notify the StreamProcessor that frames were written - * - * This notifies the StreamProcessor of the fact that frames were written to the internal - * buffer. This is for framecounter & timestamp bookkeeping. - * - * @param nbframes the number of frames that are written to the internal buffers - * @param ts the new timestamp of the 'tail' of the buffer, i.e. the last sample - * present in the buffer. - * @return true if successful - */ -bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts) { - - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Putting %d frames for %llu into frame buffer...\n", nbframes,ts); - m_data_buffer->incrementFrameCounter(nbframes, ts); - return true; -} - -/** - * @brief Notify the StreamProcessor that frames were read - * - * This notifies the StreamProcessor of the fact that frames were read from the internal - * buffer. This is for framecounter & timestamp bookkeeping. - * - * @param nbframes the number of frames that are read from the internal buffers - * @return true if successful - */ -bool StreamProcessor::getFrames(unsigned int nbframes) { - - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Getting %d frames from frame buffer...\n", nbframes); - m_data_buffer->decrementFrameCounter(nbframes); - return true; -} +// /** +// * @brief Notify the StreamProcessor that frames were written +// * +// * This notifies the StreamProcessor of the fact that frames were written to the internal +// * buffer. This is for framecounter & timestamp bookkeeping. +// * +// * @param nbframes the number of frames that are written to the internal buffers +// * @param ts the new timestamp of the 'tail' of the buffer, i.e. the last sample +// * present in the buffer. +// * @return true if successful +// */ +// bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts) { +// +// debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Putting %d frames for %llu into frame buffer...\n", nbframes,ts); +// // m_data_buffer->incrementFrameCounter(nbframes, ts); +// return true; +// } + +// /** +// * @brief Notify the StreamProcessor that frames were read +// * +// * This notifies the StreamProcessor of the fact that frames were read from the internal +// * buffer. This is for framecounter & timestamp bookkeeping. +// * +// * @param nbframes the number of frames that are read from the internal buffers +// * @return true if successful +// */ +// bool StreamProcessor::getFrames(unsigned int nbframes) { +// +// debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Getting %d frames from frame buffer...\n", nbframes); +// // m_data_buffer->decrementFrameCounter(nbframes); +// return true; +// } uint64_t StreamProcessor::getTimeNow() { Index: /branches/streaming-rework/src/debugmodule/debugmodule.h =================================================================== --- /branches/streaming-rework/src/debugmodule/debugmodule.h (revision 390) +++ /branches/streaming-rework/src/debugmodule/debugmodule.h (revision 392) @@ -199,4 +199,6 @@ bool setMgrDebugLevel( std::string name, debug_level_t level ); + void sync(); + protected: bool registerModule( DebugModule& debugModule ); Index: /branches/streaming-rework/src/debugmodule/debugmodule.cpp =================================================================== --- /branches/streaming-rework/src/debugmodule/debugmodule.cpp (revision 385) +++ /branches/streaming-rework/src/debugmodule/debugmodule.cpp (revision 392) @@ -270,4 +270,9 @@ } +void +DebugModuleManager::sync() +{ + mb_flush(); +} void Index: /branches/streaming-rework/src/libutil/TimestampedBuffer.cpp =================================================================== --- /branches/streaming-rework/src/libutil/TimestampedBuffer.cpp (revision 391) +++ /branches/streaming-rework/src/libutil/TimestampedBuffer.cpp (revision 392) @@ -41,8 +41,11 @@ m_event_size(0), m_events_per_frame(0), m_buffer_size(0), m_bytes_per_frame(0), m_bytes_per_buffer(0), + m_wrap_at(0xFFFFFFFFFFFFFFFFLLU), m_Client(c), m_framecounter(0), m_buffer_tail_timestamp(0), + m_buffer_next_tail_timestamp(0), m_dll_e2(0.0), m_dll_b(0.877), m_dll_c(0.384), m_nominal_rate(0.0), m_update_period(0) { + pthread_mutex_init(&m_framecounter_lock, NULL); } @@ -53,4 +56,61 @@ } +/** + * \brief Set the nominal rate in frames/timeunit + * + * Sets the nominal rate in frames per time unit. This rate is used + * to initialize the DLL that will extract the effective rate based + * upon the timestamps it gets fed. + * + * @param r rate + * @return true if successful + */ +bool TimestampedBuffer::setNominalRate(float r) { + m_nominal_rate=r; + debugOutput(DEBUG_LEVEL_VERBOSE," nominal rate=%e set to %e\n", + m_nominal_rate, r); + return true; +} + +/** + * \brief Set the nominal update period (in frames) + * + * Sets the nominal update period. This period is the number of frames + * between two timestamp updates (hence buffer writes) + * + * @param n period in frames + * @return true if successful + */ +bool TimestampedBuffer::setUpdatePeriod(unsigned int n) { + m_update_period=n; + return true; +} + +/** + * \brief set the value at which timestamps should wrap around + * @param w value to wrap at + * @return true if successful + */ +bool TimestampedBuffer::setWrapValue(uint64_t w) { + m_wrap_at=w; + return true; +} + +/** + * \brief return the effective rate + * + * Returns the effective rate calculated by the DLL. + * + * @return rate (in frames/timeunit) + */ +float TimestampedBuffer::getRate() { + return ((float) m_update_period)/m_dll_e2; +} + +/** + * \brief Sets the size of the events + * @param s event size in bytes + * @return true if successful + */ bool TimestampedBuffer::setEventSize(unsigned int s) { m_event_size=s; @@ -62,6 +122,11 @@ } -bool TimestampedBuffer::setEventsPerFrame(unsigned int s) { - m_events_per_frame=s; +/** + * \brief Sets the number of events per frame + * @param n number of events per frame + * @return true if successful + */ +bool TimestampedBuffer::setEventsPerFrame(unsigned int n) { + m_events_per_frame=n; m_bytes_per_frame=m_event_size*m_events_per_frame; @@ -70,25 +135,58 @@ return true; } - -bool TimestampedBuffer::setBufferSize(unsigned int s) { - m_buffer_size=s; - +/** + * \brief Sets the buffer size in frames + * @param n number frames + * @return true if successful + */ +bool TimestampedBuffer::setBufferSize(unsigned int n) { + m_buffer_size=n; + m_bytes_per_frame=m_event_size*m_events_per_frame; m_bytes_per_buffer=m_bytes_per_frame*m_buffer_size; - - return true; -} - + + return true; +} + +/** + * \brief Returns the current fill of the buffer + * + * This returns the buffer fill of the internal ringbuffer. This + * can only be used as an indication because it's state is not + * guaranteed to be consistent at all times due to threading issues. + * + * In order to get the number of frames in the buffer, use the + * getFrameCounter, getBufferHeadTimestamp, getBufferTailTimestamp + * functions + * + * @return the internal buffer fill in frames + */ unsigned int TimestampedBuffer::getBufferFill() { return freebob_ringbuffer_read_space(m_event_buffer)/(m_bytes_per_frame); } +/** + * \brief Initializes the TimestampedBuffer + * + * Initializes the TimestampedBuffer, should be called before anything else + * is done. + * + * @return true if successful + */ bool TimestampedBuffer::init() { - - pthread_mutex_init(&m_framecounter_lock, NULL); - - return true; -} - + return true; +} + +/** + * \brief Resets the TimestampedBuffer + * + * Resets the TimestampedBuffer, clearing the buffers and counters. + * (not true yet: Also resets the DLL to the nominal values.) + * + * \note when this is called, you should make sure that the buffer + * tail timestamp gets set before continuing + * + * @return true if successful + */ bool TimestampedBuffer::reset() { freebob_ringbuffer_reset(m_event_buffer); @@ -99,23 +197,26 @@ } -void TimestampedBuffer::dumpInfo() { - - uint64_t ts_head, fc; - getBufferHeadTimestamp(&ts_head,&fc); - - int64_t diff=(int64_t)ts_head - (int64_t)m_buffer_tail_timestamp; - - debugOutputShort( DEBUG_LEVEL_NORMAL, " TimestampedBuffer (%p) info:\n",this); - debugOutputShort( DEBUG_LEVEL_NORMAL, " Frame counter : %d\n", m_framecounter); - debugOutputShort( DEBUG_LEVEL_NORMAL, " Buffer head timestamp : %011llu\n",ts_head); - debugOutputShort( DEBUG_LEVEL_NORMAL, " Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp); - debugOutputShort( DEBUG_LEVEL_NORMAL, " Head - Tail : %011lld\n",diff); - debugOutputShort( DEBUG_LEVEL_NORMAL, " rate : %lf (%f)\n",m_dll_e2,m_dll_e2/m_update_period); -} - +/** + * \brief Perpares the TimestampedBuffer + * + * Prepare the TimestampedBuffer. This allocates all internal buffers and + * initializes all data structures. + * + * This should be called after parameters such as buffer size, event size etc.. are set, + * and before any read/write operations are performed. + * + * @return true if successful + */ bool TimestampedBuffer::prepare() { debugOutput(DEBUG_LEVEL_VERBOSE,"Preparing buffer (%p)\n",this); debugOutput(DEBUG_LEVEL_VERBOSE," Size=%u events, events/frame=%u, event size=%ubytes\n", m_buffer_size,m_events_per_frame,m_event_size); + + debugOutput(DEBUG_LEVEL_VERBOSE," update period %u\n", + m_update_period); + debugOutput(DEBUG_LEVEL_VERBOSE," nominal rate=%f\n", + m_nominal_rate); + + debugOutput(DEBUG_LEVEL_VERBOSE," wrapping at %llu\n",m_wrap_at); assert(m_buffer_size); @@ -123,7 +224,9 @@ assert(m_event_size); + assert(m_nominal_rate != 0.0L); + assert(m_update_period != 0); + if( !(m_event_buffer=freebob_ringbuffer_create( (m_events_per_frame * m_buffer_size) * m_event_size))) { - debugFatal("Could not allocate memory event ringbuffer\n"); return false; @@ -136,7 +239,4 @@ return false; } - - assert(m_nominal_rate != 0.0); - assert(m_update_period != 0); // init the DLL @@ -149,7 +249,18 @@ } -bool TimestampedBuffer::writeFrames(unsigned int nevents, char *data) { - - unsigned int write_size=nevents*m_event_size*m_events_per_frame; +/** + * @brief Write frames to the buffer + * + * Copies \ref nframes of frames from the buffer pointed to by \ref data to the + * internal ringbuffer. The time of the last frame in the buffer is set to \ref ts. + * + * @param nframes number of frames to copy + * @param data pointer to the frame buffer + * @param ts timestamp of the last frame copied + * @return true if successful + */ +bool TimestampedBuffer::writeFrames(unsigned int nframes, char *data, uint64_t ts) { + + unsigned int write_size=nframes*m_event_size*m_events_per_frame; // add the data payload to the ringbuffer @@ -159,11 +270,23 @@ return false; } - return true; - -} - -bool TimestampedBuffer::readFrames(unsigned int nevents, char *data) { - - unsigned int read_size=nevents*m_event_size*m_events_per_frame; + + incrementFrameCounter(nframes,ts); + + return true; + +} +/** + * @brief Read frames from the buffer + * + * Copies \ref nframes of frames from the internal buffer to the data buffer pointed + * to by \ref data. + * + * @param nframes number of frames to copy + * @param data pointer to the frame buffer + * @return true if successful + */ +bool TimestampedBuffer::readFrames(unsigned int nframes, char *data) { + + unsigned int read_size=nframes*m_event_size*m_events_per_frame; // get the data payload to the ringbuffer @@ -173,8 +296,24 @@ return false; } - return true; - -} - + + decrementFrameCounter(nframes); + + return true; + +} + +/** + * @brief Performs block processing write of frames + * + * This function allows for zero-copy writing into the ringbuffer. + * It calls the client's processWriteBlock function to write frames + * into the internal buffer's data area, in a thread safe fashion. + * + * It also updates the timestamp. + * + * @param nbframes number of frames to process + * @param ts timestamp of the last frame written to the buffer + * @return true if successful + */ bool TimestampedBuffer::blockProcessWriteFrames(unsigned int nbframes, int64_t ts) { @@ -266,8 +405,21 @@ } - return true; - -} - + incrementFrameCounter(nbframes,ts); + + return true; + +} + +/** + * @brief Performs block processing read of frames + * + * This function allows for zero-copy reading from the ringbuffer. + * It calls the client's processReadBlock function to read frames + * directly from the internal buffer's data area, in a thread safe + * fashion. + * + * @param nbframes number of frames to process + * @return true if successful + */ bool TimestampedBuffer::blockProcessReadFrames(unsigned int nbframes) { @@ -302,5 +454,5 @@ if(vec[0].len==0) { // this indicates an empty event buffer - debugError("RCV: Event buffer underrun in processor %p\n",this); + debugError("Event buffer underrun in buffer %p\n",this); return false; } @@ -321,5 +473,5 @@ if(xrun<0) { // xrun detected - debugError("RCV: Frame buffer overrun in processor %p\n",this); + debugError("Frame buffer overrun in buffer %p\n",this); return false; } @@ -342,5 +494,5 @@ if(xrun<0) { // xrun detected - debugError("RCV: Frame buffer overrun in processor %p\n",this); + debugError("Frame buffer overrun in buffer %p\n",this); return false; } @@ -354,10 +506,110 @@ } - return true; -} - -/** - * Decrements the frame counter, in a atomic way. + decrementFrameCounter(nbframes); + + return true; +} + +/** + * @brief Sets the buffer tail timestamp. + * + * Set the buffer tail timestamp to \ref new_timestamp. This will recalculate + * the internal state such that the buffer's timeframe starts at + * \ref new_timestamp. + * + * This is thread safe. + * + * @param new_timestamp + */ +void TimestampedBuffer::setBufferTailTimestamp(uint64_t new_timestamp) { + + pthread_mutex_lock(&m_framecounter_lock); + + m_buffer_tail_timestamp = new_timestamp; + + m_dll_e2=m_update_period * m_nominal_rate; + m_buffer_next_tail_timestamp = (uint64_t)((double)m_buffer_tail_timestamp + m_dll_e2); + + pthread_mutex_unlock(&m_framecounter_lock); + + debugOutput(DEBUG_LEVEL_VERBOSE, "Set buffer tail timestamp for (%p) to %11llu, NTS=%llu, DLL2=%f\n", + this, new_timestamp, m_buffer_next_tail_timestamp, m_dll_e2); + +} + +/** + * \brief return the timestamp of the first frame in the buffer + * + * This function returns the timestamp of the very first sample in + * the StreamProcessor's buffer. It also returns the framecounter value + * for which this timestamp is valid. + * + * @param ts address to store the timestamp in + * @param fc address to store the associated framecounter in + */ +void TimestampedBuffer::getBufferHeadTimestamp(uint64_t *ts, uint64_t *fc) { + double rate=(double)m_buffer_next_tail_timestamp - (double)m_buffer_tail_timestamp; + rate /= (double)m_update_period; + + int64_t timestamp; + + pthread_mutex_lock(&m_framecounter_lock); + *fc = m_framecounter; + + // ts(x) = m_buffer_tail_timestamp - + // (m_buffer_next_tail_timestamp - m_buffer_tail_timestamp)/(samples_between_updates)*(x) + + // buffer head is the frame that is framecounter-1 away from the tail + timestamp=(int64_t)m_buffer_tail_timestamp - (int64_t)((m_framecounter) * rate); + + pthread_mutex_unlock(&m_framecounter_lock); + + debugOutput(DEBUG_LEVEL_VERBOSE, "(%p): HTS = %011lld, TTS=%011llu, FC=%05u, RATE=%f\n", + this, timestamp, m_buffer_tail_timestamp, *fc, rate); + + if(timestamp >= (int64_t)m_wrap_at) { + timestamp -= m_wrap_at; + } else if(timestamp < 0) { + timestamp += m_wrap_at; + } + + *ts=timestamp; + + debugOutput(DEBUG_LEVEL_VERBOSE, " HTS = %011lld, FC=%05u\n", + *ts, *fc); + +} + +/** + * \brief return the timestamp of the last frame in the buffer + * + * This function returns the timestamp of the last frame in + * the StreamProcessor's buffer. It also returns the framecounter + * value for which this timestamp is valid. + * + * @param ts address to store the timestamp in + * @param fc address to store the associated framecounter in + */ +void TimestampedBuffer::getBufferTailTimestamp(uint64_t *ts, uint64_t *fc) { + pthread_mutex_lock(&m_framecounter_lock); + *fc = m_framecounter; + *ts = m_buffer_tail_timestamp; + pthread_mutex_unlock(&m_framecounter_lock); +} + +/** + * Resets the frame counter, in a atomic way. This * is thread safe. + */ +void TimestampedBuffer::resetFrameCounter() { + pthread_mutex_lock(&m_framecounter_lock); + m_framecounter = 0; + pthread_mutex_unlock(&m_framecounter_lock); +} + +/** + * Decrements the frame counter in a thread safe way. + * + * @param nbframes number of frames to decrement */ void TimestampedBuffer::decrementFrameCounter(int nbframes) { @@ -368,10 +620,12 @@ /** - * Increments the frame counter, in a atomic way. - * also sets the buffer tail timestamp - * This is thread safe. + * Increments the frame counter in a thread safe way. + * Also updates the timestamp. + * + * @param nbframes the number of frames to add + * @param new_timestamp the new timestamp */ void TimestampedBuffer::incrementFrameCounter(int nbframes, uint64_t new_timestamp) { - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer tail timestamp for (%p) to %11llu\n", + debugOutput(DEBUG_LEVEL_VERBOSE, "Setting buffer tail timestamp for (%p) to %11llu\n", this, new_timestamp); @@ -390,10 +644,10 @@ // the maximal difference we can allow (64secs) - const int64_t max=TICKS_PER_SECOND*64L; + const int64_t max=m_wrap_at/2; if(diff > max) { - diff -= TICKS_PER_SECOND*128L; + diff -= m_wrap_at; } else if (diff < -max) { - diff += TICKS_PER_SECOND*128L; + diff += m_wrap_at; } @@ -401,20 +655,31 @@ debugOutputShort(DEBUG_LEVEL_VERY_VERBOSE, "diff2=%lld err=%f\n", - diff, err); - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "FC=%10u, TS=%011llu\n",m_framecounter, m_buffer_tail_timestamp); + diff, err); + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "B: FC=%10u, TS=%011llu, NTS=%011llu\n", + m_framecounter, m_buffer_tail_timestamp, m_buffer_next_tail_timestamp); m_buffer_tail_timestamp=m_buffer_next_tail_timestamp; m_buffer_next_tail_timestamp += (uint64_t)(m_dll_b * err + m_dll_e2); - if (m_buffer_next_tail_timestamp > TICKS_PER_SECOND*128L) { - m_buffer_next_tail_timestamp -= TICKS_PER_SECOND*128L; + if (m_buffer_next_tail_timestamp >= m_wrap_at) { + m_buffer_next_tail_timestamp -= m_wrap_at; + + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Unwrapping next tail timestamp: %11llu\n", + m_buffer_next_tail_timestamp); } m_dll_e2 += m_dll_c*err; - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "TS=%011llu, NTS=%011llu, DLLe2=%f\n", + debugOutput(DEBUG_LEVEL_VERBOSE, "A: TS=%011llu, NTS=%011llu, DLLe2=%f\n", m_buffer_tail_timestamp, m_buffer_next_tail_timestamp, m_dll_e2); pthread_mutex_unlock(&m_framecounter_lock); + + if(m_buffer_tail_timestamp>=m_wrap_at) { + debugError("Wrapping failed for m_buffer_tail_timestamp!\n"); + } + if(m_buffer_next_tail_timestamp>=m_wrap_at) { + debugError("Wrapping failed for m_buffer_next_tail_timestamp!\n"); + } // this DLL allows the calculation of any sample timestamp relative to the buffer tail, @@ -427,95 +692,20 @@ /** - * Sets the buffer tail timestamp (in usecs) - * This is thread safe. - */ -void TimestampedBuffer::setBufferTailTimestamp(uint64_t new_timestamp) { - - pthread_mutex_lock(&m_framecounter_lock); - - m_buffer_tail_timestamp = new_timestamp; - - m_dll_e2=m_update_period * m_nominal_rate; - m_buffer_next_tail_timestamp = (uint64_t)((double)m_buffer_tail_timestamp + m_dll_e2); - - pthread_mutex_unlock(&m_framecounter_lock); - - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Set buffer tail timestamp for (%p) to %11llu, NTS=%llu, DLL2=%f\n", - this, new_timestamp, m_buffer_next_tail_timestamp, m_dll_e2); - -} - -/** - * \brief return the timestamp of the first frame in the buffer - * - * This function returns the timestamp of the very first sample in - * the StreamProcessor's buffer. This is useful for slave StreamProcessors - * to find out what the base for their timestamp generation should - * be. It also returns the framecounter value for which this timestamp - * is valid. - * - * The system is built in such a way that we assume that the processing - * of the buffers doesn't take any time. Assume we have a buffer transfer at - * time T1, meaning that the last sample of this buffer occurs at T1. As - * processing does not take time, we don't have to add anything to T1. When - * transferring the processed buffer to the xmit processor, the timestamp - * of the last sample is still T1. - * - * When starting the streams, we don't have any information on this last - * timestamp. We prefill the buffer at the xmit side, and we should find - * out what the timestamp for the last sample in the buffer is. If we sync - * on a receive SP, we know that the last prefilled sample corresponds with - * the first sample received - 1 sample duration. This is the same as if the last - * transfer from iso to client would have emptied the receive buffer. - * - * - * @param ts address to store the timestamp in - * @param fc address to store the associated framecounter in - */ -void TimestampedBuffer::getBufferHeadTimestamp(uint64_t *ts, uint64_t *fc) { - double rate=(double)m_buffer_next_tail_timestamp - (double)m_buffer_tail_timestamp; - rate /= (double)m_update_period; - - pthread_mutex_lock(&m_framecounter_lock); - *fc = m_framecounter; - - // ts(x) = m_buffer_tail_timestamp + - // (m_buffer_next_tail_timestamp - m_buffer_tail_timestamp)/(samples_between_updates)*x - - *ts=m_buffer_tail_timestamp + (uint64_t)(m_framecounter * rate); - - pthread_mutex_unlock(&m_framecounter_lock); - if(*ts > TICKS_PER_SECOND*128L) { - *ts -= TICKS_PER_SECOND*128L; - } -} - -/** - * \brief return the timestamp of the last frame in the buffer - * - * This function returns the timestamp of the last frame in - * the StreamProcessor's buffer. It also returns the framecounter - * value for which this timestamp is valid. - * - * @param ts address to store the timestamp in - * @param fc address to store the associated framecounter in - */ -void TimestampedBuffer::getBufferTailTimestamp(uint64_t *ts, uint64_t *fc) { - pthread_mutex_lock(&m_framecounter_lock); - *fc = m_framecounter; - *ts = m_buffer_tail_timestamp; - pthread_mutex_unlock(&m_framecounter_lock); -} - -/** - * Resets the frame counter, in a atomic way. This - * is thread safe. - */ -void TimestampedBuffer::resetFrameCounter() { - pthread_mutex_lock(&m_framecounter_lock); - m_framecounter = 0; - pthread_mutex_unlock(&m_framecounter_lock); -} - + * @brief Print status info. + */ +void TimestampedBuffer::dumpInfo() { + + uint64_t ts_head, fc; + getBufferHeadTimestamp(&ts_head,&fc); + + int64_t diff=(int64_t)ts_head - (int64_t)m_buffer_tail_timestamp; + + debugOutputShort( DEBUG_LEVEL_NORMAL, " TimestampedBuffer (%p) info:\n",this); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Frame counter : %d\n", m_framecounter); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Buffer head timestamp : %011llu\n",ts_head); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Head - Tail : %011lld\n",diff); + debugOutputShort( DEBUG_LEVEL_NORMAL, " rate : %lf (%f)\n",m_dll_e2,m_dll_e2/m_update_period); +} } // end of namespace FreebobUtil Index: /branches/streaming-rework/src/libutil/TimestampedBuffer.h =================================================================== --- /branches/streaming-rework/src/libutil/TimestampedBuffer.h (revision 391) +++ /branches/streaming-rework/src/libutil/TimestampedBuffer.h (revision 392) @@ -36,12 +36,56 @@ class TimestampedBufferClient; +/** + * \brief Class implementing a frame buffer that is time-aware + * + * This class implements a buffer that is time-aware. Whenever new frames + * are written to the buffer, the timestamp corresponding to the last frame + * in the buffer is updated. This allows to calculate the timestamp of any + * other frame in the buffer. + * + * The buffer is a frame buffer, having the following parameters defining + * it's behaviour: + * - buff_size: buffer size in frames (setBufferSize()) + * - events_per_frame: the number of events per frame (setEventsPerFrame()) + * - event_size: the storage size of the events (in bytes) (setEventSize()) + * + * The total size of the buffer (in bytes) is at least + * buff_size*events_per_frame*event_size. + * + * Timestamp tracking is done by requiring that a timestamp is specified every + * time frames are added to the buffer. In combination with the buffer fill and + * the frame rate (calculated internally), this allows to calculate the timestamp + * of any frame in the buffer. In order to initialize the internal data structures, + * the setNominalRate() and setUpdatePeriod() functions are provided. + * + * \note Currently the class only supports fixed size writes of size update_period. + * This can change in the future, implementation ideas are already in place. + * + * The TimestampedBuffer class is time unit agnostic. It can handle any time unit + * as long as it fits in a 64 bit unsigned integer. The buffer supports wrapped + * timestamps using (...). + * + * There are two methods of reading and writing to the buffer. + * + * The first method uses conventional readFrames() and writeFrames() functions. + * + * The second method makes use of the TimestampedBufferClient interface. When a + * TimestampedBuffer is created, it is required that a TimestampedBufferClient is + * registered. This client implements the processReadBlock and processWriteBlock + * functions. These are block processing 'callbacks' that allow zero-copy processing + * of the buffer contents. In order to initiate block processing, the + * blockProcessWriteFrames and blockProcessReadFrames functions are provided by + * TimestampedBuffer. + * + */ class TimestampedBuffer { public: + TimestampedBuffer(TimestampedBufferClient *); virtual ~TimestampedBuffer(); - bool writeFrames(unsigned int nbframes, char *data); + bool writeFrames(unsigned int nbframes, char *data, uint64_t ts); bool readFrames(unsigned int nbframes, char *data); @@ -57,4 +101,6 @@ bool setBufferSize(unsigned int s); + bool setWrapValue(uint64_t w); + unsigned int getBufferFill(); @@ -62,8 +108,4 @@ int getFrameCounter() {return m_framecounter;}; - void decrementFrameCounter(int nbframes); - void incrementFrameCounter(int nbframes, uint64_t new_timestamp); - void resetFrameCounter(); - void getBufferHeadTimestamp(uint64_t *ts, uint64_t *fc); void getBufferTailTimestamp(uint64_t *ts, uint64_t *fc); @@ -72,13 +114,18 @@ // dll stuff - void setNominalRate(double r) {m_nominal_rate=r;}; - double getRate() {return m_dll_e2;}; + bool setNominalRate(float r); + float getRate(); - void setUpdatePeriod(unsigned int t) {m_update_period=t;}; + bool setUpdatePeriod(unsigned int t); // misc stuff void dumpInfo(); - - + void setVerboseLevel(int l) {setDebugLevel(l);}; + +private: + void decrementFrameCounter(int nbframes); + void incrementFrameCounter(int nbframes, uint64_t new_timestamp); + void resetFrameCounter(); + protected: @@ -91,4 +138,6 @@ unsigned int m_bytes_per_frame; unsigned int m_bytes_per_buffer; + + uint64_t m_wrap_at; // value to wrap at TimestampedBufferClient *m_Client; @@ -117,8 +166,11 @@ double m_dll_c; - double m_nominal_rate; + float m_nominal_rate; unsigned int m_update_period; }; +/** + * \brief Interface to be implemented by TimestampedBuffer clients + */ class TimestampedBufferClient { public: