Index: branches/streaming-rework/src/libstreaming/freebob_streaming.cpp =================================================================== --- branches/streaming-rework/src/libstreaming/freebob_streaming.cpp (revision 384) +++ branches/streaming-rework/src/libstreaming/freebob_streaming.cpp (revision 385) @@ -119,5 +119,5 @@ // discover the devices on the bus if(!dev->m_deviceManager->discover(DEBUG_LEVEL_NORMAL)) { - debugOutput(DEBUG_LEVEL_VERBOSE, "Could not discover devices\n"); + debugFatal("Could not discover devices\n"); delete dev->processorManager; delete dev->m_deviceManager; @@ -132,7 +132,12 @@ assert(device); + debugOutput(DEBUG_LEVEL_VERBOSE, "Setting samplerate to %d for (%p)\n", + dev->options.sample_rate, device); // Set the device's sampling rate to that requested // FIXME: does this really belong here? If so we need to handle errors. if (!device->setSamplingFrequency(parseSampleRate(dev->options.sample_rate))) { + debugOutput(DEBUG_LEVEL_VERBOSE, " => Retry setting samplerate to %d for (%p)\n", + dev->options.sample_rate, device); + // try again: if (!device->setSamplingFrequency(parseSampleRate(dev->options.sample_rate))) { @@ -252,10 +257,10 @@ periods++; if(periods>periods_print) { - debugOutput(DEBUG_LEVEL_VERBOSE, "\n"); - debugOutput(DEBUG_LEVEL_VERBOSE, "============================================\n"); - debugOutput(DEBUG_LEVEL_VERBOSE, "Xruns: %d\n",xruns); - debugOutput(DEBUG_LEVEL_VERBOSE, "============================================\n"); + debugOutputShort(DEBUG_LEVEL_VERBOSE, "\nfreebob_streaming_wait\n"); + debugOutputShort(DEBUG_LEVEL_VERBOSE, "============================================\n"); + debugOutputShort(DEBUG_LEVEL_VERBOSE, "Xruns: %d\n",xruns); + debugOutputShort(DEBUG_LEVEL_VERBOSE, "============================================\n"); dev->processorManager->dumpInfo(); - debugOutput(DEBUG_LEVEL_VERBOSE, "\n"); + debugOutputShort(DEBUG_LEVEL_VERBOSE, "\n"); periods_print+=100; } Index: branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp =================================================================== --- branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp (revision 384) +++ branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp (revision 385) @@ -38,11 +38,10 @@ #define RECEIVE_DLL_INTEGRATION_COEFFICIENT 0.015 -#define RECEIVE_PROCESSING_DELAY 51200 +#define RECEIVE_PROCESSING_DELAY (10000) // in ticks -#define TRANSMIT_TRANSFER_DELAY 1000 -#define TRANSMIT_ADVANCE_CYCLES 10 - -//#define DO_SYT_SYNC +#define TRANSMIT_TRANSFER_DELAY 9000U +// the number of cycles to send a packet in advance of it's timestamp +#define TRANSMIT_ADVANCE_CYCLES 1U namespace FreebobStreaming { @@ -117,27 +116,23 @@ packet->fmt = IEC61883_FMT_AMDTP; - // signal that we are running (a transmit stream is always 'runnable') - m_running=true; - - // we calculate the timestamp of the next sample in the buffer, which will - // allow us to check if we are to send this sample now, or later - - // FIXME: maybe we should use the buffer head timestamp for this? - + // recalculate the buffer head timestamp float ticks_per_frame=m_SyncSource->getTicksPerFrame(); // the base timestamp is the one of the last sample in the buffer - int64_t timestamp = m_buffer_tail_timestamp; - + uint64_t ts_tail; + uint64_t fc; + + getBufferTailTimestamp(&ts_tail, &fc); // thread safe + + int64_t timestamp = ts_tail; // meaning that the first sample in the buffer lies m_framecounter * rate - // earlier. This would give the next equation: + // earlier. This gives the next equation: // timestamp = m_last_timestamp - m_framecounter * rate - // but to preserve causality, we have to make sure that this timestamp is - // always bigger than m_last_timestamp. this can be done by adding - // m_ringbuffersize_frames * rate. - timestamp += (int64_t)((((int64_t)m_ringbuffer_size_frames) - - ((int64_t)m_framecounter)) - * ticks_per_frame); + timestamp -= (int64_t)((float)fc * ticks_per_frame); + + // FIXME: test + // substract the receive transfer delay + timestamp -= RECEIVE_PROCESSING_DELAY; // this happens if m_buffer_tail_timestamp wraps around while there are @@ -158,26 +153,79 @@ // determine if we want to send a packet or not - uint64_t cycle_timer=m_handler->getCycleTimerTicks(); - - int64_t until_next=timestamp-cycle_timer; + // note that we can't use getCycleTimer directly here, + // because packets are queued in advance. This means that + // we the packet we are constructing will be sent out + // on 'cycle', not 'now'. + unsigned int ctr=m_handler->getCycleTimer(); + int now_cycles = (int)CYCLE_TIMER_GET_CYCLES(ctr); + + // the difference between 'now' and the cycle this + // packet is intended for + int cycle_diff = cycle - now_cycles; + + // detect wraparounds + if(cycle_diff < -(int)(CYCLES_PER_SECOND/2)) { + cycle_diff += CYCLES_PER_SECOND; + } else if(cycle_diff > (int)(CYCLES_PER_SECOND/2)) { + cycle_diff -= CYCLES_PER_SECOND; + } + + // as long as the cycle parameter is not in sync with + // the current time, the stream is considered not + // to be 'running' + if(cycle_diff < 0 || cycle == -1) { + m_running=false; + } else { + m_running=true; + } + +#ifdef DEBUG + if(cycle_diff < 0) { + debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy)\n", + cycle, now_cycles); + } +#endif + // if cycle lies cycle_diff cycles in the future, we should + // queue this packet cycle_diff * TICKS_PER_CYCLE earlier than + // we would if it were to be sent immediately. + + // determine the 'now' time in ticks + uint64_t cycle_timer=CYCLE_TIMER_TO_TICKS(ctr); + + // time until the packet is to be sent (if > 0: send packet) + int64_t until_next=timestamp-(int64_t)cycle_timer; + + int64_t utn2=until_next; // debug!! // we send a packet some cycles in advance, to avoid the // following situation: // suppose we are only a few ticks away from - // the moment to send this packet. This means that in - // order to keep causality, we have to make sure that - // the TRANSFER_DELAY is bigger than one cycle, which - // might be a little much. - // this means that we need one cycle of extra buffering. + // the moment to send this packet. therefore we decide + // not to send the packet, but send it in the next cycle. + // This means that the next time point will be 3072 ticks + // later, making that the timestamp will be expired when the + // packet is sent, unless TRANSFER_DELAY > 3072. + // this means that we need at least one cycle of extra buffering. until_next -= TICKS_PER_CYCLE * TRANSMIT_ADVANCE_CYCLES; + // we have to queue it cycle_diff * TICKS_PER_CYCLE earlier + until_next -= cycle_diff * TICKS_PER_CYCLE; + // the maximal difference we can allow (64secs) const int64_t max=TICKS_PER_SECOND*64L; - + +#ifdef DEBUG if(!m_disabled) { - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> TS=%11llu, CTR=%11llu, FC=%5d, TPS=%10.6f, UTN=%11lld\n", - timestamp, cycle_timer, m_framecounter, ticks_per_frame, until_next + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> TS=%11llu, CTR=%11llu, FC=%5d\n", + timestamp, cycle_timer, fc ); - } + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " UTN=%11lld, UTN2=%11lld\n", + until_next, utn2 + ); + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " CY_NOW=%04d, CY_TARGET=%04d, CY_DIFF=%04lld\n", + now_cycles, cycle, cycle_diff + ); + } +#endif if(until_next > max) { @@ -194,12 +242,17 @@ until_next += TICKS_PER_SECOND*128L; } - + +#ifdef DEBUG if(!m_disabled) { - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " TS=%11llu, CTR=%11llu, FC=%5d, TPS=%10.6f, UTN=%11lld\n", - timestamp, cycle_timer, m_framecounter, ticks_per_frame, until_next + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " > TS=%11llu, CTR=%11llu, FC=%5d\n", + timestamp, cycle_timer, fc ); - } - - // don't process the stream when it is not enabled, + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " UTN=%11lld, UTN2=%11lld\n", + until_next, utn2 + ); + } +#endif + + // don't process the stream when it is not enabled, not running // or when the next sample is not due yet. @@ -207,5 +260,5 @@ // that means that we'll send NODATA packets. // we don't add payload because DICE devices don't like that. - if((until_next>0) || m_disabled) { + if((until_next>0) || m_disabled || !m_running) { // no-data packets have syt=0xFFFF // and have the usual amount of events as dummy data (?) @@ -224,8 +277,19 @@ *tag = IEC61883_TAG_WITH_CIP; *sy = 0; - + + if(m_disabled) { + // indicate that we are now in a disabled state. + m_is_disabled=true; + } else { + // indicate that we are now in an enabled state. + m_is_disabled=false; + } + return RAW1394_ISO_DEFER; } + // indicate that we are now in an enabled state. + m_is_disabled=false; + // construct the packet nevents = m_syt_interval; @@ -245,5 +309,7 @@ debugWarning("Transmit buffer underrun (cycle %d, FC=%d, PC=%d)\n", - cycle, m_framecounter, m_handler->getPacketCount()); + cycle, getFrameCounter(), m_handler->getPacketCount()); + + nevents=0; // TODO: we have to be a little smarter here @@ -257,5 +323,5 @@ packet->fdf = IEC61883_FDF_NODATA; packet->syt = 0xffff; - + // this means no-data packets with payload (DICE doesn't like that) *length = 2*sizeof(quadlet_t) + m_syt_interval * m_dimension * sizeof(quadlet_t); @@ -277,5 +343,12 @@ // convert the timestamp to SYT format - unsigned int timestamp_SYT = TICKS_TO_SYT(timestamp); + uint64_t ts=timestamp + TRANSMIT_TRANSFER_DELAY; + + // check if it wrapped + if (ts >= TICKS_PER_SECOND * 128L) { + ts -= TICKS_PER_SECOND * 128L; + } + + unsigned int timestamp_SYT = TICKS_TO_SYT(ts); packet->syt = ntohs(timestamp_SYT); @@ -284,8 +357,7 @@ // calculate the new buffer head timestamp. this is - // the timestamp of the current packet plus - // SYT_INTERVAL * rate - - timestamp += (int64_t)((float)m_syt_interval * ticks_per_frame ); + // the previous buffer head timestamp plus + // the number of frames sent * ticks_per_frame + timestamp += (int64_t)((float)nevents * ticks_per_frame ); // check if it wrapped @@ -336,5 +408,5 @@ uint64_t ts; uint64_t fc; - m_SyncSource->getBufferHeadTimestamp(&ts, &fc); + m_SyncSource->getBufferHeadTimestamp(&ts, &fc); // thread safe // update the frame counter such that it reflects the buffer content, @@ -440,5 +512,5 @@ // add the receive processing delay - m_ringbuffer_size_frames+=(uint)(RECEIVE_PROCESSING_DELAY/m_ticks_per_frame); +// m_ringbuffer_size_frames+=(uint)(RECEIVE_PROCESSING_DELAY/m_ticks_per_frame); if( !(m_event_buffer=freebob_ringbuffer_create( @@ -583,8 +655,43 @@ uint64_t fc; - m_SyncSource->getBufferHeadTimestamp(&ts, &fc); - - setBufferTailTimestamp(ts); - + debugOutput(DEBUG_LEVEL_VERBOSE,"Preparing to enable...\n"); + + m_SyncSource->getBufferHeadTimestamp(&ts, &fc); // thread safe + + // recalculate the buffer head timestamp + float ticks_per_frame=m_SyncSource->getTicksPerFrame(); + + // set buffer head timestamp + // this makes that the next sample to be sent out + // has the same timestamp as the last one received + // plus one frame + ts += (uint64_t)ticks_per_frame; + setBufferHeadTimestamp(ts); + int64_t timestamp = ts; + + // since we have a full buffer, we know that the buffer tail lies + // m_ringbuffer_size_frames * rate earlier + timestamp += (int64_t)((float)m_ringbuffer_size_frames * ticks_per_frame); + + // this happens when the last timestamp is near wrapping, and + // m_framecounter is low. + // this means: m_last_timestamp is near wrapping and have just had + // a getPackets() from the client side. the projected next_period + // boundary lies beyond the wrap value. + // the action is to wrap the value. + if (timestamp >= TICKS_PER_SECOND * 128L) { + timestamp -= TICKS_PER_SECOND * 128L; + } + + StreamProcessor::setBufferTailTimestamp(timestamp); + + debugOutput(DEBUG_LEVEL_VERBOSE,"TS=%10lld, TSTMP=%10llu, %f\n", + ts, timestamp, ticks_per_frame); + + if (!StreamProcessor::prepareForEnable()) { + debugError("StreamProcessor::prepareForEnable failed\n"); + return false; + } + return true; } @@ -617,4 +724,6 @@ int xrun; unsigned int offset=0; + + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "AmdtpTransmitStreamProcessor::putFrames(%d, %llu)\n",nbframes, ts); freebob_ringbuffer_data_t vec[2]; @@ -700,12 +809,37 @@ } + + // recalculate the buffer tail timestamp + float ticks_per_frame=m_SyncSource->getTicksPerFrame(); + + // this makes that the last sample to be sent out on ISO + // has the same timestamp as the last one transfered + // to the client + // plus one frame + ts += (uint64_t)ticks_per_frame; + int64_t timestamp = ts; + + // however we have to preserve causality, meaning that we have to make + // sure that the worst-case buffer head timestamp still lies in the future. + // this worst case timestamp occurs when the xmit buffer is completely full. + // therefore we add m_ringbuffer_size_frames * ticks_per_frame to the timestamp. + // this will make sure that the buffer head timestamp lies in the future. + // the netto effect of this is that the system works as if the buffer processing + // by the client doesn't take time. + + timestamp += (int64_t)((float)m_ringbuffer_size_frames * ticks_per_frame); + + // wrap the timestamp if nescessary + if (timestamp >= TICKS_PER_SECOND * 128L) { + timestamp -= TICKS_PER_SECOND * 128L; + } + + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::putFrames(%d, %llu)\n",nbframes, timestamp); // update the frame counter such that it reflects the new value, // and also update the buffer tail timestamp // done in the SP base class - debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor::putFrames(%d, %llu)\n",nbframes, ts); - - if (!StreamProcessor::putFrames(nbframes, ts)) { - debugError("Could not do StreamProcessor::putFrames(%d, %llu)\n",nbframes, ts); + if (!StreamProcessor::putFrames(nbframes, timestamp)) { + debugError("Could not do StreamProcessor::putFrames(%d, %llu)\n",nbframes, timestamp); return false; } @@ -1029,4 +1163,8 @@ m_last_timestamp += CYCLE_TIMER_GET_OFFSET(syt_timestamp); m_last_timestamp += cc_seconds * TICKS_PER_SECOND; + + // the receive processing delay indicates how much + // extra time we need as slack + m_last_timestamp += RECEIVE_PROCESSING_DELAY; //=> now estimate the device frame rate @@ -1083,4 +1221,5 @@ //=> don't process the stream samples when it is not enabled. if(m_disabled) { + // we keep track of the timestamp here // this makes sure that we will have a somewhat accurate @@ -1091,8 +1230,20 @@ // SYT_INTERVAL * rate later uint64_t ts=m_last_timestamp+(uint64_t)((float)m_syt_interval * m_ticks_per_frame); + + // wrap if nescessary + if (ts >= TICKS_PER_SECOND * 128L) { + ts -= TICKS_PER_SECOND * 128L; + } + // set the timestamps StreamProcessor::setBufferTimestamps(ts,ts); + + // indicate that we are now in a disabled state. + m_is_disabled=true; return RAW1394_ISO_DEFER; } + + // indicate that we are now in an enabled state. + m_is_disabled=false; //=> process the packet @@ -1103,5 +1254,5 @@ { debugWarning("Receive buffer overrun (cycle %d, FC=%d, PC=%d)\n", - cycle, m_framecounter, m_handler->getPacketCount()); + cycle, getFrameCounter(), m_handler->getPacketCount()); m_xruns++; @@ -1224,10 +1375,13 @@ // currently this is in ticks + int64_t fc=getFrameCounter(); + int64_t next_period_boundary = m_last_timestamp; - next_period_boundary += (int64_t)(((int64_t)m_period-(int64_t)m_framecounter) * m_ticks_per_frame); - next_period_boundary += RECEIVE_PROCESSING_DELAY; + next_period_boundary += (int64_t)(((int64_t)m_period + - fc) * m_ticks_per_frame); +// next_period_boundary += RECEIVE_PROCESSING_DELAY; debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD=%11lld, LTS=%11llu, FC=%5d, TPF=%f\n", - next_period_boundary, m_last_timestamp, m_framecounter, m_ticks_per_frame + next_period_boundary, m_last_timestamp, fc, m_ticks_per_frame ); @@ -1249,5 +1403,5 @@ debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " NPD=%11lld, LTS=%11llu, FC=%5d, TPF=%f\n", - next_period_boundary, m_last_timestamp, m_framecounter, m_ticks_per_frame + next_period_boundary, m_last_timestamp, fc, m_ticks_per_frame ); @@ -1283,5 +1437,5 @@ m_WakeupStat.reset(); - m_ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_framerate); +// m_ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_framerate); // reset all non-device specific stuff @@ -1554,5 +1708,5 @@ if (!StreamProcessor::getFrames(nbframes, ts)) { - debugError("Could not do StreamProcessor::getFrames(%d, %llu)\n",nbframes, ts); + debugError("Could not do StreamProcessor::getFrames(%d, %llu)\n", nbframes, ts); return false; } Index: branches/streaming-rework/src/libstreaming/IsoHandlerManager.cpp =================================================================== --- branches/streaming-rework/src/libstreaming/IsoHandlerManager.cpp (revision 384) +++ branches/streaming-rework/src/libstreaming/IsoHandlerManager.cpp (revision 385) @@ -31,4 +31,7 @@ #include "IsoStream.h" #include + +#define MINIMUM_INTERRUPTS_PER_PERIOD 4U +#define PACKETS_PER_INTERRUPT 4U namespace FreebobStreaming @@ -285,10 +288,12 @@ unsigned int packets_per_period=stream->getPacketsPerPeriod(); +#if 0 // hardware interrupts occur when one DMA block is full, and the size of one DMA // block = PAGE_SIZE. Setting the max_packet_size makes sure that the HW irq is // occurs at a period boundary (optimal CPU use) - // NOTE: try and use 4 hardware interrupts per period for better latency. - unsigned int max_packet_size=4 * getpagesize() / packets_per_period; + // NOTE: try and use MINIMUM_INTERRUPTS_PER_PERIOD hardware interrupts + // per period for better latency. + unsigned int max_packet_size=MINIMUM_INTERRUPTS_PER_PERIOD * getpagesize() / packets_per_period; if (max_packet_size < stream->getMaxPacketSize()) { max_packet_size=stream->getMaxPacketSize(); @@ -300,53 +305,25 @@ max_packet_size = getpagesize(); - int irq_interval=packets_per_period / 4; - if(irq_interval <= 0) irq_interval=1; - - /* the receive buffer size doesn't matter for the latency, - but it has a minimal value in order for libraw to operate correctly (300) */ - int buffers=400; - - // create the actual handler - IsoRecvHandler *h = new IsoRecvHandler(stream->getPort(), buffers, - max_packet_size, irq_interval); - - debugOutput( DEBUG_LEVEL_VERBOSE, " registering IsoRecvHandler\n"); - - if(!h) { - debugFatal("Could not create IsoRecvHandler\n"); - return false; - } - - h->setVerboseLevel(getDebugLevel()); - - // init the handler - if(!h->init()) { - debugFatal("Could not initialize receive handler\n"); - return false; - } - - // register the stream with the handler - if(!h->registerStream(stream)) { - debugFatal("Could not register receive stream with handler\n"); - return false; - } - - // register the handler with the manager - if(!registerHandler(h)) { - debugFatal("Could not register receive handler with manager\n"); - return false; - } - debugOutput( DEBUG_LEVEL_VERBOSE, " registered stream (%p) with handler (%p)\n",stream,h); - } - - if (stream->getType()==IsoStream::EST_Transmit) { - - // setup the optimal parameters for the raw1394 ISO buffering - unsigned int packets_per_period=stream->getPacketsPerPeriod(); + unsigned int irq_interval=packets_per_period / MINIMUM_INTERRUPTS_PER_PERIOD; + if(irq_interval <= 0) irq_interval=1; +#else // hardware interrupts occur when one DMA block is full, and the size of one DMA - // block = PAGE_SIZE. Setting the max_packet_size makes sure that the HW irq - // occurs at a period boundary (optimal CPU use) - // NOTE: try and use 4 interrupts per period for better latency. - unsigned int max_packet_size=4 * getpagesize() / packets_per_period; + // block = PAGE_SIZE. Setting the max_packet_size enables control over the IRQ + // frequency, as the controller uses max_packet_size, and not the effective size + // when writing to the DMA buffer. + + // configure it such that we have an irq for every PACKETS_PER_INTERRUPT packets + unsigned int irq_interval=PACKETS_PER_INTERRUPT; + + // unless the period size doesn't allow this + if ((packets_per_period/MINIMUM_INTERRUPTS_PER_PERIOD) < irq_interval) { + irq_interval=1; + } + + // FIXME: test + irq_interval=1; + + unsigned int max_packet_size=getpagesize() / irq_interval; + if (max_packet_size < stream->getMaxPacketSize()) { max_packet_size=stream->getMaxPacketSize(); @@ -358,12 +335,104 @@ max_packet_size = getpagesize(); - int irq_interval=packets_per_period / 4; +#endif + /* the receive buffer size doesn't matter for the latency, + but it has a minimal value in order for libraw to operate correctly (300) */ + int buffers=400; + + // create the actual handler + IsoRecvHandler *h = new IsoRecvHandler(stream->getPort(), buffers, + max_packet_size, irq_interval); + + debugOutput( DEBUG_LEVEL_VERBOSE, " registering IsoRecvHandler\n"); + + if(!h) { + debugFatal("Could not create IsoRecvHandler\n"); + return false; + } + + h->setVerboseLevel(getDebugLevel()); + + // init the handler + if(!h->init()) { + debugFatal("Could not initialize receive handler\n"); + return false; + } + + // register the stream with the handler + if(!h->registerStream(stream)) { + debugFatal("Could not register receive stream with handler\n"); + return false; + } + + // register the handler with the manager + if(!registerHandler(h)) { + debugFatal("Could not register receive handler with manager\n"); + return false; + } + debugOutput( DEBUG_LEVEL_VERBOSE, " registered stream (%p) with handler (%p)\n",stream,h); + } + + if (stream->getType()==IsoStream::EST_Transmit) { + // setup the optimal parameters for the raw1394 ISO buffering + unsigned int packets_per_period=stream->getPacketsPerPeriod(); + +#if 0 + // hardware interrupts occur when one DMA block is full, and the size of one DMA + // block = PAGE_SIZE. Setting the max_packet_size makes sure that the HW irq + // occurs at a period boundary (optimal CPU use) + // NOTE: try and use MINIMUM_INTERRUPTS_PER_PERIOD interrupts per period + // for better latency. + unsigned int max_packet_size=MINIMUM_INTERRUPTS_PER_PERIOD * getpagesize() / packets_per_period; + if (max_packet_size < stream->getMaxPacketSize()) { + max_packet_size=stream->getMaxPacketSize(); + } + + // Ensure we don't request a packet size bigger than the + // kernel-enforced maximum which is currently 1 page. + if (max_packet_size > (unsigned int)getpagesize()) + max_packet_size = getpagesize(); + + unsigned int irq_interval=packets_per_period / MINIMUM_INTERRUPTS_PER_PERIOD; if(irq_interval <= 0) irq_interval=1; - +#else + // hardware interrupts occur when one DMA block is full, and the size of one DMA + // block = PAGE_SIZE. Setting the max_packet_size enables control over the IRQ + // frequency, as the controller uses max_packet_size, and not the effective size + // when writing to the DMA buffer. + + // configure it such that we have an irq for every PACKETS_PER_INTERRUPT packets + unsigned int irq_interval=PACKETS_PER_INTERRUPT; + + // unless the period size doesn't allow this + if ((packets_per_period/MINIMUM_INTERRUPTS_PER_PERIOD) < irq_interval) { + irq_interval=1; + } + + // FIXME: test + irq_interval=1; + + unsigned int max_packet_size=getpagesize() / irq_interval; + + if (max_packet_size < stream->getMaxPacketSize()) { + max_packet_size=stream->getMaxPacketSize(); + } + + // Ensure we don't request a packet size bigger than the + // kernel-enforced maximum which is currently 1 page. + if (max_packet_size > (unsigned int)getpagesize()) + max_packet_size = getpagesize(); +#endif // the transmit buffer size should be as low as possible for latency. // note however that the raw1394 subsystem tries to keep this buffer // full, so we have to make sure that we have enough events in our // event buffers - int buffers=packets_per_period; + + // every irq_interval packets an interrupt will occur. that is when + // buffers get transfered, meaning that we should have at least some + // margin here +// int buffers=irq_interval * 2; + + // half a period. the xmit handler will take care of this + int buffers=packets_per_period/2; // NOTE: this is dangerous: what if there is not enough prefill? Index: branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp =================================================================== --- branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp (revision 384) +++ branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp (revision 385) @@ -33,4 +33,7 @@ #include +#include "libstreaming/cycletimer.h" + +#define CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL 50 namespace FreebobStreaming { @@ -403,6 +406,10 @@ } + // we want to make sure that everything is running well, + // so wait for a while + usleep(USECS_PER_CYCLE * CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL); + debugOutput( DEBUG_LEVEL_VERBOSE, "StreamProcessors running...\n"); - debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting frame counters...\n"); + debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting counters...\n"); // now we reset the frame counters @@ -411,9 +418,13 @@ ++it ) { + debugOutput( DEBUG_LEVEL_VERBOSE, "Before:\n"); + if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { (*it)->dumpInfo(); } - (*it)->reset(); + (*it)->reset(); + + debugOutput( DEBUG_LEVEL_VERBOSE, "After:\n"); if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { @@ -427,4 +438,6 @@ ++it ) { + debugOutput( DEBUG_LEVEL_VERBOSE, "Before:\n"); + if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { (*it)->dumpInfo(); @@ -432,4 +445,6 @@ (*it)->reset(); + + debugOutput( DEBUG_LEVEL_VERBOSE, "After:\n"); if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { @@ -666,5 +681,6 @@ bool StreamProcessorManager::waitForPeriod() { int time_till_next_period; - + bool xrun_occurred=false; + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n"); @@ -679,9 +695,24 @@ usleep(time_till_next_period); - // check if it is still true + // check for underruns on the ISO side, + // those should make us bail out of the wait loop + for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); + it != m_ReceiveProcessors.end(); + ++it ) { + // a xrun has occurred on the Iso side + xrun_occurred |= (*it)->xrunOccurred(); + } + for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); + it != m_TransmitProcessors.end(); + ++it ) { + // a xrun has occurred on the Iso side + xrun_occurred |= (*it)->xrunOccurred(); + } + + // check if we were waked up too soon time_till_next_period=m_SyncSource->getTimeUntilNextPeriodUsecs(); } - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", time_till_next_period); + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", -time_till_next_period); // this is to notify the client of the delay @@ -696,10 +727,11 @@ // and the receive processors should have done their transfer. m_time_of_transfer=m_SyncSource->getTimeAtPeriod(); - debugOutput( DEBUG_LEVEL_VERBOSE, "transfer at %llu ticks...\n", + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n", m_time_of_transfer); + + xrun_occurred=false; // check if xruns occurred on the Iso side. // also check if xruns will occur should we transfer() now - bool xrun_occurred=false; for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); @@ -741,4 +773,6 @@ } + m_nbperiods++; + // now we can signal the client that we are (should be) ready return !xrun_occurred; @@ -772,13 +806,5 @@ bool StreamProcessorManager::transfer(enum StreamProcessor::EProcessorType t) { - int64_t time_of_transfer; debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n"); - - // first we should find out on what time this transfer is - // supposed to be happening. this time will become the time - // stamp for the transmitted buffer. - // NOTE: maybe we should include the transfer delay here, that - // would make it equal for all types of SP's - time_of_transfer=m_time_of_transfer; // a static cast could make sure that there is no performance @@ -788,22 +814,7 @@ it != m_ReceiveProcessors.end(); ++it ) { - uint64_t buffer_tail_ts; - uint64_t fc; - int64_t ts; - - (*it)->getBufferTailTimestamp(&buffer_tail_ts,&fc); - ts = buffer_tail_ts; - ts += (int64_t)((-(int64_t)fc) * m_SyncSource->getTicksPerFrame()); - // NOTE: ts can be negative due to wraparound, it is the responsability of the - // SP to deal with that. - float tmp=m_SyncSource->getTicksPerFrame(); - - debugOutput(DEBUG_LEVEL_VERBOSE, "=> TS=%11lld, BLT=%11llu, FC=%5d, TPF=%f\n", - ts, buffer_tail_ts, fc, tmp - ); - debugOutput(DEBUG_LEVEL_VERBOSE, " TPF=%f\n", tmp); - - #ifdef DEBUG + //#ifdef DEBUG + #if 0 { uint64_t ts_tail=0; @@ -823,16 +834,17 @@ } - debugOutput(DEBUG_LEVEL_VERBOSE,"R * HEAD: TS=%llu, FC=%llu | TAIL: TS=%llu, FC=%llu, %d\n", - ts_tail, fc_tail, ts_head, fc_head, cnt); + debugOutput(DEBUG_LEVEL_VERBOSE,"R => HEAD: TS=%11llu, FC=%5llu | TAIL: TS=%11llu, FC=%5llu, %d\n", + ts_head, fc_head, ts_tail, fc_tail, cnt); } #endif - if(!(*it)->getFrames(m_period, ts)) { - debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u) from stream processor (%p)", - m_period,*it); + if(!(*it)->getFrames(m_period, (int64_t)m_time_of_transfer)) { + debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)", + m_period, m_time_of_transfer,*it); return false; // buffer underrun } - #ifdef DEBUG + //#ifdef DEBUG + #if 0 { uint64_t ts_tail=0; @@ -852,6 +864,6 @@ } - debugOutput(DEBUG_LEVEL_VERBOSE,"R > HEAD: TS=%llu, FC=%llu | TAIL: TS=%llu, FC=%llu, %d\n", - ts_tail, fc_tail, ts_head, fc_head, cnt); + debugOutput(DEBUG_LEVEL_VERBOSE,"R > HEAD: TS=%11llu, FC=%5llu | TAIL: TS=%11llu, FC=%5llu, %d\n", + ts_head, fc_head, ts_tail, fc_tail, cnt); } #endif @@ -863,5 +875,6 @@ ++it ) { - #ifdef DEBUG + //#ifdef DEBUG + #if 0 { uint64_t ts_tail=0; @@ -881,16 +894,17 @@ } - debugOutput(DEBUG_LEVEL_VERBOSE,"T * HEAD: TS=%llu, FC=%llu | TAIL: TS=%llu, FC=%llu, %d\n", - ts_tail, fc_tail, ts_head, fc_head, cnt); + debugOutput(DEBUG_LEVEL_VERBOSE,"T => HEAD: TS=%11llu, FC=%5llu | TAIL: TS=%11llu, FC=%5llu, %d\n", + ts_head, fc_head, ts_tail, fc_tail, cnt); } #endif - if(!(*it)->putFrames(m_period,time_of_transfer)) { + if(!(*it)->putFrames(m_period, (int64_t)m_time_of_transfer)) { debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)", - m_period, time_of_transfer, *it); + m_period, m_time_of_transfer, *it); return false; // buffer overrun } - #ifdef DEBUG + //#ifdef DEBUG + #if 0 { uint64_t ts_tail=0; @@ -910,6 +924,6 @@ } - debugOutput(DEBUG_LEVEL_VERBOSE,"T > HEAD: TS=%llu, FC=%llu | TAIL: TS=%llu, FC=%llu, %d\n", - ts_tail, fc_tail, ts_head, fc_head, cnt); + debugOutput(DEBUG_LEVEL_VERBOSE,"T > HEAD: TS=%11llu, FC=%5llu | TAIL: TS=%11llu, FC=%5llu, %d\n", + ts_head, fc_head, ts_tail, fc_tail, cnt); } #endif @@ -923,5 +937,5 @@ debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n"); debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n"); - debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %d\n", m_nbperiods); + debugOutputShort( DEBUG_LEVEL_NORMAL, "Period count: %6d\n", m_nbperiods); debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n"); Index: branches/streaming-rework/src/libstreaming/IsoHandler.cpp =================================================================== --- branches/streaming-rework/src/libstreaming/IsoHandler.cpp (revision 384) +++ branches/streaming-rework/src/libstreaming/IsoHandler.cpp (revision 385) @@ -877,15 +877,30 @@ debugOutput( DEBUG_LEVEL_VERBOSE, " Irq interval : %d \n",m_irq_interval); - if(raw1394_iso_recv_init(m_handle, - iso_receive_handler, - m_buf_packets, - m_max_packet_size, - m_Client->getChannel(), - RAW1394_DMA_BUFFERFILL, - m_irq_interval)) { - debugFatal("Could not do receive initialisation!\n" ); - debugFatal(" %s\n",strerror(errno)); - - return false; + if(m_irq_interval > 1) { + if(raw1394_iso_recv_init(m_handle, + iso_receive_handler, + m_buf_packets, + m_max_packet_size, + m_Client->getChannel(), + RAW1394_DMA_BUFFERFILL, + m_irq_interval)) { + debugFatal("Could not do receive initialisation!\n" ); + debugFatal(" %s\n",strerror(errno)); + + return false; + } + } else { + if(raw1394_iso_recv_init(m_handle, + iso_receive_handler, + m_buf_packets, + m_max_packet_size, + m_Client->getChannel(), + RAW1394_DMA_PACKET_PER_BUFFER, + m_irq_interval)) { + debugFatal("Could not do receive initialisation!\n" ); + debugFatal(" %s\n",strerror(errno)); + + return false; + } } return true; Index: branches/streaming-rework/src/libstreaming/StreamProcessor.h =================================================================== --- branches/streaming-rework/src/libstreaming/StreamProcessor.h (revision 384) +++ branches/streaming-rework/src/libstreaming/StreamProcessor.h (revision 385) @@ -80,7 +80,7 @@ bool isRunning(); ///< returns true if there is some stream data processed - void enable(); ///< enable the stream processing - void disable() {m_disabled=true;}; ///< disable the stream processing - bool isEnabled() {return !m_disabled;}; + bool enable(); ///< enable the stream processing + bool disable(); ///< disable the stream processing + bool isEnabled() {return !m_is_disabled;}; virtual bool putFrames(unsigned int nbframes, int64_t ts); ///< transfer the buffer contents from client @@ -100,6 +100,6 @@ virtual bool prepareForStart() {return true;}; - virtual bool prepareForEnable() {return true;}; - virtual bool prepareForDisable() {return true;}; + virtual bool prepareForEnable(); + virtual bool prepareForDisable(); protected: @@ -120,5 +120,6 @@ bool m_running; bool m_disabled; - + bool m_is_disabled; + StreamStatistics m_PacketStat; StreamStatistics m_PeriodStat; @@ -147,5 +148,4 @@ void decrementFrameCounter(int nbframes, uint64_t new_timestamp); void incrementFrameCounter(int nbframes, uint64_t new_timestamp); - void setFrameCounter(int new_framecounter, uint64_t new_timestamp); void resetFrameCounter(); @@ -193,5 +193,5 @@ float getTicksPerFrame() {return m_ticks_per_frame;}; - protected: + private: // the framecounter gives the number of frames in the buffer signed int m_framecounter; @@ -205,5 +205,5 @@ uint64_t m_buffer_head_timestamp; - + protected: StreamProcessor *m_SyncSource; Index: branches/streaming-rework/src/libstreaming/cycletimer.h =================================================================== --- branches/streaming-rework/src/libstreaming/cycletimer.h (revision 384) +++ branches/streaming-rework/src/libstreaming/cycletimer.h (revision 385) @@ -38,7 +38,8 @@ #define TICKS_PER_CYCLE 3072U #define TICKS_PER_SECOND 24576000UL -#define TICKS_PER_USEC (TICKS_PER_SECOND/1000000.0) +#define TICKS_PER_USEC (24.576000) #define USECS_PER_TICK (1.0/TICKS_PER_USEC) +#define USECS_PER_CYCLE (125U) #define CYCLE_TIMER_GET_SECS(x) ((((x) & 0xFE000000U) >> 25)) Index: branches/streaming-rework/src/libstreaming/StreamProcessor.cpp =================================================================== --- branches/streaming-rework/src/libstreaming/StreamProcessor.cpp (revision 384) +++ branches/streaming-rework/src/libstreaming/StreamProcessor.cpp (revision 385) @@ -35,7 +35,7 @@ namespace FreebobStreaming { -IMPL_DEBUG_MODULE( StreamProcessor, StreamProcessor, DEBUG_LEVEL_NORMAL ); -IMPL_DEBUG_MODULE( ReceiveStreamProcessor, ReceiveStreamProcessor, DEBUG_LEVEL_NORMAL ); -IMPL_DEBUG_MODULE( TransmitStreamProcessor, TransmitStreamProcessor, DEBUG_LEVEL_NORMAL ); +IMPL_DEBUG_MODULE( StreamProcessor, StreamProcessor, DEBUG_LEVEL_VERBOSE ); +IMPL_DEBUG_MODULE( ReceiveStreamProcessor, ReceiveStreamProcessor, DEBUG_LEVEL_VERBOSE ); +IMPL_DEBUG_MODULE( TransmitStreamProcessor, TransmitStreamProcessor, DEBUG_LEVEL_VERBOSE ); StreamProcessor::StreamProcessor(enum IsoStream::EStreamType type, int port, int framerate) @@ -51,4 +51,5 @@ , m_running(false) , m_disabled(true) + , m_is_disabled(true) { @@ -61,4 +62,5 @@ void StreamProcessor::dumpInfo() { + int64_t diff=(int64_t)m_buffer_head_timestamp - (int64_t)m_buffer_tail_timestamp; debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor information\n"); @@ -66,8 +68,14 @@ IsoStream::dumpInfo(); - debugOutputShort( DEBUG_LEVEL_NORMAL, " Frame counter : %d\n", m_framecounter); - debugOutputShort( DEBUG_LEVEL_NORMAL, " Xruns : %d\n", m_xruns); - debugOutputShort( DEBUG_LEVEL_NORMAL, " Running : %d\n", m_running); - debugOutputShort( DEBUG_LEVEL_NORMAL, " Enabled : %d\n", !m_disabled); + debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor info:\n"); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Frame counter : %d\n", m_framecounter); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Buffer head timestamp : %011llu\n",m_buffer_head_timestamp); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Head - Tail : %011lld\n",diff); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Now : %011u\n",m_handler->getCycleTimerTicks()); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Xruns : %d\n", m_xruns); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Running : %d\n", m_running); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Enabled : %s\n", m_disabled ? "No" : "Yes"); + debugOutputShort( DEBUG_LEVEL_NORMAL, " enable status : %s\n", m_is_disabled ? "No" : "Yes"); // m_PeriodStat.dumpInfo(); @@ -111,4 +119,29 @@ return true; +} + +bool StreamProcessor::prepareForEnable() { + int64_t diff=(int64_t)m_buffer_head_timestamp - (int64_t)m_buffer_tail_timestamp; + + debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::prepareForEnable for (%p)\n",this); + debugOutput(DEBUG_LEVEL_VERBOSE," Frame Counter : %05d\n",m_framecounter); + debugOutput(DEBUG_LEVEL_VERBOSE," Buffer head timestamp : %011llu\n",m_buffer_head_timestamp); + debugOutput(DEBUG_LEVEL_VERBOSE," Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp); + debugOutput(DEBUG_LEVEL_VERBOSE," Head - Tail : %011lld\n",diff); + debugOutput(DEBUG_LEVEL_VERBOSE," Now : %011u\n",m_handler->getCycleTimerTicks()); + return true; +} + +bool StreamProcessor::prepareForDisable() { + int64_t diff=(int64_t)m_buffer_head_timestamp - (int64_t)m_buffer_tail_timestamp; + + debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::prepareForDisable for (%p)\n",this); + debugOutput(DEBUG_LEVEL_VERBOSE," Frame Counter : %05d\n",m_framecounter); + debugOutput(DEBUG_LEVEL_VERBOSE," Buffer head timestamp : %011llu\n",m_buffer_head_timestamp); + debugOutput(DEBUG_LEVEL_VERBOSE," Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp); + debugOutput(DEBUG_LEVEL_VERBOSE," Head - Tail : %011lld\n",diff); + debugOutput(DEBUG_LEVEL_VERBOSE," Now : %011u\n",m_handler->getCycleTimerTicks()); + return true; + } @@ -172,5 +205,5 @@ bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) { - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Getting %d frames from frame buffer...\n", nbframes); + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Getting %d frames from frame buffer at (%011lld)...\n", nbframes, ts); decrementFrameCounter(nbframes, ts); return true; @@ -181,9 +214,47 @@ } -void StreamProcessor::enable() { - if(!m_running) { - debugWarning("The StreamProcessor is not running yet, enable() might not be a good idea.\n"); - } - m_disabled=false; +bool StreamProcessor::enable() { + int cnt=0; + + if(!m_running) { + debugWarning("The StreamProcessor is not running yet, enable() might not be a good idea.\n"); + } + + m_disabled=false; + + // now wait until it is effectively enabled + // time-out at 100ms + while(m_is_disabled && cnt++ < 1000) { + usleep(100); + } + + // check if the operation timed out + if(cnt==1000) { + debugWarning("Timeout when enabling StreamProcessor (%p)\n",this); + return false; + } + + return true; +} + +bool StreamProcessor::disable() { + int cnt=0; + + m_disabled=true; + + // now wait until it is effectively disabled + // time-out at + while(!m_is_disabled && cnt++ < 1000) { + usleep(100); + } + + // check if the operation timed out (100ms) + if(cnt==1000) { + debugWarning("Timeout when disabling StreamProcessor (%p)\n",this); + return false; + } + + return true; + } @@ -195,8 +266,11 @@ /** * Decrements the frame counter, in a atomic way. This - * also sets the buffer tail timestamp + * also sets the buffer head timestamp * is thread safe. */ void StreamProcessor::decrementFrameCounter(int nbframes, uint64_t new_timestamp) { + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer head timestamp for (%p) to %11llu\n", + this, new_timestamp); + pthread_mutex_lock(&m_framecounter_lock); m_framecounter -= nbframes; @@ -207,8 +281,11 @@ /** * Increments the frame counter, in a atomic way. - * also sets the buffer head timestamp + * also sets the buffer tail timestamp * This is thread safe. */ void StreamProcessor::incrementFrameCounter(int nbframes, uint64_t new_timestamp) { + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer tail timestamp for (%p) to %11llu\n", + this, new_timestamp); + pthread_mutex_lock(&m_framecounter_lock); m_framecounter += nbframes; @@ -219,20 +296,11 @@ /** - * Sets the frame counter, in a atomic way. - * also sets the buffer head timestamp - * This is thread safe. - */ -void StreamProcessor::setFrameCounter(int new_framecounter, uint64_t new_timestamp) { - pthread_mutex_lock(&m_framecounter_lock); - m_framecounter = new_framecounter; - m_buffer_tail_timestamp = new_timestamp; - pthread_mutex_unlock(&m_framecounter_lock); -} - -/** * Sets the buffer tail timestamp (in usecs) * This is thread safe. */ void StreamProcessor::setBufferTailTimestamp(uint64_t new_timestamp) { + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer tail timestamp for (%p) to %11llu\n", + this, new_timestamp); + pthread_mutex_lock(&m_framecounter_lock); m_buffer_tail_timestamp = new_timestamp; @@ -245,4 +313,7 @@ */ void StreamProcessor::setBufferHeadTimestamp(uint64_t new_timestamp) { + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer head timestamp for (%p) to %11llu\n", + this, new_timestamp); + pthread_mutex_lock(&m_framecounter_lock); m_buffer_head_timestamp = new_timestamp; @@ -256,4 +327,9 @@ */ void StreamProcessor::setBufferTimestamps(uint64_t new_head, uint64_t new_tail) { + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer head timestamp for (%p) to %11llu\n", + this, new_head); + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " and buffer tail timestamp for (%p) to %11llu\n", + this, new_tail); + pthread_mutex_lock(&m_framecounter_lock); m_buffer_head_timestamp = new_head;