Index: /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.cpp =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.cpp (revision 722) +++ /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.cpp (revision 727) @@ -438,25 +438,23 @@ unsigned int nevents, unsigned int offset ) { - bool no_problem=true; + bool no_problem = true; for ( PortVectorIterator it = m_PeriodPorts.begin(); - it != m_PeriodPorts.end(); - ++it ) - { - - if ( ( *it )->isDisabled() ) {continue;}; + it != m_PeriodPorts.end(); + ++it ) + { + if ( (*it)->isDisabled() ) { continue; }; //FIXME: make this into a static_cast when not DEBUG? - - AmdtpPortInfo *pinfo=dynamic_cast ( *it ); + AmdtpPortInfo *pinfo = dynamic_cast ( *it ); assert ( pinfo ); // this should not fail!! - switch ( pinfo->getFormat() ) + switch( pinfo->getFormat() ) { case AmdtpPortInfo::E_MBLA: - if ( encodePortToMBLAEvents ( static_cast ( *it ), ( quadlet_t * ) data, offset, nevents ) ) + if( encodePortToMBLAEvents(static_cast(*it), (quadlet_t *)data, offset, nevents) ) { - debugWarning ( "Could not encode port %s to MBLA events", ( *it )->getName().c_str() ); - no_problem=false; + debugWarning ( "Could not encode port %s to MBLA events", (*it)->getName().c_str() ); + no_problem = false; } break; @@ -470,23 +468,24 @@ } -bool AmdtpTransmitStreamProcessor::transmitSilenceBlock ( char *data, - unsigned int nevents, unsigned int offset ) -{ - bool problem = false; - for ( PortVectorIterator it = m_PeriodPorts.begin(); - it != m_PeriodPorts.end(); - ++it ) +bool +AmdtpTransmitStreamProcessor::transmitSilenceBlock( + char *data, unsigned int nevents, unsigned int offset) +{ + bool no_problem = true; + for(PortVectorIterator it = m_PeriodPorts.begin(); + it != m_PeriodPorts.end(); + ++it ) { //FIXME: make this into a static_cast when not DEBUG? - AmdtpPortInfo *pinfo=dynamic_cast ( *it ); - assert ( pinfo ); // this should not fail!! - - switch ( pinfo->getFormat() ) + AmdtpPortInfo *pinfo=dynamic_cast(*it); + assert(pinfo); // this should not fail!! + + switch( pinfo->getFormat() ) { case AmdtpPortInfo::E_MBLA: - if ( encodeSilencePortToMBLAEvents ( static_cast ( *it ), ( quadlet_t * ) data, offset, nevents ) ) + if ( encodeSilencePortToMBLAEvents(static_cast(*it), (quadlet_t *)data, offset, nevents) ) { - debugWarning ( "Could not encode port %s to MBLA events", ( *it )->getName().c_str() ); - problem = true; + debugWarning("Could not encode port %s to MBLA events", (*it)->getName().c_str()); + no_problem = false; } break; @@ -497,5 +496,5 @@ } } - return problem; + return no_problem; } Index: /branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp (revision 723) +++ /branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp (revision 727) @@ -29,4 +29,5 @@ #include #include +#include #define RUNNING_TIMEOUT_MSEC 4000 @@ -350,6 +351,6 @@ int64_t time_till_next_period; while(nb_sync_runs--) { // or while not sync-ed? - // check if we were waked up too soon - time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); + // check if we were woken up too soon + time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period); if(time_till_next_period > 0) { @@ -444,8 +445,79 @@ // now align the received streams - debugOutput( DEBUG_LEVEL_VERBOSE, " Aligning incoming streams...\n"); - - + if(!alignReceivedStreams()) { + debugError("Could not align streams\n"); + return false; + } debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n"); + return true; +} + +bool +StreamProcessorManager::alignReceivedStreams() +{ + #define NB_PERIODS_FOR_ALIGN_AVERAGE 20 + #define NB_ALIGN_TRIES 20 + debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n"); + unsigned int nb_sync_runs; + unsigned int nb_rcv_sp = m_ReceiveProcessors.size(); + int64_t diff_between_streams[nb_rcv_sp]; + int64_t diff; + + unsigned int i; + + bool aligned = false; + int cnt = NB_ALIGN_TRIES; + while (!aligned && cnt--) { + nb_sync_runs = NB_PERIODS_FOR_ALIGN_AVERAGE; + while(nb_sync_runs) { + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs); + waitForPeriod(); + + i = 0; + for ( i = 0; i < nb_rcv_sp; i++) { + StreamProcessor *s = m_ReceiveProcessors.at(i); + diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod()); + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " offset between SyncSP %p and SP %p is %lld ticks...\n", + m_SyncSource, s, diff); + if ( nb_sync_runs == NB_PERIODS_FOR_ALIGN_AVERAGE ) { + diff_between_streams[i] = diff; + } else { + diff_between_streams[i] += diff; + } + } + if(!transferSilence()) { + debugError("Could not transfer silence\n"); + return false; + } + nb_sync_runs--; + } + // calculate the average offsets + debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n"); + int diff_between_streams_frames[nb_rcv_sp]; + aligned = true; + for ( i = 0; i < nb_rcv_sp; i++) { + StreamProcessor *s = m_ReceiveProcessors.at(i); + + diff_between_streams[i] /= NB_PERIODS_FOR_ALIGN_AVERAGE; + diff_between_streams_frames[i] = roundf(diff_between_streams[i] / s->getTicksPerFrame()); + debugOutput( DEBUG_LEVEL_VERBOSE, " avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n", + m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]); + + aligned &= (diff_between_streams_frames[i] == 0); + + // position the stream + if(!s->shiftStream(diff_between_streams_frames[i])) { + debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]); + return false; + } + } + if (!aligned) { + debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n"); + } + } + if (cnt == 0) { + debugError("Align failed\n"); + return false; + } return true; } @@ -817,5 +889,4 @@ */ bool StreamProcessorManager::transfer() { - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n"); bool retval=true; @@ -833,5 +904,4 @@ * @return true if successful, false otherwise (indicates xrun). */ - bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) { debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n", @@ -879,4 +949,74 @@ } +/** + * @brief Transfer one period of silence for both receive and transmit StreamProcessors + * + * Transfers one period of silence to the Iso side for transmit SP's + * or dump one period of frames for receive SP's + * + * @return true if successful, false otherwise (indicates xrun). + */ +bool StreamProcessorManager::transferSilence() { + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n"); + bool retval=true; + retval &= transferSilence(StreamProcessor::ePT_Receive); + retval &= transferSilence(StreamProcessor::ePT_Transmit); + return retval; +} + +/** + * @brief Transfer one period of silence for either the receive or transmit StreamProcessors + * + * Transfers one period of silence to the Iso side for transmit SP's + * or dump one period of frames for receive SP's + * + * @param t The processor type to tranfer for (receive or transmit) + * @return true if successful, false otherwise (indicates xrun). + */ +bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) { + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n", + t, m_time_of_transfer, + (unsigned int)TICKS_TO_SECS(m_time_of_transfer), + (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer), + (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer)); + + bool retval = true; + // a static cast could make sure that there is no performance + // penalty for the virtual functions (to be checked) + if (t==StreamProcessor::ePT_Receive) { + for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); + it != m_ReceiveProcessors.end(); + ++it ) { + if(!(*it)->dropFrames(m_period, m_time_of_transfer)) { + debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n", + m_period, m_time_of_transfer,*it); + retval &= false; // buffer underrun + } + } + } else { + // FIXME: in the SPM it would be nice to have system time instead of + // 1394 time + float rate = m_SyncSource->getTicksPerFrame(); + int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate); + + // the data we are putting into the buffer is intended to be transmitted + // one ringbuffer size after it has been received + int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks); + + for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); + it != m_TransmitProcessors.end(); + ++it ) { + // FIXME: in the SPM it would be nice to have system time instead of + // 1394 time + if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) { + debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n", + m_period, transmit_timestamp, *it); + retval &= false; // buffer underrun + } + } + } + return retval; +} + void StreamProcessorManager::dumpInfo() { debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n"); Index: /branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.h =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.h (revision 720) +++ /branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.h (revision 727) @@ -71,5 +71,5 @@ void setPeriodSize(unsigned int period); void setPeriodSize(unsigned int period, unsigned int nb_buffers); - int getPeriodSize() {return m_period;}; + unsigned int getPeriodSize() {return m_period;}; void setNbBuffers(unsigned int nb_buffers); @@ -81,9 +81,13 @@ // the client-side functions + bool waitForPeriod(); + bool transfer(); + bool transfer(enum StreamProcessor::eProcessorType); +private: + bool transferSilence(); + bool transferSilence(enum StreamProcessor::eProcessorType); - bool waitForPeriod(); ///< wait for the next period - bool transfer(); ///< transfer the buffer contents from/to client - bool transfer(enum StreamProcessor::eProcessorType); ///< transfer the buffer contents from/to client (single processor type) - + bool alignReceivedStreams(); +public: int getDelayedUsecs() {return m_delayed_usecs;}; bool xrunOccurred(); Index: /branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.h =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.h (revision 723) +++ /branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.h (revision 727) @@ -154,4 +154,37 @@ bool putFrames(unsigned int nbframes, int64_t ts); ///< transfer the client contents to the buffer + /** + * @brief drop nframes from the internal buffer as if they were transferred to the client side + * + * Gets nframes of frames from the buffer as done by getFrames(), but does not transfer them + * to the client side. Instead they are discarded. + * + * @param nframes number of frames + * @return true if the operation was successful + */ + bool dropFrames(unsigned int nframes, int64_t ts); + + /** + * @brief put silence frames into the internal buffer + * + * Puts nframes of frames into the buffer as done by putFrames(), but does not transfer them + * from the client side. Instead, silent frames are used. + * + * @param nframes number of frames + * @return true if the operation was successful + */ + bool putSilenceFrames(unsigned int nbframes, int64_t ts); + + /** + * @brief Shifts the stream with the specified number of frames + * + * Used to align several streams to each other. It comes down to + * making sure the head timestamp corresponds to the timestamp of + * one master stream + * + * @param nframes the number of frames to shift + * @return true if successful + */ + bool shiftStream(int nframes); protected: // the helper receive/transmit functions enum eChildReturnValue { @@ -238,5 +271,10 @@ protected: Util::TimestampedBuffer *m_data_buffer; - + // the scratch buffer is temporary buffer space that can be + // used by any function. It's pre-allocated when the SP is created. + // the purpose is to avoid allocation of memory (or heap/stack) in + // an RT context + byte_t* m_scratch_buffer; + size_t m_scratch_buffer_size_bytes; protected: StreamProcessorManager *m_manager; @@ -255,15 +293,4 @@ */ bool canClientTransferFrames(unsigned int nframes); - - /** - * @brief drop nframes from the internal buffer - * - * this function drops nframes from the internal buffers, without any - * specification on what frames are dropped. Timestamps are not updated. - * - * @param nframes number of frames - * @return true if the operation was successful - */ - bool dropFrames(unsigned int nframes); /** Index: /branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.cpp =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.cpp (revision 723) +++ /branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.cpp (revision 727) @@ -41,4 +41,6 @@ , m_next_state( ePS_Invalid ) , m_cycle_to_switch_state( 0 ) + , m_scratch_buffer( NULL ) + , m_scratch_buffer_size_bytes( 0 ) , m_manager( NULL ) , m_ticks_per_frame( 0 ) @@ -51,9 +53,10 @@ { // create the timestamped buffer and register ourselves as its client - m_data_buffer=new Util::TimestampedBuffer(this); + m_data_buffer = new Util::TimestampedBuffer(this); } StreamProcessor::~StreamProcessor() { if (m_data_buffer) delete m_data_buffer; + if (m_scratch_buffer) delete[] m_scratch_buffer; } @@ -75,11 +78,4 @@ int StreamProcessor::getBufferFill() { return m_data_buffer->getBufferFill(); -} - -bool -StreamProcessor::dropFrames(unsigned int nbframes) -{ - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d)\n", nbframes); - return m_data_buffer->dropFrames(nbframes); } @@ -377,9 +373,9 @@ if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles); if (dropped_cycles > 0) { - debugWarning("(%p) dropped %d packets on cycle %u\n", this, dropped_cycles, cycle); + debugWarning("(%p) dropped %d packets on cycle %u (last_cycle=%u, dropped=%d)\n", this, dropped_cycles, cycle, m_last_cycle, dropped); m_dropped += dropped_cycles; } } - if (cycle > 0) { + if (cycle >= 0) { m_last_cycle = cycle; } @@ -623,8 +619,8 @@ } -bool StreamProcessor::getFramesDry(unsigned int nbframes, int64_t ts) { +bool StreamProcessor::getFramesDry(unsigned int nbframes, int64_t ts) +{ debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): dry run %d frames (@ ts=%lld)\n", this, nbframes, ts); - // dry run on this side means that we put silence in all enabled ports // since there is do data put into the ringbuffer in the dry-running state @@ -632,5 +628,13 @@ } -bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts) { +bool +StreamProcessor::dropFrames(unsigned int nbframes, int64_t ts) +{ + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d, %lld)\n", nbframes, ts); + return m_data_buffer->dropFrames(nbframes); +} + +bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts) +{ debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.putFrames(%d, %11llu)", nbframes, ts); assert( getType() == ePT_Transmit ); @@ -640,5 +644,6 @@ bool -StreamProcessor::putFramesWet(unsigned int nbframes, int64_t ts) { +StreamProcessor::putFramesWet(unsigned int nbframes, int64_t ts) +{ debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesWet(%d, %llu)\n", nbframes, ts); // transfer the data @@ -649,8 +654,49 @@ bool -StreamProcessor::putFramesDry(unsigned int nbframes, int64_t ts) { +StreamProcessor::putFramesDry(unsigned int nbframes, int64_t ts) +{ debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesDry(%d, %llu)\n", nbframes, ts); // do nothing return true; +} + +bool +StreamProcessor::putSilenceFrames(unsigned int nbframes, int64_t ts) +{ + debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putSilenceFrames(%d, %llu)\n", nbframes, ts); + + size_t bytes_per_frame = getEventSize() * getEventsPerFrame(); + unsigned int scratch_buffer_size_frames = m_scratch_buffer_size_bytes / bytes_per_frame; + + if (nbframes > scratch_buffer_size_frames) { + debugError("nframes (%u) > scratch_buffer_size_frames (%u)\n", + nbframes, scratch_buffer_size_frames); + } + + assert(m_scratch_buffer); + if(!transmitSilenceBlock((char *)m_scratch_buffer, nbframes, 0)) { + debugError("Could not prepare silent block\n"); + return false; + } + if(!m_data_buffer->writeFrames(nbframes, (char *)m_scratch_buffer, ts)) { + debugError("Could not write silent block\n"); + return false; + } + return true; +} + +bool +StreamProcessor::shiftStream(int nbframes) +{ + if(nbframes == 0) return true; + if(nbframes > 0) { + return m_data_buffer->dropFrames(nbframes); + } else { + bool result = true; + while(nbframes--) { + result &= m_data_buffer->writeDummyFrame(); + } + return result; + } } @@ -670,7 +716,17 @@ bool StreamProcessor::prepare() { - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "prepare...\n"); + debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare SP (%p)...\n", this); if(!m_manager) { debugFatal("Not attached to a manager!\n"); + return false; + } + + // make the scratch buffer one period of frames long + m_scratch_buffer_size_bytes = m_manager->getPeriodSize() * getEventsPerFrame() * getEventSize(); + debugOutput( DEBUG_LEVEL_VERBOSE, " Allocate scratch buffer of %d quadlets\n"); + if(m_scratch_buffer) delete[] m_scratch_buffer; + m_scratch_buffer = new byte_t[m_scratch_buffer_size_bytes]; + if(m_scratch_buffer == NULL) { + debugFatal("Could not allocate scratch buffer\n"); return false; } Index: /branches/ppalmers-streaming/src/libutil/TimestampedBuffer.cpp =================================================================== --- /branches/ppalmers-streaming/src/libutil/TimestampedBuffer.cpp (revision 720) +++ /branches/ppalmers-streaming/src/libutil/TimestampedBuffer.cpp (revision 727) @@ -448,8 +448,7 @@ * @return true if successful */ -bool TimestampedBuffer::dropFrames(unsigned int nframes) { - - unsigned int read_size=nframes*m_event_size*m_events_per_frame; - +bool +TimestampedBuffer::dropFrames(unsigned int nframes) { + unsigned int read_size = nframes * m_event_size * m_events_per_frame; ffado_ringbuffer_read_advance(m_event_buffer, read_size); decrementFrameCounter(nframes);