Index: branches/streaming-rework/src/libstreaming/MotuStreamProcessor.h =================================================================== --- branches/streaming-rework/src/libstreaming/MotuStreamProcessor.h (revision 312) +++ branches/streaming-rework/src/libstreaming/MotuStreamProcessor.h (revision 384) @@ -35,4 +35,6 @@ #include "../libutil/DelayLockedLoop.h" + +#ifdef ENABLE_MOTU namespace FreebobStreaming { @@ -75,6 +77,6 @@ void setTicksPerFrameDLL(float *dll) {m_ticks_per_frame=dll;}; - virtual bool preparedForStop(); - virtual bool preparedForStart(); + virtual bool prepareForStop(); + virtual bool prepareForStart(); protected: @@ -165,6 +167,6 @@ unsigned int getEventSize(void); - virtual bool preparedForStop(); - virtual bool preparedForStart(); + virtual bool prepareForStop(); + virtual bool prepareForStart(); protected: @@ -183,9 +185,4 @@ unsigned int m_event_size; - // The integrator of a Delay-Locked Loop (DLL) used to provide a - // continuously updated estimate of the number of ieee1394 frames - // per audio frame at the current sample rate. - float m_ticks_per_frame; - signed int m_last_cycle_ofs; signed int m_next_cycle; @@ -201,4 +198,6 @@ } // end of namespace FreebobStreaming +#endif /* ENABLE_MOTU */ + #endif /* __FREEBOB_MOTUSTREAMPROCESSOR__ */ Index: branches/streaming-rework/src/libstreaming/freebob_streaming.cpp =================================================================== --- branches/streaming-rework/src/libstreaming/freebob_streaming.cpp (revision 383) +++ branches/streaming-rework/src/libstreaming/freebob_streaming.cpp (revision 384) @@ -159,5 +159,4 @@ // we are ready! - debugOutputShort(DEBUG_LEVEL_VERBOSE, "\n\n"); return dev; @@ -191,5 +190,4 @@ debugOutput(DEBUG_LEVEL_VERBOSE,"------------- Start -------------\n"); - // create the connections for all devices // iterate over the found devices @@ -207,7 +205,10 @@ } - dev->processorManager->start(); - - return 0; + if(dev->processorManager->start()) { + return 0; + } else { + freebob_streaming_stop(dev); + return -1; + } } @@ -225,5 +226,4 @@ assert(device); - int j=0; for(j=0; jgetStreamCount();j++) { @@ -250,23 +250,21 @@ static int xruns=0; - periods++; - if(periods>periods_print) { - debugOutput(DEBUG_LEVEL_VERBOSE, "\n"); - debugOutput(DEBUG_LEVEL_VERBOSE, "============================================\n"); - debugOutput(DEBUG_LEVEL_VERBOSE, "Xruns: %d\n",xruns); - debugOutput(DEBUG_LEVEL_VERBOSE, "============================================\n"); - dev->processorManager->dumpInfo(); -// debugOutput(DEBUG_LEVEL_VERBOSE, "--------------------------------------------\n"); -/* quadlet_t *addr=(quadlet_t*)(dev->processorManager->getPortByIndex(0, Port::E_Capture)->getBufferAddress()); - if (addr) hexDumpQuadlets(addr,10);*/ - debugOutput(DEBUG_LEVEL_VERBOSE, "\n"); - periods_print+=100; - } + periods++; + if(periods>periods_print) { + debugOutput(DEBUG_LEVEL_VERBOSE, "\n"); + debugOutput(DEBUG_LEVEL_VERBOSE, "============================================\n"); + debugOutput(DEBUG_LEVEL_VERBOSE, "Xruns: %d\n",xruns); + debugOutput(DEBUG_LEVEL_VERBOSE, "============================================\n"); + dev->processorManager->dumpInfo(); + debugOutput(DEBUG_LEVEL_VERBOSE, "\n"); + periods_print+=100; + } + if(dev->processorManager->waitForPeriod()) { return dev->options.period_size; } else { debugWarning("XRUN detected\n"); + // do xrun recovery - dev->processorManager->handleXrun(); xruns++; Index: branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp =================================================================== --- branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp (revision 383) +++ branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp (revision 384) @@ -31,16 +31,16 @@ #include "AmdtpPort.h" -#include "cyclecounter.h" +#include "cycletimer.h" #include #include -#define RECEIVE_PROCESSING_DELAY_IN_SAMPLES 100 #define RECEIVE_DLL_INTEGRATION_COEFFICIENT 0.015 -#define RECEIVE_PROCESSING_DELAY (TICKS_PER_SECOND * 2/1000) +#define RECEIVE_PROCESSING_DELAY 51200 // in ticks -#define TRANSMIT_TRANSFER_DELAY 10000 +#define TRANSMIT_TRANSFER_DELAY 1000 +#define TRANSMIT_ADVANCE_CYCLES 10 //#define DO_SYT_SYNC @@ -80,5 +80,4 @@ } - return true; } @@ -97,5 +96,5 @@ unsigned int nevents=0; - packet->eoh0 = 0; + debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"Xmit handler for cycle %d\n",cycle); #ifdef DEBUG @@ -121,10 +120,94 @@ m_running=true; - // don't process the stream when it is not enabled. - // however, we do have to generate (semi) valid packets - // that means that we'll send NODATA packets FIXME: check!! - if(m_disabled) { + // we calculate the timestamp of the next sample in the buffer, which will + // allow us to check if we are to send this sample now, or later + + // FIXME: maybe we should use the buffer head timestamp for this? + + float ticks_per_frame=m_SyncSource->getTicksPerFrame(); + + // the base timestamp is the one of the last sample in the buffer + int64_t timestamp = m_buffer_tail_timestamp; + + + // meaning that the first sample in the buffer lies m_framecounter * rate + // earlier. This would give the next equation: + // timestamp = m_last_timestamp - m_framecounter * rate + // but to preserve causality, we have to make sure that this timestamp is + // always bigger than m_last_timestamp. this can be done by adding + // m_ringbuffersize_frames * rate. + timestamp += (int64_t)((((int64_t)m_ringbuffer_size_frames) + - ((int64_t)m_framecounter)) + * ticks_per_frame); + + // this happens if m_buffer_tail_timestamp wraps around while there are + // not much frames in the buffer. We should add the wraparound value of the ticks + // counter + if (timestamp < 0) { + timestamp += TICKS_PER_SECOND * 128L; + } + // this happens when the last timestamp is near wrapping, and + // m_framecounter is low. + // this means: m_last_timestamp is near wrapping and have just had + // a getPackets() from the client side. the projected next_period + // boundary lies beyond the wrap value. + // the action is to wrap the value. + else if (timestamp >= TICKS_PER_SECOND * 128L) { + timestamp -= TICKS_PER_SECOND * 128L; + } + + // determine if we want to send a packet or not + uint64_t cycle_timer=m_handler->getCycleTimerTicks(); + + int64_t until_next=timestamp-cycle_timer; + + // we send a packet some cycles in advance, to avoid the + // following situation: + // suppose we are only a few ticks away from + // the moment to send this packet. This means that in + // order to keep causality, we have to make sure that + // the TRANSFER_DELAY is bigger than one cycle, which + // might be a little much. + // this means that we need one cycle of extra buffering. + until_next -= TICKS_PER_CYCLE * TRANSMIT_ADVANCE_CYCLES; + + // the maximal difference we can allow (64secs) + const int64_t max=TICKS_PER_SECOND*64L; + + if(!m_disabled) { + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> TS=%11llu, CTR=%11llu, FC=%5d, TPS=%10.6f, UTN=%11lld\n", + timestamp, cycle_timer, m_framecounter, ticks_per_frame, until_next + ); + } + + if(until_next > max) { + // this means that cycle_timer has wrapped, but + // timestamp has not. we should unwrap cycle_timer + // by adding TICKS_PER_SECOND*128L, meaning that we should substract + // this value from until_next + until_next -= TICKS_PER_SECOND*128L; + } else if (until_next < -max) { + // this means that timestamp has wrapped, but + // cycle_timer has not. we should unwrap timestamp + // by adding TICKS_PER_SECOND*128L, meaning that we should add + // this value from until_next + until_next += TICKS_PER_SECOND*128L; + } + + if(!m_disabled) { + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " TS=%11llu, CTR=%11llu, FC=%5d, TPS=%10.6f, UTN=%11lld\n", + timestamp, cycle_timer, m_framecounter, ticks_per_frame, until_next + ); + } + + // don't process the stream when it is not enabled, + // or when the next sample is not due yet. + + // we do have to generate (semi) valid packets + // that means that we'll send NODATA packets. + // we don't add payload because DICE devices don't like that. + if((until_next>0) || m_disabled) { // no-data packets have syt=0xFFFF - // and have the usual amount of events as dummy data + // and have the usual amount of events as dummy data (?) packet->fdf = IEC61883_FDF_NODATA; packet->syt = 0xffff; @@ -133,5 +216,10 @@ m_dbc += m_syt_interval; + // this means no-data packets with payload (DICE doesn't like that) *length = 2*sizeof(quadlet_t) + m_syt_interval * m_dimension * sizeof(quadlet_t); + + // this means no-data packets without payload + //*length = 2*sizeof(quadlet_t); + *tag = IEC61883_TAG_WITH_CIP; *sy = 0; @@ -140,191 +228,44 @@ } - packet->fdf = m_fdf; - -// assert(m_handler->getDroppedCount()<5); - -// debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "get packet...\n"); - - // construct the packet cip - // NOTE: maybe a little outdated - // FIXME: this should be done differently: - // first we should determine the timestamp of the first sample in this block - // this can be done by reading the rate of the compagnion receiver - // this rate will give us the ticks per sample used by the device - // then we should take the previous timestamp, and add m_syt_interval * ticks_per_sample - // to this timestamp (only if we are sending events). This gives us the timestamp - // of the first sample in this packet. - // we should define a transfer delay and add it to this timestamp and then send it to - // the device. - - // NOTE: this will even work when we're only transmitting (no receive stream): - // the ticks_per_sample value is initialized by the receive streamprocessor - // to the nominal value. We will then transmit at our own pace, being at nominal - // rate compared to the cycle counter. - - // NOTE: this scheme might even work when sync'ing on the sync streams of the device - // they are AMDTP streams with SYT timestamps, therefore a decent estimate of - // ticks_per_frame can be found, and we are synced when using it. - -// <> - - // FIXME: if m_last_bufferfill > 0 - float ticks_per_frame=syncmaster->getTicksPerFrame(); - - // m_last_timestamp is the moment upon which the last 'period signal' - // should have been given (note: should have been because - // the timestamp is derrived from the incoming packets, - // not from the moment the signal was actually given) - - // at a period boundary, we expect m_ringbuffer_size_frames frames to - // be in the buffers. 'right after' the transfer(), all of these - // frames should be in the xmit buffers (if transfer() finishes - // before new packets are received) - // therefore the last sample of the xmit buffer lies at - // T1 = timestamp + (m_ringbuffer_size_frames) * ticks_per_frame - - // in reality however life is multithreaded, and we don't know - // exactly how many samples there are in the buffer. but we know - // how many there should be, so we say that the last frame put - // into the buffer (by transfer) has the timestamp T1 - - // this means that the current sample has timestamp - // T2 = T1 - ticks_per_frame * (nb_frames_in_buffer) - // = T1 - ticks_per_frame * (m_ringbuffer_size_frames-m_framecounter) - // = timestamp + ticks_per_frame * - // (m_ringbuffer_size_frames-m_ringbuffer_size_frames+m_framecounter) - // = timestamp + ticks_per_frame * m_framecounter - - int T2 = m_last_timestamp + ticks_per_frame*m_framecounter; - - // we then need to add the transfer delay for the receiving - // device to this time to determine the xmit timestamp - // TSTAMP = T2 + TRANSFER_DELAY - - // we should determine when to 'queue' this sample to - // the ISO xmit layer, based upon the cycle parameter - // we can define the ideal time at which to send the sample as - // TSEND = TSTAMP - TRANSFER_DELAY - // being T2 - int TSEND = T2; - - // the xmit timestamp should then be the TSEND + TRANSMIT_TRANSFER_DELAY - // note that in this setup, TRANSMIT_TRANSFER_DELAY has to incorporate the - // iso buffering - int timestamp = TSEND + TRANSMIT_TRANSFER_DELAY; - - // if we take a look at TSEND we can determine if we are to send - // the sample or not: - // if - // CYCLES(TSEND) < cycle - // then the time at which to send the packet has passed (note: wraparound!) - // we should send the sample - // if it hasn't passed, we should send an empty packet - // - // this should automatically catch up - - // FIXME: wraparound! - int cycle_wo_wraparound=cycle; - - int TSEND_cycle_wo_wraparound = TICKS_TO_CYCLES(TSEND); - - // arbitrary, should be replaced by a better wraparound - // detection - - // if cycles wraps around, and TSEND_cyles doesn't, - // we need to make sure that we compare the right things - // i.e. unwrap the cycle parameter - // if both wrap, this can't be true - if (cycle_wo_wraparound - TSEND_cycle_wo_wraparound < -4000) { - cycle_wo_wraparound += 8000; - - debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"wraparound on cycle detected: %d %d %d\n", - cycle, cycle_wo_wraparound, - cycle - TSEND_cycle_wo_wraparound); - } - - // if TSEND_cycle wraps around and cycle doesn't, - // TSEND_cycle suddenly becomes a lot smaller than cycle - if (TSEND_cycle_wo_wraparound - cycle_wo_wraparound < -4000) { - TSEND_cycle_wo_wraparound += 8000; - - debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"wraparound on TSEND detected: %d %d %d\n", - TICKS_TO_CYCLES(TSEND), TSEND_cycle_wo_wraparound, - TSEND_cycle_wo_wraparound - cycle_wo_wraparound); - } - - if (TSEND_cycle_wo_wraparound < cycle_wo_wraparound) { - nevents=m_syt_interval; - m_dbc += m_syt_interval; - - } else { // no-data - - // no-data packets have syt=0xFFFF - // and have the usual amount of events as dummy data + // construct the packet + nevents = m_syt_interval; + m_dbc += m_syt_interval; + + *tag = IEC61883_TAG_WITH_CIP; + *sy = 0; + + enum raw1394_iso_disposition retval; + + unsigned int read_size=nevents*sizeof(quadlet_t)*m_dimension; + + if ((freebob_ringbuffer_read(m_event_buffer,(char *)(data+8),read_size)) < + read_size) + { + /* there is no more data in the ringbuffer */ + + debugWarning("Transmit buffer underrun (cycle %d, FC=%d, PC=%d)\n", + cycle, m_framecounter, m_handler->getPacketCount()); + + // TODO: we have to be a little smarter here + // because we have some slack on the device side (TRANSFER_DELAY) + // we can allow some skipped packets + // signal underrun + m_xruns++; + + // compose a no-data packet, we should always + // send a valid packet packet->fdf = IEC61883_FDF_NODATA; packet->syt = 0xffff; - // the dbc is incremented even with no data packets - m_dbc += m_syt_interval; - - *length = 2*sizeof(quadlet_t) + m_syt_interval * m_dimension * sizeof(quadlet_t); - *tag = IEC61883_TAG_WITH_CIP; - *sy = 0; - -// if(packet->dbs) { -// debugOutput(DEBUG_LEVEL_VERY_VERBOSE, -// "XMT %04d: CH = %d, FDF = %X. SYT = %6d, DBS = %3d, DBC = %3d, FMT = %3d, LEN = %4d (%2d)\n", -// cycle, m_channel, packet->fdf, -// packet->syt, -// packet->dbs, -// packet->dbc, -// packet->fmt, -// *length, -// ((*length / sizeof (quadlet_t)) - 2)/packet->dbs); -// } - - debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"Sending empty packet on cycle %d\n", cycle); - - // FIXME: this is to prevent libraw to do loop over too - // much packets at once (overflowing the receive side) - // but putting it here is a little arbitrary - - - return RAW1394_ISO_DEFER; - } - - -// debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"Now=%4d/%8d, Tstamp=%8d, DT=%8d, T2=%8d, T3=%8d, last TS=%d, BF=%d\n", -// cycle,(cycle*3072), -// timestamp, -// timestamp-(cycle*3072), -// T2,T3, -// m_last_timestamp, -// buffer_fill); - - enum raw1394_iso_disposition retval = RAW1394_ISO_OK; - - unsigned int read_size=nevents*sizeof(quadlet_t)*m_dimension; - - if ((freebob_ringbuffer_read(m_event_buffer,(char *)(data+8),read_size)) < - read_size) - { - /* there is no more data in the ringbuffer */ - - debugWarning("Transmit buffer underrun (cycle %d, FC=%d, PC=%d)\n", - cycle, m_framecounter, m_handler->getPacketCount()); - - // signal underrun - m_xruns++; + // this means no-data packets with payload (DICE doesn't like that) + *length = 2*sizeof(quadlet_t) + m_syt_interval * m_dimension * sizeof(quadlet_t); + + // this means no-data packets without payload + //*length = 2*sizeof(quadlet_t); retval=RAW1394_ISO_DEFER; - *length=0; - nevents=0; - - } else { - retval=RAW1394_ISO_OK; *length = read_size + 8; - + // process all ports that should be handled on a per-packet base // this is MIDI for AMDTP (due to the need of DBC) @@ -332,43 +273,32 @@ debugWarning("Problem encoding Packet Ports\n"); } - - // we can forget the seconds for the cycle counter - // because we are masking with 0xFFFF - - unsigned int timestamp_SYT = (TICKS_TO_CYCLES(timestamp) << 12) - | TICKS_TO_OFFSET(timestamp); - - packet->syt = ntohs(timestamp_SYT & 0xffff); - -// debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"XMIT %d EVENTS, SYT %04X for cycle %2d: %08d (%2u cycles + %04u ticks)\n", -// nevents, timestamp_SYT & 0xFFFF, cycle, timestamp_SYT -// CYCLE_COUNTER_GET_CYCLES(timestamp_SYT), -// CYCLE_COUNTER_GET_OFFSET(timestamp_SYT) -// ); - } - - *tag = IEC61883_TAG_WITH_CIP; - *sy = 0; - - // update the frame counter - incrementFrameCounter(nevents); - -/* if(m_framecounter>m_period) { - retval=RAW1394_ISO_DEFER; - }*/ - -#ifdef DEBUG -// if(packet->dbs) { -// debugOutput(DEBUG_LEVEL_VERY_VERBOSE, -// "XMT %04d: CH = %d, FDF = %X. SYT = %6d, DBS = %3d, DBC = %3d, FMT = %3d, LEN = %4d (%2d)\n", -// cycle, m_channel, packet->fdf, -// packet->syt, -// packet->dbs, -// packet->dbc, -// packet->fmt, -// *length, -// ((*length / sizeof (quadlet_t)) - 2)/packet->dbs); -// } -#endif + + packet->fdf = m_fdf; + + // convert the timestamp to SYT format + unsigned int timestamp_SYT = TICKS_TO_SYT(timestamp); + packet->syt = ntohs(timestamp_SYT); + + retval=RAW1394_ISO_OK; + } + + // calculate the new buffer head timestamp. this is + // the timestamp of the current packet plus + // SYT_INTERVAL * rate + + timestamp += (int64_t)((float)m_syt_interval * ticks_per_frame ); + + // check if it wrapped + if (timestamp >= TICKS_PER_SECOND * 128L) { + timestamp -= TICKS_PER_SECOND * 128L; + } + + // update the frame counter such that it reflects the new value + // also update the buffer head timestamp + // done in the SP base class + if (!StreamProcessor::getFrames(nevents, timestamp)) { + debugError("Could not do StreamProcessor::getFrames(%d, %llu)\n",nevents, timestamp); + retval=RAW1394_ISO_ERROR; + } return retval; @@ -376,35 +306,23 @@ } -void AmdtpTransmitStreamProcessor::decrementFrameCounter() { - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "decrement frame counter...\n"); - -#ifdef DEBUG - int xmit_bufferspace=freebob_ringbuffer_read_space(m_event_buffer)/m_dimension/4; - int recv_bufferspace=freebob_ringbuffer_read_space(syncmaster->m_event_buffer)/syncmaster->m_dimension/4; - - debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"XMT: %5d | RCV: %5d | DIFF: %5d | SUM: %5d \n", xmit_bufferspace, recv_bufferspace, xmit_bufferspace - recv_bufferspace, xmit_bufferspace + recv_bufferspace); -#endif - - // update the timestamp - - m_last_timestamp=syncmaster->getPeriodTimeStamp(); - - StreamProcessor::decrementFrameCounter(); -} - -void AmdtpTransmitStreamProcessor::incrementFrameCounter(int nbframes) { - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "increment frame counter by %d...\n", nbframes); - - StreamProcessor::incrementFrameCounter(nbframes); -} - -bool AmdtpTransmitStreamProcessor::isOnePeriodReady() -{ - return true; - //return (m_framecounter > m_period); +int64_t AmdtpTransmitStreamProcessor::getTimeUntilNextPeriodUsecs() { + debugFatal("IMPLEMENT ME!"); + return 0; +} + +uint64_t AmdtpTransmitStreamProcessor::getTimeAtPeriodUsecs() { + // then we should convert this into usecs + // FIXME: we assume that the TimeSource of the IsoHandler is + // in usecs. + return m_handler->mapToTimeSource(getTimeAtPeriod()); +} + +uint64_t AmdtpTransmitStreamProcessor::getTimeAtPeriod() { + debugFatal("IMPLEMENT ME!"); + + return 0; } bool AmdtpTransmitStreamProcessor::prefill() { - if(!transferSilence(m_ringbuffer_size_frames)) { debugFatal("Could not prefill transmit stream\n"); @@ -412,27 +330,21 @@ } -/* int i=m_nb_buffers; - while(i--) { - if(!transferSilence(m_period)) { - debugFatal("Could not prefill transmit stream\n"); - return false; - } - } - - // and we should also provide enough prefill for the - // SYT processing delay - if(!transferSilence(RECEIVE_PROCESSING_DELAY_IN_SAMPLES)) { - debugFatal("Could not prefill transmit stream (2)\n"); + // when the buffer is prefilled, we should + // also initialize the base timestamp + // this base timestamp is the timestamp of the + // last buffer transfer. + uint64_t ts; + uint64_t fc; + m_SyncSource->getBufferHeadTimestamp(&ts, &fc); + + // update the frame counter such that it reflects the buffer content, + // and also update the buffer tail timestamp + // done in the SP base class + if (!StreamProcessor::putFrames(m_ringbuffer_size_frames, ts)) { + debugError("Could not do StreamProcessor::putFrames(%d, %llu)\n", + m_ringbuffer_size_frames, ts); return false; } -*/ - // the framecounter should be pulled back to - // make sure the ISO buffering is used - // we are using 1 period of iso buffering -// m_framecounter=-m_period; - - // should this also be pre-buffered? - //m_framecounter=-(m_framerate * RECEIVE_PROCESSING_DELAY)/TICKS_PER_SECOND; - + return true; @@ -524,11 +436,13 @@ m_ringbuffer_size_frames=m_nb_buffers * m_period; - // add the processing delay - m_ringbuffer_size_frames+=RECEIVE_PROCESSING_DELAY_IN_SAMPLES; + // prepare the framerate estimate + m_ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_framerate); + + // add the receive processing delay + m_ringbuffer_size_frames+=(uint)(RECEIVE_PROCESSING_DELAY/m_ticks_per_frame); if( !(m_event_buffer=freebob_ringbuffer_create( (m_dimension * m_ringbuffer_size_frames) * sizeof(quadlet_t)))) { debugFatal("Could not allocate memory event ringbuffer"); -// return -ENOMEM; return false; } @@ -539,5 +453,4 @@ freebob_ringbuffer_free(m_event_buffer); return false; -// return -ENOMEM; } @@ -633,6 +546,10 @@ return false; } - - // we should prefill the event buffer + + // prefilling is done in ...() + // because at that point the streams are running, + // while here they are not. + + // prefill the event buffer if (!prefill()) { debugFatal("Could not prefill buffers\n"); @@ -652,4 +569,25 @@ } +bool AmdtpTransmitStreamProcessor::prepareForStart() { + + return true; +} + +bool AmdtpTransmitStreamProcessor::prepareForStop() { + disable(); + return true; +} + +bool AmdtpTransmitStreamProcessor::prepareForEnable() { + uint64_t ts; + uint64_t fc; + + m_SyncSource->getBufferHeadTimestamp(&ts, &fc); + + setBufferTailTimestamp(ts); + + return true; +} + bool AmdtpTransmitStreamProcessor::transferSilence(unsigned int size) { /* a naive implementation would look like this: */ @@ -668,23 +606,13 @@ } -bool AmdtpTransmitStreamProcessor::transfer() { +bool AmdtpTransmitStreamProcessor::canClientTransferFrames(unsigned int nbframes) { + // there has to be enough space to put the frames in + return m_ringbuffer_size_frames - getFrameCounter() > nbframes; +} + +bool AmdtpTransmitStreamProcessor::putFrames(unsigned int nbframes, int64_t ts) { m_PeriodStat.mark(freebob_ringbuffer_read_space(m_event_buffer)/(4*m_dimension)); debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n"); - // TODO: improve -/* a naive implementation would look like this: - - unsigned int write_size=m_period*sizeof(quadlet_t)*m_dimension; - char *dummybuffer=(char *)calloc(sizeof(quadlet_t),m_period*m_dimension); - transmitBlock(dummybuffer, m_period, 0, 0); - - if (freebob_ringbuffer_write(m_event_buffer,(char *)(dummybuffer),write_size) < write_size) { - debugWarning("Could not write to event buffer\n"); - } - - - free(dummybuffer); -*/ -/* but we're not that naive anymore... */ int xrun; unsigned int offset=0; @@ -693,5 +621,5 @@ // we received one period of frames // this is period_size*dimension of events - int events2write=m_period*m_dimension; + int events2write=nbframes*m_dimension; int bytes2write=events2write*sizeof(quadlet_t); @@ -709,5 +637,5 @@ int byteswritten=0; - unsigned int frameswritten=(m_period*cluster_size-bytes2write)/cluster_size; + unsigned int frameswritten=(nbframes*cluster_size-bytes2write)/cluster_size; offset=frameswritten; @@ -761,5 +689,5 @@ // xrun detected debugError("XMT: Frame buffer underrun in processor %p\n",this); - break; + break; // FIXME: return false ? } @@ -771,4 +699,14 @@ assert(bytes2write%cluster_size==0); + } + + // update the frame counter such that it reflects the new value, + // and also update the buffer tail timestamp + // done in the SP base class + debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor::putFrames(%d, %llu)\n",nbframes, ts); + + if (!StreamProcessor::putFrames(nbframes, ts)) { + debugError("Could not do StreamProcessor::putFrames(%d, %llu)\n",nbframes, ts); + return false; } @@ -859,5 +797,5 @@ quadlet_t *target_event=NULL; - int j; + unsigned int j; for ( PortVectorIterator it = m_PacketPorts.begin(); @@ -989,5 +927,5 @@ AmdtpReceiveStreamProcessor::AmdtpReceiveStreamProcessor(int port, int framerate, int dimension) - : ReceiveStreamProcessor(port, framerate), m_dimension(dimension), m_last_timestamp(0), m_last_timestamp2(0), m_one_period_passed(false) { + : ReceiveStreamProcessor(port, framerate), m_dimension(dimension), m_last_timestamp(0), m_last_timestamp2(0) { @@ -1021,5 +959,4 @@ struct iec61883_packet *packet = (struct iec61883_packet *) data; assert(packet); - unsigned long in_time=debugGetCurrentTSC(); #ifdef DEBUG @@ -1029,30 +966,44 @@ #endif - // how are we going to get this right??? -// m_running=true; - if((packet->fmt == 0x10) && (packet->fdf != 0xFF) && (packet->syt != 0xFFFF) && (packet->dbs>0) && (length>=2*sizeof(quadlet_t))) { unsigned int nevents=((length / sizeof (quadlet_t)) - 2)/packet->dbs; - // signal that we're running - if(nevents) m_running=true; - - - // do the time stamp processing - // put the last time stamp a variable - // this will allow us to determine the - // actual presentation time later - bool wraparound_occurred=false; - + //=> store the previous timestamp m_last_timestamp2=m_last_timestamp; + //=> convert the SYT to ticks unsigned int syt_timestamp=ntohs(packet->syt); + + debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"ch%2u: CY=%4u, SYT=%08X (%3u secs + %4u cycles + %04u ticks)\n", + channel, cycle,syt_timestamp, CYCLE_TIMER_GET_SECS(syt_timestamp), + CYCLE_TIMER_GET_CYCLES(syt_timestamp), CYCLE_TIMER_GET_OFFSET(syt_timestamp)); + + // reconstruct the full cycle + unsigned int cc=m_handler->getCycleTimer(); + unsigned int cc_cycles=CYCLE_TIMER_GET_CYCLES(cc); + unsigned int cc_seconds=CYCLE_TIMER_GET_SECS(cc); + + // the cycletimer has wrapped since this packet was received + // we want cc_seconds to reflect the 'seconds' at the point this + // was received + if (cycle>cc_cycles) { + if (cc_seconds) { + cc_seconds--; + } else { + // seconds has wrapped around, so we'd better not substract 1 + // the good value is 127 + cc_seconds=127; + } + } + // reconstruct the top part of the timestamp using the current cycle number unsigned int now_cycle_masked=cycle & 0xF; - unsigned int syt_cycle=CYCLE_COUNTER_GET_CYCLES(syt_timestamp); + unsigned int syt_cycle=CYCLE_TIMER_GET_CYCLES(syt_timestamp); // if this is true, wraparound has occurred, undo this wraparound if(syt_cycle7999) { - debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"Detected wraparound: %d + %d = %d\n",cycle,delta_cycles,new_cycles); + // if the cycles cause a wraparound of the cycle timer, + // perform this wraparound + // and convert the timestamp into ticks + if(new_cycles<8000) { + m_last_timestamp = new_cycles * TICKS_PER_CYCLE; + } else { + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, + "Detected wraparound: %d + %d = %d\n", + cycle,delta_cycles,new_cycles); new_cycles-=8000; // wrap around - wraparound_occurred=true; + m_last_timestamp = new_cycles * TICKS_PER_CYCLE; + // add one second due to wraparound + m_last_timestamp += TICKS_PER_SECOND; } - m_last_timestamp = (new_cycles) << 12; - - // now add the offset part on top of that - m_last_timestamp |= (syt_timestamp & 0xFFF); - - // mask off the seconds field - - // m_last_timestamp timestamp now contains all info, - // including cycle number - - if (m_last_timestamp && m_last_timestamp2) { - // try and estimate the frame rate from the device: - int measured_difference=((int)(CYCLE_COUNTER_TO_TICKS(m_last_timestamp))) - -((int)(CYCLE_COUNTER_TO_TICKS(m_last_timestamp2))); + m_last_timestamp += CYCLE_TIMER_GET_OFFSET(syt_timestamp); + m_last_timestamp += cc_seconds * TICKS_PER_SECOND; + + //=> now estimate the device frame rate + if (m_last_timestamp2 && m_last_timestamp) { + // try and estimate the frame rate from the device - // handle wrap around of the cycle variable if nescessary - // it can be that two successive timestamps cause wraparound - // (if the difference between time stamps is larger than 2 cycles), - // thus it isn't always nescessary - if (wraparound_occurred & (m_last_timestamp correcting for timestamp difference wraparound\n"); - measured_difference+=TICKS_PER_SECOND; + // first get the measured difference between both + // timestamps + int64_t measured_difference; + measured_difference=((int64_t)(m_last_timestamp)) + -((int64_t)(m_last_timestamp2)); + // correct for seconds wraparound + if (m_last_timestamp 1.5*((TICKS_PER_SECOND*1.0) / m_framerate)*m_syt_interval) { debugWarning("Timestamp diff more than 50%% of the nominal diff too large!\n"); - debugWarning(" SYT: %08X | STMP: %08X,%08X | DLL: in=%5.0f, current=%f, err=%e\n",syt_timestamp, m_last_timestamp, m_last_timestamp2, f,m_ticks_per_frame,err); + debugWarning(" SYT: %08X | STMP: %llu,%llu | DLL: in=%f, current=%f, err=%e\n",syt_timestamp, m_last_timestamp, m_last_timestamp2, f,m_ticks_per_frame,err); + debugWarning(" CC: %08X | CC_CY: %u | CC_SEC: %u | SYT_CY: %u | NEW_CY: %u\n", + cc, cc_cycles, cc_seconds, syt_cycle,new_cycles); + } if(f < 0.5*((TICKS_PER_SECOND*1.0) / m_framerate)*m_syt_interval) { debugWarning("Timestamp diff more than 50%% of the nominal diff too small!\n"); - debugWarning(" SYT: %08X | STMP: %08X,%08X | DLL: in=%5.0f, current=%f, err=%e\n",syt_timestamp, m_last_timestamp, m_last_timestamp2, f,m_ticks_per_frame,err); + debugWarning(" SYT: %08X | STMP: %llu,%llu | DLL: in=%f, current=%f, err=%e\n",syt_timestamp, m_last_timestamp, m_last_timestamp2, f,m_ticks_per_frame,err); } #endif - // integrate the error m_ticks_per_frame += RECEIVE_DLL_INTEGRATION_COEFFICIENT*err; @@ -1116,36 +1072,37 @@ debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"R-SYT for cycle (%2d %2d)=>%2d: %5uT (%04uC + %04uT) %04X %04X %d\n", cycle,now_cycle_masked,delta_cycles, - CYCLE_COUNTER_TO_TICKS(m_last_timestamp), - CYCLE_COUNTER_GET_CYCLES(m_last_timestamp), - CYCLE_COUNTER_GET_OFFSET(m_last_timestamp), - ntohs(packet->syt),m_last_timestamp&0xFFFF, dropped + (m_last_timestamp), + TICKS_TO_CYCLES(m_last_timestamp), + TICKS_TO_OFFSET(m_last_timestamp), + ntohs(packet->syt),TICKS_TO_CYCLE_TIMER(m_last_timestamp)&0xFFFF, dropped ); -#ifdef DEBUG - if(m_last_timestamp signal that we're running (if we are) + if(!m_running && nevents && m_last_timestamp2 && m_last_timestamp) m_running=true; + + //=> don't process the stream samples when it is not enabled. if(m_disabled) { + // we keep track of the timestamp here + // this makes sure that we will have a somewhat accurate + // estimate as to when a period might be ready. i.e. it will not + // be ready earlier than this timestamp + period time + + // the next (possible) sample is not this one, but lies + // SYT_INTERVAL * rate later + uint64_t ts=m_last_timestamp+(uint64_t)((float)m_syt_interval * m_ticks_per_frame); + StreamProcessor::setBufferTimestamps(ts,ts); + return RAW1394_ISO_DEFER; } - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "put packet...\n"); - + + //=> process the packet unsigned int write_size=nevents*sizeof(quadlet_t)*m_dimension; + // add the data payload to the ringbuffer - if (freebob_ringbuffer_write(m_event_buffer,(char *)(data+8),write_size) < write_size) { debugWarning("Receive buffer overrun (cycle %d, FC=%d, PC=%d)\n", cycle, m_framecounter, m_handler->getPacketCount()); + m_xruns++; @@ -1159,5 +1116,4 @@ retval=RAW1394_ISO_DEFER; } - } @@ -1176,226 +1132,125 @@ #endif - // update the frame counter - incrementFrameCounter(nevents); - if(m_framecounter>(signed int)m_period) { - retval=RAW1394_ISO_DEFER; - debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"defer!\n"); + // update the frame counter such that it reflects the new value, + // and also update the buffer tail timestamp, as we add new frames + // done in the SP base class + if (!StreamProcessor::putFrames(nevents, m_last_timestamp)) { + debugError("Could not do StreamProcessor::putFrames(%d, %llu)\n",nevents, m_last_timestamp); + return RAW1394_ISO_ERROR; } - } else { - // discard packet - // can be important for sync though - - // FIXME: this is to prevent libraw to do loop over too - // much packets at once (draining the xmit side) - // but putting it here is a little arbitrary - retval=RAW1394_ISO_DEFER; - } - - m_PacketStat.mark(debugGetCurrentTSC()-in_time); - -// m_PacketStat.mark(freebob_ringbuffer_read_space(m_event_buffer)/(4*m_dimension)); + } return retval; } -// this uses SYT to determine if one period is ready -bool AmdtpReceiveStreamProcessor::isOnePeriodReady() { - -#ifdef DO_SYT_SYNC - // this code is not ready yet - - // one sample will take a number off cycle counter ticks: - // The number of ticks per second is 24576000 - // The number of samples per second is Fs - // therefore the number of ticks per sample is 24576000 / Fs - // NOTE: this will be rounded!! -// float ticks_per_sample=24576000.0/m_framerate; - float ticks_per_sample=m_ticks_per_frame; - - // we are allowed to add some constant - // processing delay to the transfer delay - // being the period size and some fixed delay -// unsigned int processing_delay=ticks_per_sample*(m_period)+RECEIVE_PROCESSING_DELAY; - unsigned int processing_delay=ticks_per_sample*(m_period+RECEIVE_PROCESSING_DELAY_IN_SAMPLES); - - // the number of events in the buffer is - // m_framecounter - - // we have the timestamp of the last event block: - // m_last_timestamp - - // the time at which the beginning of the buffer should be - // presented to the audio side is: - // m_last_timestamp - (m_framecounter-m_syt_interval)*ticks_per_sample - - // NOTE: in fact, we don't have to check this, because it should always be the case - // - // WAS: however we have to make sure that we can transfer at least one period - // therefore we first check if this is ok - -// if(m_framecounter > (signed int)m_period) { - - unsigned int m_last_timestamp_ticks = CYCLE_COUNTER_TO_TICKS(m_last_timestamp); - - // add the processing delay - int ideal_presentation_time = m_last_timestamp_ticks + processing_delay; - int buffer_content_ticks=((int)m_framecounter)-((int)m_syt_interval); - buffer_content_ticks *= ticks_per_sample; - - // if the ideal_presentation_time is smaller than buffer_content_ticks, wraparound has occurred - // for the cycle part of m_last_timestamp. Therefore add one second worth of ticks - // to the cycle counter, as this is the wraparound point. - if (ideal_presentation_time < buffer_content_ticks) ideal_presentation_time += 24576000; - // we can now safely substract these, it will always be > 0 - ideal_presentation_time -= buffer_content_ticks; - - // FIXME: if we are sure, make ideal_presentation_time an unsigned int -// assert(ideal_presentation_time>=0); - - unsigned int current_time_ticks = (m_handler->getCycleCounter() % TICKS_PER_SECOND ); - -#ifdef DEBUG - if(ideal_presentation_time<0) { - debugWarning("ideal_presentation_time time is negative!\n"); - debugOutput(DEBUG_LEVEL_VERBOSE,"Periods: %d, FC: %d, remote framerate %f\n", - m_PeriodStat.m_count, m_framecounter, m_ticks_per_frame); - debugOutput(DEBUG_LEVEL_VERBOSE,"p-delay: %u, buffer_content: %d\n", - processing_delay, buffer_content_ticks); - debugOutput(DEBUG_LEVEL_VERBOSE,"Timestamp : %10u ticks (%3u secs + %4u cycles + %04u ticks)\n", - m_last_timestamp_ticks, - CYCLE_COUNTER_GET_SECS(m_last_timestamp), - CYCLE_COUNTER_GET_CYCLES(m_last_timestamp), - CYCLE_COUNTER_GET_OFFSET(m_last_timestamp) - ); - debugOutput(DEBUG_LEVEL_VERBOSE,"P-TIME : %10d ticks (%3u secs + %4u cycles + %04u ticks)\n", - ideal_presentation_time, - TICKS_TO_SECS(ideal_presentation_time), - TICKS_TO_CYCLES(ideal_presentation_time), - TICKS_TO_OFFSET(ideal_presentation_time) - ); - debugOutput(DEBUG_LEVEL_VERBOSE,"Now : %10u ticks (%3u secs + %4u cycles + %04u ticks)\n", - current_time_ticks, - TICKS_TO_SECS(current_time_ticks), - TICKS_TO_CYCLES(current_time_ticks), - TICKS_TO_OFFSET(current_time_ticks) - ); - return false; - } - if(ideal_presentation_time 24576000/2) tmp-=24576000; - - if(tmp<0) { - debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"SYT passed (%d ticks too late)\n",-tmp); - debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"Periods: %d, remote ticks/frame: %f, remote framerate = %f\n", - m_PeriodStat.m_count, m_ticks_per_frame, 24576000.0/m_ticks_per_frame); - debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"Bufferfill %d, framecounter %d\n", - freebob_ringbuffer_read_space(m_event_buffer)/(4*m_dimension),m_framecounter); - if (-tmp>1000000) debugWarning("SYT VERY LATE: %d!\n",-tmp); - - m_WakeupStat.mark(m_framecounter); - - m_one_period_passed=true; - m_last_timestamp_at_period_ticks=ideal_presentation_time; - - return true; - } else { - debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"Too early wait %d ticks\n",tmp); - return false; - } -// } else { -// return false; -// } -#else - if(m_framecounter > m_period) { - return true; - } else return false; -#endif -} - -unsigned int AmdtpReceiveStreamProcessor::getPeriodTimeStamp() { - if (m_one_period_passed) { - return m_last_timestamp_at_period_ticks; - } else { - // float ticks_per_sample=24576000.0/m_framerate; - float ticks_per_sample=m_ticks_per_frame; - - // we are allowed to add some constant - // processing delay to the transfer delay - // being the period size and some fixed delay - // unsigned int processing_delay=ticks_per_sample*(m_period)+RECEIVE_PROCESSING_DELAY; - unsigned int processing_delay=ticks_per_sample*(m_period+RECEIVE_PROCESSING_DELAY_IN_SAMPLES); - - unsigned int m_last_timestamp_ticks = CYCLE_COUNTER_TO_TICKS(m_last_timestamp); - - // add the processing delay - int ideal_presentation_time = m_last_timestamp_ticks + processing_delay; - unsigned int buffer_content_ticks=(int)((m_framecounter-m_syt_interval)*ticks_per_sample); - - // if the ideal_presentation_time is smaller than buffer_content_ticks, wraparound has occurred - // for the cycle part of m_last_timestamp. Therefore add one second worth of ticks - // to the cycle counter, as this is the wraparound point. - if (ideal_presentation_time < buffer_content_ticks) ideal_presentation_time += 24576000; - // we can now safely substract these, it will always be > 0 - ideal_presentation_time -= buffer_content_ticks; - - return ideal_presentation_time; - } +int64_t AmdtpReceiveStreamProcessor::getTimeUntilNextPeriodUsecs() { + uint64_t time_at_period=getTimeAtPeriod(); + + uint64_t cycle_timer=m_handler->getCycleTimerTicks(); + + int64_t until_next=time_at_period-cycle_timer; + + // the maximal difference we can allow (64secs) + const int64_t max=TICKS_PER_SECOND*64L; + + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> TAP=%11llu, CTR=%11llu, UTN=%11lld, TPUS=%f\n", + time_at_period, cycle_timer, until_next, m_handler->getTicksPerUsec() + ); + + if(until_next > max) { + // this means that cycle_timer has wrapped, but + // time_at_period has not. we should unwrap cycle_timer + // by adding TICKS_PER_SECOND*128L, meaning that we should substract + // this value from until_next + until_next -= TICKS_PER_SECOND*128L; + } else if (until_next < -max) { + // this means that time_at_period has wrapped, but + // cycle_timer has not. we should unwrap time_at_period + // by adding TICKS_PER_SECOND*128L, meaning that we should add + // this value from until_next + until_next += TICKS_PER_SECOND*128L; + } + + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " TAP=%11llu, CTR=%11llu, UTN=%11lld, TPUS=%f\n", + time_at_period, cycle_timer, until_next, m_handler->getTicksPerUsec() + ); + + // now convert to usecs + // don't use the mapping function because it only works + // for absolute times, not the relative time we are + // using here (which can also be negative). + return (int64_t)(((float)until_next) / m_handler->getTicksPerUsec()); +} + +uint64_t AmdtpReceiveStreamProcessor::getTimeAtPeriodUsecs() { + // then we should convert this into usecs + // FIXME: we assume that the TimeSource of the IsoHandler is + // in usecs. + return m_handler->mapToTimeSource(getTimeAtPeriod()); +} + +uint64_t AmdtpReceiveStreamProcessor::getTimeAtPeriod() { + + // every time a packet is received both the framecounter and the base + // timestamp are updated. This means that at any instance of time, the + // front of the buffer (latest sample) timestamp is known. + // As we know the number of frames in the buffer, and we now the rate + // in ticks/frame, we can calculate the back of buffer timestamp as: + // back_of_buffer_time = front_time - nbframes * rate + // the next period boundary time lies m_period frames later: + // next_period_boundary = back_of_buffer_time + m_period * rate + + // NOTE: we should account for the fact that the timestamp is not for + // the latest sample, but for the latest sample minus syt_interval-1 + // because it is the timestamp for the first sample in the packet, + // while the complete packet contains SYT_INTERVAL samples. + // this makes the equation: + // back_of_buffer_time = front_time - (nbframes - (syt_interval - 1)) * rate + // next_period_boundary = back_of_buffer_time + m_period * rate + + // NOTE: where do we add the processing delay? + // if we add it here: + // next_period_boundary += RECEIVE_PROCESSING_DELAY + + // the complete equation now is: + // next_period_boundary = front_time - (nbframes - (syt_interval - 1)) * rate + // + m_period * rate + RECEIVE_PROCESSING_DELAY + // since syt_interval is a constant value, we can equally well ignore it, as + // if it were already included in RECEIVE_PROCESSING_DELAY + // making the equation (simplified: + // next_period_boundary = front_time + (-nbframes + m_period) * rate + // + RECEIVE_PROCESSING_DELAY + // currently this is in ticks + + int64_t next_period_boundary = m_last_timestamp; + next_period_boundary += (int64_t)(((int64_t)m_period-(int64_t)m_framecounter) * m_ticks_per_frame); + next_period_boundary += RECEIVE_PROCESSING_DELAY; + + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD=%11lld, LTS=%11llu, FC=%5d, TPF=%f\n", + next_period_boundary, m_last_timestamp, m_framecounter, m_ticks_per_frame + ); + + // this happens if the timestamp wraps around while there are a lot of + // frames in the buffer. We should add the wraparound value of the ticks + // counter + if (next_period_boundary < 0) { + next_period_boundary += TICKS_PER_SECOND * 128L; + } + // this happens when the last timestamp is near wrapping, and + // m_framecounter is low. + // this means: m_last_timestamp is near wrapping and have just had + // a getPackets() from the client side. the projected next_period + // boundary lies beyond the wrap value. + // the action is to wrap the value. + else if (next_period_boundary >= TICKS_PER_SECOND * 128L) { + next_period_boundary -= TICKS_PER_SECOND * 128L; + } + + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " NPD=%11lld, LTS=%11llu, FC=%5d, TPF=%f\n", + next_period_boundary, m_last_timestamp, m_framecounter, m_ticks_per_frame + ); + + return next_period_boundary; } @@ -1424,21 +1279,9 @@ freebob_ringbuffer_reset(m_event_buffer); - // reset the last timestamp - m_last_timestamp=0; - m_PeriodStat.reset(); m_PacketStat.reset(); m_WakeupStat.reset(); - - // reset the framerate estimate - m_ticks_per_frame = (TICKS_PER_SECOND*1.0) / m_framerate; - - debugOutput(DEBUG_LEVEL_VERBOSE,"Initializing remote ticks/frame to %f\n",m_ticks_per_frame); - - //reset the timestamps - m_last_timestamp=0; - m_last_timestamp2=0; - - m_one_period_passed=false; + + m_ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_framerate); // reset all non-device specific stuff @@ -1491,5 +1334,5 @@ // prepare the framerate estimate - m_ticks_per_frame = (TICKS_PER_SECOND*1.0) / m_framerate; + m_ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_framerate); debugOutput(DEBUG_LEVEL_VERBOSE,"Initializing remote ticks/frame to %f\n",m_ticks_per_frame); @@ -1500,6 +1343,6 @@ // add the processing delay debugOutput(DEBUG_LEVEL_VERBOSE,"Adding %u frames of SYT slack buffering...\n", - RECEIVE_PROCESSING_DELAY_IN_SAMPLES); - ringbuffer_size_frames+=RECEIVE_PROCESSING_DELAY_IN_SAMPLES; + (uint)(RECEIVE_PROCESSING_DELAY/m_ticks_per_frame)); + ringbuffer_size_frames+=(uint)(RECEIVE_PROCESSING_DELAY/m_ticks_per_frame); if( !(m_event_buffer=freebob_ringbuffer_create( @@ -1601,5 +1444,19 @@ } -bool AmdtpReceiveStreamProcessor::transfer() { +bool AmdtpReceiveStreamProcessor::prepareForStart() { + disable(); + return true; +} + +bool AmdtpReceiveStreamProcessor::prepareForStop() { + disable(); + return true; +} + +bool AmdtpReceiveStreamProcessor::canClientTransferFrames(unsigned int nbframes) { + return getFrameCounter() >= (int) nbframes; +} + +bool AmdtpReceiveStreamProcessor::getFrames(unsigned int nbframes, int64_t ts) { m_PeriodStat.mark(freebob_ringbuffer_read_space(m_event_buffer)/(4*m_dimension)); @@ -1607,15 +1464,4 @@ debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n"); -/* another naive section: - unsigned int read_size=m_period*sizeof(quadlet_t)*m_dimension; - char *dummybuffer=(char *)calloc(sizeof(quadlet_t),m_period*m_dimension); - if (freebob_ringbuffer_read(m_event_buffer,(char *)(dummybuffer),read_size) < read_size) { - debugWarning("Could not read from event buffer\n"); - } - - receiveBlock(dummybuffer, m_period, 0); - - free(dummybuffer); -*/ int xrun; unsigned int offset=0; @@ -1625,5 +1471,5 @@ // this is period_size*dimension of events - int events2read=m_period*m_dimension; + int events2read=nbframes*m_dimension; int bytes2read=events2read*sizeof(quadlet_t); /* read events2read bytes from the ringbuffer @@ -1638,5 +1484,5 @@ while(bytes2read>0) { - unsigned int framesread=(m_period*cluster_size-bytes2read)/cluster_size; + unsigned int framesread=(nbframes*cluster_size-bytes2read)/cluster_size; offset=framesread; @@ -1695,6 +1541,22 @@ assert(bytes2read%cluster_size==0); } - - return true; + + // update the frame counter such that it reflects the new value, + // and also update the buffer head timestamp as we pull frames + // done in the SP base class + + // wrap the timestamp if nescessary + if (ts < 0) { + ts += TICKS_PER_SECOND * 128L; + } else if (ts >= TICKS_PER_SECOND * 128L) { + ts -= TICKS_PER_SECOND * 128L; + } + + if (!StreamProcessor::getFrames(nbframes, ts)) { + debugError("Could not do StreamProcessor::getFrames(%d, %llu)\n",nbframes, ts); + return false; + } + + return true; } @@ -1752,5 +1614,5 @@ quadlet_t *target_event=NULL; - int j; + unsigned int j; for ( PortVectorIterator it = m_PacketPorts.begin(); Index: branches/streaming-rework/src/libstreaming/IsoHandlerManager.cpp =================================================================== --- branches/streaming-rework/src/libstreaming/IsoHandlerManager.cpp (revision 383) +++ branches/streaming-rework/src/libstreaming/IsoHandlerManager.cpp (revision 384) @@ -32,5 +32,4 @@ #include - namespace FreebobStreaming { @@ -39,5 +38,6 @@ IsoHandlerManager::IsoHandlerManager() : - m_poll_timeout(100), m_poll_fds(0), m_poll_nfds(0) + m_State(E_Created), + m_poll_timeout(1), m_poll_fds(0), m_poll_nfds(0) { @@ -57,14 +57,35 @@ } -// the IsoHandlerManager thread updates the handler caches -// it doesn't iterate them !!! +/** + * the IsoHandlerManager thread execute function iterates the handlers. + * + * This means that once the thread is running, streams are + * transmitted and received (if present on the bus). Make sure + * that the clients are registered & ready before starting the + * thread! + * + * The register and unregister functions are thread unsafe, so + * should not be used when the thread is running. + * + * @return false if the handlers could not be iterated. + */ bool IsoHandlerManager::Execute() { - updateCycleCounters(); - usleep(USLEEP_AFTER_UPDATE); +// updateCycleTimers(); + + if(!iterate()) { + debugFatal("Could not iterate the isoManager\n"); + return false; + } return true; } +/** + * Poll the handlers managed by this manager, and iterate them + * when ready + * + * @return true when successful + */ bool IsoHandlerManager::iterate() { @@ -104,6 +125,6 @@ } -// updates the internal cycle counter caches of the handlers -void IsoHandlerManager::updateCycleCounters() { +// updates the internal cycle timer caches of the handlers +void IsoHandlerManager::updateCycleTimers() { debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n"); @@ -113,5 +134,5 @@ { int cnt=0; - while (!(*it)->updateCycleCounter() && (cnt++ < MAX_UPDATE_TRIES)) { + while (!(*it)->updateCycleTimer() && (cnt++ < MAX_UPDATE_TRIES)) { usleep(USLEEP_AFTER_UPDATE_FAILURE); } @@ -119,22 +140,4 @@ } - -bool IsoHandlerManager::prepare() -{ - debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); - for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin(); - it != m_IsoHandlers.end(); - ++it ) - { - if(!(*it)->prepare()) { - debugFatal("Could not prepare handlers\n"); - return false; - } - } - - return true; -} - - bool IsoHandlerManager::registerHandler(IsoHandler *handler) @@ -205,6 +208,8 @@ void IsoHandlerManager::disablePolling(IsoStream *stream) { + int i=0; + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Disable polling on stream %p\n",stream); - int i=0; + for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin(); it != m_IsoHandlers.end(); @@ -216,12 +221,14 @@ debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "polling disabled\n"); } + i++; } - } void IsoHandlerManager::enablePolling(IsoStream *stream) { + int i=0; + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Enable polling on stream %p\n",stream); - int i=0; + for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin(); it != m_IsoHandlers.end(); @@ -233,4 +240,5 @@ debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "polling enabled\n"); } + i++; } @@ -478,4 +486,36 @@ } + +bool IsoHandlerManager::prepare() +{ + bool retval=true; + + debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); + + // check state + if(m_State != E_Created) { + debugError("Incorrect state, expected E_Created, got %d\n",(int)m_State); + return false; + } + + for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin(); + it != m_IsoHandlers.end(); + ++it ) + { + if(!(*it)->prepare()) { + debugFatal("Could not prepare handlers\n"); + retval=false; + } + } + + if (retval) { + m_State=E_Prepared; + } else { + m_State=E_Error; + } + + return retval; +} + bool IsoHandlerManager::startHandlers() { return startHandlers(-1); @@ -483,5 +523,13 @@ bool IsoHandlerManager::startHandlers(int cycle) { + bool retval=true; + debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); + + // check state + if(m_State != E_Prepared) { + debugError("Incorrect state, expected E_Prepared, got %d\n",(int)m_State); + return false; + } for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin(); @@ -492,13 +540,27 @@ if(!(*it)->start(cycle)) { debugOutput( DEBUG_LEVEL_VERBOSE, " could not start handler (%p)\n",*it); - return false; + retval=false; } } - return true; + if (retval) { + m_State=E_Running; + } else { + m_State=E_Error; + } + + return retval; } bool IsoHandlerManager::stopHandlers() { debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); + + // check state + if(m_State != E_Running) { + debugError("Incorrect state, expected E_Running, got %d\n",(int)m_State); + return false; + } + + bool retval=true; for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin(); @@ -509,9 +571,30 @@ if(!(*it)->stop()){ debugOutput( DEBUG_LEVEL_VERBOSE, " could not stop handler (%p)\n",*it); - return false; + retval=false; } } - return true; -} + + if (retval) { + m_State=E_Prepared; + } else { + m_State=E_Error; + } + + return retval; +} + +bool IsoHandlerManager::reset() { + debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); + + // check state + if(m_State == E_Error) { + debugFatal("Resetting from error condition not yet supported...\n"); + return false; + } + + // if not in an error condition, reset means stop the handlers + return stopHandlers(); +} + void IsoHandlerManager::setVerboseLevel(int i) { @@ -527,7 +610,9 @@ void IsoHandlerManager::dumpInfo() { - debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping IsoHandlerManager Stream handler information...\n"); int i=0; + debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping IsoHandlerManager Stream handler information...\n"); + debugOutputShort( DEBUG_LEVEL_NORMAL, " State: %d\n",(int)m_State); + for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin(); it != m_IsoHandlers.end(); Index: branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp =================================================================== --- branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp (revision 383) +++ branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp (revision 384) @@ -39,5 +39,6 @@ StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int nb_buffers) - : m_nb_buffers(nb_buffers), m_period(period), m_xruns(0), m_isoManager(0), m_nbperiods(0) { + : m_SyncSource(NULL), m_nb_buffers(nb_buffers), m_period(period), m_xruns(0), + m_isoManager(0), m_nbperiods(0) { } @@ -146,9 +147,19 @@ } +bool StreamProcessorManager::setSyncSource(StreamProcessor *s) { + m_SyncSource=s; + return true; +} + +StreamProcessor *StreamProcessorManager::getSyncSource() { + return m_SyncSource; +} + bool StreamProcessorManager::init() { debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); - // the tread that runs the packet iterators + // the tread that runs the StreamProcessor + // checking the period boundaries m_streamingThread=new FreebobUtil::PosixThread(this, m_thread_realtime, m_thread_priority+5, @@ -169,15 +180,7 @@ m_isoManager->setVerboseLevel(getDebugLevel()); - // the tread that keeps the handler's cycle counters up to date - // NOTE: is lower priority nescessary? it can block -// m_isoManagerThread=new FreebobUtil::PosixThread(m_isoManager, m_thread_realtime, m_thread_priority+6, PTHREAD_CANCEL_DEFERRED); - - // now that we are using a DLL, we don't need to run this at RT priority - // it only serves to cope with drift - // however, in order to make the DLL fast enough, we have to increase - // its bandwidth, making it more sensitive to deviations. These deviations - // are mostly determined by the time difference between reading the cycle - // time register and the local cpu clock. - + // the tread that keeps the handler's cycle timers up to date + // and performs the actual packet transfer + // needs high priority m_isoManagerThread=new FreebobUtil::PosixThread( m_isoManager, @@ -201,5 +204,5 @@ if(sem_init(&m_period_semaphore, 0, 0)) { - debugFatal( "Cannot init packet transfer semaphore\n"); + debugFatal( "Cannot init period transfer semaphore\n"); debugFatal( " Error: %s\n",strerror(errno)); return false; @@ -212,26 +215,57 @@ debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n"); - debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n"); + + // if no sync source is set, select one here + if(m_SyncSource == NULL) { + debugWarning("Sync Source is not set. Defaulting to first StreamProcessor.\n"); + } + for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); it != m_ReceiveProcessors.end(); ++it ) { + if(m_SyncSource == NULL) { + debugWarning(" => Sync Source is %p.\n", *it); + m_SyncSource = *it; + } + } + + for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); + it != m_TransmitProcessors.end(); + ++it ) { + if(m_SyncSource == NULL) { + debugWarning(" => Sync Source is %p.\n", *it); + m_SyncSource = *it; + } + } + + // now do the actual preparation + debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n"); + for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); + it != m_ReceiveProcessors.end(); + ++it ) { + if(!(*it)->setSyncSource(m_SyncSource)) { + debugFatal( " could not set sync source (%p)...\n",(*it)); + return false; + } + if(!(*it)->prepare()) { debugFatal( " could not prepare (%p)...\n",(*it)); return false; - - } - } - - debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n"); + } + } + + debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n"); for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); it != m_TransmitProcessors.end(); ++it ) { + if(!(*it)->setSyncSource(m_SyncSource)) { + debugFatal( " could not set sync source (%p)...\n",(*it)); + return false; + } if(!(*it)->prepare()) { debugFatal( " could not prepare (%p)...\n",(*it)); return false; - - } - - } + } + } // if there are no stream processors registered, @@ -245,83 +279,15 @@ } +// FIXME: this can be removed bool StreamProcessorManager::Execute() { - - bool period_ready=true; - bool xrun_has_occured=false; - bool this_period_ready; - -// debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "------------- EXECUTE -----------\n"); - - if(!m_isoManager->iterate()) { - debugFatal("Could not iterate the isoManager\n"); - return false; - } - - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " RCV PROC: "); - for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); - it != m_ReceiveProcessors.end(); - ++it ) { - - this_period_ready = (*it)->isOnePeriodReady(); - period_ready = period_ready && this_period_ready; -// if (this_period_ready) { -// m_isoManager->disablePolling(*it); -// } -// - xrun_has_occured = xrun_has_occured || (*it)->xrunOccurred(); - debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, "(%d/%d/%d) ", period_ready, xrun_has_occured,(*it)->m_framecounter); - } - debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, "\n"); - - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " XMIT PROC: "); - for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - this_period_ready = (*it)->isOnePeriodReady(); - period_ready = period_ready && this_period_ready; -// if (this_period_ready) { -// m_isoManager->disablePolling(*it); -// } - xrun_has_occured = xrun_has_occured || (*it)->xrunOccurred(); - debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, "(%d/%d/%d) ", period_ready, xrun_has_occured,(*it)->m_framecounter); - } - debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, "\n"); - - if(xrun_has_occured) { - // do xrun signaling/handling - debugWarning("Streaming thread detected xrun\n"); - m_xruns++; - m_xrun_happened=true; - sem_post(&m_period_semaphore); + // temp measure, polling + usleep(1000); + + // FIXME: move this to an IsoHandlerManager sub-thread + // and make this private again in IHM + m_isoManager->updateCycleTimers(); - return false; // stop thread - } - - if(period_ready) { - // signal the waiting thread(s?) that a period is ready - sem_post(&m_period_semaphore); - debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, "Period done...\n"); - - for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); - it != m_ReceiveProcessors.end(); - ++it ) { - (*it)->decrementFrameCounter(); -// m_isoManager->enablePolling(*it); - - } - - for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - (*it)->decrementFrameCounter(); -// m_isoManager->enablePolling(*it); - } - - m_nbperiods++; - } - - return true; - + return true; } @@ -335,5 +301,5 @@ it != m_ReceiveProcessors.end(); ++it ) { - if (!(*it)->preparedForStart()) { + if (!(*it)->prepareForStart()) { debugOutput(DEBUG_LEVEL_VERBOSE,"Receive stream processor (%p) failed to prepare for start\n", *it); return false; @@ -343,6 +309,4 @@ return false; } - - } @@ -351,5 +315,5 @@ it != m_TransmitProcessors.end(); ++it ) { - if (!(*it)->preparedForStart()) { + if (!(*it)->prepareForStart()) { debugOutput(DEBUG_LEVEL_VERBOSE,"Transmit stream processor (%p) failed to prepare for start\n", *it); return false; @@ -359,5 +323,4 @@ return false; } - } @@ -368,5 +331,11 @@ } - debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandler...\n"); + debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n"); + if (!disableStreamProcessors()) { + debugFatal("Could not disable StreamProcessors...\n"); + return false; + } + + debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n"); if (!m_isoManager->startHandlers(0)) { debugFatal("Could not start handlers...\n"); @@ -374,11 +343,15 @@ } - debugOutput( DEBUG_LEVEL_VERBOSE, "Starting streaming thread...\n"); - + debugOutput( DEBUG_LEVEL_VERBOSE, "Starting streaming threads...\n"); + + // note: libraw1394 doesn't like it if you poll() and/or iterate() before + // starting the streams. // start the runner thread + // FIXME: maybe this should go into the isomanager itself. + m_isoManagerThread->Start(); + + // start the runner thread + // FIXME: not used anymore (for updatecycletimers ATM, but that's not good) m_streamingThread->Start(); - - // start the runner thread - m_isoManagerThread->Start(); debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to start running...\n"); @@ -434,5 +407,4 @@ // now we reset the frame counters - // FIXME: check how we are going to do sync for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); it != m_ReceiveProcessors.end(); @@ -467,17 +439,17 @@ debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n"); - // and we enable the streamprocessors - for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); - it != m_ReceiveProcessors.end(); - ++it ) { - (*it)->enable(); - m_isoManager->enablePolling(*it); - } - - for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - (*it)->enable(); - m_isoManager->enablePolling(*it); + + debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor...\n"); + if (!m_SyncSource->prepareForEnable()) { + debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n"); + return false; + } + + m_SyncSource->enable(); + + debugOutput( DEBUG_LEVEL_VERBOSE, " All StreamProcessors...\n"); + if (!enableStreamProcessors()) { + debugFatal("Could not enable StreamProcessors...\n"); + return false; } @@ -509,5 +481,5 @@ it != m_ReceiveProcessors.end(); ++it ) { - if(!(*it)->preparedForStop()) allReady = false; + if(!(*it)->prepareForStop()) allReady = false; } @@ -515,5 +487,5 @@ it != m_TransmitProcessors.end(); ++it ) { - if(!(*it)->preparedForStop()) allReady = false; + if(!(*it)->prepareForStop()) allReady = false; } usleep(1000); @@ -560,21 +532,55 @@ } -bool StreamProcessorManager::waitForPeriod() { - - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n"); - - // Wait for packetizer thread to signal a period completion - sem_wait(&m_period_semaphore); - - if(m_xrun_happened) { - debugWarning("Detected underrun\n"); - dumpInfo(); - return false; - } - +/** + * Enables the registered StreamProcessors + * @return true if successful, false otherwise + */ +bool StreamProcessorManager::enableStreamProcessors() { + // and we enable the streamprocessors + for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); + it != m_ReceiveProcessors.end(); + ++it ) { + (*it)->prepareForEnable(); + (*it)->enable(); + } + + for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); + it != m_TransmitProcessors.end(); + ++it ) { + (*it)->prepareForEnable(); + (*it)->enable(); + } return true; - -} - +} + +/** + * Disables the registered StreamProcessors + * @return true if successful, false otherwise + */ +bool StreamProcessorManager::disableStreamProcessors() { + // and we disable the streamprocessors + for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); + it != m_ReceiveProcessors.end(); + ++it ) { + (*it)->prepareForDisable(); + (*it)->disable(); + } + + for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); + it != m_TransmitProcessors.end(); + ++it ) { + (*it)->prepareForDisable(); + (*it)->disable(); + } + return true; +} + +/** + * Called upon Xrun events. This brings all StreamProcessors back + * into their starting state, and then carries on streaming. This should + * have the same effect as restarting the whole thing. + * + * @return true if successful, false otherwise + */ bool StreamProcessorManager::handleXrun() { @@ -583,13 +589,14 @@ /* * Reset means: - * 1) Stopping the packetizer thread + * 1) Disabling the SP's, so that they don't process any packets + * note: the isomanager does keep on delivering/requesting them * 2) Bringing all buffers & streamprocessors into a know state * - Clear all capture buffers * - Put nb_periods*period_size of null frames into the playback buffers - * 3) Restarting the packetizer thread + * 3) Re-enable the SP's */ - debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping processormanager...\n"); - if(!stop()) { - debugFatal("Could not stop.\n"); + debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n"); + if (!disableStreamProcessors()) { + debugFatal("Could not disable StreamProcessors...\n"); return false; } @@ -597,5 +604,5 @@ debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting Processors...\n"); - // now we reset the frame counters + // now we reset the streamprocessors for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); it != m_ReceiveProcessors.end(); @@ -611,5 +618,4 @@ (*it)->dumpInfo(); } - } @@ -629,73 +635,287 @@ } - debugOutput( DEBUG_LEVEL_VERBOSE, "Starting processormanager...\n"); - - if(!start()) { - debugFatal("Could not start.\n"); - return false; - } - + debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n"); + + debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor...\n"); + if (!m_SyncSource->prepareForEnable()) { + debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n"); + return false; + } + + m_SyncSource->enable(); + + debugOutput( DEBUG_LEVEL_VERBOSE, " All StreamProcessors...\n"); + if (!enableStreamProcessors()) { + debugFatal("Could not enable StreamProcessors...\n"); + return false; + } debugOutput( DEBUG_LEVEL_VERBOSE, "Xrun handled...\n"); - return true; } +/** + * @brief Waits until the next period of samples is ready + * + * This function does not return until a full period of samples is (or should be) + * ready to be transferred. + * + * @return true if the period is ready, false if an xrun occurred + */ +bool StreamProcessorManager::waitForPeriod() { + int time_till_next_period; + + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n"); + + assert(m_SyncSource); + + time_till_next_period=m_SyncSource->getTimeUntilNextPeriodUsecs(); + + while(time_till_next_period > 0) { + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period); + + // wait for the period + usleep(time_till_next_period); + + // check if it is still true + time_till_next_period=m_SyncSource->getTimeUntilNextPeriodUsecs(); + } + + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", time_till_next_period); + + // this is to notify the client of the delay + // that we introduced + m_delayed_usecs=time_till_next_period; + + // we save the 'ideal' time of the transfer at this point, + // because we can have interleaved read - process - write + // cycles making that we modify a receiving stream's buffer + // before we get to writing. + // NOTE: before waitForPeriod() is called again, both the transmit + // and the receive processors should have done their transfer. + m_time_of_transfer=m_SyncSource->getTimeAtPeriod(); + debugOutput( DEBUG_LEVEL_VERBOSE, "transfer at %llu ticks...\n", + m_time_of_transfer); + + // check if xruns occurred on the Iso side. + // also check if xruns will occur should we transfer() now + bool xrun_occurred=false; + + for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); + it != m_ReceiveProcessors.end(); + ++it ) { + // a xrun has occurred on the Iso side + xrun_occurred |= (*it)->xrunOccurred(); + + // if this is true, a xrun will occur + xrun_occurred |= !((*it)->canClientTransferFrames(m_period)); + +#ifdef DEBUG + if ((*it)->xrunOccurred()) { + debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it); + } + if (!((*it)->canClientTransferFrames(m_period))) { + debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it); + } +#endif + + } + for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); + it != m_TransmitProcessors.end(); + ++it ) { + // a xrun has occurred on the Iso side + xrun_occurred |= (*it)->xrunOccurred(); + + // if this is true, a xrun will occur + xrun_occurred |= !((*it)->canClientTransferFrames(m_period)); + +#ifdef DEBUG + if ((*it)->xrunOccurred()) { + debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it); + } + if (!((*it)->canClientTransferFrames(m_period))) { + debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it); + } +#endif + } + + // now we can signal the client that we are (should be) ready + return !xrun_occurred; +} + +/** + * @brief Transfer one period of frames for both receive and transmit StreamProcessors + * + * Transfers one period of frames from the client side to the Iso side and vice versa. + * + * @return true if successful, false otherwise (indicates xrun). + */ bool StreamProcessorManager::transfer() { - - debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n"); - - // a static cast could make sure that there is no performance - // penalty for the virtual functions (to be checked) - - for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); - it != m_ReceiveProcessors.end(); - ++it ) { - if(!(*it)->transfer()) { - debugFatal("could not transfer() stream processor (%p)",*it); - return false; - } - } - - for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - if(!(*it)->transfer()) { - debugFatal("could not transfer() stream processor (%p)",*it); - return false; - } - } - - return true; -} + + debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n"); + + if (!transfer(StreamProcessor::E_Receive)) return false; + if (!transfer(StreamProcessor::E_Transmit)) return false; + + return true; +} + +/** + * @brief Transfer one period of frames for either the receive or transmit StreamProcessors + * + * Transfers one period of frames from the client side to the Iso side or vice versa. + * + * @param t The processor type to tranfer for (receive or transmit) + * @return true if successful, false otherwise (indicates xrun). + */ bool StreamProcessorManager::transfer(enum StreamProcessor::EProcessorType t) { - - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n"); - - // a static cast could make sure that there is no performance - // penalty for the virtual functions (to be checked) - if (t==StreamProcessor::E_Receive) { - for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); - it != m_ReceiveProcessors.end(); - ++it ) { - if(!(*it)->transfer()) { - debugFatal("could not transfer() stream processor (%p)",*it); - return false; - } - } - } else { - for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - if(!(*it)->transfer()) { - debugFatal("could not transfer() stream processor (%p)",*it); - return false; - } - } - } - - return true; + int64_t time_of_transfer; + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n"); + + // first we should find out on what time this transfer is + // supposed to be happening. this time will become the time + // stamp for the transmitted buffer. + // NOTE: maybe we should include the transfer delay here, that + // would make it equal for all types of SP's + time_of_transfer=m_time_of_transfer; + + // a static cast could make sure that there is no performance + // penalty for the virtual functions (to be checked) + if (t==StreamProcessor::E_Receive) { + for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); + it != m_ReceiveProcessors.end(); + ++it ) { + uint64_t buffer_tail_ts; + uint64_t fc; + int64_t ts; + + (*it)->getBufferTailTimestamp(&buffer_tail_ts,&fc); + ts = buffer_tail_ts; + ts += (int64_t)((-(int64_t)fc) * m_SyncSource->getTicksPerFrame()); + // NOTE: ts can be negative due to wraparound, it is the responsability of the + // SP to deal with that. + + float tmp=m_SyncSource->getTicksPerFrame(); + + debugOutput(DEBUG_LEVEL_VERBOSE, "=> TS=%11lld, BLT=%11llu, FC=%5d, TPF=%f\n", + ts, buffer_tail_ts, fc, tmp + ); + debugOutput(DEBUG_LEVEL_VERBOSE, " TPF=%f\n", tmp); + + #ifdef DEBUG + { + uint64_t ts_tail=0; + uint64_t fc_tail=0; + + uint64_t ts_head=0; + uint64_t fc_head=0; + + int cnt=0; + + (*it)->getBufferHeadTimestamp(&ts_head,&fc_head); + (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail); + + while((fc_head != fc_tail) && (cnt++ < 10)) { + (*it)->getBufferHeadTimestamp(&ts_head,&fc_head); + (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail); + } + + debugOutput(DEBUG_LEVEL_VERBOSE,"R * HEAD: TS=%llu, FC=%llu | TAIL: TS=%llu, FC=%llu, %d\n", + ts_tail, fc_tail, ts_head, fc_head, cnt); + } + #endif + + if(!(*it)->getFrames(m_period, ts)) { + debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u) from stream processor (%p)", + m_period,*it); + return false; // buffer underrun + } + + #ifdef DEBUG + { + uint64_t ts_tail=0; + uint64_t fc_tail=0; + + uint64_t ts_head=0; + uint64_t fc_head=0; + + int cnt=0; + + (*it)->getBufferHeadTimestamp(&ts_head,&fc_head); + (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail); + + while((fc_head != fc_tail) && (cnt++ < 10)) { + (*it)->getBufferHeadTimestamp(&ts_head,&fc_head); + (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail); + } + + debugOutput(DEBUG_LEVEL_VERBOSE,"R > HEAD: TS=%llu, FC=%llu | TAIL: TS=%llu, FC=%llu, %d\n", + ts_tail, fc_tail, ts_head, fc_head, cnt); + } + #endif + + } + } else { + for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); + it != m_TransmitProcessors.end(); + ++it ) { + + #ifdef DEBUG + { + uint64_t ts_tail=0; + uint64_t fc_tail=0; + + uint64_t ts_head=0; + uint64_t fc_head=0; + + int cnt=0; + + (*it)->getBufferHeadTimestamp(&ts_head,&fc_head); + (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail); + + while((fc_head != fc_tail) && (cnt++ < 10)) { + (*it)->getBufferHeadTimestamp(&ts_head,&fc_head); + (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail); + } + + debugOutput(DEBUG_LEVEL_VERBOSE,"T * HEAD: TS=%llu, FC=%llu | TAIL: TS=%llu, FC=%llu, %d\n", + ts_tail, fc_tail, ts_head, fc_head, cnt); + } + #endif + + if(!(*it)->putFrames(m_period,time_of_transfer)) { + debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)", + m_period, time_of_transfer, *it); + return false; // buffer overrun + } + + #ifdef DEBUG + { + uint64_t ts_tail=0; + uint64_t fc_tail=0; + + uint64_t ts_head=0; + uint64_t fc_head=0; + + int cnt=0; + + (*it)->getBufferHeadTimestamp(&ts_head,&fc_head); + (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail); + + while((fc_head != fc_tail) && (cnt++ < 10)) { + (*it)->getBufferHeadTimestamp(&ts_head,&fc_head); + (*it)->getBufferTailTimestamp(&ts_tail,&fc_tail); + } + + debugOutput(DEBUG_LEVEL_VERBOSE,"T > HEAD: TS=%llu, FC=%llu | TAIL: TS=%llu, FC=%llu, %d\n", + ts_tail, fc_tail, ts_head, fc_head, cnt); + } + #endif + } + } + + return true; } Index: branches/streaming-rework/src/libstreaming/IsoHandler.cpp =================================================================== --- branches/streaming-rework/src/libstreaming/IsoHandler.cpp (revision 383) +++ branches/streaming-rework/src/libstreaming/IsoHandler.cpp (revision 384) @@ -29,7 +29,6 @@ #include "IsoHandler.h" #include "IsoStream.h" -#include "cyclecounter.h" - -#include "libutil/Time.h" +#include "cycletimer.h" + #include "libutil/TimeSource.h" #include "libutil/SystemTimeSource.h" @@ -39,14 +38,14 @@ #include #include +#include #include using namespace std; - -#define CC_SLEEP_TIME_AFTER_UPDATE 100 +#define CC_SLEEP_TIME_AFTER_UPDATE 1000 #define CC_SLEEP_TIME_AFTER_FAILURE 10 -#define CC_DLL_COEFF ((0.01)*((float)(CC_SLEEP_TIME_AFTER_UPDATE/1000.0))) - -#define CC_MAX_RATE_ERROR (2/100.0) +#define CC_DLL_COEFF ((0.001)*((float)(CC_SLEEP_TIME_AFTER_UPDATE/1000.0))) + +#define CC_MAX_RATE_ERROR (2.0/100.0) #define CC_INIT_MAX_TRIES 10 @@ -96,9 +95,9 @@ : TimeSource(), m_handle(0), m_handle_util(0), m_port(port), m_buf_packets(400), m_max_packet_size(1024), m_irq_interval(-1), - m_cyclecounter_ticks(0), m_lastmeas_usecs(0), m_ticks_per_usec(24.576), + m_cycletimer_ticks(0), m_lastmeas_usecs(0), m_ticks_per_usec(24.576), m_ticks_per_usec_dll_err2(0), - m_packetcount(0), m_dropped(0), m_Client(0) -{ - InitTime(); + m_packetcount(0), m_dropped(0), m_Client(0), + m_State(E_Created), m_TimeSource_LastSecs(0),m_TimeSource_NbCycleWraps(0) +{ m_TimeSource=new FreebobUtil::SystemTimeSource(); } @@ -108,20 +107,31 @@ m_buf_packets(buf_packets), m_max_packet_size( max_packet_size), m_irq_interval(irq), - m_cyclecounter_ticks(0), m_lastmeas_usecs(0), m_ticks_per_usec(24.576), + m_cycletimer_ticks(0), m_lastmeas_usecs(0), m_ticks_per_usec(24.576), m_ticks_per_usec_dll_err2(0), - m_packetcount(0), m_dropped(0), m_Client(0) -{ - InitTime(); + m_packetcount(0), m_dropped(0), m_Client(0), + m_State(E_Created), m_TimeSource_LastSecs(0),m_TimeSource_NbCycleWraps(0) +{ m_TimeSource=new FreebobUtil::SystemTimeSource(); } IsoHandler::~IsoHandler() { + +// Don't call until libraw1394's raw1394_new_handle() function has been +// fixed to correctly initialise the iso_packet_infos field. Bug is +// confirmed present in libraw1394 1.2.1. In any case, +// raw1394_destroy_handle() will do any iso system shutdown required. +// raw1394_iso_shutdown(m_handle); + if(m_handle) { + if (m_State == E_Running) { stop(); + } + raw1394_destroy_handle(m_handle); } + if(m_handle_util) raw1394_destroy_handle(m_handle_util); - delete m_TimeSource; + if (m_TimeSource) delete m_TimeSource; } @@ -131,11 +141,18 @@ debugOutput( DEBUG_LEVEL_VERBOSE, "IsoHandler (%p) enter...\n",this); + // check the state + if(m_State != E_Created) { + debugError("Incorrect state, expected E_Created, got %d\n",(int)m_State); + return false; + } + + // the main handle for the ISO traffic m_handle = raw1394_new_handle_on_port( m_port ); if ( !m_handle ) { if ( !errno ) { - cerr << "libraw1394 not compatible" << endl; + debugError("libraw1394 not compatible\n"); } else { - perror( "IsoHandler::Initialize: Could not get 1394 handle" ); - cerr << "Is ieee1394 and raw1394 driver loaded?" << endl; + debugError("Could not get 1394 handle: %s", strerror(errno) ); + debugError("Are ieee1394 and raw1394 drivers loaded?"); } return false; @@ -147,34 +164,112 @@ if ( !m_handle_util ) { if ( !errno ) { - cerr << "libraw1394 not compatible" << endl; + debugError("libraw1394 not compatible\n"); } else { - perror( "IsoHandler::Initialize: Could not get 1394 handle" ); - cerr << "Is ieee1394 and raw1394 driver loaded?" << endl; + debugError("Could not get 1394 handle: %s", strerror(errno) ); + debugError("Are ieee1394 and raw1394 drivers loaded?"); } - return false; - } - + + raw1394_destroy_handle(m_handle); + return false; + } raw1394_set_userdata(m_handle_util, static_cast(this)); + // bus reset handling if(raw1394_busreset_notify (m_handle, RAW1394_NOTIFY_ON)) { debugWarning("Could not enable busreset notification.\n"); debugWarning(" Error message: %s\n",strerror(errno)); - } - + debugWarning("Continuing without bus reset support.\n"); + } else { + // apparently this cannot fail raw1394_set_bus_reset_handler(m_handle, busreset_handler); + } // initialize the local timesource m_TimeSource_NbCycleWraps=0; + unsigned int new_timer; + +#ifdef LIBRAW1394_USE_CTRREAD_API + struct raw1394_cycle_timer ctr; + int err; + err=raw1394_read_cycle_timer(m_handle_util, &ctr); + if(err) { + debugWarning("raw1394_read_cycle_timer: %s", strerror(err)); + } + new_timer=ctr.cycle_timer; +#else + // normally we should be able to use the same handle + // because it is not iterated on by any other stuff + // but I'm not sure quadlet_t buf=0; - unsigned int new_counter; - raw1394_read(m_handle_util, raw1394_get_local_id(m_handle_util), CSR_REGISTER_BASE | CSR_CYCLE_TIME, 4, &buf); - - new_counter= ntohl(buf) & 0xFFFFFFFF; - m_TimeSource_LastSecs=CYCLE_COUNTER_GET_SECS(new_counter); - - // update the cycle counter value for initial value - initCycleCounter(); + new_timer= ntohl(buf) & 0xFFFFFFFF; +#endif + + m_TimeSource_LastSecs=CYCLE_TIMER_GET_SECS(new_timer); + + // update the cycle timer value for initial value + initCycleTimer(); + + // update the internal state + m_State=E_Initialized; + + return true; +} + +bool IsoHandler::prepare() +{ + debugOutput( DEBUG_LEVEL_VERBOSE, "IsoHandler (%p) enter...\n",this); + + // check the state + if(m_State != E_Initialized) { + debugError("Incorrect state, expected E_Initialized, got %d\n",(int)m_State); + return false; + } + + // Don't call until libraw1394's raw1394_new_handle() function has been + // fixed to correctly initialise the iso_packet_infos field. Bug is + // confirmed present in libraw1394 1.2.1. + +// raw1394_iso_shutdown(m_handle); + + m_State = E_Prepared; + + return true; +} + +bool IsoHandler::start(int cycle) +{ + debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); + + // check the state + if(m_State != E_Prepared) { + debugError("Incorrect state, expected E_Prepared, got %d\n",(int)m_State); + return false; + } + + m_State=E_Running; + + return true; +} + +bool IsoHandler::stop() +{ + debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); + + // check state + if(m_State != E_Running) { + debugError("Incorrect state, expected E_Running, got %d\n",(int)m_State); + return false; + } + + // this is put here to try and avoid the + // Runaway context problem + // don't know if it will help though. + raw1394_iso_xmit_sync(m_handle); + + raw1394_iso_stop(m_handle); + + m_State=E_Prepared; return true; @@ -185,15 +280,8 @@ { m_TimeSource=t; - - // update the cycle counter value for initial value - initCycleCounter(); - - return true; -} - -bool IsoHandler::stop() -{ - debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); - raw1394_iso_stop(m_handle); + + // update the cycle timer value for initial value + initCycleTimer(); + return true; } @@ -210,5 +298,5 @@ // as busreset can elect a new cycle master, // we need to re-initialize our timing code - initCycleCounter(); + initCycleTimer(); return 0; @@ -216,13 +304,73 @@ /** - * Returns the current value of the cycle counter (in ticks) - * - * @return the current value of the cycle counter (in ticks) + * Returns the current value of the cycle timer (in ticks) + * + * @return the current value of the cycle timer (in ticks) */ -unsigned int IsoHandler::getCycleCounter() { - // calculate the cycle counter based upon the current time - // and the estimated tick rate - freebob_microsecs_t now=m_TimeSource->getCurrentTimeAsUsecs(); +unsigned int IsoHandler::getCycleTimerTicks() { + +#ifdef LIBRAW1394_USE_CTRREAD_API + // the new api should be realtime safe. + // it might cause a reschedule when turning preemption, + // back on but that won't hurt us if we have sufficient + // priority + struct raw1394_cycle_timer ctr; + int err; + err=raw1394_read_cycle_timer(m_handle_util, &ctr); + if(err) { + debugWarning("raw1394_read_cycle_timer: %s", strerror(err)); + } + return CYCLE_TIMER_TO_TICKS(ctr.cycle_timer); + +#else + // use the estimated version + freebob_microsecs_t now; + now=m_TimeSource->getCurrentTimeAsUsecs(); + return mapToCycleTimer(now); +#endif + +} + +/** + * Returns the current value of the cycle timer (as is) + * + * @return the current value of the cycle timer (as is) + */ + +unsigned int IsoHandler::getCycleTimer() { + +#ifdef LIBRAW1394_USE_CTRREAD_API + // the new api should be realtime safe. + // it might cause a reschedule when turning preemption, + // back on but that won't hurt us if we have sufficient + // priority + struct raw1394_cycle_timer ctr; + int err; + err=raw1394_read_cycle_timer(m_handle_util, &ctr); + if(err) { + debugWarning("raw1394_read_cycle_timer: %s", strerror(err)); + } + return ctr.cycle_timer; + +#else + // use the estimated version + freebob_microsecs_t now; + now=m_TimeSource->getCurrentTimeAsUsecs(); + return TICKS_TO_CYCLE_TIMER(mapToCycleTimer(now)); +#endif + +} +/** + * Maps a value of the active TimeSource to a Cycle Timer value. + * + * This is usefull if you know a time value and want the corresponding + * Cycle Timer value. Note that the value shouldn't be too far off + * the current time, because then the mapping can be bad. + * + * @return the value of the cycle timer (in ticks) + */ + +unsigned int IsoHandler::mapToCycleTimer(freebob_microsecs_t now) { // linear interpolation @@ -231,13 +379,17 @@ float offset=m_ticks_per_usec * ((float)delta_usecs); - unsigned int pred_ticks=m_cyclecounter_ticks+(unsigned int)offset; - - debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"Get CC: d_usecs=%d, offset=%f, cc_ticks=%lu, pred_ticks=%lu\n", - delta_usecs, offset, m_cyclecounter_ticks,pred_ticks - ); + int64_t pred_ticks=(int64_t)m_cycletimer_ticks+(int64_t)offset; + + if (pred_ticks < 0) { + debugWarning("Predicted ticks < 0\n"); + } + debugOutput(DEBUG_LEVEL_VERBOSE,"now=%llu, m_lastmeas_usec=%llu, delta_usec=%d\n", + now, m_lastmeas_usecs, delta_usecs); + debugOutput(DEBUG_LEVEL_VERBOSE,"t/usec=%f, offset=%f, m_cc_t=%llu, pred_ticks=%lld\n", + m_ticks_per_usec, offset, m_cycletimer_ticks, pred_ticks); // if we need to wrap, do it - if (pred_ticks > TICKS_PER_SECOND * 128) { - pred_ticks -= TICKS_PER_SECOND * 128; + if (pred_ticks > TICKS_PER_SECOND * 128L) { + pred_ticks -= TICKS_PER_SECOND * 128L; } @@ -245,16 +397,45 @@ } -bool IsoHandler::updateCycleCounter() { - quadlet_t buf=0; - +/** + * Maps a Cycle Timer value (in ticks) of the active TimeSource's unit. + * + * This is usefull if you know a Cycle Timer value and want the corresponding + * timesource value. Note that the value shouldn't be too far off + * the current cycle timer, because then the mapping can be bad. + * + * @return the mapped value + */ + +freebob_microsecs_t IsoHandler::mapToTimeSource(unsigned int cc) { + + // linear interpolation + int delta_cc=cc-m_cycletimer_ticks; + + float offset= ((float)delta_cc) / m_ticks_per_usec; + + int64_t pred_time=(int64_t)m_lastmeas_usecs+(int64_t)offset; + + if (pred_time < 0) { + debugWarning("Predicted time < 0\n"); + debugOutput(DEBUG_LEVEL_VERBOSE,"cc=%u, m_cycletimer_ticks=%llu, delta_cc=%d\n", + cc, m_cycletimer_ticks, delta_cc); + debugOutput(DEBUG_LEVEL_VERBOSE,"t/usec=%f, offset=%f, m_lastmeas_usecs=%llu, pred_time=%lld\n", + m_ticks_per_usec, offset, m_lastmeas_usecs, pred_time); + } + + + return pred_time; +} + +bool IsoHandler::updateCycleTimer() { freebob_microsecs_t prev_usecs=m_lastmeas_usecs; - unsigned int prev_ticks=m_cyclecounter_ticks; + uint64_t prev_ticks=m_cycletimer_ticks; freebob_microsecs_t new_usecs; - unsigned int new_ticks; - unsigned int new_counter; - - /* To estimate the cycle counter, we implement a - DLL based routine, that maps the cycle counter + uint64_t new_ticks; + unsigned int new_timer; + + /* To estimate the cycle timer, we implement a + DLL based routine, that maps the cycle timer on the system clock. @@ -270,5 +451,5 @@ Basically what we do is estimate the next point (T1,CC1_est) based upon the previous point (T0, CC0) and the estimated rate (R). - Then we compare our estimation with the measured cycle counter + Then we compare our estimation with the measured cycle timer at T1 (=CC1_meas). We then calculate the estimation error on R: err=(CC1_meas-CC0)/(T1-T2) - (CC1_est-CC0)/(T1-T2) @@ -288,34 +469,45 @@ */ - +#ifdef LIBRAW1394_USE_CTRREAD_API + struct raw1394_cycle_timer ctr; + int err; + err=raw1394_read_cycle_timer(m_handle_util, &ctr); + if(err) { + debugWarning("raw1394_read_cycle_timer: %s", strerror(err)); + } + new_usecs=(freebob_microsecs_t)ctr.local_time; + new_timer=ctr.cycle_timer; +#else // normally we should be able to use the same handle // because it is not iterated on by any other stuff // but I'm not sure + quadlet_t buf=0; raw1394_read(m_handle_util, raw1394_get_local_id(m_handle_util), CSR_REGISTER_BASE | CSR_CYCLE_TIME, 4, &buf); new_usecs=m_TimeSource->getCurrentTimeAsUsecs(); - - new_counter= ntohl(buf) & 0xFFFFFFFF; - new_ticks=CYCLE_COUNTER_TO_TICKS(new_counter); + new_timer= ntohl(buf) & 0xFFFFFFFF; +#endif + + new_ticks=CYCLE_TIMER_TO_TICKS(new_timer); // the difference in system time - int delta_usecs=new_usecs-prev_usecs; + int64_t delta_usecs=new_usecs-prev_usecs; // this cannot be 0, because m_TimeSource->getCurrentTimeAsUsecs should // never return the same value (maybe in future terrahz processors?) assert(delta_usecs); - // the measured cycle counter difference - unsigned int delta_ticks_meas; - if (new_ticks > prev_ticks) { + // the measured cycle timer difference + int64_t delta_ticks_meas; + if (new_ticks >= prev_ticks) { delta_ticks_meas=new_ticks - prev_ticks; } else { // wraparound - delta_ticks_meas=CYCLE_COUNTER_UNWRAP_TICKS(new_ticks) - prev_ticks; - } - - // the estimated cycle counter difference - unsigned int delta_ticks_est=(unsigned int)(m_ticks_per_usec * ((float)delta_usecs)); + delta_ticks_meas=CYCLE_TIMER_UNWRAP_TICKS(new_ticks) - prev_ticks; + } + + // the estimated cycle timer difference + int64_t delta_ticks_est=(int64_t)(m_ticks_per_usec * ((float)delta_usecs)); // the measured & estimated rate - float rate_meas=((float)delta_ticks_meas/(float)delta_usecs); + float rate_meas=((double)delta_ticks_meas/(double)delta_usecs); float rate_est=((float)m_ticks_per_usec); @@ -338,5 +530,5 @@ #ifdef DEBUG - int diff=(int)delta_ticks_est; + int64_t diff=(int64_t)delta_ticks_est; // calculate the difference in predicted ticks and @@ -345,10 +537,10 @@ - if (diff > 24000 || diff < -24000) { // approx +/-1 msec error - debugOutput(DEBUG_LEVEL_VERBOSE,"Bad pred (%p): diff=%d, dt_est=%u, dt_meas=%u, d=%dus, err=%fus\n", this, + if (diff > 24000L || diff < -24000L) { // approx +/-1 msec error + debugOutput(DEBUG_LEVEL_VERBOSE,"Bad pred (%p): diff=%lld, dt_est=%lld, dt_meas=%lld, d=%lldus, err=%fus\n", this, diff, delta_ticks_est, delta_ticks_meas, delta_usecs, (((float)diff)/24.576) ); } else { - debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"Good pred: diff=%d, dt_est=%u, dt_meas=%u, d=%dus, err=%fus\n", + debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"Good pred: diff=%lld, dt_est=%lld, dt_meas=%lld, d=%lldus, err=%fus\n", diff, delta_ticks_est, delta_ticks_meas, delta_usecs, (((float)diff)/24.576) ); @@ -379,16 +571,16 @@ // update the internal values - // note: the next cyclecounter point is + // note: the next cycletimer point is // the estimated one, not the measured one! - m_cyclecounter_ticks += delta_ticks_est; + m_cycletimer_ticks += delta_ticks_est; // if we need to wrap, do it - if (m_cyclecounter_ticks > TICKS_PER_SECOND * 128) { - m_cyclecounter_ticks -= TICKS_PER_SECOND * 128; + if (m_cycletimer_ticks > TICKS_PER_SECOND * 128L) { + m_cycletimer_ticks -= TICKS_PER_SECOND * 128L; } m_lastmeas_usecs = new_usecs; - debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"U TS: %10u -> %10u, d=%7uus, dt_est=%7u, dt_meas=%7u, erate=%6.4f, mrate=%6f\n", - prev_ticks, m_cyclecounter_ticks, delta_usecs, + debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"U TS: %10llu -> %10llu, d=%7lldus, dt_est=%7lld, dt_meas=%7lld, erate=%6.4f, mrate=%6f\n", + prev_ticks, m_cycletimer_ticks, delta_usecs, delta_ticks_est, delta_ticks_meas, m_ticks_per_usec, rate_meas ); @@ -405,14 +597,12 @@ } -void IsoHandler::initCycleCounter() { - quadlet_t buf=0; - +void IsoHandler::initCycleTimer() { freebob_microsecs_t prev_usecs; unsigned int prev_ticks; - unsigned int prev_counter; + unsigned int prev_timer; freebob_microsecs_t new_usecs; unsigned int new_ticks; - unsigned int new_counter; + unsigned int new_timer; float rate=0.0; @@ -428,16 +618,36 @@ || (rate < 24.576*(1.0-CC_MAX_RATE_ERROR)))) { +#ifdef LIBRAW1394_USE_CTRREAD_API + struct raw1394_cycle_timer ctr; + int err; + err=raw1394_read_cycle_timer(m_handle_util, &ctr); + if(err) { + debugWarning("raw1394_read_cycle_timer: %s", strerror(err)); + } + prev_usecs=(freebob_microsecs_t)ctr.local_time; + prev_timer=ctr.cycle_timer; +#else // normally we should be able to use the same handle // because it is not iterated on by any other stuff // but I'm not sure + quadlet_t buf=0; raw1394_read(m_handle_util, raw1394_get_local_id(m_handle_util), CSR_REGISTER_BASE | CSR_CYCLE_TIME, 4, &buf); prev_usecs=m_TimeSource->getCurrentTimeAsUsecs(); - - prev_counter= ntohl(buf) & 0xFFFFFFFF; - prev_ticks=CYCLE_COUNTER_TO_TICKS(prev_counter); + prev_timer= ntohl(buf) & 0xFFFFFFFF; +#endif + prev_ticks=CYCLE_TIMER_TO_TICKS(prev_timer); usleep(CC_SLEEP_TIME_AFTER_UPDATE); + +#ifdef LIBRAW1394_USE_CTRREAD_API + err=raw1394_read_cycle_timer(m_handle_util, &ctr); + if(err) { + debugWarning("raw1394_read_cycle_timer: %s", strerror(err)); + } + new_usecs=(freebob_microsecs_t)ctr.local_time; + new_timer=ctr.cycle_timer; +#else // normally we should be able to use the same handle // because it is not iterated on by any other stuff @@ -446,7 +656,8 @@ CSR_REGISTER_BASE | CSR_CYCLE_TIME, 4, &buf); new_usecs=m_TimeSource->getCurrentTimeAsUsecs(); - - new_counter= ntohl(buf) & 0xFFFFFFFF; - new_ticks=CYCLE_COUNTER_TO_TICKS(new_counter); + new_timer= ntohl(buf) & 0xFFFFFFFF; +#endif + + new_ticks=CYCLE_TIMER_TO_TICKS(new_timer); unsigned int delta_ticks; @@ -455,5 +666,5 @@ delta_ticks=new_ticks - prev_ticks; } else { // wraparound - delta_ticks=CYCLE_COUNTER_UNWRAP_TICKS(new_ticks) - prev_ticks; + delta_ticks=CYCLE_TIMER_UNWRAP_TICKS(new_ticks) - prev_ticks; } @@ -467,5 +678,5 @@ // update the internal values - m_cyclecounter_ticks=new_ticks; + m_cycletimer_ticks=new_ticks; m_lastmeas_usecs=new_usecs; @@ -478,5 +689,5 @@ // this is not fatal, the DLL will eventually correct this if(try_cnt == CC_INIT_MAX_TRIES) { - debugWarning("Failed to properly initialize cycle counter...\n"); + debugWarning("Failed to properly initialize cycle timer...\n"); } @@ -495,14 +706,22 @@ debugOutputShort( DEBUG_LEVEL_NORMAL, " Handler type : %s\n", (this->getType()==EHT_Receive ? "Receive" : "Transmit")); - debugOutputShort( DEBUG_LEVEL_NORMAL, " Port, Channel : %2d, %2d\n", + debugOutputShort( DEBUG_LEVEL_NORMAL, " Port, Channel : %2d, %2d\n", m_port, channel); - debugOutputShort( DEBUG_LEVEL_NORMAL, " Packet count : %10d (%5d dropped)\n", + debugOutputShort( DEBUG_LEVEL_NORMAL, " Packet count : %10d (%5d dropped)\n", this->getPacketCount(), this->getDroppedCount()); + #ifdef DEBUG - unsigned int cc=this->getCycleCounter(); - debugOutputShort( DEBUG_LEVEL_NORMAL, " Cycle counter : %10lu (%03us, %04ucycles, %04uticks)\n", + unsigned int cc=this->getCycleTimerTicks(); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Cycle timer : %10lu (%03us, %04ucycles, %04uticks)\n", cc,TICKS_TO_SECS(cc),TICKS_TO_CYCLES(cc),TICKS_TO_OFFSET(cc)); + +/* freebob_microsecs_t now=m_TimeSource->getCurrentTimeAsUsecs(); + cc=mapToCycleTimer(now); + freebob_microsecs_t now_mapped=mapToTimeSource(cc); + + debugOutputShort( DEBUG_LEVEL_NORMAL, " Mapping test : now: %14llu, cc: %10lu, mapped now: %14llu\n", + now,cc,now_mapped);*/ #endif - debugOutputShort( DEBUG_LEVEL_NORMAL, " Ticks/usec : %8.6f (dll2: %8.6e)\n\n", + debugOutputShort( DEBUG_LEVEL_NORMAL, " Ticks/usec : %8.6f (dll2: %8.6e)\n\n", this->getTicksPerUsec(), m_ticks_per_usec_dll_err2); @@ -551,29 +770,33 @@ /* The timesource interface */ freebob_microsecs_t IsoHandler::getCurrentTime() { - quadlet_t buf=0; - unsigned int new_counter; - - raw1394_read(m_handle_util, raw1394_get_local_id(m_handle_util), - CSR_REGISTER_BASE | CSR_CYCLE_TIME, 4, &buf); - - new_counter= ntohl(buf) & 0xFFFFFFFF; + unsigned int new_timer; + + new_timer= getCycleTimerTicks(); // this assumes that it never happens that there are more than 2 // minutes between calls - if (CYCLE_COUNTER_GET_SECS(new_counter) < m_TimeSource_LastSecs) { + if (CYCLE_TIMER_GET_SECS(new_timer) < m_TimeSource_LastSecs) { m_TimeSource_NbCycleWraps++; } - freebob_microsecs_t ticks=m_TimeSource_NbCycleWraps * 128 * TICKS_PER_SECOND - + CYCLE_COUNTER_TO_TICKS(new_counter); - - m_TimeSource_LastSecs=CYCLE_COUNTER_GET_SECS(new_counter); + freebob_microsecs_t ticks=m_TimeSource_NbCycleWraps * 128L * TICKS_PER_SECOND + + CYCLE_TIMER_TO_TICKS(new_timer); + + m_TimeSource_LastSecs=CYCLE_TIMER_GET_SECS(new_timer); debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"Wraps=%4u, LastSecs=%3u, nowSecs=%3u, ticks=%10u\n", m_TimeSource_NbCycleWraps, m_TimeSource_LastSecs, - CYCLE_COUNTER_GET_SECS(new_counter), ticks + CYCLE_TIMER_GET_SECS(new_timer), ticks ); return ticks; +} + +freebob_microsecs_t IsoHandler::unWrapTime(freebob_microsecs_t t) { + return CYCLE_TIMER_UNWRAP_TICKS(t); +} + +freebob_microsecs_t IsoHandler::wrapTime(freebob_microsecs_t t) { + return CYCLE_TIMER_WRAP_TICKS(t); } @@ -608,11 +831,5 @@ IsoRecvHandler::~IsoRecvHandler() { -// Don't call until libraw1394's raw1394_new_handle() function has been -// fixed to correctly initialise the iso_packet_infos field. Bug is -// confirmed present in libraw1394 1.2.1. In any case, -// raw1394_destroy_handle() will do any iso system shutdown required. -// raw1394_iso_shutdown(m_handle); - raw1394_destroy_handle(m_handle); - m_handle = NULL; + } @@ -628,5 +845,6 @@ } -enum raw1394_iso_disposition IsoRecvHandler::putPacket(unsigned char *data, unsigned int length, +enum raw1394_iso_disposition IsoRecvHandler::putPacket( + unsigned char *data, unsigned int length, unsigned char channel, unsigned char tag, unsigned char sy, unsigned int cycle, unsigned int dropped) { @@ -647,8 +865,9 @@ bool IsoRecvHandler::prepare() { -// Don't call until libraw1394's raw1394_new_handle() function has been -// fixed to correctly initialise the iso_packet_infos field. Bug is -// confirmed present in libraw1394 1.2.1. -// raw1394_iso_shutdown(m_handle); + + // prepare the generic IsoHandler + if(!IsoHandler::prepare()) { + return false; + } debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing iso receive handler (%p)\n",this); @@ -658,5 +877,6 @@ debugOutput( DEBUG_LEVEL_VERBOSE, " Irq interval : %d \n",m_irq_interval); - if(raw1394_iso_recv_init(m_handle, iso_receive_handler, + if(raw1394_iso_recv_init(m_handle, + iso_receive_handler, m_buf_packets, m_max_packet_size, @@ -676,4 +896,9 @@ debugOutput( DEBUG_LEVEL_VERBOSE, "start on cycle %d\n", cycle); + // start the generic IsoHandler + if(!IsoHandler::start(cycle)) { + return false; + } + if(raw1394_iso_recv_start(m_handle, cycle, -1, 0)) { debugFatal("Could not start receive handler (%s)\n",strerror(errno)); @@ -723,11 +948,5 @@ IsoXmitHandler::~IsoXmitHandler() { -// Don't call until libraw1394's raw1394_new_handle() function has been -// fixed to correctly initialise the iso_packet_infos field. Bug is -// confirmed present in libraw1394 1.2.1. In any case, -// raw1394_destroy_handle() will do any iso system shutdown required. -// raw1394_iso_shutdown(m_handle); - raw1394_destroy_handle(m_handle); - m_handle = NULL; + // handle cleanup is done in the IsoHanlder destructor } @@ -742,22 +961,4 @@ return true; - -} - -enum raw1394_iso_disposition IsoXmitHandler::getPacket(unsigned char *data, unsigned int *length, - unsigned char *tag, unsigned char *sy, - int cycle, unsigned int dropped) { - - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, - "sending packet: length=%d, cycle=%d\n", - *length, cycle ); - m_packetcount++; - m_dropped+=dropped; - - if(m_Client) { - return m_Client->getPacket(data, length, tag, sy, cycle, dropped, m_max_packet_size); - } - - return RAW1394_ISO_OK; } @@ -766,5 +967,8 @@ debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing iso transmit handler (%p, client=%p)\n",this,m_Client); -// raw1394_iso_shutdown(m_handle); + if(!(IsoHandler::prepare())) { + return false; + } + debugOutput( DEBUG_LEVEL_VERBOSE, " Buffers : %d \n",m_buf_packets); debugOutput( DEBUG_LEVEL_VERBOSE, " Max Packet size : %d \n",m_max_packet_size); @@ -791,4 +995,9 @@ { debugOutput( DEBUG_LEVEL_VERBOSE, "start on cycle %d\n", cycle); + + if(!(IsoHandler::start(cycle))) { + return false; + } + if(raw1394_iso_xmit_start(m_handle, cycle, m_prebuffers)) { debugFatal("Could not start xmit handler (%s)\n",strerror(errno)); @@ -796,4 +1005,22 @@ } return true; +} + +enum raw1394_iso_disposition IsoXmitHandler::getPacket( + unsigned char *data, unsigned int *length, + unsigned char *tag, unsigned char *sy, + int cycle, unsigned int dropped) { + + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, + "sending packet: length=%d, cycle=%d\n", + *length, cycle ); + m_packetcount++; + m_dropped+=dropped; + + if(m_Client) { + return m_Client->getPacket(data, length, tag, sy, cycle, dropped, m_max_packet_size); + } + + return RAW1394_ISO_OK; } Index: branches/streaming-rework/src/libstreaming/StreamProcessor.h =================================================================== --- branches/streaming-rework/src/libstreaming/StreamProcessor.h (revision 383) +++ branches/streaming-rework/src/libstreaming/StreamProcessor.h (revision 384) @@ -34,4 +34,6 @@ #include "PortManager.h" #include "streamstatistics.h" + +#include namespace FreebobStreaming { @@ -73,23 +75,6 @@ bool xrunOccurred() { return (m_xruns>0);}; - - /** - * This is used for implementing the synchronisation. - * As long as this function doesn't return true, the current buffer - * contents are not transfered to the packet decoders. - * - * This means that there can be more events in the buffer than - * one period worth of them, should the synchronisation mechanism - * require this - * @return - */ - virtual bool isOnePeriodReady()=0; - - unsigned int getNbPeriodsReady() { if(m_period) return m_framecounter/m_period; else return 0;}; - virtual void decrementFrameCounter(); - virtual void incrementFrameCounter(int nbframes); // move to private? - void resetFrameCounter(); void resetXrunCounter(); @@ -99,5 +84,6 @@ bool isEnabled() {return !m_disabled;}; - virtual bool transfer(); ///< transfer the buffer contents from/to client + virtual bool putFrames(unsigned int nbframes, int64_t ts); ///< transfer the buffer contents from client + virtual bool getFrames(unsigned int nbframes, int64_t ts); ///< transfer the buffer contents to the client virtual bool reset(); ///< reset the streams & buffers (e.g. after xrun) @@ -111,6 +97,9 @@ virtual void setVerboseLevel(int l); - virtual bool preparedForStop() {return true;}; - virtual bool preparedForStart() {return true;}; + virtual bool prepareForStop() {return true;}; + virtual bool prepareForStart() {return true;}; + + virtual bool prepareForEnable() {return true;}; + virtual bool prepareForDisable() {return true;}; protected: @@ -124,10 +113,9 @@ unsigned int m_xruns; - signed int m_framecounter; unsigned int m_framerate; StreamProcessorManager *m_manager; - + bool m_running; bool m_disabled; @@ -141,4 +129,89 @@ DECLARE_DEBUG_MODULE; + // frame counter & sync stuff + public: + /** + * @brief Can this StreamProcessor handle a nframes of frames? + * + * this function indicates if the streamprocessor can handle nframes + * of frames. It is used to detect underruns-to-be. + * + * @param nframes number of frames + * @return true if the StreamProcessor can handle this amount of frames + * false if it can't + */ + virtual bool canClientTransferFrames(unsigned int nframes) {return true;}; + + int getFrameCounter() {return m_framecounter;}; + + void decrementFrameCounter(int nbframes, uint64_t new_timestamp); + void incrementFrameCounter(int nbframes, uint64_t new_timestamp); + void setFrameCounter(int new_framecounter, uint64_t new_timestamp); + void resetFrameCounter(); + + void setBufferTailTimestamp(uint64_t new_timestamp); + void setBufferHeadTimestamp(uint64_t new_timestamp); + void setBufferTimestamps(uint64_t new_head, uint64_t new_tail); + /** + * \brief return the time until the next period boundary (in microseconds) + * + * Return the time until the next period boundary. If this StreamProcessor + * is the current synchronization source, this function is called to + * determine when a buffer transfer can be made. When this value is + * smaller than 0, a period boundary is assumed to be crossed, hence a + * transfer can be made. + * + * \return the time in usecs + */ + virtual int64_t getTimeUntilNextPeriodUsecs() = 0; + /** + * \brief return the time of the next period boundary (in microseconds) + * + * Returns the time of the next period boundary, in microseconds. The + * goal of this function is to determine the exact point of the period + * boundary. This is assumed to be the point at which the buffer transfer should + * take place, meaning that it can be used as a reference timestamp for transmitting + * StreamProcessors + * + * \return the time in usecs + */ + virtual uint64_t getTimeAtPeriodUsecs() = 0; + + /** + * \brief return the time of the next period boundary (in internal units) + * + * The same as getTimeUntilNextPeriodUsecs() but in internal units. + * + * @return the time in internal units + */ + virtual uint64_t getTimeAtPeriod() = 0; + + void getBufferHeadTimestamp(uint64_t *ts, uint64_t *fc); + void getBufferTailTimestamp(uint64_t *ts, uint64_t *fc); + + bool setSyncSource(StreamProcessor *s); + float getTicksPerFrame() {return m_ticks_per_frame;}; + + protected: + // the framecounter gives the number of frames in the buffer + signed int m_framecounter; + + // the buffer tail timestamp gives the timestamp of the last frame + // that was put into the buffer + uint64_t m_buffer_tail_timestamp; + + // the buffer head timestamp gives the timestamp of the first frame + // that was put into the buffer + uint64_t m_buffer_head_timestamp; + + + StreamProcessor *m_SyncSource; + + float m_ticks_per_frame; + + private: + // this mutex protects the access to the framecounter + // and the buffer head timestamp. + pthread_mutex_t m_framecounter_lock; }; Index: branches/streaming-rework/src/libstreaming/MotuStreamProcessor.cpp =================================================================== --- branches/streaming-rework/src/libstreaming/MotuStreamProcessor.cpp (revision 383) +++ branches/streaming-rework/src/libstreaming/MotuStreamProcessor.cpp (revision 384) @@ -28,4 +28,5 @@ */ +#ifdef ENABLE_MOTU #include "MotuStreamProcessor.h" @@ -101,5 +102,5 @@ m_running = true; - // Initialise the cycle counter if this is the first time + // Initialise the cycle timer if this is the first time // iso data has been requested. if (!m_disabled && m_cycle_count<0) { @@ -176,5 +177,5 @@ m_next_cycle -= 8000; - // Deal cleanly with potential wrap-around cycle counter conditions + // Deal cleanly with potential wrap-around cycle timer conditions unwrapped_cycle = cycle; if (m_cycle_count-cycle > 7900) @@ -843,5 +844,5 @@ } -bool MotuTransmitStreamProcessor::preparedForStop() { +bool MotuTransmitStreamProcessor::prepareForStop() { // If the stream is disabled or isn't running there's no need to @@ -894,5 +895,5 @@ } -bool MotuTransmitStreamProcessor::preparedForStart() { +bool MotuTransmitStreamProcessor::prepareForStart() { // Reset some critical variables required so the stream starts cleanly. This // method is called once on every stream restart, including those during @@ -1523,5 +1524,5 @@ } -bool MotuReceiveStreamProcessor::preparedForStop() { +bool MotuReceiveStreamProcessor::prepareForStop() { // A MOTU receive stream can stop at any time. However, signify @@ -1535,5 +1536,5 @@ } -bool MotuReceiveStreamProcessor::preparedForStart() { +bool MotuReceiveStreamProcessor::prepareForStart() { // Reset some critical variables required so the stream starts cleanly. This // method is called once on every stream restart, including those during @@ -1556,2 +1557,3 @@ } // end of namespace FreebobStreaming +#endif Index: branches/streaming-rework/src/libstreaming/cycletimer.h =================================================================== --- branches/streaming-rework/src/libstreaming/cycletimer.h (revision 383) +++ branches/streaming-rework/src/libstreaming/cycletimer.h (revision 384) @@ -27,25 +27,25 @@ */ -/* Definitions and utility macro's to handle the ISO cyclecounter */ +/* Definitions and utility macro's to handle the ISO cycle timer */ -#ifndef __CYCLECOUNTER_H__ -#define __CYCLECOUNTER_H__ +#ifndef __CYCLETIMER_H__ +#define __CYCLETIMER_H__ #define CSR_CYCLE_TIME 0x200 #define CSR_REGISTER_BASE 0xfffff0000000ULL -#define CYCLES_PER_SECOND 8000 -#define TICKS_PER_CYCLE 3072 -#define TICKS_PER_SECOND (CYCLES_PER_SECOND * TICKS_PER_CYCLE) +#define CYCLES_PER_SECOND 8000U +#define TICKS_PER_CYCLE 3072U +#define TICKS_PER_SECOND 24576000UL #define TICKS_PER_USEC (TICKS_PER_SECOND/1000000.0) #define USECS_PER_TICK (1.0/TICKS_PER_USEC) -#define CYCLE_COUNTER_GET_SECS(x) ((((x) & 0xFE000000) >> 25)) -#define CYCLE_COUNTER_GET_CYCLES(x) ((((x) & 0x01FFF000) >> 12)) -#define CYCLE_COUNTER_GET_OFFSET(x) ((((x) & 0x00000FFF))) -#define CYCLE_COUNTER_TO_TICKS(x) ((CYCLE_COUNTER_GET_SECS(x) * TICKS_PER_SECOND) +\ - (CYCLE_COUNTER_GET_CYCLES(x) * TICKS_PER_CYCLE ) +\ - (CYCLE_COUNTER_GET_OFFSET(x) )) +#define CYCLE_TIMER_GET_SECS(x) ((((x) & 0xFE000000U) >> 25)) +#define CYCLE_TIMER_GET_CYCLES(x) ((((x) & 0x01FFF000U) >> 12)) +#define CYCLE_TIMER_GET_OFFSET(x) ((((x) & 0x00000FFFU))) +#define CYCLE_TIMER_TO_TICKS(x) ((CYCLE_TIMER_GET_SECS(x) * TICKS_PER_SECOND) +\ + (CYCLE_TIMER_GET_CYCLES(x) * TICKS_PER_CYCLE ) +\ + (CYCLE_TIMER_GET_OFFSET(x) )) // non-efficient versions, to be avoided in critical code @@ -54,9 +54,17 @@ #define TICKS_TO_OFFSET(x) (((x)%TICKS_PER_CYCLE)) -#define CYCLE_COUNTER_UNWRAP_TICKS(x) ((x) \ - + (127 * TICKS_PER_SECOND) \ +#define TICKS_TO_CYCLE_TIMER(x) ( ((TICKS_TO_SECS(x) & 0x7F) << 25) \ + | ((TICKS_TO_CYCLES(x) & 0x1FFF) << 12) \ + | ((TICKS_TO_OFFSET(x) & 0xFFF))) + +#define TICKS_TO_SYT(x) (((TICKS_TO_CYCLES(x) & 0xF) << 12) \ + | ((TICKS_TO_OFFSET(x) & 0xFFF))) + +#define CYCLE_TIMER_UNWRAP_TICKS(x) (((uint64_t)(x)) \ + + (127ULL * TICKS_PER_SECOND) \ + (CYCLES_PER_SECOND * TICKS_PER_CYCLE) \ + (TICKS_PER_CYCLE) \ ) +#define CYCLE_TIMER_WRAP_TICKS(x) ((x % TICKS_PER_SECOND)) -#endif // __CYCLECOUNTER_H__ +#endif // __CYCLETIMER_H__ Index: branches/streaming-rework/src/libstreaming/IsoStream.cpp =================================================================== --- branches/streaming-rework/src/libstreaming/IsoStream.cpp (revision 383) +++ branches/streaming-rework/src/libstreaming/IsoStream.cpp (revision 384) @@ -30,5 +30,4 @@ #include "PacketBuffer.h" #include - namespace FreebobStreaming @@ -83,5 +82,5 @@ m_port, m_channel); -}; +} bool IsoStream::setChannel(int c) { Index: branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.h =================================================================== --- branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.h (revision 383) +++ branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.h (revision 384) @@ -38,4 +38,5 @@ #include #include "ringbuffer.h" +#include #define AMDTP_MAX_PACKET_SIZE 2048 @@ -88,8 +89,12 @@ bool reset(); bool prepare(); - bool transfer(); - virtual void setVerboseLevel(int l); - - bool isOnePeriodReady(); + + bool prepareForStop(); + bool prepareForStart(); + + bool prepareForEnable(); + + bool canClientTransferFrames(unsigned int nbframes); + bool putFrames(unsigned int nbframes, int64_t ts); ///< transfer the buffer contents from the client // We have 1 period of samples = m_period @@ -103,14 +108,11 @@ unsigned int getMaxPacketSize() {return 4 * (2 + m_syt_interval * m_dimension);}; - - // FIXME: do this the proper way! - AmdtpReceiveStreamProcessor *syncmaster; - - // this updates the timestamp, and the - // 'bufferfill' - // should be called from the same thread - // that does the iteration - void decrementFrameCounter(); - void incrementFrameCounter(int nbframes); + + int64_t getTimeUntilNextPeriodUsecs(); + + uint64_t getTimeAtPeriodUsecs(); + uint64_t getTimeAtPeriod(); + + void setVerboseLevel(int l); protected: @@ -177,10 +179,11 @@ bool reset(); bool prepare(); - bool transfer(); - - virtual void setVerboseLevel(int l); - - bool isOnePeriodReady(); - + + bool prepareForStop(); + bool prepareForStart(); + + bool canClientTransferFrames(unsigned int nbframes); + bool getFrames(unsigned int nbframes, int64_t ts); ///< transfer the buffer contents to the client + // We have 1 period of samples = m_period // this period takes m_period/m_framerate seconds of time @@ -194,9 +197,13 @@ unsigned int getMaxPacketSize() {return 4 * (2 + m_syt_interval * m_dimension);}; - float getTicksPerFrame() {return m_ticks_per_frame;}; - unsigned int getPeriodTimeStamp(); - void dumpInfo(); + int64_t getTimeUntilNextPeriodUsecs(); + + uint64_t getTimeAtPeriodUsecs(); + uint64_t getTimeAtPeriod(); + + void setVerboseLevel(int l); + protected: @@ -211,11 +218,7 @@ unsigned int m_syt_interval; - unsigned int m_last_timestamp; - unsigned int m_last_timestamp2; - unsigned int m_last_timestamp_at_period_ticks; - - float m_ticks_per_frame; - - bool m_one_period_passed; + uint64_t m_last_timestamp; /// last timestamp (in ticks) + uint64_t m_last_timestamp2; /// last timestamp (in ticks) + uint64_t m_last_timestamp_at_period_ticks; DECLARE_DEBUG_MODULE; Index: branches/streaming-rework/src/libstreaming/IsoHandlerManager.h =================================================================== --- branches/streaming-rework/src/libstreaming/IsoHandlerManager.h (revision 383) +++ branches/streaming-rework/src/libstreaming/IsoHandlerManager.h (revision 384) @@ -89,5 +89,5 @@ bool stopHandlers(); ///< stop the managed ISO handlers - bool reset() {return true;}; ///< reset the ISO manager and all streams + bool reset(); ///< reset the ISO manager and all streams bool prepare(); ///< prepare the ISO manager and all streams @@ -96,17 +96,27 @@ void enablePolling(IsoStream *); ///< enables polling on a stream - public: - - // RunnableInterface interface + public: bool Execute(); // note that this is called in we while(running) loop bool Init(); - // iterate all handlers - bool iterate(); + // the state machine private: - // updates the cycle counter caches of all handlers - void updateCycleCounters(); + enum EHandlerStates { + E_Created, + E_Prepared, + E_Running, + E_Error + }; + enum EHandlerStates m_State; + + private: + /// iterate all child handlers + bool iterate(); + public: // FIXME: just so that SPM can do this (temp solution) + /// updates the cycle timer caches of all child handlers + void updateCycleTimers(); + private: // note: there is a disctinction between streams and handlers // because one handler can serve multiple streams (in case of @@ -134,5 +144,5 @@ bool rebuildFdMap(); - + // debug stuff DECLARE_DEBUG_MODULE; Index: branches/streaming-rework/src/libstreaming/StreamProcessorManager.h =================================================================== --- branches/streaming-rework/src/libstreaming/StreamProcessorManager.h (revision 383) +++ branches/streaming-rework/src/libstreaming/StreamProcessorManager.h (revision 384) @@ -54,6 +54,4 @@ public FreebobUtil::RunnableInterface { - friend class StreamRunner; - public: @@ -64,10 +62,14 @@ bool prepare(); ///< to be called after the processors are registered - virtual void setVerboseLevel(int l); - void dumpInfo(); + bool start(); + bool stop(); + // this is the setup API bool registerProcessor(StreamProcessor *processor); ///< start managing a streamprocessor bool unregisterProcessor(StreamProcessor *processor); ///< stop managing a streamprocessor + + bool enableStreamProcessors(); /// enable registered StreamProcessors + bool disableStreamProcessors(); /// disable registered StreamProcessors void setPeriodSize(unsigned int period); @@ -83,6 +85,4 @@ // the client-side functions - bool xrunOccurred(); - int getXrunCount() {return m_xruns;}; bool waitForPeriod(); ///< wait for the next period @@ -90,13 +90,34 @@ bool transfer(); ///< transfer the buffer contents from/to client bool transfer(enum StreamProcessor::EProcessorType); ///< transfer the buffer contents from/to client (single processor type) - + + int getDelayedUsecs() {return m_delayed_usecs;}; + bool xrunOccurred(); + int getXrunCount() {return m_xruns;}; + +private: + int m_delayed_usecs; + // this stores the time at which the next transfer should occur + // usually this is in the past, but it is needed as a timestamp + // for the transmit SP's + uint64_t m_time_of_transfer; + +public: bool handleXrun(); ///< reset the streams & buffers after xrun - - bool start(); - bool stop(); bool setThreadParameters(bool rt, int priority); - // the ISO-side functions + virtual void setVerboseLevel(int l); + void dumpInfo(); + + + // the sync source stuff +private: + StreamProcessor *m_SyncSource; + +public: + bool setSyncSource(StreamProcessor *s); + StreamProcessor * getSyncSource(); + + protected: int signalWaiters(); // call this to signal a period boundary @@ -116,4 +137,6 @@ StreamProcessorVector m_ReceiveProcessors; StreamProcessorVector m_TransmitProcessors; + + unsigned int m_nb_buffers; Index: branches/streaming-rework/src/libstreaming/StreamProcessor.cpp =================================================================== --- branches/streaming-rework/src/libstreaming/StreamProcessor.cpp (revision 383) +++ branches/streaming-rework/src/libstreaming/StreamProcessor.cpp (revision 384) @@ -46,5 +46,7 @@ , m_framecounter(0) , m_framerate(framerate) - , m_manager(0) + , m_manager(NULL) + , m_SyncSource(NULL) + , m_ticks_per_frame(0) , m_running(false) , m_disabled(true) @@ -69,9 +71,8 @@ debugOutputShort( DEBUG_LEVEL_NORMAL, " Enabled : %d\n", !m_disabled); - m_PeriodStat.dumpInfo(); - m_PacketStat.dumpInfo(); - m_WakeupStat.dumpInfo(); - - +// m_PeriodStat.dumpInfo(); +// m_PacketStat.dumpInfo(); +// m_WakeupStat.dumpInfo(); + } @@ -80,4 +81,6 @@ debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n"); + pthread_mutex_init(&m_framecounter_lock, NULL); + return IsoStream::init(); } @@ -138,9 +141,37 @@ } -bool StreamProcessor::transfer() { - - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n"); -// TODO: implement - +/** + * @brief Notify the StreamProcessor that frames were written + * + * This notifies the StreamProcessor of the fact that frames were written to the internal + * buffer. This is for framecounter & timestamp bookkeeping. + * + * @param nbframes the number of frames that are written to the internal buffers + * @param ts the new timestamp of the 'tail' of the buffer, i.e. the last sample + * present in the buffer. + * @return true if successful + */ +bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts) { + + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Putting %d frames for %llu into frame buffer...\n", nbframes,ts); + incrementFrameCounter(nbframes, ts); + return true; +} + +/** + * @brief Notify the StreamProcessor that frames were read + * + * This notifies the StreamProcessor of the fact that frames were read from the internal + * buffer. This is for framecounter & timestamp bookkeeping. + * + * @param nbframes the number of frames that are read from the internal buffers + * @param ts the new timestamp of the 'head' of the buffer, i.e. the first sample + * present in the buffer. + * @return true if successful + */ +bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) { + + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Getting %d frames from frame buffer...\n", nbframes); + decrementFrameCounter(nbframes, ts); return true; } @@ -157,17 +188,126 @@ } +bool StreamProcessor::setSyncSource(StreamProcessor *s) { + m_SyncSource=s; + return true; +} + /** * Decrements the frame counter, in a atomic way. This + * also sets the buffer tail timestamp * is thread safe. */ -void StreamProcessor::decrementFrameCounter() { - SUBSTRACT_ATOMIC((SInt32 *)&m_framecounter,m_period); -} -/** - * Increments the frame counter, in a atomic way. This - * is thread safe. - */ -void StreamProcessor::incrementFrameCounter(int nbframes) { - ADD_ATOMIC((SInt32 *)&m_framecounter, nbframes); +void StreamProcessor::decrementFrameCounter(int nbframes, uint64_t new_timestamp) { + pthread_mutex_lock(&m_framecounter_lock); + m_framecounter -= nbframes; + m_buffer_head_timestamp = new_timestamp; + pthread_mutex_unlock(&m_framecounter_lock); +} + +/** + * Increments the frame counter, in a atomic way. + * also sets the buffer head timestamp + * This is thread safe. + */ +void StreamProcessor::incrementFrameCounter(int nbframes, uint64_t new_timestamp) { + pthread_mutex_lock(&m_framecounter_lock); + m_framecounter += nbframes; + m_buffer_tail_timestamp = new_timestamp; + pthread_mutex_unlock(&m_framecounter_lock); + +} + +/** + * Sets the frame counter, in a atomic way. + * also sets the buffer head timestamp + * This is thread safe. + */ +void StreamProcessor::setFrameCounter(int new_framecounter, uint64_t new_timestamp) { + pthread_mutex_lock(&m_framecounter_lock); + m_framecounter = new_framecounter; + m_buffer_tail_timestamp = new_timestamp; + pthread_mutex_unlock(&m_framecounter_lock); +} + +/** + * Sets the buffer tail timestamp (in usecs) + * This is thread safe. + */ +void StreamProcessor::setBufferTailTimestamp(uint64_t new_timestamp) { + pthread_mutex_lock(&m_framecounter_lock); + m_buffer_tail_timestamp = new_timestamp; + pthread_mutex_unlock(&m_framecounter_lock); +} + +/** + * Sets the buffer head timestamp (in usecs) + * This is thread safe. + */ +void StreamProcessor::setBufferHeadTimestamp(uint64_t new_timestamp) { + pthread_mutex_lock(&m_framecounter_lock); + m_buffer_head_timestamp = new_timestamp; + pthread_mutex_unlock(&m_framecounter_lock); +} + +/** + * Sets both the buffer head and tail timestamps (in usecs) + * (avoids multiple mutex lock/unlock's) + * This is thread safe. + */ +void StreamProcessor::setBufferTimestamps(uint64_t new_head, uint64_t new_tail) { + pthread_mutex_lock(&m_framecounter_lock); + m_buffer_head_timestamp = new_head; + m_buffer_tail_timestamp = new_tail; + pthread_mutex_unlock(&m_framecounter_lock); +} +/** + * \brief return the timestamp of the first frame in the buffer + * + * This function returns the timestamp of the very first sample in + * the StreamProcessor's buffer. This is useful for slave StreamProcessors + * to find out what the base for their timestamp generation should + * be. It also returns the framecounter value for which this timestamp + * is valid. + * + * The system is built in such a way that we assume that the processing + * of the buffers doesn't take any time. Assume we have a buffer transfer at + * time T1, meaning that the last sample of this buffer occurs at T1. As + * processing does not take time, we don't have to add anything to T1. When + * transferring the processed buffer to the xmit processor, the timestamp + * of the last sample is still T1. + * + * When starting the streams, we don't have any information on this last + * timestamp. We prefill the buffer at the xmit side, and we should find + * out what the timestamp for the last sample in the buffer is. If we sync + * on a receive SP, we know that the last prefilled sample corresponds with + * the first sample received - 1 sample duration. This is the same as if the last + * transfer from iso to client would have emptied the receive buffer. + * + * + * @param ts address to store the timestamp in + * @param fc address to store the associated framecounter in + */ +void StreamProcessor::getBufferHeadTimestamp(uint64_t *ts, uint64_t *fc) { + pthread_mutex_lock(&m_framecounter_lock); + *fc = m_framecounter; + *ts = m_buffer_head_timestamp; + pthread_mutex_unlock(&m_framecounter_lock); +} + +/** + * \brief return the timestamp of the last frame in the buffer + * + * This function returns the timestamp of the last frame in + * the StreamProcessor's buffer. It also returns the framecounter + * value for which this timestamp is valid. + * + * @param ts address to store the timestamp in + * @param fc address to store the associated framecounter in + */ +void StreamProcessor::getBufferTailTimestamp(uint64_t *ts, uint64_t *fc) { + pthread_mutex_lock(&m_framecounter_lock); + *fc = m_framecounter; + *ts = m_buffer_tail_timestamp; + pthread_mutex_unlock(&m_framecounter_lock); } @@ -177,5 +317,7 @@ */ void StreamProcessor::resetFrameCounter() { - ZERO_ATOMIC((SInt32 *)&m_framecounter); + pthread_mutex_lock(&m_framecounter_lock); + m_framecounter = 0; + pthread_mutex_unlock(&m_framecounter_lock); } Index: branches/streaming-rework/src/libstreaming/IsoHandler.h =================================================================== --- branches/streaming-rework/src/libstreaming/IsoHandler.h (revision 383) +++ branches/streaming-rework/src/libstreaming/IsoHandler.h (revision 384) @@ -68,4 +68,7 @@ virtual bool init(); + virtual bool prepare(); + virtual bool start(int cycle); + virtual bool stop(); int iterate() { if(m_handle) return raw1394_loop_iterate(m_handle); else return -1; }; @@ -86,7 +89,4 @@ virtual enum EHandlerType getType() = 0; - virtual bool start(int cycle) = 0; - virtual bool stop(); - int getFileDescriptor() { return raw1394_get_fd(m_handle);}; @@ -102,14 +102,14 @@ int getPort() {return m_port;}; - virtual bool prepare() = 0; - - // get the most recent cycle counter value - // RT safe - unsigned int getCycleCounter(); - - // update the cycle counter cache - // not RT safe - // the isohandlermanager is responsible for calling this! - bool updateCycleCounter(); + /// get the most recent cycle timer value (in ticks) + unsigned int getCycleTimerTicks(); + /// get the most recent cycle timer value (as is) + unsigned int getCycleTimer(); + /// Maps a value of the active TimeSource to a Cycle Timer value. + unsigned int mapToCycleTimer(freebob_microsecs_t now); + /// Maps a Cycle Timer value to the active TimeSource's unit. + freebob_microsecs_t mapToTimeSource(unsigned int cc); + /// update the cycle timer cache + bool updateCycleTimer(); float getTicksPerUsec() {return m_ticks_per_usec;}; @@ -125,6 +125,6 @@ int m_irq_interval; - unsigned int m_cyclecounter_ticks; - freebob_microsecs_t m_lastmeas_usecs; + uint64_t m_cycletimer_ticks; + uint64_t m_lastmeas_usecs; float m_ticks_per_usec; float m_ticks_per_usec_dll_err2; @@ -145,5 +145,17 @@ static int busreset_handler(raw1394handle_t handle, unsigned int generation); - void initCycleCounter(); + void initCycleTimer(); + + // the state machine + private: + enum EHandlerStates { + E_Created, + E_Initialized, + E_Prepared, + E_Running, + E_Error + }; + + enum EHandlerStates m_State; // implement the TimeSource interface @@ -151,4 +163,7 @@ freebob_microsecs_t getCurrentTime(); freebob_microsecs_t getCurrentTimeAsUsecs(); + inline freebob_microsecs_t unWrapTime(freebob_microsecs_t t); + inline freebob_microsecs_t wrapTime(freebob_microsecs_t t); + private: // to cope with wraparound @@ -173,7 +188,4 @@ enum EHandlerType getType() { return EHT_Receive;}; - -// int registerStream(IsoStream *); -// int unregisterStream(IsoStream *); bool start(int cycle); @@ -216,7 +228,4 @@ enum EHandlerType getType() { return EHT_Transmit;}; - -// int registerStream(IsoStream *); -// int unregisterStream(IsoStream *); unsigned int getPreBuffers() {return m_prebuffers;};