Index: /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.cpp =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.cpp (revision 719) +++ /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.cpp (revision 720) @@ -38,49 +38,23 @@ #define TRANSMIT_TRANSFER_DELAY DEFAULT_TRANSFER_DELAY -namespace Streaming { +namespace Streaming +{ /* transmit */ -AmdtpTransmitStreamProcessor::AmdtpTransmitStreamProcessor(int port, int dimension) - : StreamProcessor(ePT_Transmit, port) - , m_dimension(dimension) - , m_last_timestamp(0) - , m_dbc(0) - , m_ringbuffer_size_frames(0) +AmdtpTransmitStreamProcessor::AmdtpTransmitStreamProcessor ( int port, int dimension ) + : StreamProcessor ( ePT_Transmit, port ) + , m_dimension ( dimension ) + , m_dbc ( 0 ) {} -enum raw1394_iso_disposition -AmdtpTransmitStreamProcessor::getPacket(unsigned char *data, unsigned int *length, - unsigned char *tag, unsigned char *sy, - int cycle, unsigned int dropped, unsigned int max_length) { - struct iec61883_packet *packet = (struct iec61883_packet *) data; - - if (cycle<0) { - debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE,"Xmit handler for cycle %d, (running=%d)\n", - cycle, isRunning()); - *tag = 0; - *sy = 0; - *length=0; - return RAW1394_ISO_OK; - } - - debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE,"Xmit handler for cycle %d, (running=%d)\n", - cycle, isRunning()); - - if (addCycles(m_last_cycle, 1) != cycle) { - debugWarning("(%p) Dropped %d packets on cycle %d\n", diffCycles(cycle,m_last_cycle)-1, cycle); - } - - m_last_cycle=cycle; - -#ifdef DEBUG - if(dropped>0) { - debugWarning("Dropped %d packets on cycle %d\n",dropped, cycle); - } -#endif - - // calculate & preset common values - +bool +AmdtpTransmitStreamProcessor::generatePacketHeader ( + unsigned char *data, unsigned int *length, + unsigned char *tag, unsigned char *sy, + int cycle, unsigned int dropped, unsigned int max_length ) +{ + struct iec61883_packet *packet = ( struct iec61883_packet * ) data; /* Our node ID can change after a bus reset, so it is best to fetch - * our node ID for each packet. */ + * our node ID for each packet. */ packet->sid = m_handler->getLocalNodeId() & 0x3f; @@ -96,34 +70,4 @@ *tag = IEC61883_TAG_WITH_CIP; *sy = 0; - - // determine if we want to send a packet or not - // note that we can't use getCycleTimer directly here, - // because packets are queued in advance. This means that - // we the packet we are constructing will be sent out - // on 'cycle', not 'now'. - unsigned int ctr=m_handler->getCycleTimer(); - int now_cycles = (int)CYCLE_TIMER_GET_CYCLES(ctr); - - // the difference between the cycle this - // packet is intended for and 'now' - int cycle_diff = diffCycles(cycle, now_cycles); - -#ifdef DEBUG - if(isRunning() && (cycle_diff < 0)) { - debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy)\n", - cycle, now_cycles); - } - - // keep track of the lag - m_PacketStat.mark(cycle_diff); -#endif - - // as long as the cycle parameter is not in sync with - // the current time, the stream is considered not - // to be 'running' - // NOTE: this works only at startup - if (!isRunning() && cycle_diff >= 0 && cycle >= 0) { - debugOutput(DEBUG_LEVEL_VERBOSE, "Xmit StreamProcessor %p started running at cycle %d\n",this, cycle); - } signed int fc; @@ -152,61 +96,60 @@ const int max_cycles_to_transmit_early = 5; - if( !isRunning() || !m_data_buffer->isEnabled() ) { - debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, - "Not running (%d) or buffer not enabled (enabled=%d)\n", - isRunning(), m_data_buffer->isEnabled()); - - // not running or not enabled - goto send_empty_packet; - } - try_block_of_frames: - debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "Try for cycle %d\n", cycle); + debugOutput ( DEBUG_LEVEL_ULTRA_VERBOSE, "Try for cycle %d\n", cycle ); // check whether the packet buffer has packets for us to send. // the base timestamp is the one of the next sample in the buffer ffado_timestamp_t ts_head_tmp; - m_data_buffer->getBufferHeadTimestamp(&ts_head_tmp, &fc); // thread safe + m_data_buffer->getBufferHeadTimestamp ( &ts_head_tmp, &fc ); // thread safe // the timestamp gives us the time at which we want the sample block // to be output by the device - presentation_time=(uint64_t)ts_head_tmp; + presentation_time = ( uint64_t ) ts_head_tmp; + m_last_timestamp = presentation_time; // now we calculate the time when we have to transmit the sample block - transmit_at_time = substractTicks(presentation_time, TRANSMIT_TRANSFER_DELAY); + transmit_at_time = substractTicks ( presentation_time, TRANSMIT_TRANSFER_DELAY ); // calculate the cycle this block should be presented in // (this is just a virtual calculation since at that time it should // already be in the device's buffer) - presentation_cycle = (unsigned int)(TICKS_TO_CYCLES( presentation_time )); + presentation_cycle = ( unsigned int ) ( TICKS_TO_CYCLES ( presentation_time ) ); // calculate the cycle this block should be transmitted in - transmit_at_cycle = (unsigned int)(TICKS_TO_CYCLES( transmit_at_time )); + transmit_at_cycle = ( unsigned int ) ( TICKS_TO_CYCLES ( transmit_at_time ) ); // we can check whether this cycle is within the 'window' we have // to send this packet. // first calculate the number of cycles left before presentation time - cycles_until_presentation = diffCycles( presentation_cycle, cycle ); + cycles_until_presentation = diffCycles ( presentation_cycle, cycle ); // we can check whether this cycle is within the 'window' we have // to send this packet. // first calculate the number of cycles left before presentation time - cycles_until_transmit = diffCycles( transmit_at_cycle, cycle ); + cycles_until_transmit = diffCycles ( transmit_at_cycle, cycle ); + + debugOutput ( DEBUG_LEVEL_VERY_VERBOSE, + "Gen HDR: CY=%04u, TC=%04u, CUT=%04d, TST=%011llu (%04u), TSP=%011llu (%04u)\n", + cycle, + transmit_at_cycle, cycles_until_transmit, + transmit_at_time, ( unsigned int ) TICKS_TO_CYCLES ( transmit_at_time ), + presentation_time, ( unsigned int ) TICKS_TO_CYCLES ( presentation_time ) ); // two different options: - // 1) there are not enough frames for one packet + // 1) there are not enough frames for one packet // => determine wether this is a problem, since we might still // have some time to send it // 2) there are enough packets // => determine whether we have to send them in this packet - if (fc < (signed int)m_syt_interval) { - m_PacketStat.signal(0); + if ( fc < ( signed int ) m_syt_interval ) + { // not enough frames in the buffer, - debugOutput(DEBUG_LEVEL_VERBOSE, + debugOutput ( DEBUG_LEVEL_VERBOSE, "Insufficient frames: N=%02d, CY=%04u, TC=%04u, CUT=%04d\n", - fc, cycle, transmit_at_cycle, cycles_until_transmit); + fc, cycle, transmit_at_cycle, cycles_until_transmit ); // we can still postpone the queueing of the packets // if we are far enough ahead of the presentation time - if( cycles_until_presentation <= min_cycles_before_presentation ) { - m_PacketStat.signal(1); + if ( cycles_until_presentation <= min_cycles_before_presentation ) + { // we are too late // meaning that we in some sort of xrun state @@ -214,20 +157,22 @@ m_xruns++; // we send an empty packet on this cycle - goto send_empty_packet; // UGLY but effective - } else { - m_PacketStat.signal(2); + return false; + } + else + { // there is still time left to send the packet // we want the system to give this packet another go -// goto try_packet_again; // UGLY but effective + // goto try_packet_again; // UGLY but effective // unfortunatly the try_again doesn't work very well, // so we'll have to either usleep(one cycle) and goto try_block_of_frames - + // or just fill this with an empty packet // if we have to do this too often, the presentation time will // get too close and we're in trouble - goto send_empty_packet; // UGLY but effective - } - } else { - m_PacketStat.signal(3); + return false; + } + } + else + { // there are enough frames, so check the time they are intended for // all frames have a certain 'time window' in which they can be sent @@ -236,5 +181,5 @@ // in theory we can send the packet up till one cycle before the presentation time, // however this is not very smart. - + // There are 3 options: // 1) the frame block is too early @@ -245,16 +190,19 @@ // => discard (and raise xrun?) // get next block of frames and repeat - - if (cycles_until_transmit <= max_cycles_to_transmit_early) { - m_PacketStat.signal(4); + + if ( cycles_until_transmit <= max_cycles_to_transmit_early ) + { // it's time send the packet - goto send_packet; // UGLY but effective - } else if (cycles_until_transmit < 0) { + m_dbc += fillDataPacketHeader ( packet, length, m_last_timestamp ); + return true; + } + else if ( cycles_until_transmit < 0 ) + { // we are too late - debugOutput(DEBUG_LEVEL_VERBOSE, + debugOutput ( DEBUG_LEVEL_VERBOSE, "Too late: CY=%04u, TC=%04u, CUT=%04d, TSP=%011llu (%04u)\n", cycle, transmit_at_cycle, cycles_until_transmit, - presentation_time, (unsigned int)TICKS_TO_CYCLES(presentation_time)); + presentation_time, ( unsigned int ) TICKS_TO_CYCLES ( presentation_time ) ); // however, if we can send this sufficiently before the presentation @@ -262,12 +210,14 @@ // NOTE: dangerous since the device has no way of reporting that it didn't get // this packet on time. - if ( cycles_until_presentation <= min_cycles_before_presentation ) { - m_PacketStat.signal(5); + if ( cycles_until_presentation <= min_cycles_before_presentation ) + { // we are not that late and can still try to transmit the packet - goto send_packet; // UGLY but effective - } else { // definitely too late - m_PacketStat.signal(6); + m_dbc += fillDataPacketHeader ( packet, length, m_last_timestamp ); + return true; + } + else // definitely too late + { // remove the samples - m_data_buffer->dropFrames(m_syt_interval); + m_data_buffer->dropFrames ( m_syt_interval ); // signal some xrun situation ??HERE?? m_xruns++; @@ -275,99 +225,114 @@ goto try_block_of_frames; // UGLY but effective } - } else { - m_PacketStat.signal(7); - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, + } + else + { + debugOutput ( DEBUG_LEVEL_VERY_VERBOSE, "Too early: CY=%04u, TC=%04u, CUT=%04d, TST=%011llu (%04u), TSP=%011llu (%04u)\n", cycle, transmit_at_cycle, cycles_until_transmit, - transmit_at_time, (unsigned int)TICKS_TO_CYCLES(transmit_at_time), - presentation_time, (unsigned int)TICKS_TO_CYCLES(presentation_time)); - #ifdef DEBUG - if (cycles_until_transmit > max_cycles_to_transmit_early + 1) { - debugOutput(DEBUG_LEVEL_VERBOSE, + transmit_at_time, ( unsigned int ) TICKS_TO_CYCLES ( transmit_at_time ), + presentation_time, ( unsigned int ) TICKS_TO_CYCLES ( presentation_time ) ); +#ifdef DEBUG + if ( cycles_until_transmit > max_cycles_to_transmit_early + 1 ) + { + debugOutput ( DEBUG_LEVEL_VERY_VERBOSE, "Way too early: CY=%04u, TC=%04u, CUT=%04d, TST=%011llu (%04u), TSP=%011llu (%04u)\n", cycle, transmit_at_cycle, cycles_until_transmit, - transmit_at_time, (unsigned int)TICKS_TO_CYCLES(transmit_at_time), - presentation_time, (unsigned int)TICKS_TO_CYCLES(presentation_time)); - } - #endif + transmit_at_time, ( unsigned int ) TICKS_TO_CYCLES ( transmit_at_time ), + presentation_time, ( unsigned int ) TICKS_TO_CYCLES ( presentation_time ) ); + } +#endif // we are too early, send only an empty packet - goto send_empty_packet; // UGLY but effective - } - } - - debugFatal("Should never reach this code!\n"); - return RAW1394_ISO_ERROR; - -send_empty_packet: - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT NONE: CY=%04u, TSP=%011llu (%04u)\n", - cycle, - presentation_time, (unsigned int)TICKS_TO_CYCLES(presentation_time)); - - m_dbc += fillNoDataPacketHeader(packet, length); - return RAW1394_ISO_DEFER; - -send_packet: - if (m_data_buffer->readFrames(m_syt_interval, (char *)(data + 8))) { - m_dbc += fillDataPacketHeader(packet, length, presentation_time); - + return false; + } + } + return true; +} + +bool +AmdtpTransmitStreamProcessor::generatePacketData ( + unsigned char *data, unsigned int *length, + unsigned char *tag, unsigned char *sy, + int cycle, unsigned int dropped, unsigned int max_length ) +{ + struct iec61883_packet *packet = ( struct iec61883_packet * ) data; + if ( m_data_buffer->readFrames ( m_syt_interval, ( char * ) ( data + 8 ) ) ) + { // process all ports that should be handled on a per-packet base // this is MIDI for AMDTP (due to the need of DBC) - if (!encodePacketPorts((quadlet_t *)(data+8), m_syt_interval, packet->dbc)) { - debugWarning("Problem encoding Packet Ports\n"); - } - - debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT DATA: CY=%04u, TST=%011llu (%04u), TSP=%011llu (%04u)\n", - cycle, - transmit_at_time, (unsigned int)TICKS_TO_CYCLES(transmit_at_time), - presentation_time, (unsigned int)TICKS_TO_CYCLES(presentation_time)); - - return RAW1394_ISO_OK; - } - -// the ISO AGAIN does not work very well... -// try_packet_again: -// -// debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT RETRY: CY=%04u, TSP=%011llu (%04u)\n", -// cycle, -// presentation_time, (unsigned int)TICKS_TO_CYCLES(presentation_time)); -// return RAW1394_ISO_AGAIN; - - // else: - debugFatal("This is impossible, since we checked the buffer size before!\n"); - return RAW1394_ISO_ERROR; -} - -unsigned int -AmdtpTransmitStreamProcessor::getEventsPerFrame() -{ - return m_dimension; -} - -unsigned int -AmdtpTransmitStreamProcessor::getUpdatePeriod() -{ + if ( !encodePacketPorts ( ( quadlet_t * ) ( data+8 ), m_syt_interval, packet->dbc ) ) + { + debugWarning ( "Problem encoding Packet Ports\n" ); + } + debugOutput ( DEBUG_LEVEL_VERY_VERBOSE, "XMIT DATA: TSP=%011llu (%04u)\n", + cycle, m_last_timestamp, ( unsigned int ) TICKS_TO_CYCLES ( m_last_timestamp ) ); + return true; + } + else + { + return false; + } +} + +bool +AmdtpTransmitStreamProcessor::generateSilentPacketHeader ( + unsigned char *data, unsigned int *length, + unsigned char *tag, unsigned char *sy, + int cycle, unsigned int dropped, unsigned int max_length ) +{ + struct iec61883_packet *packet = ( struct iec61883_packet * ) data; + debugOutput ( DEBUG_LEVEL_VERY_VERBOSE, "XMIT NONE: CY=%04u, TSP=%011llu (%04u)\n", + cycle, m_last_timestamp, ( unsigned int ) TICKS_TO_CYCLES ( m_last_timestamp ) ); + + /* Our node ID can change after a bus reset, so it is best to fetch + * our node ID for each packet. */ + packet->sid = m_handler->getLocalNodeId() & 0x3f; + + packet->dbs = m_dimension; + packet->fn = 0; + packet->qpc = 0; + packet->sph = 0; + packet->reserved = 0; + packet->dbc = m_dbc; + packet->eoh1 = 2; + packet->fmt = IEC61883_FMT_AMDTP; + + *tag = IEC61883_TAG_WITH_CIP; + *sy = 0; + + m_dbc += fillNoDataPacketHeader ( packet, length ); + return true; +} + +bool +AmdtpTransmitStreamProcessor::generateSilentPacketData ( + unsigned char *data, unsigned int *length, + unsigned char *tag, unsigned char *sy, + int cycle, unsigned int dropped, unsigned int max_length ) +{ + return true; // no need to do anything +} + +unsigned int AmdtpTransmitStreamProcessor::fillDataPacketHeader ( + struct iec61883_packet *packet, unsigned int* length, + uint32_t ts ) +{ + + packet->fdf = m_fdf; + + // convert the timestamp to SYT format + uint16_t timestamp_SYT = TICKS_TO_SYT ( ts ); + packet->syt = ntohs ( timestamp_SYT ); + + *length = m_syt_interval*sizeof ( quadlet_t ) *m_dimension + 8; + return m_syt_interval; } - -unsigned int AmdtpTransmitStreamProcessor::fillDataPacketHeader( - struct iec61883_packet *packet, unsigned int* length, - uint32_t ts) { - - packet->fdf = m_fdf; - - // convert the timestamp to SYT format - uint16_t timestamp_SYT = TICKS_TO_SYT(ts); - packet->syt = ntohs(timestamp_SYT); - - *length = m_syt_interval*sizeof(quadlet_t)*m_dimension + 8; - - return m_syt_interval; -} - -unsigned int AmdtpTransmitStreamProcessor::fillNoDataPacketHeader( - struct iec61883_packet *packet, unsigned int* length) { +unsigned int AmdtpTransmitStreamProcessor::fillNoDataPacketHeader ( + struct iec61883_packet *packet, unsigned int* length ) +{ // no-data packets have syt=0xFFFF @@ -378,99 +343,59 @@ // FIXME: either make this a setting or choose bool send_payload=true; - if(send_payload) { + if ( send_payload ) + { // this means no-data packets with payload (DICE doesn't like that) - *length = 2*sizeof(quadlet_t) + m_syt_interval * m_dimension * sizeof(quadlet_t); + *length = 2*sizeof ( quadlet_t ) + m_syt_interval * m_dimension * sizeof ( quadlet_t ); return m_syt_interval; - } else { + } + else + { // dbc is not incremented // this means no-data packets without payload - *length = 2*sizeof(quadlet_t); + *length = 2*sizeof ( quadlet_t ); return 0; } } -bool AmdtpTransmitStreamProcessor::prefill() { - - debugOutput( DEBUG_LEVEL_VERBOSE, "Prefill transmit buffers...\n"); - - if(!transferSilence(m_ringbuffer_size_frames)) { - debugFatal("Could not prefill transmit stream\n"); - return false; - } - - return true; -} - -bool AmdtpTransmitStreamProcessor::reset() { - - debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting...\n"); - - // reset the statistics - m_PeriodStat.reset(); - m_PacketStat.reset(); - m_WakeupStat.reset(); - - m_data_buffer->setTickOffset(0); - -// // reset all non-device specific stuff -// // i.e. the iso stream and the associated ports -// if(!StreamProcessor::reset()) { -// debugFatal("Could not do base class reset\n"); -// return false; -// } - - // we should prefill the event buffer - if (!prefill()) { - debugFatal("Could not prefill buffers\n"); - return false; - } - - return true; -} - -bool AmdtpTransmitStreamProcessor::prepareChild() { - m_PeriodStat.setName("XMT PERIOD"); - m_PacketStat.setName("XMT PACKET"); - m_WakeupStat.setName("XMT WAKEUP"); - - debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing (%p)...\n", this); - - // prepare all non-device specific stuff - // i.e. the iso stream and the associated ports - if(!StreamProcessor::prepare()) { - debugFatal("Could not prepare base class\n"); - return false; - } - - switch (m_manager->getNominalRate()) { - case 32000: - m_syt_interval = 8; - m_fdf = IEC61883_FDF_SFC_32KHZ; - break; - case 44100: - m_syt_interval = 8; - m_fdf = IEC61883_FDF_SFC_44K1HZ; - break; - default: - case 48000: - m_syt_interval = 8; - m_fdf = IEC61883_FDF_SFC_48KHZ; - break; - case 88200: - m_syt_interval = 16; - m_fdf = IEC61883_FDF_SFC_88K2HZ; - break; - case 96000: - m_syt_interval = 16; - m_fdf = IEC61883_FDF_SFC_96KHZ; - break; - case 176400: - m_syt_interval = 32; - m_fdf = IEC61883_FDF_SFC_176K4HZ; - break; - case 192000: - m_syt_interval = 32; - m_fdf = IEC61883_FDF_SFC_192KHZ; - break; +unsigned int +AmdtpTransmitStreamProcessor::getPacketsPerPeriod() +{ + return ( m_manager->getPeriodSize() ) /m_syt_interval; +} + +bool AmdtpTransmitStreamProcessor::prepareChild() +{ + debugOutput ( DEBUG_LEVEL_VERBOSE, "Preparing (%p)...\n", this ); + switch ( m_manager->getNominalRate() ) + { + case 32000: + m_syt_interval = 8; + m_fdf = IEC61883_FDF_SFC_32KHZ; + break; + case 44100: + m_syt_interval = 8; + m_fdf = IEC61883_FDF_SFC_44K1HZ; + break; + default: + case 48000: + m_syt_interval = 8; + m_fdf = IEC61883_FDF_SFC_48KHZ; + break; + case 88200: + m_syt_interval = 16; + m_fdf = IEC61883_FDF_SFC_88K2HZ; + break; + case 96000: + m_syt_interval = 16; + m_fdf = IEC61883_FDF_SFC_96KHZ; + break; + case 176400: + m_syt_interval = 32; + m_fdf = IEC61883_FDF_SFC_176K4HZ; + break; + case 192000: + m_syt_interval = 32; + m_fdf = IEC61883_FDF_SFC_192KHZ; + break; } @@ -481,296 +406,120 @@ m_manager->getNominalRate(), m_dimension, - m_syt_interval); - - // prepare the framerate estimate - float ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_manager->getNominalRate()); - m_ticks_per_frame=ticks_per_frame; - - // initialize internal buffer - m_ringbuffer_size_frames=m_manager->getNbBuffers() * m_manager->getPeriodSize(); - - assert(m_data_buffer); - m_data_buffer->setBufferSize(m_ringbuffer_size_frames * 2); - m_data_buffer->setEventSize(sizeof(quadlet_t)); - m_data_buffer->setEventsPerFrame(m_dimension); - - m_data_buffer->setUpdatePeriod(m_manager->getPeriodSize()); - m_data_buffer->setNominalRate(ticks_per_frame); - - m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND); - - m_data_buffer->prepare(); - - // set the parameters of ports we can: - // we want the audio ports to be period buffered, - // and the midi ports to be packet buffered + m_syt_interval ); + for ( PortVectorIterator it = m_Ports.begin(); - it != m_Ports.end(); - ++it ) - { - debugOutput(DEBUG_LEVEL_VERBOSE, "Setting up port %s\n",(*it)->getName().c_str()); - if(!(*it)->setBufferSize(m_manager->getPeriodSize())) { - debugFatal("Could not set buffer size to %d\n",m_manager->getPeriodSize()); - return false; - } - - - switch ((*it)->getPortType()) { - case Port::E_Audio: - if(!(*it)->setSignalType(Port::E_PeriodSignalled)) { - debugFatal("Could not set signal type to PeriodSignalling"); - return false; - } - debugWarning("---------------- ! Doing hardcoded test setup ! --------------\n"); - // buffertype and datatype are dependant on the API - if(!(*it)->setBufferType(Port::E_PointerBuffer)) { - debugFatal("Could not set buffer type"); - return false; - } - if(!(*it)->useExternalBuffer(true)) { - debugFatal("Could not set external buffer usage"); - return false; - } - - if(!(*it)->setDataType(Port::E_Float)) { - debugFatal("Could not set data type"); - return false; - } - - - break; - case Port::E_Midi: - if(!(*it)->setSignalType(Port::E_PacketSignalled)) { - debugFatal("Could not set signal type to PeriodSignalling"); - return false; - } - - // we use a timing unit of 10ns - // this makes sure that for the max syt interval - // we don't have rounding, and keeps the numbers low - // we have 1 slot every 8 events - // we have syt_interval events per packet - // => syt_interval/8 slots per packet - // packet rate is 8000pkt/sec => interval=125us - // so the slot interval is (1/8000)/(syt_interval/8) - // or: 1/(1000 * syt_interval) sec - // which is 1e9/(1000*syt_interval) nsec - // or 100000/syt_interval 'units' - // the event interval is fixed to 320us = 32000 'units' - if(!(*it)->useRateControl(true,(100000/m_syt_interval),32000, false)) { - debugFatal("Could not set signal type to PeriodSignalling"); - return false; - } - - // buffertype and datatype are dependant on the API - debugWarning("---------------- ! Doing hardcoded test setup ! --------------\n"); - // buffertype and datatype are dependant on the API - if(!(*it)->setBufferType(Port::E_RingBuffer)) { - debugFatal("Could not set buffer type"); - return false; - } - if(!(*it)->setDataType(Port::E_MidiEvent)) { - debugFatal("Could not set data type"); - return false; + it != m_Ports.end(); + ++it ) + { + if ( ( *it )->getPortType() == Port::E_Midi ) + { + // we use a timing unit of 10ns + // this makes sure that for the max syt interval + // we don't have rounding, and keeps the numbers low + // we have 1 slot every 8 events + // we have syt_interval events per packet + // => syt_interval/8 slots per packet + // packet rate is 8000pkt/sec => interval=125us + // so the slot interval is (1/8000)/(syt_interval/8) + // or: 1/(1000 * syt_interval) sec + // which is 1e9/(1000*syt_interval) nsec + // or 100000/syt_interval 'units' + // the event interval is fixed to 320us = 32000 'units' + if ( ! ( *it )->useRateControl ( true, ( 100000/m_syt_interval ),32000, false ) ) + { + debugFatal ( "Could not set signal type to PeriodSignalling" ); + return false; + } + break; + } + } + + debugOutput ( DEBUG_LEVEL_VERBOSE, "Prepared for:\n" ); + debugOutput ( DEBUG_LEVEL_VERBOSE, " Samplerate: %d, FDF: %d, DBS: %d, SYT: %d\n", + m_manager->getNominalRate(), m_fdf, m_dimension, m_syt_interval ); + debugOutput ( DEBUG_LEVEL_VERBOSE, " PeriodSize: %d, NbBuffers: %d\n", + m_manager->getPeriodSize(), m_manager->getNbBuffers() ); + debugOutput ( DEBUG_LEVEL_VERBOSE, " Port: %d, Channel: %d\n", + m_port,m_channel ); + return true; +} + +/* +* compose the event streams for the packets from the port buffers +*/ +bool AmdtpTransmitStreamProcessor::processWriteBlock ( char *data, + unsigned int nevents, unsigned int offset ) +{ + bool no_problem=true; + + for ( PortVectorIterator it = m_PeriodPorts.begin(); + it != m_PeriodPorts.end(); + ++it ) + { + + if ( ( *it )->isDisabled() ) {continue;}; + + //FIXME: make this into a static_cast when not DEBUG? + + AmdtpPortInfo *pinfo=dynamic_cast ( *it ); + assert ( pinfo ); // this should not fail!! + + switch ( pinfo->getFormat() ) + { + case AmdtpPortInfo::E_MBLA: + if ( encodePortToMBLAEvents ( static_cast ( *it ), ( quadlet_t * ) data, offset, nevents ) ) + { + debugWarning ( "Could not encode port %s to MBLA events", ( *it )->getName().c_str() ); + no_problem=false; } break; - default: - debugWarning("Unsupported port type specified\n"); + case AmdtpPortInfo::E_SPDIF: // still unimplemented break; - } - } - - // the API specific settings of the ports should already be set, - // as this is called from the processorManager->prepare() - // so we can init the ports - if(!initPorts()) { - debugFatal("Could not initialize ports!\n"); - return false; - } - - if(!preparePorts()) { - debugFatal("Could not initialize ports!\n"); - return false; - } - - debugOutput( DEBUG_LEVEL_VERBOSE, "Prepared for:\n"); - debugOutput( DEBUG_LEVEL_VERBOSE, " Samplerate: %d, FDF: %d, DBS: %d, SYT: %d\n", - m_manager->getNominalRate(),m_fdf,m_dimension,m_syt_interval); - debugOutput( DEBUG_LEVEL_VERBOSE, " PeriodSize: %d, NbBuffers: %d\n", - m_manager->getPeriodSize(), m_manager->getNbBuffers()); - debugOutput( DEBUG_LEVEL_VERBOSE, " Port: %d, Channel: %d\n", - m_port,m_channel); - - return true; - -} - -bool AmdtpTransmitStreamProcessor::prepareForStart() { - return true; -} - -bool AmdtpTransmitStreamProcessor::prepareForStop() { - return true; -} - -bool AmdtpTransmitStreamProcessor::prepareForEnable(uint64_t time_to_enable_at) { - -// if (!StreamProcessor::prepareForEnable(time_to_enable_at)) { -// debugError("StreamProcessor::prepareForEnable failed\n"); -// return false; -// } - - return true; -} - -unsigned int -AmdtpTransmitStreamProcessor::getPacketsPerPeriod() -{ - return (m_manager->getPeriodSize())/m_syt_interval; -} - -bool AmdtpTransmitStreamProcessor::transferSilence(unsigned int nframes) { - bool retval; - signed int fc; - ffado_timestamp_t ts_tail_tmp; - uint64_t ts_tail; - - // prepare a buffer of silence - char *dummybuffer=(char *)calloc(sizeof(quadlet_t),nframes*m_dimension); - transmitSilenceBlock(dummybuffer, nframes, 0); - - - m_data_buffer->getBufferTailTimestamp(&ts_tail_tmp, &fc); - if (fc != 0) { - debugWarning("Prefilling a buffer that already contains %d frames\n", fc); - } - - ts_tail = (uint64_t)ts_tail_tmp; - // modify the timestamp such that it makes sense - ts_tail = addTicks(ts_tail, (uint64_t)(nframes * getTicksPerFrame())); - // add the silence data to the ringbuffer - if(m_data_buffer->writeFrames(nframes, dummybuffer, ts_tail)) { - retval=true; - } else { - debugWarning("Could not write to event buffer\n"); - retval=false; - } - - free(dummybuffer); - - return retval; -} - -bool AmdtpTransmitStreamProcessor::putFrames(unsigned int nbframes, int64_t ts) { - m_PeriodStat.mark(m_data_buffer->getBufferFill()); - debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "AmdtpTransmitStreamProcessor::putFrames(%d, %llu)\n", nbframes, ts); - - // transfer the data - m_data_buffer->blockProcessWriteFrames(nbframes, ts); - - debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, " New timestamp: %llu\n", ts); - - return true; // FIXME: what about failure? -} - -bool AmdtpTransmitStreamProcessor::putFramesDry(unsigned int nbframes, int64_t ts) { - m_PeriodStat.mark(m_data_buffer->getBufferFill()); - debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "AmdtpTransmitStreamProcessor::putFramesDry(%d, %llu)\n", nbframes, ts); - - bool retval; - char dummybuffer[sizeof(quadlet_t)*nbframes*m_dimension]; - - transmitSilenceBlock(dummybuffer, nbframes, 0); - // add the silence data to the ringbuffer - if(m_data_buffer->writeFrames(nbframes, dummybuffer, ts)) { - retval=true; - } else { - debugWarning("Could not write %u events to event buffer\n", nbframes); - retval=false; - } - - debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, " New timestamp: %llu\n", ts); - return retval; -} - -/* - * write received events to the stream ringbuffers. - */ - -bool AmdtpTransmitStreamProcessor::processWriteBlock(char *data, - unsigned int nevents, unsigned int offset) -{ - bool no_problem=true; - + default: // ignore + break; + } + } + return no_problem; +} + +bool AmdtpTransmitStreamProcessor::transmitSilenceBlock ( char *data, + unsigned int nevents, unsigned int offset ) +{ + bool problem = false; for ( PortVectorIterator it = m_PeriodPorts.begin(); - it != m_PeriodPorts.end(); - ++it ) - { - - if((*it)->isDisabled()) {continue;}; - + it != m_PeriodPorts.end(); + ++it ) + { //FIXME: make this into a static_cast when not DEBUG? - - AmdtpPortInfo *pinfo=dynamic_cast(*it); - assert(pinfo); // this should not fail!! - - switch(pinfo->getFormat()) { - case AmdtpPortInfo::E_MBLA: - if(encodePortToMBLAEvents(static_cast(*it), (quadlet_t *)data, offset, nevents)) { - debugWarning("Could not encode port %s to MBLA events",(*it)->getName().c_str()); - no_problem=false; - } - break; - case AmdtpPortInfo::E_SPDIF: // still unimplemented - break; - default: // ignore - break; - } - } - return no_problem; - -} - -int AmdtpTransmitStreamProcessor::transmitSilenceBlock(char *data, - unsigned int nevents, unsigned int offset) -{ - int problem=0; - - for ( PortVectorIterator it = m_PeriodPorts.begin(); - it != m_PeriodPorts.end(); - ++it ) - { - - //FIXME: make this into a static_cast when not DEBUG? - - AmdtpPortInfo *pinfo=dynamic_cast(*it); - assert(pinfo); // this should not fail!! - - switch(pinfo->getFormat()) { - case AmdtpPortInfo::E_MBLA: - if(encodeSilencePortToMBLAEvents(static_cast(*it), (quadlet_t *)data, offset, nevents)) { - debugWarning("Could not encode port %s to MBLA events",(*it)->getName().c_str()); - problem=1; - } - break; - case AmdtpPortInfo::E_SPDIF: // still unimplemented - break; - default: // ignore - break; + AmdtpPortInfo *pinfo=dynamic_cast ( *it ); + assert ( pinfo ); // this should not fail!! + + switch ( pinfo->getFormat() ) + { + case AmdtpPortInfo::E_MBLA: + if ( encodeSilencePortToMBLAEvents ( static_cast ( *it ), ( quadlet_t * ) data, offset, nevents ) ) + { + debugWarning ( "Could not encode port %s to MBLA events", ( *it )->getName().c_str() ); + problem = true; + } + break; + case AmdtpPortInfo::E_SPDIF: // still unimplemented + break; + default: // ignore + break; } } return problem; - } /** - * @brief decode a packet for the packet-based ports - * - * @param data Packet data - * @param nevents number of events in data (including events of other ports & port types) - * @param dbc DataBlockCount value for this packet - * @return true if all successfull - */ -bool AmdtpTransmitStreamProcessor::encodePacketPorts(quadlet_t *data, unsigned int nevents, unsigned int dbc) +* @brief decode a packet for the packet-based ports +* +* @param data Packet data +* @param nevents number of events in data (including events of other ports & port types) +* @param dbc DataBlockCount value for this packet +* @return true if all successfull +*/ +bool AmdtpTransmitStreamProcessor::encodePacketPorts ( quadlet_t *data, unsigned int nevents, unsigned int dbc ) { bool ok=true; @@ -781,17 +530,17 @@ for ( PortVectorIterator it = m_PacketPorts.begin(); - it != m_PacketPorts.end(); - ++it ) + it != m_PacketPorts.end(); + ++it ) { #ifdef DEBUG - AmdtpPortInfo *pinfo=dynamic_cast(*it); - assert(pinfo); // this should not fail!! + AmdtpPortInfo *pinfo=dynamic_cast ( *it ); + assert ( pinfo ); // this should not fail!! // the only packet type of events for AMDTP is MIDI in mbla - assert(pinfo->getFormat()==AmdtpPortInfo::E_Midi); + assert ( pinfo->getFormat() ==AmdtpPortInfo::E_Midi ); #endif - AmdtpMidiPort *mp=static_cast(*it); + AmdtpMidiPort *mp=static_cast ( *it ); // we encode this directly (no function call) due to the high frequency @@ -806,40 +555,43 @@ // first prefill the buffer with NO_DATA's on all time muxed channels - for(j = (dbc & 0x07)+mp->getLocation(); j < nevents; j += 8) { - + for ( j = ( dbc & 0x07 ) +mp->getLocation(); j < nevents; j += 8 ) + { + quadlet_t tmpval; - - target_event=(quadlet_t *)(data + ((j * m_dimension) + mp->getPosition())); - - if(mp->canRead()) { // we can send a byte - mp->readEvent(&byte); + + target_event= ( quadlet_t * ) ( data + ( ( j * m_dimension ) + mp->getPosition() ) ); + + if ( mp->canRead() ) // we can send a byte + { + mp->readEvent ( &byte ); byte &= 0xFF; - tmpval=htonl( - IEC61883_AM824_SET_LABEL((byte)<<16, - IEC61883_AM824_LABEL_MIDI_1X)); - - debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "MIDI port %s, pos=%d, loc=%d, dbc=%d, nevents=%d, dim=%d\n", - mp->getName().c_str(), mp->getPosition(), mp->getLocation(), dbc, nevents, m_dimension); - debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "base=%p, target=%p, value=%08X\n", - data, target_event, tmpval); - - } else { + tmpval=htonl ( + IEC61883_AM824_SET_LABEL ( ( byte ) <<16, + IEC61883_AM824_LABEL_MIDI_1X ) ); + + debugOutput ( DEBUG_LEVEL_ULTRA_VERBOSE, "MIDI port %s, pos=%d, loc=%d, dbc=%d, nevents=%d, dim=%d\n", + mp->getName().c_str(), mp->getPosition(), mp->getLocation(), dbc, nevents, m_dimension ); + debugOutput ( DEBUG_LEVEL_ULTRA_VERBOSE, "base=%p, target=%p, value=%08X\n", + data, target_event, tmpval ); + + } + else + { // can't send a byte, either because there is no byte, // or because this would exceed the maximum rate - tmpval=htonl( - IEC61883_AM824_SET_LABEL(0,IEC61883_AM824_LABEL_MIDI_NO_DATA)); - } - + tmpval=htonl ( + IEC61883_AM824_SET_LABEL ( 0,IEC61883_AM824_LABEL_MIDI_NO_DATA ) ); + } + *target_event=tmpval; } } - return ok; } -int AmdtpTransmitStreamProcessor::encodePortToMBLAEvents(AmdtpAudioPort *p, quadlet_t *data, - unsigned int offset, unsigned int nevents) +int AmdtpTransmitStreamProcessor::encodePortToMBLAEvents ( AmdtpAudioPort *p, quadlet_t *data, + unsigned int offset, unsigned int nevents ) { unsigned int j=0; @@ -847,50 +599,53 @@ quadlet_t *target_event; - target_event=(quadlet_t *)(data + p->getPosition()); - - switch(p->getDataType()) { + target_event= ( quadlet_t * ) ( data + p->getPosition() ); + + switch ( p->getDataType() ) + { default: case Port::E_Int24: - { - quadlet_t *buffer=(quadlet_t *)(p->getBufferAddress()); - - assert(nevents + offset <= p->getBufferSize()); - - buffer+=offset; - - for(j = 0; j < nevents; j += 1) { // decode max nsamples - *target_event = htonl((*(buffer) & 0x00FFFFFF) | 0x40000000); - buffer++; - target_event += m_dimension; - } - } - break; + { + quadlet_t *buffer= ( quadlet_t * ) ( p->getBufferAddress() ); + + assert ( nevents + offset <= p->getBufferSize() ); + + buffer+=offset; + + for ( j = 0; j < nevents; j += 1 ) // decode max nsamples + { + *target_event = htonl ( ( * ( buffer ) & 0x00FFFFFF ) | 0x40000000 ); + buffer++; + target_event += m_dimension; + } + } + break; case Port::E_Float: - { - const float multiplier = (float)(0x7FFFFF00); - float *buffer=(float *)(p->getBufferAddress()); - - assert(nevents + offset <= p->getBufferSize()); - - buffer+=offset; - - for(j = 0; j < nevents; j += 1) { // decode max nsamples - - // don't care for overflow - float v = *buffer * multiplier; // v: -231 .. 231 - unsigned int tmp = ((int)v); - *target_event = htonl((tmp >> 8) | 0x40000000); - - buffer++; - target_event += m_dimension; - } - } - break; + { + const float multiplier = ( float ) ( 0x7FFFFF00 ); + float *buffer= ( float * ) ( p->getBufferAddress() ); + + assert ( nevents + offset <= p->getBufferSize() ); + + buffer+=offset; + + for ( j = 0; j < nevents; j += 1 ) // decode max nsamples + { + + // don't care for overflow + float v = *buffer * multiplier; // v: -231 .. 231 + unsigned int tmp = ( ( int ) v ); + *target_event = htonl ( ( tmp >> 8 ) | 0x40000000 ); + + buffer++; + target_event += m_dimension; + } + } + break; } return 0; } -int AmdtpTransmitStreamProcessor::encodeSilencePortToMBLAEvents(AmdtpAudioPort *p, quadlet_t *data, - unsigned int offset, unsigned int nevents) +int AmdtpTransmitStreamProcessor::encodeSilencePortToMBLAEvents ( AmdtpAudioPort *p, quadlet_t *data, + unsigned int offset, unsigned int nevents ) { unsigned int j=0; @@ -898,17 +653,19 @@ quadlet_t *target_event; - target_event=(quadlet_t *)(data + p->getPosition()); - - switch(p->getDataType()) { + target_event= ( quadlet_t * ) ( data + p->getPosition() ); + + switch ( p->getDataType() ) + { default: case Port::E_Int24: case Port::E_Float: - { - for(j = 0; j < nevents; j += 1) { // decode max nsamples - *target_event = htonl(0x40000000); - target_event += m_dimension; - } - } - break; + { + for ( j = 0; j < nevents; j += 1 ) // decode max nsamples + { + *target_event = htonl ( 0x40000000 ); + target_event += m_dimension; + } + } + break; } Index: /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.cpp =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.cpp (revision 719) +++ /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.cpp (revision 720) @@ -55,9 +55,4 @@ bool AmdtpReceiveStreamProcessor::prepareChild() { - - m_PeriodStat.setName("RCV PERIOD"); - m_PacketStat.setName("RCV PACKET"); - m_WakeupStat.setName("RCV WAKEUP"); - debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing (%p)...\n", this); @@ -153,6 +148,10 @@ // this packet x*syt_interval*ticks_per_frame // later than expected (the real receive time) - debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"STMP: %lluticks | buff=%d, syt_interval=%d, tpf=%f\n", - m_last_timestamp, m_handler->getWakeupInterval(), m_syt_interval, getTicksPerFrame()); + #ifdef DEBUG + if(isRunning()) { + debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"STMP: %lluticks | buff=%d, syt_interval=%d, tpf=%f\n", + m_last_timestamp, m_handler->getWakeupInterval(), m_syt_interval, getTicksPerFrame()); + } + #endif if(m_data_buffer->writeFrames(nevents, (char *)(data+8), m_last_timestamp)) { Index: /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.h =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.h (revision 719) +++ /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.h (revision 720) @@ -80,71 +80,53 @@ virtual ~AmdtpTransmitStreamProcessor() {}; - enum raw1394_iso_disposition - getPacket(unsigned char *data, unsigned int *length, - unsigned char *tag, unsigned char *sy, - int cycle, unsigned int dropped, unsigned int max_length); + bool generatePacketHeader(unsigned char *data, unsigned int *length, + unsigned char *tag, unsigned char *sy, + int cycle, unsigned int dropped, unsigned int max_length); + bool generatePacketData(unsigned char *data, unsigned int *length, + unsigned char *tag, unsigned char *sy, + int cycle, unsigned int dropped, unsigned int max_length); + bool generateSilentPacketHeader(unsigned char *data, unsigned int *length, + unsigned char *tag, unsigned char *sy, + int cycle, unsigned int dropped, unsigned int max_length); + bool generateSilentPacketData(unsigned char *data, unsigned int *length, + unsigned char *tag, unsigned char *sy, + int cycle, unsigned int dropped, unsigned int max_length); + virtual bool prepareChild(); - virtual unsigned int getEventsPerFrame(); - virtual unsigned int getEventSize() {return 4;}; - virtual unsigned int getUpdatePeriod(); - - bool reset(); - bool prepareChild(); - - bool prepareForStop(); - bool prepareForStart(); - - bool prepareForEnable(uint64_t time_to_enable_at); - - bool putFrames(unsigned int nbframes, int64_t ts); ///< transfer the buffer contents from the client - bool putFramesDry(unsigned int nbframes, int64_t ts); - - // We have 1 period of samples = m_period - // this period takes m_period/m_framerate seconds of time - // during this time, 8000 packets are sent -// unsigned int getPacketsPerPeriod() {return (m_period*8000)/m_framerate;}; - - // however, if we only count the number of used packets - // it is m_period / m_syt_interval +public: + virtual unsigned int getEventSize() + {return 4;}; + virtual unsigned int getMaxPacketSize() + {return 4 * (2 + m_syt_interval * m_dimension);}; + virtual unsigned int getEventsPerFrame() + { return m_dimension; }; + virtual unsigned int getNominalFramesPerPacket() + {return m_syt_interval;}; unsigned int getPacketsPerPeriod(); - - unsigned int getMaxPacketSize() {return 4 * (2 + m_syt_interval * m_dimension);}; protected: bool processWriteBlock(char *data, unsigned int nevents, unsigned int offset); + bool transmitSilenceBlock(char *data, unsigned int nevents, unsigned int offset); - struct iec61883_cip m_cip_status; - - int m_dimension; - unsigned int m_syt_interval; - - int m_fdf; - - bool prefill(); - +private: unsigned int fillNoDataPacketHeader(struct iec61883_packet *packet, unsigned int* length); unsigned int fillDataPacketHeader(struct iec61883_packet *packet, unsigned int* length, uint32_t ts); - - - bool transferSilence(unsigned int size); int transmitBlock(char *data, unsigned int nevents, unsigned int offset); - bool encodePacketPorts(quadlet_t *data, unsigned int nevents, unsigned int dbc); + bool encodePacketPorts(quadlet_t *data, unsigned int nevents, + unsigned int dbc); + int encodePortToMBLAEvents(AmdtpAudioPort *, quadlet_t *data, unsigned int offset, unsigned int nevents); - - int transmitSilenceBlock(char *data, unsigned int nevents, - unsigned int offset); int encodeSilencePortToMBLAEvents(AmdtpAudioPort *, quadlet_t *data, unsigned int offset, unsigned int nevents); - void updatePreparedState(); - unsigned long m_last_timestamp; - + struct iec61883_cip m_cip_status; + int m_dimension; + unsigned int m_syt_interval; + int m_fdf; unsigned int m_dbc; - - unsigned int m_ringbuffer_size_frames; }; Index: /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.h =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.h (revision 719) +++ /branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.h (revision 720) @@ -94,21 +94,13 @@ virtual unsigned int getEventsPerFrame() { return m_dimension; }; - virtual unsigned int getUpdatePeriod() + virtual unsigned int getNominalFramesPerPacket() {return m_syt_interval;}; - - // We have 1 period of samples = m_period - // this period takes m_period/m_framerate seconds of time - // during this time, 8000 packets are sent -// unsigned int getPacketsPerPeriod() {return (m_period*8000)/m_framerate;}; - - // however, if we only count the number of used packets - // it is m_period / m_syt_interval virtual unsigned int getPacketsPerPeriod(); protected: - bool processReadBlock(char *data, unsigned int nevents, unsigned int offset); bool provideSilenceBlock(unsigned int nevents, unsigned int offset); +private: bool decodePacketPorts(quadlet_t *data, unsigned int nevents, unsigned int dbc); Index: /branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp (revision 719) +++ /branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp (revision 720) @@ -283,5 +283,5 @@ // lower on average. max_of_min_delay += FFADO_SIGNAL_DELAY_TICKS; - debugOutput( DEBUG_LEVEL_VERBOSE, " %d ticks (%03us %04uc %04ut)...\n", + debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n", max_of_min_delay, (unsigned int)TICKS_TO_SECS(max_of_min_delay), @@ -295,12 +295,13 @@ //sleep(2); // FIXME: be smarter here - // wait for some sort of sync + // make sure that we are dry-running long enough for the + // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)? debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n"); - // in order to obtain that, we wait for the first periods to be received. int nb_sync_runs=20; int64_t time_till_next_period; while(nb_sync_runs--) { // or while not sync-ed? + // check if we were waked up too soon time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); - debugOutput( DEBUG_LEVEL_VERBOSE, "waiting for %d usecs...\n", time_till_next_period); + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period); if(time_till_next_period > 0) { // wait for the period @@ -309,66 +310,86 @@ } - // figure out where we are now - uint64_t time_of_transfer = m_SyncSource->getTimeAtPeriod(); - debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n", - time_of_transfer, - (unsigned int)TICKS_TO_SECS(time_of_transfer), - (unsigned int)TICKS_TO_CYCLES(time_of_transfer), - (unsigned int)TICKS_TO_OFFSET(time_of_transfer)); - - // start wet-running in 200 cycles - // this is the timeframe in which the remaining code should be ready - time_of_transfer = addTicks(time_of_transfer, 200*TICKS_PER_CYCLE); - - debugOutput( DEBUG_LEVEL_VERBOSE, " => start at TS=%011llu (%03us %04uc %04ut)...\n", - time_of_transfer, - (unsigned int)TICKS_TO_SECS(time_of_transfer), - (unsigned int)TICKS_TO_CYCLES(time_of_transfer), - (unsigned int)TICKS_TO_OFFSET(time_of_transfer)); - // we now should have decent sync info - // the buffers of the receive streams should be (approx) empty - // the buffers of the xmit streams should be full - - // at this point the buffer head timestamp of the transmit buffers can be - // set properly since we know the sync source's timestamp of the last - // buffer transfer. we also know the rate. - - debugOutput( DEBUG_LEVEL_VERBOSE, " propagate sync info...\n"); + debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n"); // FIXME: in the SPM it would be nice to have system time instead of // 1394 time -// float rate=m_SyncSource->getTicksPerFrame(); -// int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers*m_period))*rate); -// // the data at the front of the buffer is intended to be transmitted -// // nb_periods*period_size after the last received period -// int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks); - -// for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); -// it != m_TransmitProcessors.end(); -// ++it ) { -// // FIXME: encapsulate -// (*it)->m_data_buffer->setBufferHeadTimestamp(m_time_of_transfer); -// //(*it)->m_data_buffer->setNominalRate(rate); //CHECK!!! -// } - - dumpInfo(); + + // we now should have decent sync info on the sync source + // determine a point in time where the system should start + // figure out where we are now + uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod(); + debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n", + time_of_first_sample, + (unsigned int)TICKS_TO_SECS(time_of_first_sample), + (unsigned int)TICKS_TO_CYCLES(time_of_first_sample), + (unsigned int)TICKS_TO_OFFSET(time_of_first_sample)); + + #define CYCLES_FOR_STARTUP 200 + // start wet-running in CYCLES_FOR_STARTUP cycles + // this is the time window we have to setup all SP's such that they + // can start wet-running correctly. + time_of_first_sample = addTicks(time_of_first_sample, + CYCLES_FOR_STARTUP * TICKS_PER_CYCLE); + + debugOutput( DEBUG_LEVEL_VERBOSE, " => first sample at TS=%011llu (%03us %04uc %04ut)...\n", + time_of_first_sample, + (unsigned int)TICKS_TO_SECS(time_of_first_sample), + (unsigned int)TICKS_TO_CYCLES(time_of_first_sample), + (unsigned int)TICKS_TO_OFFSET(time_of_first_sample)); + + // we should start wet-running the transmit SP's some cycles in advance + // such that we know it is wet-running when it should output its first sample + #define PRESTART_CYCLES_FOR_XMIT 20 + uint64_t time_to_start_xmit = substractTicks(time_of_first_sample, + PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE); + + #define PRESTART_CYCLES_FOR_RECV 0 + uint64_t time_to_start_recv = substractTicks(time_of_first_sample, + PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE); + debugOutput( DEBUG_LEVEL_VERBOSE, " => xmit starts at TS=%011llu (%03us %04uc %04ut)...\n", + time_to_start_xmit, + (unsigned int)TICKS_TO_SECS(time_to_start_xmit), + (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit), + (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit)); + debugOutput( DEBUG_LEVEL_VERBOSE, " => recv starts at TS=%011llu (%03us %04uc %04ut)...\n", + time_to_start_recv, + (unsigned int)TICKS_TO_SECS(time_to_start_recv), + (unsigned int)TICKS_TO_CYCLES(time_to_start_recv), + (unsigned int)TICKS_TO_OFFSET(time_to_start_recv)); + + // at this point the buffer head timestamp of the transmit buffers can be set + // this is the presentation time of the first sample in the buffer + for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); + it != m_TransmitProcessors.end(); + ++it ) { + (*it)->setBufferHeadTimestamp(time_of_first_sample); + } // STEP X: switch SP's over to the running state - uint64_t time_to_start = addTicks(time_of_transfer, - m_SyncSource->getTicksPerFrame() * getPeriodSize()); - for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); - it != m_ReceiveProcessors.end(); - ++it ) { - if(!(*it)->startRunning(time_to_start)) { - debugError("Could not put SP %p into the running state\n", *it); - return false; - } - } - for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - if(!(*it)->startRunning(time_to_start)) { - debugError("Could not put SP %p into the running state\n", *it); - return false; - } + for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); + it != m_ReceiveProcessors.end(); + ++it ) { + if(!(*it)->scheduleStartRunning(time_to_start_recv)) { + debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv); + return false; + } + } + for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); + it != m_TransmitProcessors.end(); + ++it ) { + if(!(*it)->scheduleStartRunning(time_to_start_xmit)) { + debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit); + return false; + } + } + // wait for the syncsource to start running. + // that will block the waitForPeriod call until everyone has started (theoretically) + int cnt = CYCLES_FOR_STARTUP * 2; // by then it should have started + while (!m_SyncSource->isRunning() && cnt) { + usleep(125); + cnt--; + } + if(cnt==0) { + debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n"); + return false; } debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n"); @@ -577,5 +598,5 @@ // this is to notify the client of the delay // that we introduced - m_delayed_usecs = time_till_next_period; + m_delayed_usecs = -time_till_next_period; // we save the 'ideal' time of the transfer at this point, @@ -683,28 +704,18 @@ bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) { - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period for type (%d)...\n", t); + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n", + t, m_time_of_transfer, + (unsigned int)TICKS_TO_SECS(m_time_of_transfer), + (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer), + (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer)); + bool retval = true; // a static cast could make sure that there is no performance // penalty for the virtual functions (to be checked) if (t==StreamProcessor::ePT_Receive) { - // determine the time at which we want reception to start - float rate=m_SyncSource->getTicksPerFrame(); - int64_t one_frame_in_ticks=(int64_t)(((float)m_period)*rate); - - int64_t receive_timestamp = substractTicks(m_time_of_transfer, one_frame_in_ticks); - - if(receive_timestamp<0) { - debugWarning("receive ts < 0.0 : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n", - receive_timestamp, m_time_of_transfer, one_frame_in_ticks); - } - if(receive_timestamp>(128L*TICKS_PER_SECOND)) { - debugWarning("receive ts > 128L*TICKS_PER_SECOND : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n", - receive_timestamp, m_time_of_transfer, one_frame_in_ticks); - } - for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); it != m_ReceiveProcessors.end(); ++it ) { - if(!(*it)->getFrames(m_period, receive_timestamp)) { + if(!(*it)->getFrames(m_period, m_time_of_transfer)) { debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n", m_period, m_time_of_transfer,*it); @@ -713,4 +724,13 @@ } } else { + // FIXME: in the SPM it would be nice to have system time instead of + // 1394 time + float rate = m_SyncSource->getTicksPerFrame(); + int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate); + + // the data we are putting into the buffer is intended to be transmitted + // one ringbuffer size after it has been received + int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks); + for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); it != m_TransmitProcessors.end(); @@ -718,11 +738,4 @@ // FIXME: in the SPM it would be nice to have system time instead of // 1394 time - float rate=m_SyncSource->getTicksPerFrame(); - int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers*m_period))*rate); - - // the data we are putting into the buffer is intended to be transmitted - // one ringbuffer size after it has been received - int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks); - if(!(*it)->putFrames(m_period, transmit_timestamp)) { debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n", @@ -730,87 +743,4 @@ retval &= false; // buffer underrun } - - } - } - return retval; -} - -/** - * @brief Dry run one period for both receive and transmit StreamProcessors - * - * Process one period of frames for all streamprocessors, without touching the - * client buffers. This only removes an incoming period from the ISO receive buffer and - * puts one period of silence into the transmit buffers. - * - * @return true if successful, false otherwise (indicates xrun). - */ -bool StreamProcessorManager::dryRun() { - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Dry-running period...\n"); - bool retval=true; - retval &= dryRun(StreamProcessor::ePT_Receive); - retval &= dryRun(StreamProcessor::ePT_Transmit); - return retval; -} - -/** - * @brief Dry run one period for either the receive or transmit StreamProcessors - * - * see dryRun() - * - * @param t The processor type to dryRun for (receive or transmit) - * @return true if successful, false otherwise (indicates xrun). - */ - -bool StreamProcessorManager::dryRun(enum StreamProcessor::eProcessorType t) { - debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Dry-running period...\n"); - bool retval = true; - // a static cast could make sure that there is no performance - // penalty for the virtual functions (to be checked) - if (t==StreamProcessor::ePT_Receive) { - // determine the time at which we want reception to start - float rate=m_SyncSource->getTicksPerFrame(); - int64_t one_frame_in_ticks=(int64_t)(((float)m_period)*rate); - - int64_t receive_timestamp = substractTicks(m_time_of_transfer, one_frame_in_ticks); - - if(receive_timestamp<0) { - debugWarning("receive ts < 0.0 : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n", - receive_timestamp, m_time_of_transfer, one_frame_in_ticks); - } - if(receive_timestamp>(128L*TICKS_PER_SECOND)) { - debugWarning("receive ts > 128L*TICKS_PER_SECOND : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n", - receive_timestamp, m_time_of_transfer, one_frame_in_ticks); - } - - for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); - it != m_ReceiveProcessors.end(); - ++it ) { - - if(!(*it)->getFramesDry(m_period, receive_timestamp)) { - debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)\n", - m_period, m_time_of_transfer,*it); - retval &= false; // buffer underrun - } - - } - } else { - for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); - it != m_TransmitProcessors.end(); - ++it ) { - // FIXME: in the SPM it would be nice to have system time instead of - // 1394 time - float rate=m_SyncSource->getTicksPerFrame(); - int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers*m_period))*rate); - - // the data we are putting into the buffer is intended to be transmitted - // one ringbuffer size after it has been received - int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks); - - if(!(*it)->putFramesDry(m_period, transmit_timestamp)) { - debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)\n", - m_period, transmit_timestamp, *it); - retval &= false; // buffer underrun - } - } } Index: /branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.h =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.h (revision 719) +++ /branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.h (revision 720) @@ -83,10 +83,6 @@ bool waitForPeriod(); ///< wait for the next period - bool transfer(); ///< transfer the buffer contents from/to client bool transfer(enum StreamProcessor::eProcessorType); ///< transfer the buffer contents from/to client (single processor type) - - bool dryRun(); - bool dryRun(enum StreamProcessor::eProcessorType); int getDelayedUsecs() {return m_delayed_usecs;}; @@ -95,7 +91,7 @@ unsigned int getNominalRate() {return m_nominal_framerate;}; + uint64_t getTimeOfLastTransfer() { return m_time_of_transfer;}; private: - int m_delayed_usecs; // this stores the time at which the next transfer should occur Index: /branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.h =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.h (revision 719) +++ /branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.h (revision 720) @@ -81,5 +81,5 @@ ePS_WaitingForStreamDisable, }; - + ///> set the SP state to a specific value void setState(enum eProcessorState); @@ -103,8 +103,7 @@ bool scheduleStateTransition(enum eProcessorState state, uint64_t time_instant); - bool scheduleAndWaitForStateTransition(enum eProcessorState state, - uint64_t time_instant, - enum eProcessorState wait_state); -public: + bool waitForState(enum eProcessorState state, unsigned int timeout); + +public: //--- state stuff bool isRunning() {return m_state == ePS_Running;}; @@ -112,5 +111,5 @@ {return m_state == ePS_DryRunning;}; -//--- state stuff (TODO: cleanup) + // these schedule and wait for the state transition bool startDryRunning(int64_t time_to_start_at); bool startRunning(int64_t time_to_start_at); @@ -118,45 +117,29 @@ bool stopRunning(int64_t time_to_stop_at); + // these only schedule the transition + bool scheduleStartDryRunning(int64_t time_to_start_at); + bool scheduleStartRunning(int64_t time_to_start_at); + bool scheduleStopDryRunning(int64_t time_to_stop_at); + bool scheduleStopRunning(int64_t time_to_stop_at); + // the main difference between init and prepare is that when prepare is called, // the SP is registered to a manager (FIXME: can't it be called by the manager?) bool init(); bool prepare(); + ///> stop the SP from running or dryrunning bool stop(); -// constructor/destructor -public: + +public: // constructor/destructor StreamProcessor(enum eProcessorType type, int port); virtual ~StreamProcessor(); -// the receive/transmit functions -public: +public: // the public receive/transmit functions // the transmit interface accepts frames and provides packets // implement these for a transmit SP // leave default for a receive SP - virtual enum raw1394_iso_disposition - getPacket(unsigned char *data, unsigned int *length, - unsigned char *tag, unsigned char *sy, - int cycle, unsigned int dropped, unsigned int max_length) - {debugWarning("call not allowed\n"); return RAW1394_ISO_STOP;}; - virtual bool putFrames(unsigned int nbframes, int64_t ts) - {debugWarning("call not allowed\n"); return false;}; - virtual bool putFramesDry(unsigned int nbframes, int64_t ts) - {debugWarning("call not allowed\n"); return false;}; - virtual bool processWriteBlock(char *data, unsigned int nevents, unsigned int offset) - {debugWarning("call not allowed\n"); return false;}; // the receive interface accepts packets and provides frames - - // the following two methods are to be implemented by subclasses - virtual bool processPacketHeader(unsigned char *data, unsigned int length, - unsigned char channel, unsigned char tag, unsigned char sy, - unsigned int cycle, unsigned int dropped) - {debugWarning("call not allowed\n"); return false;}; - virtual bool processPacketData(unsigned char *data, unsigned int length, - unsigned char channel, unsigned char tag, unsigned char sy, - unsigned int cycle, unsigned int dropped) - {debugWarning("call not allowed\n"); return false;}; - - // this one is implemented by us + // these are implemented by the parent SP enum raw1394_iso_disposition putPacket(unsigned char *data, unsigned int length, @@ -164,10 +147,54 @@ unsigned int cycle, unsigned int dropped); + enum raw1394_iso_disposition + getPacket(unsigned char *data, unsigned int *length, + unsigned char *tag, unsigned char *sy, + int cycle, unsigned int dropped, unsigned int max_length); + bool getFrames(unsigned int nbframes, int64_t ts); ///< transfer the buffer contents to the client -protected: + bool putFrames(unsigned int nbframes, int64_t ts); ///< transfer the client contents to the buffer + +protected: // the helper receive/transmit functions // to be implemented by the children + // the following methods are to be implemented by receive SP subclasses + virtual bool processPacketHeader(unsigned char *data, unsigned int length, + unsigned char channel, unsigned char tag, + unsigned char sy, unsigned int cycle, + unsigned int dropped) + {debugWarning("call not allowed\n"); return false;}; + virtual bool processPacketData(unsigned char *data, unsigned int length, + unsigned char channel, unsigned char tag, + unsigned char sy, unsigned int cycle, + unsigned int dropped) + {debugWarning("call not allowed\n"); return false;}; virtual bool processReadBlock(char *data, unsigned int nevents, unsigned int offset) {debugWarning("call not allowed\n"); return false;}; virtual bool provideSilenceBlock(unsigned int nevents, unsigned int offset) + {debugWarning("call not allowed\n"); return false;}; + + // the following methods are to be implemented by transmit SP subclasses + virtual bool generatePacketHeader(unsigned char *data, unsigned int *length, + unsigned char *tag, unsigned char *sy, + int cycle, unsigned int dropped, + unsigned int max_length) + {debugWarning("call not allowed\n"); return false;}; + virtual bool generatePacketData(unsigned char *data, unsigned int *length, + unsigned char *tag, unsigned char *sy, + int cycle, unsigned int dropped, + unsigned int max_length) + {debugWarning("call not allowed\n"); return false;}; + virtual bool generateSilentPacketHeader(unsigned char *data, unsigned int *length, + unsigned char *tag, unsigned char *sy, + int cycle, unsigned int dropped, + unsigned int max_length) + {debugWarning("call not allowed\n"); return false;}; + virtual bool generateSilentPacketData(unsigned char *data, unsigned int *length, + unsigned char *tag, unsigned char *sy, + int cycle, unsigned int dropped, + unsigned int max_length) + {debugWarning("call not allowed\n"); return false;}; + virtual bool processWriteBlock(char *data, unsigned int nevents, unsigned int offset) + {debugWarning("call not allowed\n"); return false;}; + virtual bool transmitSilenceBlock(char *data, unsigned int nevents, unsigned int offset) {debugWarning("call not allowed\n"); return false;}; @@ -175,4 +202,8 @@ bool getFramesDry(unsigned int nbframes, int64_t ts); bool getFramesWet(unsigned int nbframes, int64_t ts); + bool putFramesDry(unsigned int nbframes, int64_t ts); + bool putFramesWet(unsigned int nbframes, int64_t ts); + + bool transferSilence(unsigned int size); // move to private? @@ -188,5 +219,15 @@ //--- data buffering and accounting -public: // FIXME: should be private +public: + void getBufferHeadTimestamp ( ffado_timestamp_t *ts, signed int *fc ) + {m_data_buffer->getBufferHeadTimestamp(ts, fc);}; + void getBufferTailTimestamp ( ffado_timestamp_t *ts, signed int *fc ) + {m_data_buffer->getBufferTailTimestamp(ts, fc);}; + + void setBufferTailTimestamp ( ffado_timestamp_t new_timestamp ) + {m_data_buffer->setBufferTailTimestamp(new_timestamp);}; + void setBufferHeadTimestamp ( ffado_timestamp_t new_timestamp ) + {m_data_buffer->setBufferHeadTimestamp(new_timestamp);}; +protected: Util::TimestampedBuffer *m_data_buffer; @@ -255,5 +296,5 @@ uint64_t getTimeAtPeriod(); - uint64_t getTimeNow(); + uint64_t getTimeNow(); // FIXME: should disappear @@ -318,8 +359,8 @@ /** - * @brief get the nominal number of frames between buffer updates - * @return the nominal number of frames between buffer updates - */ - virtual unsigned int getUpdatePeriod() = 0; + * @brief get the nominal number of frames in a packet + * @return the nominal number of frames in a packet + */ + virtual unsigned int getNominalFramesPerPacket() = 0; protected: @@ -340,5 +381,4 @@ StreamStatistics m_WakeupStat; DECLARE_DEBUG_MODULE; - }; Index: /branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.cpp =================================================================== --- /branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.cpp (revision 719) +++ /branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.cpp (revision 720) @@ -327,5 +327,171 @@ } +enum raw1394_iso_disposition +StreamProcessor::getPacket(unsigned char *data, unsigned int *length, + unsigned char *tag, unsigned char *sy, + int cycle, unsigned int dropped, unsigned int max_length) { + if (cycle<0) { + *tag = 0; + *sy = 0; + *length = 0; + return RAW1394_ISO_OK; + } + + int dropped_cycles = diffCycles(cycle, m_last_cycle) - 1; + if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles); + else m_dropped += dropped_cycles; + if (dropped_cycles > 0) debugWarning("(%p) dropped %d packets on cycle %u\n", this, dropped_cycles, cycle); + m_last_cycle = cycle; + + // bypass based upon state + if (m_state == ePS_Invalid) { + debugError("Should not have state %s\n", ePSToString(m_state) ); + return RAW1394_ISO_ERROR; + } + if (m_state == ePS_Created) { + *tag = 0; + *sy = 0; + *length = 0; + return RAW1394_ISO_DEFER; + } + + // normal processing + // note that we can't use getCycleTimer directly here, + // because packets are queued in advance. This means that + // we the packet we are constructing will be sent out + // on 'cycle', not 'now'. + unsigned int ctr = m_handler->getCycleTimer(); + int now_cycles = (int)CYCLE_TIMER_GET_CYCLES(ctr); + + // the difference between the cycle this + // packet is intended for and 'now' + int cycle_diff = diffCycles(cycle, now_cycles); + + #ifdef DEBUG + if(cycle_diff < 0) { + debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy)\n", + cycle, now_cycles); + } + #endif + + // store the previous timestamp + m_last_timestamp2 = m_last_timestamp; + + // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles) + // it happens on the first 'good' cycle for the wait condition + // or on the first received cycle that is received afterwards (might be a problem) + + // check whether we are waiting for a stream to be disabled + if(m_state == ePS_WaitingForStreamDisable) { + // we then check whether we have to switch on this cycle + if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) { + debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n"); + m_next_state = ePS_DryRunning; + if (!updateState()) { // we are allowed to change the state directly + debugError("Could not update state!\n"); + return RAW1394_ISO_ERROR; + } + } else { + // not time to disable yet + } + } + // check whether we are waiting for a stream to be enabled + else if(m_state == ePS_WaitingForStreamEnable) { + // we then check whether we have to switch on this cycle + if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) { + debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n"); + m_next_state = ePS_Running; + if (!updateState()) { // we are allowed to change the state directly + debugError("Could not update state!\n"); + return RAW1394_ISO_ERROR; + } + } else { + // not time to enable yet + } + // we are dryRunning hence data should be processed in any case + } + // check whether we are waiting for a stream to startup + else if(m_state == ePS_WaitingForStream) { + // as long as the cycle parameter is not in sync with + // the current time, the stream is considered not + // to be 'running' + // we then check whether we have to switch on this cycle + if ((cycle_diff >= 0) && (diffCycles(cycle, m_cycle_to_switch_state) >= 0)) { + debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to good packet\n"); + // hence go to the dryRunning state + m_next_state = ePS_DryRunning; + if (!updateState()) { // we are allowed to change the state directly + debugError("Could not update state!\n"); + return RAW1394_ISO_ERROR; + } + } else { + // not time (yet) to switch state + } + } + else if(m_state == ePS_Running) { + // check the packet header + if (generatePacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length)) { + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT: CY=%04u TS=%011llu\n", + cycle, m_last_timestamp); + // update some accounting + m_last_good_cycle = cycle; + m_last_dropped = dropped_cycles; + + // check whether a state change has been requested + // note that only the wait state changes are synchronized with the cycles + if(m_state != m_next_state) { + debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n", + ePSToString(m_state), ePSToString(m_next_state)); + // execute the requested change + if (!updateState()) { // we are allowed to change the state directly + debugError("Could not update state!\n"); + return RAW1394_ISO_ERROR; + } + } + + bool ok = generatePacketData(data, length, tag, sy, cycle, dropped_cycles, max_length); + // if an xrun occured, switch to the dryRunning state and + // allow for the xrun to be picked up + if (!ok) { + debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to xrun\n"); + m_next_state = ePS_DryRunning; + // execute the requested change + if (!updateState()) { // we are allowed to change the state directly + debugError("Could not update state!\n"); + return RAW1394_ISO_ERROR; + } + goto send_empty_packet; + } + return RAW1394_ISO_OK; + } + } + // we are not running, so send an empty packet + // we should generate a valid packet any time +send_empty_packet: + // note that only the wait state changes are synchronized with the cycles + if(m_state != m_next_state) { + debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n", + ePSToString(m_state), ePSToString(m_next_state)); + // execute the requested change + if (!updateState()) { // we are allowed to change the state directly + debugError("Could not update state!\n"); + return RAW1394_ISO_ERROR; + } + } + + debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT EMPTY: CY=%04u\n", cycle); + generateSilentPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length); + generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length); + return RAW1394_ISO_DEFER; +} + + // Frame Transfer API +/** + * Transfer a block of frames from the event buffer to the port buffers + * @param nbframes number of frames to transfer + * @param ts the timestamp that the LAST frame in the block should have + * @return + */ bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) { debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.getFrames(%d, %11llu)", nbframes, ts); @@ -338,5 +504,5 @@ // FIXME: this should be done somewhere else #ifdef DEBUG - uint64_t ts_head; + uint64_t ts_expected; signed int fc; int32_t lag_ticks; @@ -345,27 +511,31 @@ // in order to sync up multiple received streams, we should // use the ts parameter. It specifies the time of the block's - // first sample. + // last sample. + + // determine the time at which we want reception to start + float srate = m_manager->getSyncSource().getTicksPerFrame(); + assert(srate != 0.0); + int64_t this_block_length_in_ticks = (int64_t)(((float)nbframes) * srate); ffado_timestamp_t ts_head_tmp; m_data_buffer->getBufferHeadTimestamp(&ts_head_tmp, &fc); - ts_head=(uint64_t)ts_head_tmp; - lag_ticks=diffTicks(ts, ts_head); - float rate=m_data_buffer->getRate(); + ts_expected = addTicks((uint64_t)ts_head_tmp, this_block_length_in_ticks); - assert(rate!=0.0); + lag_ticks = diffTicks(ts, ts_expected); - lag_frames=(((float)lag_ticks)/rate); + + lag_frames = (((float)lag_ticks) / srate); debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): drifts %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n", - this, lag_ticks, lag_frames,rate, ts, ts_head, fc); - - if (lag_frames>=1.0) { + this, lag_ticks, lag_frames, srate, ts, ts_expected, fc); + + if (lag_frames >= 1.0) { // the stream lags debugWarning( "stream (%p): lags with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n", - this, lag_ticks, lag_frames,rate, ts, ts_head, fc); - } else if (lag_frames<=-1.0) { + this, lag_ticks, lag_frames, srate, ts, ts_expected, fc); + } else if (lag_frames <= -1.0) { // the stream leads debugWarning( "stream (%p): leads with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n", - this, lag_ticks, lag_frames,rate, ts, ts_head, fc); + this, lag_ticks, lag_frames, srate, ts, ts_expected, fc); } #endif @@ -386,4 +556,26 @@ } +bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts) { + debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.putFrames(%d, %11llu)", nbframes, ts); + assert( getType() == ePT_Transmit ); + if(isDryRunning()) return putFramesDry(nbframes, ts); + else return putFramesWet(nbframes, ts); +} + +bool +StreamProcessor::putFramesWet(unsigned int nbframes, int64_t ts) { + debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesWet(%d, %llu)\n", nbframes, ts); + // transfer the data + m_data_buffer->blockProcessWriteFrames(nbframes, ts); + debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, " New timestamp: %llu\n", ts); + return true; // FIXME: what about failure? +} + +bool +StreamProcessor::putFramesDry(unsigned int nbframes, int64_t ts) { + debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesDry(%d, %llu)\n", nbframes, ts); + // do nothing + return true; +} /*********************************************** @@ -421,6 +613,4 @@ bool StreamProcessor::stop() { - uint64_t time_to_stop_at = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE); - int cnt; debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stop...\n"); switch (m_state) { @@ -437,5 +627,6 @@ } -bool StreamProcessor::scheduleStateTransition(enum eProcessorState state, uint64_t time_instant) +bool +StreamProcessor::scheduleStateTransition(enum eProcessorState state, uint64_t time_instant) { // first set the time, since in the packet loop we first check m_state == m_next_state before @@ -446,26 +637,21 @@ } -bool StreamProcessor::scheduleAndWaitForStateTransition(enum eProcessorState state, - uint64_t time_instant, - enum eProcessorState wait_state) -{ - int cnt=200; // 2 seconds, i.e. 2 cycles - if(!scheduleStateTransition(state, time_instant)) { - debugError("Could not schedule state transistion to %s\n", ePSToString(state)); +bool +StreamProcessor::waitForState(enum eProcessorState state, unsigned int timeout_ms) +{ + debugOutput(DEBUG_LEVEL_VERBOSE, "Waiting for state %s\n", ePSToString(state)); + int cnt = timeout_ms; + while (m_state != state && cnt) { + usleep(1000); + cnt--; + } + if(cnt==0) { + debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout\n"); return false; } - while (m_state != wait_state && cnt) { - usleep(10000); - cnt++; - } - if(cnt==0) { - debugError("Timeout entering Stopped state\n"); - return false; - } - debugOutput(DEBUG_LEVEL_VERBOSE, " entered state %s\n", ePSToString(wait_state)); return true; } -bool StreamProcessor::startDryRunning(int64_t t) { +bool StreamProcessor::scheduleStartDryRunning(int64_t t) { uint64_t tx; if (t < 0) { @@ -474,11 +660,11 @@ tx = t; } - debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::startDryRunning for (%p)\n",this); + debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this); debugOutput(DEBUG_LEVEL_VERBOSE," Now : %011lu\n", m_handler->getCycleTimerTicks()); debugOutput(DEBUG_LEVEL_VERBOSE," Start at : %011llu (%u)\n", tx, TICKS_TO_CYCLES(tx)); if (m_state == ePS_Stopped) { - return scheduleAndWaitForStateTransition(ePS_WaitingForStream, tx, ePS_DryRunning); + return scheduleStateTransition(ePS_WaitingForStream, tx); } else if (m_state == ePS_Running) { - return scheduleAndWaitForStateTransition(ePS_WaitingForStreamDisable, tx, ePS_DryRunning); + return scheduleStateTransition(ePS_WaitingForStreamDisable, tx); } else { debugError("Cannot switch to ePS_DryRunning from %s\n", ePSToString(m_state)); @@ -487,5 +673,5 @@ } -bool StreamProcessor::startRunning(int64_t t) { +bool StreamProcessor::scheduleStartRunning(int64_t t) { uint64_t tx; if (t < 0) { @@ -494,11 +680,11 @@ tx = t; } - debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::startRunning for (%p)\n",this); + debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this); debugOutput(DEBUG_LEVEL_VERBOSE," Now : %011lu\n", m_handler->getCycleTimerTicks()); debugOutput(DEBUG_LEVEL_VERBOSE," Start at : %011llu (%u)\n", tx, TICKS_TO_CYCLES(tx)); - return scheduleAndWaitForStateTransition(ePS_WaitingForStreamEnable, tx, ePS_Running); -} - -bool StreamProcessor::stopDryRunning(int64_t t) { + return scheduleStateTransition(ePS_WaitingForStreamEnable, tx); +} + +bool StreamProcessor::scheduleStopDryRunning(int64_t t) { uint64_t tx; if (t < 0) { @@ -507,11 +693,11 @@ tx = t; } - debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::stopDryRunning for (%p)\n",this); + debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this); debugOutput(DEBUG_LEVEL_VERBOSE," Now : %011lu\n", m_handler->getCycleTimerTicks()); debugOutput(DEBUG_LEVEL_VERBOSE," Stop at : %011llu (%u)\n", tx, TICKS_TO_CYCLES(tx)); - return scheduleAndWaitForStateTransition(ePS_Stopped, tx, ePS_Stopped); -} - -bool StreamProcessor::stopRunning(int64_t t) { + return scheduleStateTransition(ePS_Stopped, tx); +} + +bool StreamProcessor::scheduleStopRunning(int64_t t) { uint64_t tx; if (t < 0) { @@ -520,9 +706,58 @@ tx = t; } - debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::stopRunning for (%p)\n",this); + debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this); debugOutput(DEBUG_LEVEL_VERBOSE," Now : %011lu\n", m_handler->getCycleTimerTicks()); debugOutput(DEBUG_LEVEL_VERBOSE," Stop at : %011llu (%u)\n", tx, TICKS_TO_CYCLES(tx)); - return scheduleAndWaitForStateTransition(ePS_WaitingForStreamDisable, tx, ePS_DryRunning); -} + return scheduleStateTransition(ePS_WaitingForStreamDisable, tx); +} + +bool StreamProcessor::startDryRunning(int64_t t) { + if(!scheduleStartDryRunning(t)) { + debugError("Could not schedule transition\n"); + return false; + } + if(!waitForState(ePS_DryRunning, 2000)) { + debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning)); + return false; + } + return true; +} + +bool StreamProcessor::startRunning(int64_t t) { + if(!scheduleStartRunning(t)) { + debugError("Could not schedule transition\n"); + return false; + } + if(!waitForState(ePS_Running, 2000)) { + debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Running)); + return false; + } + return true; +} + +bool StreamProcessor::stopDryRunning(int64_t t) { + if(!scheduleStopDryRunning(t)) { + debugError("Could not schedule transition\n"); + return false; + } + if(!waitForState(ePS_Stopped, 2000)) { + debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Stopped)); + return false; + } + return true; +} + +bool StreamProcessor::stopRunning(int64_t t) { + if(!scheduleStopRunning(t)) { + debugError("Could not schedule transition\n"); + return false; + } + if(!waitForState(ePS_DryRunning, 2000)) { + debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning)); + return false; + } + return true; +} + // internal state API @@ -544,5 +779,5 @@ { float ticks_per_frame; - unsigned int ringbuffer_size_frames; + unsigned int ringbuffer_size_frames = m_manager->getNbBuffers() * m_manager->getPeriodSize(); debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state)); @@ -554,18 +789,20 @@ // object just created result = m_data_buffer->init(); - + // prepare the framerate estimate ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_manager->getNominalRate()); m_ticks_per_frame = ticks_per_frame; debugOutput(DEBUG_LEVEL_VERBOSE,"Initializing remote ticks/frame to %f\n", ticks_per_frame); - + // initialize internal buffer - ringbuffer_size_frames = m_manager->getNbBuffers() * m_manager->getPeriodSize(); - result &= m_data_buffer->setBufferSize(ringbuffer_size_frames * 2); + result &= m_data_buffer->setBufferSize(ringbuffer_size_frames); result &= m_data_buffer->setEventSize( getEventSize() ); result &= m_data_buffer->setEventsPerFrame( getEventsPerFrame() ); - result &= m_data_buffer->setUpdatePeriod( getUpdatePeriod() ); - + if(getType() == ePT_Receive) { + result &= m_data_buffer->setUpdatePeriod( getNominalFramesPerPacket() ); + } else { + result &= m_data_buffer->setUpdatePeriod( m_manager->getPeriodSize() ); + } result &= m_data_buffer->setNominalRate(ticks_per_frame); result &= m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND); @@ -612,5 +849,4 @@ } // buffertype and datatype are dependant on the API - // buffertype and datatype are dependant on the API debugWarning("---------------- ! Doing hardcoded test setup ! --------------\n"); // buffertype and datatype are dependant on the API @@ -643,6 +879,5 @@ } - result &= m_data_buffer->reset(); // FIXME: don't like the reset() name - + result &= m_data_buffer->clearBuffer(); // FIXME: don't like the reset() name // make the buffer transparent m_data_buffer->setTransparent(true); @@ -652,4 +887,10 @@ m_state = ePS_Stopped; + #ifdef DEBUG + if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) { + debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n"); + dumpInfo(); + } + #endif return result; } @@ -679,4 +920,10 @@ } m_state = ePS_WaitingForStream; + #ifdef DEBUG + if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) { + debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n"); + dumpInfo(); + } + #endif return true; } @@ -701,4 +948,6 @@ debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started dry-running at cycle %d\n", this, m_last_cycle); if (getType() == ePT_Receive) { + // this to ensure that there is no discontinuity when starting to + // update the DLL based upon the received packets m_data_buffer->setBufferTailTimestamp(m_last_timestamp); } else { @@ -708,5 +957,5 @@ break; case ePS_WaitingForStreamDisable: - result &= m_data_buffer->reset(); // FIXME: don't like the reset() name + result &= m_data_buffer->clearBuffer(); // FIXME: don't like the reset() name m_data_buffer->setTransparent(true); break; @@ -716,4 +965,10 @@ } m_state = ePS_DryRunning; + #ifdef DEBUG + if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) { + debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n"); + dumpInfo(); + } + #endif return result; } @@ -732,4 +987,5 @@ { debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state)); + unsigned int ringbuffer_size_frames; switch(m_state) { case ePS_DryRunning: @@ -737,4 +993,19 @@ // this basically means nothing, the state change will // be picked up by the packet iterator + + if(!m_data_buffer->clearBuffer()) { // FIXME: don't like the reset() name + debugError("Could not reset data buffer\n"); + return false; + } + if (getType() == ePT_Transmit) { + ringbuffer_size_frames = m_manager->getNbBuffers() * m_manager->getPeriodSize(); + debugOutput(DEBUG_LEVEL_VERBOSE, "Prefill transmit SP %p with %u frames\n", this, ringbuffer_size_frames); + // prefill the buffer + if(!transferSilence(ringbuffer_size_frames)) { + debugFatal("Could not prefill transmit stream\n"); + return false; + } + } + break; default: @@ -743,4 +1014,10 @@ } m_state = ePS_WaitingForStreamEnable; + #ifdef DEBUG + if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) { + debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n"); + dumpInfo(); + } + #endif return true; } @@ -765,10 +1042,6 @@ debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started running at cycle %d\n", this, m_last_cycle); - if (getType() == ePT_Receive) { - m_data_buffer->setTransparent(false); - } else { - // FIXME - debugError("Implement\n"); - } + m_xruns = 0; + m_data_buffer->setTransparent(false); break; default: @@ -777,4 +1050,10 @@ } m_state = ePS_Running; + #ifdef DEBUG + if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) { + debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n"); + dumpInfo(); + } + #endif return result; } @@ -802,4 +1081,10 @@ } m_state = ePS_WaitingForStreamDisable; + #ifdef DEBUG + if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) { + debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n"); + dumpInfo(); + } + #endif return true; } @@ -922,4 +1207,33 @@ } +/*********************************************** + * Helper routines * + ***********************************************/ +bool +StreamProcessor::transferSilence(unsigned int nframes) +{ + bool retval; + signed int fc; + ffado_timestamp_t ts_tail_tmp; + + // prepare a buffer of silence + char *dummybuffer = (char *)calloc(sizeof(quadlet_t), nframes * getEventsPerFrame()); + transmitSilenceBlock(dummybuffer, nframes, 0); + + m_data_buffer->getBufferTailTimestamp(&ts_tail_tmp, &fc); + if (fc != 0) { + debugWarning("Prefilling a buffer that already contains %d frames\n", fc); + } + + // add the silence data to the ringbuffer + if(m_data_buffer->preloadFrames(nframes, dummybuffer, true)) { + retval = true; + } else { + debugWarning("Could not write to event buffer\n"); + retval = false; + } + free(dummybuffer); + return retval; +} /** @@ -968,12 +1282,17 @@ IsoStream::dumpInfo(); debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor info:\n"); - if (m_handler) - debugOutputShort( DEBUG_LEVEL_NORMAL, " Now : %011u\n",m_handler->getCycleTimerTicks()); + if (m_handler) { + uint64_t now = m_handler->getCycleTimerTicks(); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Now : %011llu (%03us %04uc %04ut)\n", + now, + (unsigned int)TICKS_TO_SECS(now), + (unsigned int)TICKS_TO_CYCLES(now), + (unsigned int)TICKS_TO_OFFSET(now)); + } debugOutputShort( DEBUG_LEVEL_NORMAL, " Xruns : %d\n", m_xruns); debugOutputShort( DEBUG_LEVEL_NORMAL, " State : %s\n", ePSToString(m_state)); debugOutputShort( DEBUG_LEVEL_NORMAL, " Next state : %s\n", ePSToString(m_next_state)); debugOutputShort( DEBUG_LEVEL_NORMAL, " transition at : %u\n", m_cycle_to_switch_state); - - + debugOutputShort( DEBUG_LEVEL_NORMAL, " Buffer : %p\n", m_data_buffer); debugOutputShort( DEBUG_LEVEL_NORMAL, " Nominal framerate : %u\n", m_manager->getNominalRate()); debugOutputShort( DEBUG_LEVEL_NORMAL, " Device framerate : Sync: %f, Buffer %f\n", @@ -983,8 +1302,4 @@ m_data_buffer->dumpInfo(); - - m_PeriodStat.dumpInfo(); - m_PacketStat.dumpInfo(); -// m_WakeupStat.dumpInfo(); } Index: /branches/ppalmers-streaming/src/debugmodule/debugmodule.cpp =================================================================== --- /branches/ppalmers-streaming/src/debugmodule/debugmodule.cpp (revision 707) +++ /branches/ppalmers-streaming/src/debugmodule/debugmodule.cpp (revision 720) @@ -35,5 +35,5 @@ #ifndef DO_MESSAGE_BUFFER_PRINT - #warning Printing debug info without ringbuffer, not RT-safe! + #warning Printing debug info without ringbuffer, not RT-safe! #endif Index: /branches/ppalmers-streaming/src/libutil/TimestampedBuffer.cpp =================================================================== --- /branches/ppalmers-streaming/src/libutil/TimestampedBuffer.cpp (revision 719) +++ /branches/ppalmers-streaming/src/libutil/TimestampedBuffer.cpp (revision 720) @@ -266,9 +266,8 @@ * @return true if successful */ -bool TimestampedBuffer::reset() { +bool TimestampedBuffer::clearBuffer() { + debugOutput(DEBUG_LEVEL_VERBOSE, "Clearing buffer\n"); ffado_ringbuffer_reset(m_event_buffer); - resetFrameCounter(); - return true; } @@ -397,4 +396,49 @@ /** + * @brief Preload frames into the buffer + * + * Preload \ref nframes of frames from the buffer pointed to by \ref data to the + * internal ringbuffer. Does not care about transparency. Keeps the buffer head or tail + * timestamp constant. + * + * @note not thread safe + * + * @param nframes number of frames to copy + * @param data pointer to the frame buffer + * @param keep_head_ts if true, keep the head timestamp constant. If false, keep the + * tail timestamp constant. + * @return true if successful + */ +bool TimestampedBuffer::preloadFrames(unsigned int nframes, char *data, bool keep_head_ts) { + unsigned int write_size = nframes * m_event_size * m_events_per_frame; + // add the data payload to the ringbuffer + size_t written = ffado_ringbuffer_write(m_event_buffer, data, write_size); + if (written < write_size) + { + debugWarning("ringbuffer full, request: %u, actual: %u\n", write_size, written); + return false; + } + + // make sure the head timestamp remains identical + signed int fc; + ffado_timestamp_t ts; + + if (keep_head_ts) { + getBufferHeadTimestamp(&ts, &fc); + } else { + getBufferTailTimestamp(&ts, &fc); + } + // update frame counter + m_framecounter += nframes; + if (keep_head_ts) { + setBufferHeadTimestamp(ts); + } else { + setBufferTailTimestamp(ts); + } + + return true; +} + +/** * @brief Drop frames from the head of the buffer * @@ -410,5 +454,4 @@ ffado_ringbuffer_read_advance(m_event_buffer, read_size); decrementFrameCounter(nframes); - return true; } @@ -722,5 +765,5 @@ #endif - ffado_timestamp_t ts=new_timestamp; + ffado_timestamp_t ts = new_timestamp; ENTER_CRITICAL_SECTION; @@ -1045,5 +1088,5 @@ ENTER_CRITICAL_SECTION; - diff=m_buffer_next_tail_timestamp - m_buffer_tail_timestamp; + diff = m_buffer_next_tail_timestamp - m_buffer_tail_timestamp; EXIT_CRITICAL_SECTION; @@ -1054,5 +1097,5 @@ #endif - ffado_timestamp_t ts=new_timestamp; + ffado_timestamp_t ts = new_timestamp; ts += m_tick_offset; @@ -1123,5 +1166,5 @@ this, diff); - double err=diff; + double err = diff; debugOutputShort(DEBUG_LEVEL_VERY_VERBOSE, "diff2="TIMESTAMP_FORMAT_SPEC" err=%f\n", @@ -1208,4 +1251,5 @@ debugOutputShort( DEBUG_LEVEL_NORMAL, " TimestampedBuffer (%p) info:\n",this); debugOutputShort( DEBUG_LEVEL_NORMAL, " Frame counter : %d\n", m_framecounter); + debugOutputShort( DEBUG_LEVEL_NORMAL, " Events in buffer : %d\n", getBufferFill()); debugOutputShort( DEBUG_LEVEL_NORMAL, " Buffer head timestamp : "TIMESTAMP_FORMAT_SPEC"\n",ts_head); debugOutputShort( DEBUG_LEVEL_NORMAL, " Buffer tail timestamp : "TIMESTAMP_FORMAT_SPEC"\n",m_buffer_tail_timestamp); Index: /branches/ppalmers-streaming/src/libutil/TimestampedBuffer.h =================================================================== --- /branches/ppalmers-streaming/src/libutil/TimestampedBuffer.h (revision 719) +++ /branches/ppalmers-streaming/src/libutil/TimestampedBuffer.h (revision 720) @@ -37,187 +37,188 @@ // #define TIMESTAMP_FORMAT_SPEC "%012lld" -namespace Util { - +namespace Util +{ class TimestampedBufferClient; /** - * \brief Class implementing a frame buffer that is time-aware - * - * This class implements a buffer that is time-aware. Whenever new frames - * are written to the buffer, the timestamp corresponding to the last frame - * in the buffer is updated. This allows to calculate the timestamp of any - * other frame in the buffer. - * - * The buffer is a frame buffer, having the following parameters defining - * it's behaviour: - * - buff_size: buffer size in frames (setBufferSize()) - * - events_per_frame: the number of events per frame (setEventsPerFrame()) - * - event_size: the storage size of the events (in bytes) (setEventSize()) - * - * The total size of the buffer (in bytes) is at least - * buff_size*events_per_frame*event_size. - * - * Timestamp tracking is done by requiring that a timestamp is specified every - * time frames are added to the buffer. In combination with the buffer fill and - * the frame rate (calculated internally), this allows to calculate the timestamp - * of any frame in the buffer. In order to initialize the internal data structures, - * the setNominalRate() and setUpdatePeriod() functions are provided. - * - * \note Currently the class only supports fixed size writes of size update_period. - * This can change in the future, implementation ideas are already in place. - * - * The TimestampedBuffer class is time unit agnostic. It can handle any time unit - * as long as it fits in a 64 bit unsigned integer. The buffer supports wrapped - * timestamps using (...). - * - * There are two methods of reading and writing to the buffer. - * - * The first method uses conventional readFrames() and writeFrames() functions. - * - * The second method makes use of the TimestampedBufferClient interface. When a - * TimestampedBuffer is created, it is required that a TimestampedBufferClient is - * registered. This client implements the processReadBlock and processWriteBlock - * functions. These are block processing 'callbacks' that allow zero-copy processing - * of the buffer contents. In order to initiate block processing, the - * blockProcessWriteFrames and blockProcessReadFrames functions are provided by - * TimestampedBuffer. - * - */ -class TimestampedBuffer { - -public: - - - TimestampedBuffer(TimestampedBufferClient *); - virtual ~TimestampedBuffer(); - - bool writeDummyFrame(); - bool dropFrames(unsigned int nbframes); - - bool writeFrames(unsigned int nbframes, char *data, ffado_timestamp_t ts); - bool readFrames(unsigned int nbframes, char *data); - - bool blockProcessWriteFrames(unsigned int nbframes, ffado_timestamp_t ts); - bool blockProcessReadFrames(unsigned int nbframes); - - bool init(); - bool prepare(); - bool reset(); - - bool isEnabled() {return m_enabled;}; - void enable() {m_enabled=true;}; - void disable() {m_enabled=false;}; - - bool isTransparent() {return m_transparent;}; - void setTransparent(bool v) {m_transparent=v;}; - - bool setEventSize(unsigned int s); - bool setEventsPerFrame(unsigned int s); - bool setBufferSize(unsigned int s); - unsigned int getBufferSize() {return m_buffer_size;}; - - unsigned int getBytesPerFrame() {return m_bytes_per_frame;}; - - bool setWrapValue(ffado_timestamp_t w); - - unsigned int getBufferFill(); - - // timestamp stuff - int getFrameCounter() {return m_framecounter;}; - - void getBufferHeadTimestamp(ffado_timestamp_t *ts, signed int *fc); - void getBufferTailTimestamp(ffado_timestamp_t *ts, signed int *fc); - - void setBufferTailTimestamp(ffado_timestamp_t new_timestamp); - void setBufferHeadTimestamp(ffado_timestamp_t new_timestamp); - - // sync related, also drops or add frames when necessary - bool syncBufferHeadToTimestamp(ffado_timestamp_t ts); - bool syncBufferTailToTimestamp(ffado_timestamp_t ts); - bool syncCorrectLag(int64_t ts); - - ffado_timestamp_t getTimestampFromTail(int nframes); - ffado_timestamp_t getTimestampFromHead(int nframes); - - // buffer offset stuff - /// return the tick offset value - ffado_timestamp_t getTickOffset() {return m_tick_offset;}; - - bool setFrameOffset(int nframes); - bool setTickOffset(ffado_timestamp_t); - - // dll stuff - bool setNominalRate(float r); - float getNominalRate() {return m_nominal_rate;}; - float getRate(); - - bool setUpdatePeriod(unsigned int t); - - // misc stuff - void dumpInfo(); - void setVerboseLevel(int l) {setDebugLevel(l);}; - -private: - void decrementFrameCounter(int nbframes); - void incrementFrameCounter(int nbframes, ffado_timestamp_t new_timestamp); - void resetFrameCounter(); - -protected: - - ffado_ringbuffer_t * m_event_buffer; - char* m_cluster_buffer; - - unsigned int m_event_size; // the size of one event - unsigned int m_events_per_frame; // the number of events in a frame - unsigned int m_buffer_size; // the number of frames in the buffer - unsigned int m_bytes_per_frame; - unsigned int m_bytes_per_buffer; - bool m_enabled; // you can get frames FIXME: rename!! - bool m_transparent; // the buffer should hold the frames put in it. if true, discards all frames - - ffado_timestamp_t m_wrap_at; // value to wrap at - - TimestampedBufferClient *m_Client; - - DECLARE_DEBUG_MODULE; - -private: - // the framecounter gives the number of frames in the buffer - signed int m_framecounter; - - // the offset that define the timing of the buffer - ffado_timestamp_t m_tick_offset; - - // the buffer tail timestamp gives the timestamp of the last frame - // that was put into the buffer - ffado_timestamp_t m_buffer_tail_timestamp; - ffado_timestamp_t m_buffer_next_tail_timestamp; - - // this mutex protects the access to the framecounter - // and the buffer head timestamp. - pthread_mutex_t m_framecounter_lock; - - // tracking DLL variables + * \brief Class implementing a frame buffer that is time-aware + * + * This class implements a buffer that is time-aware. Whenever new frames + * are written to the buffer, the timestamp corresponding to the last frame + * in the buffer is updated. This allows to calculate the timestamp of any + * other frame in the buffer. + * + * The buffer is a frame buffer, having the following parameters defining + * it's behaviour: + * - buff_size: buffer size in frames (setBufferSize()) + * - events_per_frame: the number of events per frame (setEventsPerFrame()) + * - event_size: the storage size of the events (in bytes) (setEventSize()) + * + * The total size of the buffer (in bytes) is at least + * buff_size*events_per_frame*event_size. + * + * Timestamp tracking is done by requiring that a timestamp is specified every + * time frames are added to the buffer. In combination with the buffer fill and + * the frame rate (calculated internally), this allows to calculate the timestamp + * of any frame in the buffer. In order to initialize the internal data structures, + * the setNominalRate() and setUpdatePeriod() functions are provided. + * + * \note Currently the class only supports fixed size writes of size update_period. + * This can change in the future, implementation ideas are already in place. + * + * The TimestampedBuffer class is time unit agnostic. It can handle any time unit + * as long as it fits in a 64 bit unsigned integer. The buffer supports wrapped + * timestamps using (...). + * + * There are two methods of reading and writing to the buffer. + * + * The first method uses conventional readFrames() and writeFrames() functions. + * + * The second method makes use of the TimestampedBufferClient interface. When a + * TimestampedBuffer is created, it is required that a TimestampedBufferClient is + * registered. This client implements the processReadBlock and processWriteBlock + * functions. These are block processing 'callbacks' that allow zero-copy processing + * of the buffer contents. In order to initiate block processing, the + * blockProcessWriteFrames and blockProcessReadFrames functions are provided by + * TimestampedBuffer. + * + */ +class TimestampedBuffer +{ + public: + TimestampedBuffer ( TimestampedBufferClient * ); + virtual ~TimestampedBuffer(); + + bool writeDummyFrame(); + bool dropFrames ( unsigned int nbframes ); + + bool writeFrames ( unsigned int nbframes, char *data, ffado_timestamp_t ts ); + bool readFrames ( unsigned int nbframes, char *data ); + + bool preloadFrames ( unsigned int nbframes, char *data, bool keep_head_ts ); + + bool blockProcessWriteFrames ( unsigned int nbframes, ffado_timestamp_t ts ); + bool blockProcessReadFrames ( unsigned int nbframes ); + + bool init(); + bool prepare(); + bool clearBuffer(); + + bool isEnabled() {return m_enabled;}; + void enable() {m_enabled=true;}; + void disable() {m_enabled=false;}; + + bool isTransparent() {return m_transparent;}; + void setTransparent ( bool v ) {m_transparent=v;}; + + bool setEventSize ( unsigned int s ); + bool setEventsPerFrame ( unsigned int s ); + bool setBufferSize ( unsigned int s ); + unsigned int getBufferSize() {return m_buffer_size;}; + + unsigned int getBytesPerFrame() {return m_bytes_per_frame;}; + + bool setWrapValue ( ffado_timestamp_t w ); + + unsigned int getBufferFill(); + + // timestamp stuff + int getFrameCounter() {return m_framecounter;}; + + void getBufferHeadTimestamp ( ffado_timestamp_t *ts, signed int *fc ); + void getBufferTailTimestamp ( ffado_timestamp_t *ts, signed int *fc ); + + void setBufferTailTimestamp ( ffado_timestamp_t new_timestamp ); + void setBufferHeadTimestamp ( ffado_timestamp_t new_timestamp ); + + // sync related, also drops or add frames when necessary + bool syncBufferHeadToTimestamp ( ffado_timestamp_t ts ); + bool syncBufferTailToTimestamp ( ffado_timestamp_t ts ); + bool syncCorrectLag ( int64_t ts ); + + ffado_timestamp_t getTimestampFromTail ( int nframes ); + ffado_timestamp_t getTimestampFromHead ( int nframes ); + + // buffer offset stuff + /// return the tick offset value + ffado_timestamp_t getTickOffset() {return m_tick_offset;}; + + bool setFrameOffset ( int nframes ); + bool setTickOffset ( ffado_timestamp_t ); + + // dll stuff + bool setNominalRate ( float r ); + float getNominalRate() {return m_nominal_rate;}; + float getRate(); + + bool setUpdatePeriod ( unsigned int t ); + + // misc stuff + void dumpInfo(); + void setVerboseLevel ( int l ) {setDebugLevel ( l );}; + + private: + void decrementFrameCounter ( int nbframes ); + void incrementFrameCounter ( int nbframes, ffado_timestamp_t new_timestamp ); + void resetFrameCounter(); + + protected: + + ffado_ringbuffer_t * m_event_buffer; + char* m_cluster_buffer; + + unsigned int m_event_size; // the size of one event + unsigned int m_events_per_frame; // the number of events in a frame + unsigned int m_buffer_size; // the number of frames in the buffer + unsigned int m_bytes_per_frame; + unsigned int m_bytes_per_buffer; + bool m_enabled; // you can get frames FIXME: rename!! + bool m_transparent; // the buffer should hold the frames put in it. if true, discards all frames + + ffado_timestamp_t m_wrap_at; // value to wrap at + + TimestampedBufferClient *m_Client; + + DECLARE_DEBUG_MODULE; + + private: + // the framecounter gives the number of frames in the buffer + signed int m_framecounter; + + // the offset that define the timing of the buffer + ffado_timestamp_t m_tick_offset; + + // the buffer tail timestamp gives the timestamp of the last frame + // that was put into the buffer + ffado_timestamp_t m_buffer_tail_timestamp; + ffado_timestamp_t m_buffer_next_tail_timestamp; + + // this mutex protects the access to the framecounter + // and the buffer head timestamp. + pthread_mutex_t m_framecounter_lock; + + // tracking DLL variables // JMW: try double for this too // float m_dll_e2; - double m_dll_e2; - float m_dll_b; - float m_dll_c; - - float m_nominal_rate; - unsigned int m_update_period; + double m_dll_e2; + float m_dll_b; + float m_dll_c; + + float m_nominal_rate; + unsigned int m_update_period; }; /** - * \brief Interface to be implemented by TimestampedBuffer clients - */ -class TimestampedBufferClient { + * \brief Interface to be implemented by TimestampedBuffer clients + */ +class TimestampedBufferClient +{ public: TimestampedBufferClient() {}; virtual ~TimestampedBufferClient() {}; - virtual bool processReadBlock(char *data, unsigned int nevents, unsigned int offset)=0; - virtual bool processWriteBlock(char *data, unsigned int nevents, unsigned int offset)=0; + virtual bool processReadBlock ( char *data, unsigned int nevents, unsigned int offset ) =0; + virtual bool processWriteBlock ( char *data, unsigned int nevents, unsigned int offset ) =0; }; Index: /branches/ppalmers-streaming/src/genericavc/avc_avdevice.cpp =================================================================== --- /branches/ppalmers-streaming/src/genericavc/avc_avdevice.cpp (revision 719) +++ /branches/ppalmers-streaming/src/genericavc/avc_avdevice.cpp (revision 720) @@ -422,6 +422,4 @@ } - int samplerate=outputPlug->getSampleRate(); - debugOutput( DEBUG_LEVEL_VERBOSE, "Initializing receive processor...\n"); // create & add streamprocessors @@ -592,6 +590,6 @@ int AvDevice::getStreamCount() { - //return m_receiveProcessors.size() + m_transmitProcessors.size(); - return 1; + return m_receiveProcessors.size() + m_transmitProcessors.size(); + //return 1; }