Changeset 720

Show
Ignore:
Timestamp:
11/23/07 08:11:41 (13 years ago)
Author:
ppalmers
Message:

first working version of the reworked streaming code

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • branches/ppalmers-streaming/src/debugmodule/debugmodule.cpp

    r707 r720  
    3535 
    3636#ifndef DO_MESSAGE_BUFFER_PRINT 
    37        #warning Printing debug info without ringbuffer, not RT-safe! 
     37    #warning Printing debug info without ringbuffer, not RT-safe! 
    3838#endif 
    3939 
  • branches/ppalmers-streaming/src/genericavc/avc_avdevice.cpp

    r719 r720  
    422422    } 
    423423 
    424     int samplerate=outputPlug->getSampleRate(); 
    425  
    426424    debugOutput( DEBUG_LEVEL_VERBOSE, "Initializing receive processor...\n"); 
    427425    // create & add streamprocessors 
     
    592590int 
    593591AvDevice::getStreamCount() { 
    594     //return m_receiveProcessors.size() + m_transmitProcessors.size(); 
    595     return 1; 
     592    return m_receiveProcessors.size() + m_transmitProcessors.size(); 
     593    //return 1; 
    596594} 
    597595 
  • branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.cpp

    r719 r720  
    5555 
    5656bool AmdtpReceiveStreamProcessor::prepareChild() { 
    57  
    58     m_PeriodStat.setName("RCV PERIOD"); 
    59     m_PacketStat.setName("RCV PACKET"); 
    60     m_WakeupStat.setName("RCV WAKEUP"); 
    61  
    6257    debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing (%p)...\n", this); 
    6358 
     
    153148    // this packet x*syt_interval*ticks_per_frame 
    154149    // later than expected (the real receive time) 
    155     debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"STMP: %lluticks | buff=%d, syt_interval=%d, tpf=%f\n", 
    156         m_last_timestamp, m_handler->getWakeupInterval(), m_syt_interval, getTicksPerFrame()); 
     150    #ifdef DEBUG 
     151    if(isRunning()) { 
     152        debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"STMP: %lluticks | buff=%d, syt_interval=%d, tpf=%f\n", 
     153            m_last_timestamp, m_handler->getWakeupInterval(), m_syt_interval, getTicksPerFrame()); 
     154    } 
     155    #endif 
    157156 
    158157    if(m_data_buffer->writeFrames(nevents, (char *)(data+8), m_last_timestamp)) { 
  • branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.h

    r719 r720  
    9494    virtual unsigned int getEventsPerFrame()  
    9595                    { return m_dimension; }; 
    96     virtual unsigned int getUpdatePeriod()  
     96    virtual unsigned int getNominalFramesPerPacket()  
    9797                    {return m_syt_interval;}; 
    98  
    99     // We have 1 period of samples = m_period 
    100     // this period takes m_period/m_framerate seconds of time 
    101     // during this time, 8000 packets are sent 
    102 //     unsigned int getPacketsPerPeriod() {return (m_period*8000)/m_framerate;}; 
    103  
    104     // however, if we only count the number of used packets 
    105     // it is m_period / m_syt_interval 
    10698    virtual unsigned int getPacketsPerPeriod(); 
    10799 
    108100protected: 
    109  
    110101    bool processReadBlock(char *data, unsigned int nevents, unsigned int offset); 
    111102    bool provideSilenceBlock(unsigned int nevents, unsigned int offset); 
    112103 
     104private: 
    113105    bool decodePacketPorts(quadlet_t *data, unsigned int nevents, unsigned int dbc); 
    114106 
  • branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.cpp

    r719 r720  
    3838#define TRANSMIT_TRANSFER_DELAY DEFAULT_TRANSFER_DELAY 
    3939 
    40 namespace Streaming { 
     40namespace Streaming 
     41
    4142 
    4243/* transmit */ 
    43 AmdtpTransmitStreamProcessor::AmdtpTransmitStreamProcessor(int port, int dimension) 
    44         : StreamProcessor(ePT_Transmit, port) 
    45         , m_dimension(dimension) 
    46         , m_last_timestamp(0) 
    47         , m_dbc(0) 
    48         , m_ringbuffer_size_frames(0) 
     44AmdtpTransmitStreamProcessor::AmdtpTransmitStreamProcessor ( int port, int dimension ) 
     45        : StreamProcessor ( ePT_Transmit, port ) 
     46        , m_dimension ( dimension ) 
     47        , m_dbc ( 0 ) 
    4948{} 
    5049 
    51 enum raw1394_iso_disposition 
    52 AmdtpTransmitStreamProcessor::getPacket(unsigned char *data, unsigned int *length, 
    53                   unsigned char *tag, unsigned char *sy, 
    54                   int cycle, unsigned int dropped, unsigned int max_length) { 
    55     struct iec61883_packet *packet = (struct iec61883_packet *) data; 
    56  
    57     if (cycle<0) { 
    58         debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE,"Xmit handler for cycle %d, (running=%d)\n", 
    59             cycle, isRunning()); 
    60         *tag = 0; 
    61         *sy = 0; 
    62         *length=0; 
    63         return RAW1394_ISO_OK; 
    64     } 
    65  
    66     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE,"Xmit handler for cycle %d, (running=%d)\n", 
    67         cycle, isRunning()); 
    68  
    69     if (addCycles(m_last_cycle, 1) != cycle) { 
    70         debugWarning("(%p) Dropped %d packets on cycle %d\n", diffCycles(cycle,m_last_cycle)-1, cycle); 
    71     } 
    72  
    73     m_last_cycle=cycle; 
    74  
    75 #ifdef DEBUG 
    76     if(dropped>0) { 
    77         debugWarning("Dropped %d packets on cycle %d\n",dropped, cycle); 
    78     } 
    79 #endif 
    80  
    81     // calculate & preset common values 
    82  
     50bool 
     51AmdtpTransmitStreamProcessor::generatePacketHeader ( 
     52    unsigned char *data, unsigned int *length, 
     53    unsigned char *tag, unsigned char *sy, 
     54    int cycle, unsigned int dropped, unsigned int max_length ) 
     55
     56    struct iec61883_packet *packet = ( struct iec61883_packet * ) data; 
    8357    /* Our node ID can change after a bus reset, so it is best to fetch 
    84     * our node ID for each packet. */ 
     58    * our node ID for each packet. */ 
    8559    packet->sid = m_handler->getLocalNodeId() & 0x3f; 
    8660 
     
    9670    *tag = IEC61883_TAG_WITH_CIP; 
    9771    *sy = 0; 
    98  
    99     // determine if we want to send a packet or not 
    100     // note that we can't use getCycleTimer directly here, 
    101     // because packets are queued in advance. This means that 
    102     // we the packet we are constructing will be sent out 
    103     // on 'cycle', not 'now'. 
    104     unsigned int ctr=m_handler->getCycleTimer(); 
    105     int now_cycles = (int)CYCLE_TIMER_GET_CYCLES(ctr); 
    106  
    107     // the difference between the cycle this 
    108     // packet is intended for and 'now' 
    109     int cycle_diff = diffCycles(cycle, now_cycles); 
    110  
    111 #ifdef DEBUG 
    112     if(isRunning() && (cycle_diff < 0)) { 
    113         debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy)\n", 
    114             cycle, now_cycles); 
    115     } 
    116  
    117     // keep track of the lag 
    118     m_PacketStat.mark(cycle_diff); 
    119 #endif 
    120  
    121     // as long as the cycle parameter is not in sync with 
    122     // the current time, the stream is considered not 
    123     // to be 'running' 
    124     // NOTE: this works only at startup 
    125     if (!isRunning() && cycle_diff >= 0 && cycle >= 0) { 
    126             debugOutput(DEBUG_LEVEL_VERBOSE, "Xmit StreamProcessor %p started running at cycle %d\n",this, cycle); 
    127     } 
    12872 
    12973    signed int fc; 
     
    15296    const int max_cycles_to_transmit_early = 5; 
    15397 
    154     if( !isRunning() || !m_data_buffer->isEnabled() ) { 
    155         debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, 
    156                     "Not running (%d) or buffer not enabled (enabled=%d)\n", 
    157                     isRunning(), m_data_buffer->isEnabled()); 
    158  
    159         // not running or not enabled 
    160         goto send_empty_packet; 
    161     } 
    162  
    16398try_block_of_frames: 
    164     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "Try for cycle %d\n", cycle); 
     99    debugOutput ( DEBUG_LEVEL_ULTRA_VERBOSE, "Try for cycle %d\n", cycle ); 
    165100    // check whether the packet buffer has packets for us to send. 
    166101    // the base timestamp is the one of the next sample in the buffer 
    167102    ffado_timestamp_t ts_head_tmp; 
    168     m_data_buffer->getBufferHeadTimestamp(&ts_head_tmp, &fc); // thread safe 
     103    m_data_buffer->getBufferHeadTimestamp ( &ts_head_tmp, &fc ); // thread safe 
    169104 
    170105    // the timestamp gives us the time at which we want the sample block 
    171106    // to be output by the device 
    172     presentation_time=(uint64_t)ts_head_tmp; 
     107    presentation_time = ( uint64_t ) ts_head_tmp; 
     108    m_last_timestamp = presentation_time; 
    173109 
    174110    // now we calculate the time when we have to transmit the sample block 
    175     transmit_at_time = substractTicks(presentation_time, TRANSMIT_TRANSFER_DELAY); 
     111    transmit_at_time = substractTicks ( presentation_time, TRANSMIT_TRANSFER_DELAY ); 
    176112 
    177113    // calculate the cycle this block should be presented in 
    178114    // (this is just a virtual calculation since at that time it should 
    179115    //  already be in the device's buffer) 
    180     presentation_cycle = (unsigned int)(TICKS_TO_CYCLES( presentation_time )); 
     116    presentation_cycle = ( unsigned int ) ( TICKS_TO_CYCLES ( presentation_time ) ); 
    181117 
    182118    // calculate the cycle this block should be transmitted in 
    183     transmit_at_cycle = (unsigned int)(TICKS_TO_CYCLES( transmit_at_time )); 
     119    transmit_at_cycle = ( unsigned int ) ( TICKS_TO_CYCLES ( transmit_at_time ) ); 
    184120 
    185121    // we can check whether this cycle is within the 'window' we have 
    186122    // to send this packet. 
    187123    // first calculate the number of cycles left before presentation time 
    188     cycles_until_presentation = diffCycles( presentation_cycle, cycle ); 
     124    cycles_until_presentation = diffCycles ( presentation_cycle, cycle ); 
    189125 
    190126    // we can check whether this cycle is within the 'window' we have 
    191127    // to send this packet. 
    192128    // first calculate the number of cycles left before presentation time 
    193     cycles_until_transmit = diffCycles( transmit_at_cycle, cycle ); 
     129    cycles_until_transmit = diffCycles ( transmit_at_cycle, cycle ); 
     130 
     131    debugOutput ( DEBUG_LEVEL_VERY_VERBOSE, 
     132                "Gen HDR: CY=%04u, TC=%04u, CUT=%04d, TST=%011llu (%04u), TSP=%011llu (%04u)\n", 
     133                cycle, 
     134                transmit_at_cycle, cycles_until_transmit, 
     135                transmit_at_time, ( unsigned int ) TICKS_TO_CYCLES ( transmit_at_time ), 
     136                presentation_time, ( unsigned int ) TICKS_TO_CYCLES ( presentation_time ) ); 
    194137 
    195138    // two different options: 
    196     // 1) there are not enough frames for one packet  
     139    // 1) there are not enough frames for one packet 
    197140    //      => determine wether this is a problem, since we might still 
    198141    //         have some time to send it 
    199142    // 2) there are enough packets 
    200143    //      => determine whether we have to send them in this packet 
    201     if (fc < (signed int)m_syt_interval) { 
    202         m_PacketStat.signal(0); 
     144    if ( fc < ( signed int ) m_syt_interval ) 
     145    { 
    203146        // not enough frames in the buffer, 
    204         debugOutput(DEBUG_LEVEL_VERBOSE,  
     147        debugOutput ( DEBUG_LEVEL_VERBOSE, 
    205148                    "Insufficient frames: N=%02d, CY=%04u, TC=%04u, CUT=%04d\n", 
    206                     fc, cycle, transmit_at_cycle, cycles_until_transmit); 
     149                    fc, cycle, transmit_at_cycle, cycles_until_transmit ); 
    207150        // we can still postpone the queueing of the packets 
    208151        // if we are far enough ahead of the presentation time 
    209         if( cycles_until_presentation <= min_cycles_before_presentation ) { 
    210             m_PacketStat.signal(1); 
     152        if ( cycles_until_presentation <= min_cycles_before_presentation ) 
     153        { 
    211154            // we are too late 
    212155            // meaning that we in some sort of xrun state 
     
    214157            m_xruns++; 
    215158            // we send an empty packet on this cycle 
    216             goto send_empty_packet; // UGLY but effective 
    217         } else { 
    218             m_PacketStat.signal(2); 
     159            return false; 
     160        } 
     161        else 
     162        { 
    219163            // there is still time left to send the packet 
    220164            // we want the system to give this packet another go 
    221 //             goto try_packet_again; // UGLY but effective 
     165    //             goto try_packet_again; // UGLY but effective 
    222166            // unfortunatly the try_again doesn't work very well, 
    223167            // so we'll have to either usleep(one cycle) and goto try_block_of_frames 
    224              
     168 
    225169            // or just fill this with an empty packet 
    226170            // if we have to do this too often, the presentation time will 
    227171            // get too close and we're in trouble 
    228             goto send_empty_packet; // UGLY but effective 
    229         } 
    230     } else { 
    231         m_PacketStat.signal(3); 
     172            return false; 
     173        } 
     174    } 
     175    else 
     176    { 
    232177        // there are enough frames, so check the time they are intended for 
    233178        // all frames have a certain 'time window' in which they can be sent 
     
    236181        // in theory we can send the packet up till one cycle before the presentation time, 
    237182        // however this is not very smart. 
    238          
     183 
    239184        // There are 3 options: 
    240185        // 1) the frame block is too early 
     
    245190        //      => discard (and raise xrun?) 
    246191        //         get next block of frames and repeat 
    247          
    248         if (cycles_until_transmit <= max_cycles_to_transmit_early) { 
    249             m_PacketStat.signal(4); 
     192 
     193        if ( cycles_until_transmit <= max_cycles_to_transmit_early ) 
     194        { 
    250195            // it's time send the packet 
    251             goto send_packet; // UGLY but effective 
    252         } else if (cycles_until_transmit < 0) { 
     196            m_dbc += fillDataPacketHeader ( packet, length, m_last_timestamp ); 
     197            return true; 
     198        } 
     199        else if ( cycles_until_transmit < 0 ) 
     200        { 
    253201            // we are too late 
    254             debugOutput(DEBUG_LEVEL_VERBOSE,  
     202            debugOutput ( DEBUG_LEVEL_VERBOSE, 
    255203                        "Too late: CY=%04u, TC=%04u, CUT=%04d, TSP=%011llu (%04u)\n", 
    256204                        cycle, 
    257205                        transmit_at_cycle, cycles_until_transmit, 
    258                         presentation_time, (unsigned int)TICKS_TO_CYCLES(presentation_time)); 
     206                        presentation_time, ( unsigned int ) TICKS_TO_CYCLES ( presentation_time ) ); 
    259207 
    260208            // however, if we can send this sufficiently before the presentation 
     
    262210            // NOTE: dangerous since the device has no way of reporting that it didn't get 
    263211            //       this packet on time. 
    264             if ( cycles_until_presentation <= min_cycles_before_presentation ) { 
    265                 m_PacketStat.signal(5); 
     212            if ( cycles_until_presentation <= min_cycles_before_presentation ) 
     213            { 
    266214                // we are not that late and can still try to transmit the packet 
    267                 goto send_packet; // UGLY but effective 
    268             } else { // definitely too late 
    269                 m_PacketStat.signal(6); 
     215                m_dbc += fillDataPacketHeader ( packet, length, m_last_timestamp ); 
     216                return true; 
     217            } 
     218            else   // definitely too late 
     219            { 
    270220                // remove the samples 
    271                 m_data_buffer->dropFrames(m_syt_interval); 
     221                m_data_buffer->dropFrames ( m_syt_interval ); 
    272222                // signal some xrun situation ??HERE?? 
    273223                m_xruns++; 
     
    275225                goto try_block_of_frames; // UGLY but effective 
    276226            } 
    277         } else { 
    278             m_PacketStat.signal(7); 
    279             debugOutput(DEBUG_LEVEL_VERY_VERBOSE,  
     227        } 
     228        else 
     229        { 
     230            debugOutput ( DEBUG_LEVEL_VERY_VERBOSE, 
    280231                        "Too early: CY=%04u, TC=%04u, CUT=%04d, TST=%011llu (%04u), TSP=%011llu (%04u)\n", 
    281232                        cycle, 
    282233                        transmit_at_cycle, cycles_until_transmit, 
    283                         transmit_at_time, (unsigned int)TICKS_TO_CYCLES(transmit_at_time), 
    284                         presentation_time, (unsigned int)TICKS_TO_CYCLES(presentation_time)); 
    285             #ifdef DEBUG 
    286             if (cycles_until_transmit > max_cycles_to_transmit_early + 1) { 
    287                 debugOutput(DEBUG_LEVEL_VERBOSE,  
     234                        transmit_at_time, ( unsigned int ) TICKS_TO_CYCLES ( transmit_at_time ), 
     235                        presentation_time, ( unsigned int ) TICKS_TO_CYCLES ( presentation_time ) ); 
     236#ifdef DEBUG 
     237            if ( cycles_until_transmit > max_cycles_to_transmit_early + 1 ) 
     238            { 
     239                debugOutput ( DEBUG_LEVEL_VERY_VERBOSE, 
    288240                            "Way too early: CY=%04u, TC=%04u, CUT=%04d, TST=%011llu (%04u), TSP=%011llu (%04u)\n", 
    289241                            cycle, 
    290242                            transmit_at_cycle, cycles_until_transmit, 
    291                             transmit_at_time, (unsigned int)TICKS_TO_CYCLES(transmit_at_time), 
    292                             presentation_time, (unsigned int)TICKS_TO_CYCLES(presentation_time)); 
    293             } 
    294             #endif 
     243                            transmit_at_time, ( unsigned int ) TICKS_TO_CYCLES ( transmit_at_time ), 
     244                            presentation_time, ( unsigned int ) TICKS_TO_CYCLES ( presentation_time ) ); 
     245            } 
     246#endif 
    295247            // we are too early, send only an empty packet 
    296             goto send_empty_packet; // UGLY but effective 
    297         } 
    298     } 
    299  
    300     debugFatal("Should never reach this code!\n"); 
    301     return RAW1394_ISO_ERROR; 
    302  
    303 send_empty_packet: 
    304     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT NONE: CY=%04u, TSP=%011llu (%04u)\n", 
    305             cycle, 
    306             presentation_time, (unsigned int)TICKS_TO_CYCLES(presentation_time)); 
    307  
    308     m_dbc += fillNoDataPacketHeader(packet, length); 
    309     return RAW1394_ISO_DEFER; 
    310  
    311 send_packet: 
    312     if (m_data_buffer->readFrames(m_syt_interval, (char *)(data + 8))) { 
    313         m_dbc += fillDataPacketHeader(packet, length, presentation_time); 
    314  
     248            return false; 
     249        } 
     250    } 
     251    return true; 
     252
     253 
     254bool 
     255AmdtpTransmitStreamProcessor::generatePacketData ( 
     256    unsigned char *data, unsigned int *length, 
     257    unsigned char *tag, unsigned char *sy, 
     258    int cycle, unsigned int dropped, unsigned int max_length ) 
     259
     260    struct iec61883_packet *packet = ( struct iec61883_packet * ) data; 
     261    if ( m_data_buffer->readFrames ( m_syt_interval, ( char * ) ( data + 8 ) ) ) 
     262    { 
    315263        // process all ports that should be handled on a per-packet base 
    316264        // this is MIDI for AMDTP (due to the need of DBC) 
    317         if (!encodePacketPorts((quadlet_t *)(data+8), m_syt_interval, packet->dbc)) { 
    318             debugWarning("Problem encoding Packet Ports\n"); 
    319         } 
    320  
    321         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT DATA: CY=%04u, TST=%011llu (%04u), TSP=%011llu (%04u)\n", 
    322             cycle, 
    323             transmit_at_time, (unsigned int)TICKS_TO_CYCLES(transmit_at_time), 
    324             presentation_time, (unsigned int)TICKS_TO_CYCLES(presentation_time)); 
    325  
    326         return RAW1394_ISO_OK; 
    327     } 
    328  
    329 // the ISO AGAIN does not work very well... 
    330 // try_packet_again: 
    331 //  
    332 //     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT RETRY: CY=%04u, TSP=%011llu (%04u)\n", 
    333 //             cycle, 
    334 //             presentation_time, (unsigned int)TICKS_TO_CYCLES(presentation_time)); 
    335 //     return RAW1394_ISO_AGAIN; 
    336  
    337     // else: 
    338     debugFatal("This is impossible, since we checked the buffer size before!\n"); 
    339     return RAW1394_ISO_ERROR; 
    340 
    341  
    342 unsigned int 
    343 AmdtpTransmitStreamProcessor::getEventsPerFrame() 
    344 
    345     return m_dimension; 
    346 
    347  
    348 unsigned int 
    349 AmdtpTransmitStreamProcessor::getUpdatePeriod() 
    350 
     265        if ( !encodePacketPorts ( ( quadlet_t * ) ( data+8 ), m_syt_interval, packet->dbc ) ) 
     266        { 
     267            debugWarning ( "Problem encoding Packet Ports\n" ); 
     268        } 
     269        debugOutput ( DEBUG_LEVEL_VERY_VERBOSE, "XMIT DATA: TSP=%011llu (%04u)\n", 
     270                    cycle, m_last_timestamp, ( unsigned int ) TICKS_TO_CYCLES ( m_last_timestamp ) ); 
     271        return true; 
     272    } 
     273    else 
     274    { 
     275        return false; 
     276    } 
     277
     278 
     279bool 
     280AmdtpTransmitStreamProcessor::generateSilentPacketHeader ( 
     281    unsigned char *data, unsigned int *length, 
     282    unsigned char *tag, unsigned char *sy, 
     283    int cycle, unsigned int dropped, unsigned int max_length ) 
     284
     285    struct iec61883_packet *packet = ( struct iec61883_packet * ) data; 
     286    debugOutput ( DEBUG_LEVEL_VERY_VERBOSE, "XMIT NONE: CY=%04u, TSP=%011llu (%04u)\n", 
     287                cycle, m_last_timestamp, ( unsigned int ) TICKS_TO_CYCLES ( m_last_timestamp ) ); 
     288 
     289    /* Our node ID can change after a bus reset, so it is best to fetch 
     290    * our node ID for each packet. */ 
     291    packet->sid = m_handler->getLocalNodeId() & 0x3f; 
     292 
     293    packet->dbs = m_dimension; 
     294    packet->fn = 0; 
     295    packet->qpc = 0; 
     296    packet->sph = 0; 
     297    packet->reserved = 0; 
     298    packet->dbc = m_dbc; 
     299    packet->eoh1 = 2; 
     300    packet->fmt = IEC61883_FMT_AMDTP; 
     301 
     302    *tag = IEC61883_TAG_WITH_CIP; 
     303    *sy = 0; 
     304 
     305    m_dbc += fillNoDataPacketHeader ( packet, length ); 
     306    return true; 
     307
     308 
     309bool 
     310AmdtpTransmitStreamProcessor::generateSilentPacketData ( 
     311    unsigned char *data, unsigned int *length, 
     312    unsigned char *tag, unsigned char *sy, 
     313    int cycle, unsigned int dropped, unsigned int max_length ) 
     314
     315    return true; // no need to do anything 
     316
     317 
     318unsigned int AmdtpTransmitStreamProcessor::fillDataPacketHeader ( 
     319    struct iec61883_packet *packet, unsigned int* length, 
     320    uint32_t ts ) 
     321
     322 
     323    packet->fdf = m_fdf; 
     324 
     325    // convert the timestamp to SYT format 
     326    uint16_t timestamp_SYT = TICKS_TO_SYT ( ts ); 
     327    packet->syt = ntohs ( timestamp_SYT ); 
     328 
     329    *length = m_syt_interval*sizeof ( quadlet_t ) *m_dimension + 8; 
     330 
    351331    return m_syt_interval; 
    352332} 
    353333 
    354  
    355 unsigned int AmdtpTransmitStreamProcessor::fillDataPacketHeader( 
    356         struct iec61883_packet *packet, unsigned int* length, 
    357         uint32_t ts) { 
    358  
    359     packet->fdf = m_fdf; 
    360  
    361     // convert the timestamp to SYT format 
    362     uint16_t timestamp_SYT = TICKS_TO_SYT(ts); 
    363     packet->syt = ntohs(timestamp_SYT); 
    364  
    365     *length = m_syt_interval*sizeof(quadlet_t)*m_dimension + 8; 
    366  
    367     return m_syt_interval; 
    368 
    369  
    370 unsigned int AmdtpTransmitStreamProcessor::fillNoDataPacketHeader( 
    371         struct iec61883_packet *packet, unsigned int* length) { 
     334unsigned int AmdtpTransmitStreamProcessor::fillNoDataPacketHeader ( 
     335    struct iec61883_packet *packet, unsigned int* length ) 
     336
    372337 
    373338    // no-data packets have syt=0xFFFF 
     
    378343    // FIXME: either make this a setting or choose 
    379344    bool send_payload=true; 
    380     if(send_payload) { 
     345    if ( send_payload ) 
     346    { 
    381347        // this means no-data packets with payload (DICE doesn't like that) 
    382         *length = 2*sizeof(quadlet_t) + m_syt_interval * m_dimension * sizeof(quadlet_t); 
     348        *length = 2*sizeof ( quadlet_t ) + m_syt_interval * m_dimension * sizeof ( quadlet_t ); 
    383349        return m_syt_interval; 
    384     } else { 
     350    } 
     351    else 
     352    { 
    385353        // dbc is not incremented 
    386354        // this means no-data packets without payload 
    387         *length = 2*sizeof(quadlet_t); 
     355        *length = 2*sizeof ( quadlet_t ); 
    388356        return 0; 
    389357    } 
    390358} 
    391359 
    392 bool AmdtpTransmitStreamProcessor::prefill() { 
    393  
    394     debugOutput( DEBUG_LEVEL_VERBOSE, "Prefill transmit buffers...\n"); 
    395  
    396     if(!transferSilence(m_ringbuffer_size_frames)) { 
    397         debugFatal("Could not prefill transmit stream\n"); 
    398         return false; 
    399     } 
    400  
    401     return true; 
    402 
    403  
    404 bool AmdtpTransmitStreamProcessor::reset() { 
    405  
    406     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting...\n"); 
    407  
    408     // reset the statistics 
    409     m_PeriodStat.reset(); 
    410     m_PacketStat.reset(); 
    411     m_WakeupStat.reset(); 
    412  
    413     m_data_buffer->setTickOffset(0); 
    414  
    415 //     // reset all non-device specific stuff 
    416 //     // i.e. the iso stream and the associated ports 
    417 //     if(!StreamProcessor::reset()) { 
    418 //         debugFatal("Could not do base class reset\n"); 
    419 //         return false; 
    420 //     } 
    421  
    422     // we should prefill the event buffer 
    423     if (!prefill()) { 
    424         debugFatal("Could not prefill buffers\n"); 
    425         return false; 
    426     } 
    427  
    428     return true; 
    429 
    430  
    431 bool AmdtpTransmitStreamProcessor::prepareChild() { 
    432     m_PeriodStat.setName("XMT PERIOD"); 
    433     m_PacketStat.setName("XMT PACKET"); 
    434     m_WakeupStat.setName("XMT WAKEUP"); 
    435  
    436     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing (%p)...\n", this); 
    437  
    438     // prepare all non-device specific stuff 
    439     // i.e. the iso stream and the associated ports 
    440     if(!StreamProcessor::prepare()) { 
    441         debugFatal("Could not prepare base class\n"); 
    442         return false; 
    443     } 
    444  
    445     switch (m_manager->getNominalRate()) { 
    446     case 32000: 
    447         m_syt_interval = 8; 
    448         m_fdf = IEC61883_FDF_SFC_32KHZ; 
    449         break; 
    450     case 44100: 
    451         m_syt_interval = 8; 
    452         m_fdf = IEC61883_FDF_SFC_44K1HZ; 
    453         break; 
    454     default: 
    455     case 48000: 
    456         m_syt_interval = 8; 
    457         m_fdf = IEC61883_FDF_SFC_48KHZ; 
    458         break; 
    459     case 88200: 
    460         m_syt_interval = 16; 
    461         m_fdf = IEC61883_FDF_SFC_88K2HZ; 
    462         break; 
    463     case 96000: 
    464         m_syt_interval = 16; 
    465         m_fdf = IEC61883_FDF_SFC_96KHZ; 
    466         break; 
    467     case 176400: 
    468         m_syt_interval = 32; 
    469         m_fdf = IEC61883_FDF_SFC_176K4HZ; 
    470         break; 
    471     case 192000: 
    472         m_syt_interval = 32; 
    473         m_fdf = IEC61883_FDF_SFC_192KHZ; 
    474         break; 
     360unsigned int 
     361AmdtpTransmitStreamProcessor::getPacketsPerPeriod() 
     362
     363    return ( m_manager->getPeriodSize() ) /m_syt_interval; 
     364
     365 
     366bool AmdtpTransmitStreamProcessor::prepareChild() 
     367
     368    debugOutput ( DEBUG_LEVEL_VERBOSE, "Preparing (%p)...\n", this ); 
     369    switch ( m_manager->getNominalRate() ) 
     370    { 
     371        case 32000: 
     372            m_syt_interval = 8; 
     373            m_fdf = IEC61883_FDF_SFC_32KHZ; 
     374            break; 
     375        case 44100: 
     376            m_syt_interval = 8; 
     377            m_fdf = IEC61883_FDF_SFC_44K1HZ; 
     378            break; 
     379        default: 
     380        case 48000: 
     381            m_syt_interval = 8; 
     382            m_fdf = IEC61883_FDF_SFC_48KHZ; 
     383            break; 
     384        case 88200: 
     385            m_syt_interval = 16; 
     386            m_fdf = IEC61883_FDF_SFC_88K2HZ; 
     387            break; 
     388        case 96000: 
     389            m_syt_interval = 16; 
     390            m_fdf = IEC61883_FDF_SFC_96KHZ; 
     391            break; 
     392        case 176400: 
     393            m_syt_interval = 32; 
     394            m_fdf = IEC61883_FDF_SFC_176K4HZ; 
     395            break; 
     396        case 192000: 
     397            m_syt_interval = 32; 
     398            m_fdf = IEC61883_FDF_SFC_192KHZ; 
     399            break; 
    475400    } 
    476401 
     
    481406        m_manager->getNominalRate(), 
    482407        m_dimension, 
    483         m_syt_interval); 
    484  
    485     // prepare the framerate estimate 
    486     float ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_manager->getNominalRate()); 
    487     m_ticks_per_frame=ticks_per_frame; 
    488  
    489     // initialize internal buffer 
    490     m_ringbuffer_size_frames=m_manager->getNbBuffers() * m_manager->getPeriodSize(); 
    491  
    492     assert(m_data_buffer); 
    493     m_data_buffer->setBufferSize(m_ringbuffer_size_frames * 2); 
    494     m_data_buffer->setEventSize(sizeof(quadlet_t)); 
    495     m_data_buffer->setEventsPerFrame(m_dimension); 
    496  
    497     m_data_buffer->setUpdatePeriod(m_manager->getPeriodSize()); 
    498     m_data_buffer->setNominalRate(ticks_per_frame); 
    499  
    500     m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND); 
    501  
    502     m_data_buffer->prepare(); 
    503  
    504     // set the parameters of ports we can: 
    505     // we want the audio ports to be period buffered, 
    506     // and the midi ports to be packet buffered 
     408        m_syt_interval ); 
     409 
    507410    for ( PortVectorIterator it = m_Ports.begin(); 
    508           it != m_Ports.end(); 
    509           ++it ) 
    510     { 
    511         debugOutput(DEBUG_LEVEL_VERBOSE, "Setting up port %s\n",(*it)->getName().c_str()); 
    512         if(!(*it)->setBufferSize(m_manager->getPeriodSize())) { 
    513             debugFatal("Could not set buffer size to %d\n",m_manager->getPeriodSize()); 
    514             return false; 
    515         } 
    516  
    517  
    518         switch ((*it)->getPortType()) { 
    519             case Port::E_Audio: 
    520                 if(!(*it)->setSignalType(Port::E_PeriodSignalled)) { 
    521                     debugFatal("Could not set signal type to PeriodSignalling"); 
    522                     return false; 
    523                 } 
    524                 debugWarning("---------------- ! Doing hardcoded test setup ! --------------\n"); 
    525                 // buffertype and datatype are dependant on the API 
    526                 if(!(*it)->setBufferType(Port::E_PointerBuffer)) { 
    527                     debugFatal("Could not set buffer type"); 
    528                     return false; 
    529                 } 
    530                 if(!(*it)->useExternalBuffer(true)) { 
    531                     debugFatal("Could not set external buffer usage"); 
    532                     return false; 
    533                 } 
    534  
    535                 if(!(*it)->setDataType(Port::E_Float)) { 
    536                     debugFatal("Could not set data type"); 
    537                     return false; 
    538                 } 
    539  
    540  
    541                 break; 
    542             case Port::E_Midi: 
    543                 if(!(*it)->setSignalType(Port::E_PacketSignalled)) { 
    544                     debugFatal("Could not set signal type to PeriodSignalling"); 
    545                     return false; 
    546                 } 
    547  
    548                 // we use a timing unit of 10ns 
    549                 // this makes sure that for the max syt interval 
    550                 // we don't have rounding, and keeps the numbers low 
    551                 // we have 1 slot every 8 events 
    552                 // we have syt_interval events per packet 
    553                 // => syt_interval/8 slots per packet 
    554                 // packet rate is 8000pkt/sec => interval=125us 
    555                 // so the slot interval is (1/8000)/(syt_interval/8) 
    556                 // or: 1/(1000 * syt_interval) sec 
    557                 // which is 1e9/(1000*syt_interval) nsec 
    558                 // or 100000/syt_interval 'units' 
    559                 // the event interval is fixed to 320us = 32000 'units' 
    560                 if(!(*it)->useRateControl(true,(100000/m_syt_interval),32000, false)) { 
    561                     debugFatal("Could not set signal type to PeriodSignalling"); 
    562                     return false; 
    563                 } 
    564  
    565                 // buffertype and datatype are dependant on the API 
    566                 debugWarning("---------------- ! Doing hardcoded test setup ! --------------\n"); 
    567                 // buffertype and datatype are dependant on the API 
    568                 if(!(*it)->setBufferType(Port::E_RingBuffer)) { 
    569                     debugFatal("Could not set buffer type"); 
    570                     return false; 
    571                 } 
    572                 if(!(*it)->setDataType(Port::E_MidiEvent)) { 
    573                     debugFatal("Could not set data type"); 
    574                     return false; 
     411            it != m_Ports.end(); 
     412            ++it ) 
     413    { 
     414        if ( ( *it )->getPortType() == Port::E_Midi ) 
     415        { 
     416            // we use a timing unit of 10ns 
     417            // this makes sure that for the max syt interval 
     418            // we don't have rounding, and keeps the numbers low 
     419            // we have 1 slot every 8 events 
     420            // we have syt_interval events per packet 
     421            // => syt_interval/8 slots per packet 
     422            // packet rate is 8000pkt/sec => interval=125us 
     423            // so the slot interval is (1/8000)/(syt_interval/8) 
     424            // or: 1/(1000 * syt_interval) sec 
     425            // which is 1e9/(1000*syt_interval) nsec 
     426            // or 100000/syt_interval 'units' 
     427            // the event interval is fixed to 320us = 32000 'units' 
     428            if ( ! ( *it )->useRateControl ( true, ( 100000/m_syt_interval ),32000, false ) ) 
     429            { 
     430                debugFatal ( "Could not set signal type to PeriodSignalling" ); 
     431                return false; 
     432            } 
     433            break; 
     434        } 
     435    } 
     436 
     437    debugOutput ( DEBUG_LEVEL_VERBOSE, "Prepared for:\n" ); 
     438    debugOutput ( DEBUG_LEVEL_VERBOSE, " Samplerate: %d, FDF: %d, DBS: %d, SYT: %d\n", 
     439                m_manager->getNominalRate(), m_fdf, m_dimension, m_syt_interval ); 
     440    debugOutput ( DEBUG_LEVEL_VERBOSE, " PeriodSize: %d, NbBuffers: %d\n", 
     441                m_manager->getPeriodSize(), m_manager->getNbBuffers() ); 
     442    debugOutput ( DEBUG_LEVEL_VERBOSE, " Port: %d, Channel: %d\n", 
     443                m_port,m_channel ); 
     444    return true; 
     445
     446 
     447/* 
     448* compose the event streams for the packets from the port buffers 
     449*/ 
     450bool AmdtpTransmitStreamProcessor::processWriteBlock ( char *data, 
     451        unsigned int nevents, unsigned int offset ) 
     452
     453    bool no_problem=true; 
     454 
     455    for ( PortVectorIterator it = m_PeriodPorts.begin(); 
     456            it != m_PeriodPorts.end(); 
     457            ++it ) 
     458    { 
     459 
     460        if ( ( *it )->isDisabled() ) {continue;}; 
     461 
     462        //FIXME: make this into a static_cast when not DEBUG? 
     463 
     464        AmdtpPortInfo *pinfo=dynamic_cast<AmdtpPortInfo *> ( *it ); 
     465        assert ( pinfo ); // this should not fail!! 
     466 
     467        switch ( pinfo->getFormat() ) 
     468        { 
     469            case AmdtpPortInfo::E_MBLA: 
     470                if ( encodePortToMBLAEvents ( static_cast<AmdtpAudioPort *> ( *it ), ( quadlet_t * ) data, offset, nevents ) ) 
     471                { 
     472                    debugWarning ( "Could not encode port %s to MBLA events", ( *it )->getName().c_str() ); 
     473                    no_problem=false; 
    575474                } 
    576475                break; 
    577             default: 
    578                 debugWarning("Unsupported port type specified\n"); 
     476            case AmdtpPortInfo::E_SPDIF: // still unimplemented 
    579477                break; 
    580         } 
    581     } 
    582  
    583     // the API specific settings of the ports should already be set, 
    584     // as this is called from the processorManager->prepare() 
    585     // so we can init the ports 
    586     if(!initPorts()) { 
    587         debugFatal("Could not initialize ports!\n"); 
    588         return false; 
    589     } 
    590  
    591     if(!preparePorts()) { 
    592         debugFatal("Could not initialize ports!\n"); 
    593         return false; 
    594     } 
    595  
    596     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepared for:\n"); 
    597     debugOutput( DEBUG_LEVEL_VERBOSE, " Samplerate: %d, FDF: %d, DBS: %d, SYT: %d\n", 
    598              m_manager->getNominalRate(),m_fdf,m_dimension,m_syt_interval); 
    599     debugOutput( DEBUG_LEVEL_VERBOSE, " PeriodSize: %d, NbBuffers: %d\n", 
    600              m_manager->getPeriodSize(), m_manager->getNbBuffers()); 
    601     debugOutput( DEBUG_LEVEL_VERBOSE, " Port: %d, Channel: %d\n", 
    602              m_port,m_channel); 
    603  
    604     return true; 
    605  
    606 
    607  
    608 bool AmdtpTransmitStreamProcessor::prepareForStart() { 
    609     return true; 
    610 
    611  
    612 bool AmdtpTransmitStreamProcessor::prepareForStop() { 
    613     return true; 
    614 
    615  
    616 bool AmdtpTransmitStreamProcessor::prepareForEnable(uint64_t time_to_enable_at) { 
    617  
    618 //     if (!StreamProcessor::prepareForEnable(time_to_enable_at)) { 
    619 //         debugError("StreamProcessor::prepareForEnable failed\n"); 
    620 //         return false; 
    621 //     } 
    622  
    623     return true; 
    624 
    625  
    626 unsigned int 
    627 AmdtpTransmitStreamProcessor::getPacketsPerPeriod()  
    628 
    629     return (m_manager->getPeriodSize())/m_syt_interval; 
    630 
    631  
    632 bool AmdtpTransmitStreamProcessor::transferSilence(unsigned int nframes) { 
    633     bool retval; 
    634     signed int fc; 
    635     ffado_timestamp_t ts_tail_tmp; 
    636     uint64_t ts_tail; 
    637      
    638     // prepare a buffer of silence 
    639     char *dummybuffer=(char *)calloc(sizeof(quadlet_t),nframes*m_dimension); 
    640     transmitSilenceBlock(dummybuffer, nframes, 0); 
    641  
    642      
    643     m_data_buffer->getBufferTailTimestamp(&ts_tail_tmp, &fc); 
    644     if (fc != 0) { 
    645         debugWarning("Prefilling a buffer that already contains %d frames\n", fc); 
    646     } 
    647  
    648     ts_tail = (uint64_t)ts_tail_tmp; 
    649     // modify the timestamp such that it makes sense 
    650     ts_tail = addTicks(ts_tail, (uint64_t)(nframes * getTicksPerFrame())); 
    651     // add the silence data to the ringbuffer 
    652     if(m_data_buffer->writeFrames(nframes, dummybuffer, ts_tail)) { 
    653         retval=true; 
    654     } else { 
    655         debugWarning("Could not write to event buffer\n"); 
    656         retval=false; 
    657     } 
    658  
    659     free(dummybuffer); 
    660  
    661     return retval; 
    662 
    663  
    664 bool AmdtpTransmitStreamProcessor::putFrames(unsigned int nbframes, int64_t ts) { 
    665     m_PeriodStat.mark(m_data_buffer->getBufferFill()); 
    666     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "AmdtpTransmitStreamProcessor::putFrames(%d, %llu)\n", nbframes, ts); 
    667  
    668     // transfer the data 
    669     m_data_buffer->blockProcessWriteFrames(nbframes, ts); 
    670  
    671     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, " New timestamp: %llu\n", ts); 
    672  
    673     return true; // FIXME: what about failure? 
    674 
    675  
    676 bool AmdtpTransmitStreamProcessor::putFramesDry(unsigned int nbframes, int64_t ts) { 
    677     m_PeriodStat.mark(m_data_buffer->getBufferFill()); 
    678     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "AmdtpTransmitStreamProcessor::putFramesDry(%d, %llu)\n", nbframes, ts); 
    679  
    680     bool retval; 
    681     char dummybuffer[sizeof(quadlet_t)*nbframes*m_dimension]; 
    682  
    683     transmitSilenceBlock(dummybuffer, nbframes, 0); 
    684     // add the silence data to the ringbuffer 
    685     if(m_data_buffer->writeFrames(nbframes, dummybuffer, ts)) { 
    686         retval=true; 
    687     } else { 
    688         debugWarning("Could not write %u events to event buffer\n", nbframes); 
    689         retval=false; 
    690     } 
    691  
    692     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, " New timestamp: %llu\n", ts); 
    693     return retval; 
    694 
    695  
    696 /* 
    697  * write received events to the stream ringbuffers. 
    698  */ 
    699  
    700 bool AmdtpTransmitStreamProcessor::processWriteBlock(char *data, 
    701                        unsigned int nevents, unsigned int offset) 
    702 
    703     bool no_problem=true; 
    704  
     478            default: // ignore 
     479                break; 
     480        } 
     481    } 
     482    return no_problem; 
     483
     484 
     485bool AmdtpTransmitStreamProcessor::transmitSilenceBlock ( char *data, 
     486        unsigned int nevents, unsigned int offset ) 
     487
     488    bool problem = false; 
    705489    for ( PortVectorIterator it = m_PeriodPorts.begin(); 
    706           it != m_PeriodPorts.end(); 
    707           ++it ) 
    708     { 
    709  
    710         if((*it)->isDisabled()) {continue;}; 
    711  
     490            it != m_PeriodPorts.end(); 
     491            ++it ) 
     492    { 
    712493        //FIXME: make this into a static_cast when not DEBUG? 
    713  
    714         AmdtpPortInfo *pinfo=dynamic_cast<AmdtpPortInfo *>(*it); 
    715         assert(pinfo); // this should not fail!! 
    716  
    717         switch(pinfo->getFormat()) { 
    718         case AmdtpPortInfo::E_MBLA: 
    719             if(encodePortToMBLAEvents(static_cast<AmdtpAudioPort *>(*it), (quadlet_t *)data, offset, nevents)) { 
    720                 debugWarning("Could not encode port %s to MBLA events",(*it)->getName().c_str()); 
    721                 no_problem=false; 
    722             } 
    723             break; 
    724         case AmdtpPortInfo::E_SPDIF: // still unimplemented 
    725             break; 
    726         default: // ignore 
    727             break; 
    728         } 
    729     } 
    730     return no_problem; 
    731  
    732 
    733  
    734 int AmdtpTransmitStreamProcessor::transmitSilenceBlock(char *data, 
    735                        unsigned int nevents, unsigned int offset) 
    736 
    737     int problem=0; 
    738  
    739     for ( PortVectorIterator it = m_PeriodPorts.begin(); 
    740           it != m_PeriodPorts.end(); 
    741           ++it ) 
    742     { 
    743  
    744         //FIXME: make this into a static_cast when not DEBUG? 
    745  
    746         AmdtpPortInfo *pinfo=dynamic_cast<AmdtpPortInfo *>(*it); 
    747         assert(pinfo); // this should not fail!! 
    748  
    749         switch(pinfo->getFormat()) { 
    750         case AmdtpPortInfo::E_MBLA: 
    751             if(encodeSilencePortToMBLAEvents(static_cast<AmdtpAudioPort *>(*it), (quadlet_t *)data, offset, nevents)) { 
    752                 debugWarning("Could not encode port %s to MBLA events",(*it)->getName().c_str()); 
    753                 problem=1; 
    754             } 
    755             break; 
    756         case AmdtpPortInfo::E_SPDIF: // still unimplemented 
    757             break; 
    758         default: // ignore 
    759             break; 
     494        AmdtpPortInfo *pinfo=dynamic_cast<AmdtpPortInfo *> ( *it ); 
     495        assert ( pinfo ); // this should not fail!! 
     496 
     497        switch ( pinfo->getFormat() ) 
     498        { 
     499            case AmdtpPortInfo::E_MBLA: 
     500                if ( encodeSilencePortToMBLAEvents ( static_cast<AmdtpAudioPort *> ( *it ), ( quadlet_t * ) data, offset, nevents ) ) 
     501                { 
     502                    debugWarning ( "Could not encode port %s to MBLA events", ( *it )->getName().c_str() ); 
     503                    problem = true; 
     504                } 
     505                break; 
     506            case AmdtpPortInfo::E_SPDIF: // still unimplemented 
     507                break; 
     508            default: // ignore 
     509                break; 
    760510        } 
    761511    } 
    762512    return problem; 
    763  
    764513} 
    765514 
    766515/** 
    767  * @brief decode a packet for the packet-based ports 
    768 
    769  * @param data Packet data 
    770  * @param nevents number of events in data (including events of other ports & port types) 
    771  * @param dbc DataBlockCount value for this packet 
    772  * @return true if all successfull 
    773  */ 
    774 bool AmdtpTransmitStreamProcessor::encodePacketPorts(quadlet_t *data, unsigned int nevents, unsigned int dbc
     516* @brief decode a packet for the packet-based ports 
     517
     518* @param data Packet data 
     519* @param nevents number of events in data (including events of other ports & port types) 
     520* @param dbc DataBlockCount value for this packet 
     521* @return true if all successfull 
     522*/ 
     523bool AmdtpTransmitStreamProcessor::encodePacketPorts ( quadlet_t *data, unsigned int nevents, unsigned int dbc
    775524{ 
    776525    bool ok=true; 
     
    781530 
    782531    for ( PortVectorIterator it = m_PacketPorts.begin(); 
    783           it != m_PacketPorts.end(); 
    784           ++it ) 
     532            it != m_PacketPorts.end(); 
     533            ++it ) 
    785534    { 
    786535 
    787536#ifdef DEBUG 
    788         AmdtpPortInfo *pinfo=dynamic_cast<AmdtpPortInfo *>(*it); 
    789         assert(pinfo); // this should not fail!! 
     537        AmdtpPortInfo *pinfo=dynamic_cast<AmdtpPortInfo *> ( *it ); 
     538        assert ( pinfo ); // this should not fail!! 
    790539 
    791540        // the only packet type of events for AMDTP is MIDI in mbla 
    792         assert(pinfo->getFormat()==AmdtpPortInfo::E_Midi); 
     541        assert ( pinfo->getFormat() ==AmdtpPortInfo::E_Midi ); 
    793542#endif 
    794543 
    795         AmdtpMidiPort *mp=static_cast<AmdtpMidiPort *>(*it); 
     544        AmdtpMidiPort *mp=static_cast<AmdtpMidiPort *> ( *it ); 
    796545 
    797546        // we encode this directly (no function call) due to the high frequency 
     
    806555        // first prefill the buffer with NO_DATA's on all time muxed channels 
    807556 
    808         for(j = (dbc & 0x07)+mp->getLocation(); j < nevents; j += 8) { 
    809              
     557        for ( j = ( dbc & 0x07 ) +mp->getLocation(); j < nevents; j += 8 ) 
     558        { 
     559 
    810560            quadlet_t tmpval; 
    811              
    812             target_event=(quadlet_t *)(data + ((j * m_dimension) + mp->getPosition())); 
    813              
    814             if(mp->canRead()) { // we can send a byte 
    815                 mp->readEvent(&byte); 
     561 
     562            target_event= ( quadlet_t * ) ( data + ( ( j * m_dimension ) + mp->getPosition() ) ); 
     563 
     564            if ( mp->canRead() )   // we can send a byte 
     565            { 
     566                mp->readEvent ( &byte ); 
    816567                byte &= 0xFF; 
    817                 tmpval=htonl( 
    818                     IEC61883_AM824_SET_LABEL((byte)<<16, 
    819                                              IEC61883_AM824_LABEL_MIDI_1X)); 
    820  
    821                 debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "MIDI port %s, pos=%d, loc=%d, dbc=%d, nevents=%d, dim=%d\n", 
    822                     mp->getName().c_str(), mp->getPosition(), mp->getLocation(), dbc, nevents, m_dimension); 
    823                 debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "base=%p, target=%p, value=%08X\n", 
    824                     data, target_event, tmpval); 
    825                      
    826             } else { 
     568                tmpval=htonl ( 
     569                        IEC61883_AM824_SET_LABEL ( ( byte ) <<16, 
     570                                                    IEC61883_AM824_LABEL_MIDI_1X ) ); 
     571 
     572                debugOutput ( DEBUG_LEVEL_ULTRA_VERBOSE, "MIDI port %s, pos=%d, loc=%d, dbc=%d, nevents=%d, dim=%d\n", 
     573                            mp->getName().c_str(), mp->getPosition(), mp->getLocation(), dbc, nevents, m_dimension ); 
     574                debugOutput ( DEBUG_LEVEL_ULTRA_VERBOSE, "base=%p, target=%p, value=%08X\n", 
     575                            data, target_event, tmpval ); 
     576 
     577            } 
     578            else 
     579            { 
    827580                // can't send a byte, either because there is no byte, 
    828581                // or because this would exceed the maximum rate 
    829                 tmpval=htonl
    830                     IEC61883_AM824_SET_LABEL(0,IEC61883_AM824_LABEL_MIDI_NO_DATA)); 
    831             } 
    832              
     582                tmpval=htonl
     583                        IEC61883_AM824_SET_LABEL ( 0,IEC61883_AM824_LABEL_MIDI_NO_DATA ) ); 
     584            } 
     585 
    833586            *target_event=tmpval; 
    834587        } 
    835588 
    836589    } 
    837  
    838590    return ok; 
    839591} 
    840592 
    841593 
    842 int AmdtpTransmitStreamProcessor::encodePortToMBLAEvents(AmdtpAudioPort *p, quadlet_t *data, 
    843                        unsigned int offset, unsigned int nevents
     594int AmdtpTransmitStreamProcessor::encodePortToMBLAEvents ( AmdtpAudioPort *p, quadlet_t *data, 
     595        unsigned int offset, unsigned int nevents
    844596{ 
    845597    unsigned int j=0; 
     
    847599    quadlet_t *target_event; 
    848600 
    849     target_event=(quadlet_t *)(data + p->getPosition()); 
    850  
    851     switch(p->getDataType()) { 
     601    target_event= ( quadlet_t * ) ( data + p->getPosition() ); 
     602 
     603    switch ( p->getDataType() ) 
     604    { 
    852605        default: 
    853606        case Port::E_Int24: 
    854             { 
    855                 quadlet_t *buffer=(quadlet_t *)(p->getBufferAddress()); 
    856  
    857                 assert(nevents + offset <= p->getBufferSize()); 
    858  
    859                 buffer+=offset; 
    860  
    861                 for(j = 0; j < nevents; j += 1) { // decode max nsamples 
    862                     *target_event = htonl((*(buffer) & 0x00FFFFFF) | 0x40000000); 
    863                     buffer++; 
    864                     target_event += m_dimension; 
    865                 } 
    866             } 
    867             break; 
     607        { 
     608            quadlet_t *buffer= ( quadlet_t * ) ( p->getBufferAddress() ); 
     609 
     610            assert ( nevents + offset <= p->getBufferSize() ); 
     611 
     612            buffer+=offset; 
     613 
     614            for ( j = 0; j < nevents; j += 1 )   // decode max nsamples 
     615            { 
     616                *target_event = htonl ( ( * ( buffer ) & 0x00FFFFFF ) | 0x40000000 ); 
     617                buffer++; 
     618                target_event += m_dimension; 
     619            } 
     620        } 
     621        break; 
    868622        case Port::E_Float: 
    869             { 
    870                 const float multiplier = (float)(0x7FFFFF00); 
    871                 float *buffer=(float *)(p->getBufferAddress()); 
    872  
    873                 assert(nevents + offset <= p->getBufferSize()); 
    874  
    875                 buffer+=offset; 
    876  
    877                 for(j = 0; j < nevents; j += 1) { // decode max nsamples 
    878  
    879                     // don't care for overflow 
    880                     float v = *buffer * multiplier;  // v: -231 .. 231 
    881                     unsigned int tmp = ((int)v); 
    882                     *target_event = htonl((tmp >> 8) | 0x40000000); 
    883  
    884                     buffer++; 
    885                     target_event += m_dimension; 
    886                 } 
    887             } 
    888             break; 
     623        { 
     624            const float multiplier = ( float ) ( 0x7FFFFF00 ); 
     625            float *buffer= ( float * ) ( p->getBufferAddress() ); 
     626 
     627            assert ( nevents + offset <= p->getBufferSize() ); 
     628 
     629            buffer+=offset; 
     630 
     631            for ( j = 0; j < nevents; j += 1 )   // decode max nsamples 
     632            { 
     633 
     634                // don't care for overflow 
     635                float v = *buffer * multiplier;  // v: -231 .. 231 
     636                unsigned int tmp = ( ( int ) v ); 
     637                *target_event = htonl ( ( tmp >> 8 ) | 0x40000000 ); 
     638 
     639                buffer++; 
     640                target_event += m_dimension; 
     641            } 
     642        } 
     643        break; 
    889644    } 
    890645 
    891646    return 0; 
    892647} 
    893 int AmdtpTransmitStreamProcessor::encodeSilencePortToMBLAEvents(AmdtpAudioPort *p, quadlet_t *data, 
    894                        unsigned int offset, unsigned int nevents
     648int AmdtpTransmitStreamProcessor::encodeSilencePortToMBLAEvents ( AmdtpAudioPort *p, quadlet_t *data, 
     649        unsigned int offset, unsigned int nevents
    895650{ 
    896651    unsigned int j=0; 
     
    898653    quadlet_t *target_event; 
    899654 
    900     target_event=(quadlet_t *)(data + p->getPosition()); 
    901  
    902     switch(p->getDataType()) { 
     655    target_event= ( quadlet_t * ) ( data + p->getPosition() ); 
     656 
     657    switch ( p->getDataType() ) 
     658    { 
    903659        default: 
    904660        case Port::E_Int24: 
    905661        case Port::E_Float: 
    906             { 
    907                 for(j = 0; j < nevents; j += 1) { // decode max nsamples 
    908                     *target_event = htonl(0x40000000); 
    909                     target_event += m_dimension; 
    910                 } 
    911             } 
    912             break; 
     662        { 
     663            for ( j = 0; j < nevents; j += 1 )   // decode max nsamples 
     664            { 
     665                *target_event = htonl ( 0x40000000 ); 
     666                target_event += m_dimension; 
     667            } 
     668        } 
     669        break; 
    913670    } 
    914671 
  • branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.h

    r719 r720  
    8080    virtual ~AmdtpTransmitStreamProcessor() {}; 
    8181 
    82     enum raw1394_iso_disposition 
    83             getPacket(unsigned char *data, unsigned int *length, 
    84                     unsigned char *tag, unsigned char *sy, 
    85                     int cycle, unsigned int dropped, unsigned int max_length); 
     82    bool generatePacketHeader(unsigned char *data, unsigned int *length, 
     83                              unsigned char *tag, unsigned char *sy, 
     84                              int cycle, unsigned int dropped, unsigned int max_length); 
     85    bool generatePacketData(unsigned char *data, unsigned int *length, 
     86                            unsigned char *tag, unsigned char *sy, 
     87                            int cycle, unsigned int dropped, unsigned int max_length); 
     88    bool generateSilentPacketHeader(unsigned char *data, unsigned int *length, 
     89                                    unsigned char *tag, unsigned char *sy, 
     90                                    int cycle, unsigned int dropped, unsigned int max_length); 
     91    bool generateSilentPacketData(unsigned char *data, unsigned int *length, 
     92                                  unsigned char *tag, unsigned char *sy, 
     93                                  int cycle, unsigned int dropped, unsigned int max_length); 
     94    virtual bool prepareChild(); 
    8695 
    87     virtual unsigned int getEventsPerFrame(); 
    88     virtual unsigned int getEventSize() {return 4;}; 
    89     virtual unsigned int getUpdatePeriod(); 
    90  
    91     bool reset(); 
    92     bool prepareChild(); 
    93  
    94     bool prepareForStop(); 
    95     bool prepareForStart(); 
    96  
    97     bool prepareForEnable(uint64_t time_to_enable_at); 
    98  
    99     bool putFrames(unsigned int nbframes, int64_t ts); ///< transfer the buffer contents from the client 
    100     bool putFramesDry(unsigned int nbframes, int64_t ts); 
    101  
    102     // We have 1 period of samples = m_period 
    103     // this period takes m_period/m_framerate seconds of time 
    104     // during this time, 8000 packets are sent 
    105 //     unsigned int getPacketsPerPeriod() {return (m_period*8000)/m_framerate;}; 
    106  
    107     // however, if we only count the number of used packets 
    108     // it is m_period / m_syt_interval 
     96public: 
     97    virtual unsigned int getEventSize() 
     98                    {return 4;}; 
     99    virtual unsigned int getMaxPacketSize() 
     100                    {return 4 * (2 + m_syt_interval * m_dimension);}; 
     101    virtual unsigned int getEventsPerFrame() 
     102                    { return m_dimension; }; 
     103    virtual unsigned int getNominalFramesPerPacket() 
     104                    {return m_syt_interval;}; 
    109105    unsigned int getPacketsPerPeriod(); 
    110  
    111     unsigned int getMaxPacketSize() {return 4 * (2 + m_syt_interval * m_dimension);}; 
    112106 
    113107protected: 
    114108    bool processWriteBlock(char *data, unsigned int nevents, unsigned int offset); 
     109    bool transmitSilenceBlock(char *data, unsigned int nevents, unsigned int offset); 
    115110 
    116     struct iec61883_cip m_cip_status; 
    117  
    118     int m_dimension; 
    119     unsigned int m_syt_interval; 
    120  
    121     int m_fdf; 
    122  
    123     bool prefill(); 
    124  
     111private: 
    125112    unsigned int fillNoDataPacketHeader(struct iec61883_packet *packet, unsigned int* length); 
    126113    unsigned int fillDataPacketHeader(struct iec61883_packet *packet, unsigned int* length, uint32_t ts); 
    127  
    128  
    129     bool transferSilence(unsigned int size); 
    130114 
    131115    int transmitBlock(char *data, unsigned int nevents, 
    132116                        unsigned int offset); 
    133117 
    134     bool encodePacketPorts(quadlet_t *data, unsigned int nevents, unsigned int dbc); 
     118    bool encodePacketPorts(quadlet_t *data, unsigned int nevents, 
     119                           unsigned int dbc); 
     120 
    135121    int encodePortToMBLAEvents(AmdtpAudioPort *, quadlet_t *data, 
    136122                                unsigned int offset, unsigned int nevents); 
    137  
    138     int transmitSilenceBlock(char *data, unsigned int nevents, 
    139                         unsigned int offset); 
    140123    int encodeSilencePortToMBLAEvents(AmdtpAudioPort *, quadlet_t *data, 
    141124                                unsigned int offset, unsigned int nevents); 
    142     void updatePreparedState(); 
    143125 
    144     unsigned long m_last_timestamp; 
    145  
     126    struct iec61883_cip m_cip_status; 
     127    int m_dimension; 
     128    unsigned int m_syt_interval; 
     129    int m_fdf; 
    146130    unsigned int m_dbc; 
    147  
    148     unsigned int m_ringbuffer_size_frames; 
    149131}; 
    150132 
  • branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.cpp

    r719 r720  
    327327} 
    328328 
     329enum raw1394_iso_disposition 
     330StreamProcessor::getPacket(unsigned char *data, unsigned int *length, 
     331                           unsigned char *tag, unsigned char *sy, 
     332                           int cycle, unsigned int dropped, unsigned int max_length) { 
     333    if (cycle<0) { 
     334        *tag = 0; 
     335        *sy = 0; 
     336        *length = 0; 
     337        return RAW1394_ISO_OK; 
     338    } 
     339 
     340    int dropped_cycles = diffCycles(cycle, m_last_cycle) - 1; 
     341    if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles); 
     342    else m_dropped += dropped_cycles; 
     343    if (dropped_cycles > 0) debugWarning("(%p) dropped %d packets on cycle %u\n", this, dropped_cycles, cycle); 
     344    m_last_cycle = cycle; 
     345 
     346    // bypass based upon state 
     347    if (m_state == ePS_Invalid) { 
     348        debugError("Should not have state %s\n", ePSToString(m_state) ); 
     349        return RAW1394_ISO_ERROR; 
     350    } 
     351    if (m_state == ePS_Created) { 
     352        *tag = 0; 
     353        *sy = 0; 
     354        *length = 0; 
     355        return RAW1394_ISO_DEFER; 
     356    } 
     357 
     358    // normal processing 
     359    // note that we can't use getCycleTimer directly here, 
     360    // because packets are queued in advance. This means that 
     361    // we the packet we are constructing will be sent out 
     362    // on 'cycle', not 'now'. 
     363    unsigned int ctr = m_handler->getCycleTimer(); 
     364    int now_cycles = (int)CYCLE_TIMER_GET_CYCLES(ctr); 
     365 
     366    // the difference between the cycle this 
     367    // packet is intended for and 'now' 
     368    int cycle_diff = diffCycles(cycle, now_cycles); 
     369 
     370    #ifdef DEBUG 
     371    if(cycle_diff < 0) { 
     372        debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy)\n", 
     373            cycle, now_cycles); 
     374    } 
     375    #endif 
     376 
     377    // store the previous timestamp 
     378    m_last_timestamp2 = m_last_timestamp; 
     379 
     380    // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles) 
     381    //       it happens on the first 'good' cycle for the wait condition 
     382    //       or on the first received cycle that is received afterwards (might be a problem) 
     383 
     384    // check whether we are waiting for a stream to be disabled 
     385    if(m_state == ePS_WaitingForStreamDisable) { 
     386        // we then check whether we have to switch on this cycle 
     387        if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) { 
     388            debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n"); 
     389            m_next_state = ePS_DryRunning; 
     390            if (!updateState()) { // we are allowed to change the state directly 
     391                debugError("Could not update state!\n"); 
     392                return RAW1394_ISO_ERROR; 
     393            } 
     394        } else { 
     395            // not time to disable yet 
     396        } 
     397    } 
     398    // check whether we are waiting for a stream to be enabled 
     399    else if(m_state == ePS_WaitingForStreamEnable) { 
     400        // we then check whether we have to switch on this cycle 
     401        if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) { 
     402            debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n"); 
     403            m_next_state = ePS_Running; 
     404            if (!updateState()) { // we are allowed to change the state directly 
     405                debugError("Could not update state!\n"); 
     406                return RAW1394_ISO_ERROR; 
     407            } 
     408        } else { 
     409            // not time to enable yet 
     410        } 
     411        // we are dryRunning hence data should be processed in any case 
     412    } 
     413    // check whether we are waiting for a stream to startup 
     414    else if(m_state == ePS_WaitingForStream) { 
     415        // as long as the cycle parameter is not in sync with 
     416        // the current time, the stream is considered not 
     417        // to be 'running' 
     418        // we then check whether we have to switch on this cycle 
     419        if ((cycle_diff >= 0) && (diffCycles(cycle, m_cycle_to_switch_state) >= 0)) { 
     420            debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to good packet\n"); 
     421            // hence go to the dryRunning state 
     422            m_next_state = ePS_DryRunning; 
     423            if (!updateState()) { // we are allowed to change the state directly 
     424                debugError("Could not update state!\n"); 
     425                return RAW1394_ISO_ERROR; 
     426            } 
     427        } else { 
     428            // not time (yet) to switch state 
     429        } 
     430    } 
     431    else if(m_state == ePS_Running) { 
     432        // check the packet header 
     433        if (generatePacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length)) { 
     434            debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT: CY=%04u TS=%011llu\n", 
     435                    cycle, m_last_timestamp); 
     436            // update some accounting 
     437            m_last_good_cycle = cycle; 
     438            m_last_dropped = dropped_cycles; 
     439 
     440            // check whether a state change has been requested 
     441            // note that only the wait state changes are synchronized with the cycles 
     442            if(m_state != m_next_state) { 
     443                debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n", 
     444                                                ePSToString(m_state), ePSToString(m_next_state)); 
     445                // execute the requested change 
     446                if (!updateState()) { // we are allowed to change the state directly 
     447                    debugError("Could not update state!\n"); 
     448                    return RAW1394_ISO_ERROR; 
     449                } 
     450            } 
     451 
     452            bool ok = generatePacketData(data, length, tag, sy, cycle, dropped_cycles, max_length); 
     453            // if an xrun occured, switch to the dryRunning state and 
     454            // allow for the xrun to be picked up 
     455            if (!ok) { 
     456                debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to xrun\n"); 
     457                m_next_state = ePS_DryRunning; 
     458                // execute the requested change 
     459                if (!updateState()) { // we are allowed to change the state directly 
     460                    debugError("Could not update state!\n"); 
     461                    return RAW1394_ISO_ERROR; 
     462                } 
     463                goto send_empty_packet; 
     464            } 
     465            return RAW1394_ISO_OK; 
     466        } 
     467    } 
     468    // we are not running, so send an empty packet 
     469    // we should generate a valid packet any time 
     470send_empty_packet: 
     471    // note that only the wait state changes are synchronized with the cycles 
     472    if(m_state != m_next_state) { 
     473        debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n", 
     474                                        ePSToString(m_state), ePSToString(m_next_state)); 
     475        // execute the requested change 
     476        if (!updateState()) { // we are allowed to change the state directly 
     477            debugError("Could not update state!\n"); 
     478            return RAW1394_ISO_ERROR; 
     479        } 
     480    } 
     481 
     482    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "XMIT EMPTY: CY=%04u\n", cycle); 
     483    generateSilentPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length); 
     484    generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length); 
     485    return RAW1394_ISO_DEFER; 
     486} 
     487 
     488 
    329489// Frame Transfer API 
     490/** 
     491 * Transfer a block of frames from the event buffer to the port buffers 
     492 * @param nbframes number of frames to transfer 
     493 * @param ts the timestamp that the LAST frame in the block should have 
     494 * @return  
     495 */ 
    330496bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) { 
    331497    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.getFrames(%d, %11llu)", nbframes, ts); 
     
    338504// FIXME: this should be done somewhere else 
    339505#ifdef DEBUG 
    340     uint64_t ts_head; 
     506    uint64_t ts_expected; 
    341507    signed int fc; 
    342508    int32_t lag_ticks; 
     
    345511    // in order to sync up multiple received streams, we should  
    346512    // use the ts parameter. It specifies the time of the block's  
    347     // first sample. 
     513    // last sample. 
     514     
     515    // determine the time at which we want reception to start 
     516    float srate = m_manager->getSyncSource().getTicksPerFrame(); 
     517    assert(srate != 0.0); 
     518    int64_t this_block_length_in_ticks = (int64_t)(((float)nbframes) * srate); 
    348519     
    349520    ffado_timestamp_t ts_head_tmp; 
    350521    m_data_buffer->getBufferHeadTimestamp(&ts_head_tmp, &fc); 
    351     ts_head=(uint64_t)ts_head_tmp; 
    352     lag_ticks=diffTicks(ts, ts_head); 
    353     float rate=m_data_buffer->getRate(); 
     522    ts_expected = addTicks((uint64_t)ts_head_tmp, this_block_length_in_ticks); 
    354523     
    355     assert(rate!=0.0); 
     524    lag_ticks = diffTicks(ts, ts_expected); 
    356525     
    357     lag_frames=(((float)lag_ticks)/rate); 
     526     
     527    lag_frames = (((float)lag_ticks) / srate); 
    358528     
    359529    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): drifts %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n", 
    360                  this, lag_ticks, lag_frames,rate, ts, ts_head, fc); 
    361  
    362     if (lag_frames>=1.0) { 
     530                 this, lag_ticks, lag_frames, srate, ts, ts_expected, fc); 
     531 
     532    if (lag_frames >= 1.0) { 
    363533        // the stream lags 
    364534        debugWarning( "stream (%p): lags  with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n", 
    365                       this, lag_ticks, lag_frames,rate, ts, ts_head, fc); 
    366     } else if (lag_frames<=-1.0) { 
     535                      this, lag_ticks, lag_frames, srate, ts, ts_expected, fc); 
     536    } else if (lag_frames <= -1.0) { 
    367537        // the stream leads 
    368538        debugWarning( "stream (%p): leads with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n", 
    369                       this, lag_ticks, lag_frames,rate, ts, ts_head, fc); 
     539                      this, lag_ticks, lag_frames, srate, ts, ts_expected, fc); 
    370540    } 
    371541#endif 
     
    386556} 
    387557 
     558bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts) { 
     559    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.putFrames(%d, %11llu)", nbframes, ts); 
     560    assert( getType() == ePT_Transmit ); 
     561    if(isDryRunning()) return putFramesDry(nbframes, ts); 
     562    else return putFramesWet(nbframes, ts); 
     563} 
     564 
     565bool 
     566StreamProcessor::putFramesWet(unsigned int nbframes, int64_t ts) { 
     567    debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesWet(%d, %llu)\n", nbframes, ts); 
     568    // transfer the data 
     569    m_data_buffer->blockProcessWriteFrames(nbframes, ts); 
     570    debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, " New timestamp: %llu\n", ts); 
     571    return true; // FIXME: what about failure? 
     572} 
     573 
     574bool 
     575StreamProcessor::putFramesDry(unsigned int nbframes, int64_t ts) { 
     576    debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesDry(%d, %llu)\n", nbframes, ts); 
     577    // do nothing 
     578    return true; 
     579} 
    388580 
    389581/*********************************************** 
     
    421613bool StreamProcessor::stop() 
    422614{ 
    423     uint64_t time_to_stop_at = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE); 
    424     int cnt; 
    425615    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stop...\n"); 
    426616    switch (m_state) { 
     
    437627} 
    438628 
    439 bool StreamProcessor::scheduleStateTransition(enum eProcessorState state, uint64_t time_instant) 
     629bool 
     630StreamProcessor::scheduleStateTransition(enum eProcessorState state, uint64_t time_instant) 
    440631{ 
    441632    // first set the time, since in the packet loop we first check m_state == m_next_state before 
     
    446637} 
    447638 
    448 bool StreamProcessor::scheduleAndWaitForStateTransition(enum eProcessorState state,  
    449                                                         uint64_t time_instant,  
    450                                                         enum eProcessorState wait_state) 
    451 
    452     int cnt=200; // 2 seconds, i.e. 2 cycles 
    453     if(!scheduleStateTransition(state, time_instant)) { 
    454         debugError("Could not schedule state transistion to %s\n", ePSToString(state)); 
     639bool 
     640StreamProcessor::waitForState(enum eProcessorState state, unsigned int timeout_ms) 
     641
     642    debugOutput(DEBUG_LEVEL_VERBOSE, "Waiting for state %s\n", ePSToString(state)); 
     643    int cnt = timeout_ms; 
     644    while (m_state != state && cnt) { 
     645        usleep(1000); 
     646        cnt--; 
     647    } 
     648    if(cnt==0) { 
     649        debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout\n"); 
    455650        return false; 
    456651    } 
    457     while (m_state != wait_state && cnt) { 
    458         usleep(10000); 
    459         cnt++; 
    460     } 
    461     if(cnt==0) { 
    462         debugError("Timeout entering Stopped state\n"); 
    463         return false; 
    464     } 
    465     debugOutput(DEBUG_LEVEL_VERBOSE, " entered state %s\n", ePSToString(wait_state)); 
    466652    return true; 
    467653} 
    468654 
    469 bool StreamProcessor::startDryRunning(int64_t t) { 
     655bool StreamProcessor::scheduleStartDryRunning(int64_t t) { 
    470656    uint64_t tx; 
    471657    if (t < 0) { 
     
    474660        tx = t; 
    475661    } 
    476     debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::startDryRunning for (%p)\n",this); 
     662    debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this); 
    477663    debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011lu\n", m_handler->getCycleTimerTicks()); 
    478664    debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%u)\n", tx, TICKS_TO_CYCLES(tx)); 
    479665    if (m_state == ePS_Stopped) { 
    480         return scheduleAndWaitForStateTransition(ePS_WaitingForStream, tx, ePS_DryRunning); 
     666        return scheduleStateTransition(ePS_WaitingForStream, tx); 
    481667    } else if (m_state == ePS_Running) { 
    482         return scheduleAndWaitForStateTransition(ePS_WaitingForStreamDisable, tx, ePS_DryRunning); 
     668        return scheduleStateTransition(ePS_WaitingForStreamDisable, tx); 
    483669    } else { 
    484670        debugError("Cannot switch to ePS_DryRunning from %s\n", ePSToString(m_state)); 
     
    487673} 
    488674 
    489 bool StreamProcessor::startRunning(int64_t t) { 
     675bool StreamProcessor::scheduleStartRunning(int64_t t) { 
    490676    uint64_t tx; 
    491677    if (t < 0) { 
     
    494680        tx = t; 
    495681    } 
    496     debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::startRunning for (%p)\n",this); 
     682    debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this); 
    497683    debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011lu\n", m_handler->getCycleTimerTicks()); 
    498684    debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%u)\n", tx, TICKS_TO_CYCLES(tx)); 
    499     return scheduleAndWaitForStateTransition(ePS_WaitingForStreamEnable, tx, ePS_Running); 
    500 } 
    501  
    502 bool StreamProcessor::stopDryRunning(int64_t t) { 
     685    return scheduleStateTransition(ePS_WaitingForStreamEnable, tx); 
     686} 
     687 
     688bool StreamProcessor::scheduleStopDryRunning(int64_t t) { 
    503689    uint64_t tx; 
    504690    if (t < 0) { 
     
    507693        tx = t; 
    508694    } 
    509     debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::stopDryRunning for (%p)\n",this); 
     695    debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this); 
    510696    debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011lu\n", m_handler->getCycleTimerTicks()); 
    511697    debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at               : %011llu (%u)\n", tx, TICKS_TO_CYCLES(tx)); 
    512     return scheduleAndWaitForStateTransition(ePS_Stopped, tx, ePS_Stopped); 
    513 } 
    514  
    515 bool StreamProcessor::stopRunning(int64_t t) { 
     698    return scheduleStateTransition(ePS_Stopped, tx); 
     699} 
     700 
     701bool StreamProcessor::scheduleStopRunning(int64_t t) { 
    516702    uint64_t tx; 
    517703    if (t < 0) { 
     
    520706        tx = t; 
    521707    } 
    522     debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::stopRunning for (%p)\n",this); 
     708    debugOutput(DEBUG_LEVEL_VERBOSE,"for %s SP (%p)\n", ePTToString(getType()), this); 
    523709    debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011lu\n", m_handler->getCycleTimerTicks()); 
    524710    debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at               : %011llu (%u)\n", tx, TICKS_TO_CYCLES(tx)); 
    525     return scheduleAndWaitForStateTransition(ePS_WaitingForStreamDisable, tx, ePS_DryRunning); 
    526 
     711    return scheduleStateTransition(ePS_WaitingForStreamDisable, tx); 
     712
     713 
     714bool StreamProcessor::startDryRunning(int64_t t) { 
     715    if(!scheduleStartDryRunning(t)) { 
     716        debugError("Could not schedule transition\n"); 
     717        return false; 
     718    } 
     719    if(!waitForState(ePS_DryRunning, 2000)) { 
     720        debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning)); 
     721        return false; 
     722    } 
     723    return true; 
     724
     725 
     726bool StreamProcessor::startRunning(int64_t t) { 
     727    if(!scheduleStartRunning(t)) { 
     728        debugError("Could not schedule transition\n"); 
     729        return false; 
     730    } 
     731    if(!waitForState(ePS_Running, 2000)) { 
     732        debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Running)); 
     733        return false; 
     734    } 
     735    return true; 
     736
     737 
     738bool StreamProcessor::stopDryRunning(int64_t t) { 
     739    if(!scheduleStopDryRunning(t)) { 
     740        debugError("Could not schedule transition\n"); 
     741        return false; 
     742    } 
     743    if(!waitForState(ePS_Stopped, 2000)) { 
     744        debugError(" Timeout while waiting for %s\n", ePSToString(ePS_Stopped)); 
     745        return false; 
     746    } 
     747    return true; 
     748
     749 
     750bool StreamProcessor::stopRunning(int64_t t) { 
     751    if(!scheduleStopRunning(t)) { 
     752        debugError("Could not schedule transition\n"); 
     753        return false; 
     754    } 
     755    if(!waitForState(ePS_DryRunning, 2000)) { 
     756        debugError(" Timeout while waiting for %s\n", ePSToString(ePS_DryRunning)); 
     757        return false; 
     758    } 
     759    return true; 
     760
     761 
    527762 
    528763// internal state API 
     
    544779{ 
    545780    float ticks_per_frame; 
    546     unsigned int ringbuffer_size_frames
     781    unsigned int ringbuffer_size_frames = m_manager->getNbBuffers() * m_manager->getPeriodSize()
    547782 
    548783    debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state)); 
     
    554789            // object just created 
    555790            result = m_data_buffer->init(); 
    556              
     791 
    557792            // prepare the framerate estimate 
    558793            ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_manager->getNominalRate()); 
    559794            m_ticks_per_frame = ticks_per_frame; 
    560795            debugOutput(DEBUG_LEVEL_VERBOSE,"Initializing remote ticks/frame to %f\n", ticks_per_frame); 
    561          
     796 
    562797            // initialize internal buffer 
    563             ringbuffer_size_frames = m_manager->getNbBuffers() * m_manager->getPeriodSize(); 
    564             result &= m_data_buffer->setBufferSize(ringbuffer_size_frames * 2); 
     798            result &= m_data_buffer->setBufferSize(ringbuffer_size_frames); 
    565799 
    566800            result &= m_data_buffer->setEventSize( getEventSize() ); 
    567801            result &= m_data_buffer->setEventsPerFrame( getEventsPerFrame() ); 
    568             result &= m_data_buffer->setUpdatePeriod( getUpdatePeriod() ); 
    569  
     802            if(getType() == ePT_Receive) { 
     803                result &= m_data_buffer->setUpdatePeriod( getNominalFramesPerPacket() ); 
     804            } else { 
     805                result &= m_data_buffer->setUpdatePeriod( m_manager->getPeriodSize() ); 
     806            } 
    570807            result &= m_data_buffer->setNominalRate(ticks_per_frame); 
    571808            result &= m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND); 
     
    612849                        } 
    613850                        // buffertype and datatype are dependant on the API 
    614                         // buffertype and datatype are dependant on the API 
    615851                        debugWarning("---------------- ! Doing hardcoded test setup ! --------------\n"); 
    616852                        // buffertype and datatype are dependant on the API 
     
    643879    } 
    644880 
    645     result &= m_data_buffer->reset(); // FIXME: don't like the reset() name 
    646  
     881    result &= m_data_buffer->clearBuffer(); // FIXME: don't like the reset() name 
    647882    // make the buffer transparent 
    648883    m_data_buffer->setTransparent(true); 
     
    652887 
    653888    m_state = ePS_Stopped; 
     889    #ifdef DEBUG 
     890    if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) { 
     891        debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n"); 
     892        dumpInfo(); 
     893    } 
     894    #endif 
    654895    return result; 
    655896} 
     
    679920    } 
    680921    m_state = ePS_WaitingForStream; 
     922    #ifdef DEBUG 
     923    if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) { 
     924        debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n"); 
     925        dumpInfo(); 
     926    } 
     927    #endif 
    681928    return true; 
    682929} 
     
    701948            debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started dry-running at cycle %d\n", this, m_last_cycle); 
    702949            if (getType() == ePT_Receive) { 
     950                // this to ensure that there is no discontinuity when starting to  
     951                // update the DLL based upon the received packets 
    703952                m_data_buffer->setBufferTailTimestamp(m_last_timestamp); 
    704953            } else { 
     
    708957            break; 
    709958        case ePS_WaitingForStreamDisable: 
    710             result &= m_data_buffer->reset(); // FIXME: don't like the reset() name 
     959            result &= m_data_buffer->clearBuffer(); // FIXME: don't like the reset() name 
    711960            m_data_buffer->setTransparent(true); 
    712961            break; 
     
    716965    } 
    717966    m_state = ePS_DryRunning; 
     967    #ifdef DEBUG 
     968    if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) { 
     969        debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n"); 
     970        dumpInfo(); 
     971    } 
     972    #endif 
    718973    return result; 
    719974} 
     
    732987{ 
    733988    debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state)); 
     989    unsigned int ringbuffer_size_frames; 
    734990    switch(m_state) { 
    735991        case ePS_DryRunning: 
     
    737993            // this basically means nothing, the state change will 
    738994            // be picked up by the packet iterator 
     995 
     996            if(!m_data_buffer->clearBuffer()) { // FIXME: don't like the reset() name 
     997                debugError("Could not reset data buffer\n"); 
     998                return false; 
     999            } 
     1000            if (getType() == ePT_Transmit) { 
     1001                ringbuffer_size_frames = m_manager->getNbBuffers() * m_manager->getPeriodSize(); 
     1002                debugOutput(DEBUG_LEVEL_VERBOSE, "Prefill transmit SP %p with %u frames\n", this, ringbuffer_size_frames); 
     1003                // prefill the buffer 
     1004                if(!transferSilence(ringbuffer_size_frames)) { 
     1005                    debugFatal("Could not prefill transmit stream\n"); 
     1006                    return false; 
     1007                } 
     1008            } 
     1009 
    7391010            break; 
    7401011        default: 
     
    7431014    } 
    7441015    m_state = ePS_WaitingForStreamEnable; 
     1016    #ifdef DEBUG 
     1017    if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) { 
     1018        debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n"); 
     1019        dumpInfo(); 
     1020    } 
     1021    #endif 
    7451022    return true; 
    7461023} 
     
    7651042            debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started running at cycle %d\n",  
    7661043                                             this, m_last_cycle); 
    767             if (getType() == ePT_Receive) { 
    768                 m_data_buffer->setTransparent(false); 
    769             } else { 
    770                 // FIXME 
    771                 debugError("Implement\n"); 
    772             } 
     1044            m_xruns = 0; 
     1045            m_data_buffer->setTransparent(false); 
    7731046            break; 
    7741047        default: 
     
    7771050    } 
    7781051    m_state = ePS_Running; 
     1052    #ifdef DEBUG 
     1053    if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) { 
     1054        debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n"); 
     1055        dumpInfo(); 
     1056    } 
     1057    #endif 
    7791058    return result; 
    7801059} 
     
    8021081    } 
    8031082    m_state = ePS_WaitingForStreamDisable; 
     1083    #ifdef DEBUG 
     1084    if (getDebugLevel() >= DEBUG_LEVEL_VERBOSE) { 
     1085        debugOutput(DEBUG_LEVEL_VERBOSE, "State switch complete, dumping SP info...\n"); 
     1086        dumpInfo(); 
     1087    } 
     1088    #endif 
    8041089    return true; 
    8051090} 
     
    9221207} 
    9231208 
     1209/*********************************************** 
     1210 * Helper routines                             * 
     1211 ***********************************************/ 
     1212bool 
     1213StreamProcessor::transferSilence(unsigned int nframes) 
     1214{ 
     1215    bool retval; 
     1216    signed int fc; 
     1217    ffado_timestamp_t ts_tail_tmp; 
     1218 
     1219    // prepare a buffer of silence 
     1220    char *dummybuffer = (char *)calloc(sizeof(quadlet_t), nframes * getEventsPerFrame()); 
     1221    transmitSilenceBlock(dummybuffer, nframes, 0); 
     1222 
     1223    m_data_buffer->getBufferTailTimestamp(&ts_tail_tmp, &fc); 
     1224    if (fc != 0) { 
     1225        debugWarning("Prefilling a buffer that already contains %d frames\n", fc); 
     1226    } 
     1227 
     1228    // add the silence data to the ringbuffer 
     1229    if(m_data_buffer->preloadFrames(nframes, dummybuffer, true)) { 
     1230        retval = true; 
     1231    } else { 
     1232        debugWarning("Could not write to event buffer\n"); 
     1233        retval = false; 
     1234    } 
     1235    free(dummybuffer); 
     1236    return retval; 
     1237} 
    9241238 
    9251239/** 
     
    9681282    IsoStream::dumpInfo(); 
    9691283    debugOutputShort( DEBUG_LEVEL_NORMAL, "  StreamProcessor info:\n"); 
    970     if (m_handler) 
    971         debugOutputShort( DEBUG_LEVEL_NORMAL, "  Now                   : %011u\n",m_handler->getCycleTimerTicks()); 
     1284    if (m_handler) { 
     1285        uint64_t now = m_handler->getCycleTimerTicks(); 
     1286        debugOutputShort( DEBUG_LEVEL_NORMAL, "  Now                   : %011llu (%03us %04uc %04ut)\n", 
     1287                          now, 
     1288                          (unsigned int)TICKS_TO_SECS(now), 
     1289                          (unsigned int)TICKS_TO_CYCLES(now), 
     1290                          (unsigned int)TICKS_TO_OFFSET(now)); 
     1291    } 
    9721292    debugOutputShort( DEBUG_LEVEL_NORMAL, "  Xruns                 : %d\n", m_xruns); 
    9731293    debugOutputShort( DEBUG_LEVEL_NORMAL, "  State                 : %s\n", ePSToString(m_state)); 
    9741294    debugOutputShort( DEBUG_LEVEL_NORMAL, "   Next state           : %s\n", ePSToString(m_next_state)); 
    9751295    debugOutputShort( DEBUG_LEVEL_NORMAL, "    transition at       : %u\n", m_cycle_to_switch_state); 
    976      
    977  
     1296    debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer                : %p\n", m_data_buffer); 
    9781297    debugOutputShort( DEBUG_LEVEL_NORMAL, "  Nominal framerate     : %u\n", m_manager->getNominalRate()); 
    9791298    debugOutputShort( DEBUG_LEVEL_NORMAL, "  Device framerate      : Sync: %f, Buffer %f\n", 
     
    9831302 
    9841303    m_data_buffer->dumpInfo(); 
    985  
    986     m_PeriodStat.dumpInfo(); 
    987     m_PacketStat.dumpInfo(); 
    988 //     m_WakeupStat.dumpInfo(); 
    9891304} 
    9901305 
  • branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.h

    r719 r720  
    8181        ePS_WaitingForStreamDisable, 
    8282    }; 
    83      
     83 
    8484    ///> set the SP state to a specific value 
    8585    void setState(enum eProcessorState); 
     
    103103 
    104104    bool scheduleStateTransition(enum eProcessorState state, uint64_t time_instant); 
    105     bool scheduleAndWaitForStateTransition(enum eProcessorState state,  
    106                                            uint64_t time_instant,  
    107                                            enum eProcessorState wait_state); 
    108 public: 
     105    bool waitForState(enum eProcessorState state, unsigned int timeout); 
     106 
     107public: //--- state stuff 
    109108    bool isRunning() 
    110109            {return m_state == ePS_Running;}; 
     
    112111            {return m_state == ePS_DryRunning;}; 
    113112 
    114 //--- state stuff (TODO: cleanup) 
     113    // these schedule and wait for the state transition 
    115114    bool startDryRunning(int64_t time_to_start_at); 
    116115    bool startRunning(int64_t time_to_start_at); 
     
    118117    bool stopRunning(int64_t time_to_stop_at); 
    119118 
     119    // these only schedule the transition 
     120    bool scheduleStartDryRunning(int64_t time_to_start_at); 
     121    bool scheduleStartRunning(int64_t time_to_start_at); 
     122    bool scheduleStopDryRunning(int64_t time_to_stop_at); 
     123    bool scheduleStopRunning(int64_t time_to_stop_at); 
     124 
    120125    // the main difference between init and prepare is that when prepare is called, 
    121126    // the SP is registered to a manager (FIXME: can't it be called by the manager?) 
    122127    bool init(); 
    123128    bool prepare(); 
     129 
    124130    ///> stop the SP from running or dryrunning 
    125131    bool stop(); 
    126 // constructor/destructor 
    127 public: 
     132 
     133public: // constructor/destructor 
    128134    StreamProcessor(enum eProcessorType type, int port); 
    129135    virtual ~StreamProcessor(); 
    130136 
    131 // the receive/transmit functions 
    132 public: 
     137public: // the public receive/transmit functions 
    133138    // the transmit interface accepts frames and provides packets 
    134139    // implement these for a transmit SP 
    135140    // leave default for a receive SP 
    136     virtual enum raw1394_iso_disposition 
    137     getPacket(unsigned char *data, unsigned int *length, 
    138                 unsigned char *tag, unsigned char *sy, 
    139                 int cycle, unsigned int dropped, unsigned int max_length) 
    140         {debugWarning("call not allowed\n"); return RAW1394_ISO_STOP;}; 
    141     virtual bool putFrames(unsigned int nbframes, int64_t ts)  
    142         {debugWarning("call not allowed\n"); return false;}; 
    143     virtual bool putFramesDry(unsigned int nbframes, int64_t ts) 
    144         {debugWarning("call not allowed\n"); return false;}; 
    145     virtual bool processWriteBlock(char *data, unsigned int nevents, unsigned int offset) 
    146         {debugWarning("call not allowed\n"); return false;}; 
    147141 
    148142    // the receive interface accepts packets and provides frames 
    149      
    150     // the following two methods are to be implemented by subclasses 
    151     virtual bool processPacketHeader(unsigned char *data, unsigned int length, 
    152                   unsigned char channel, unsigned char tag, unsigned char sy, 
    153                   unsigned int cycle, unsigned int dropped) 
    154         {debugWarning("call not allowed\n"); return false;}; 
    155     virtual bool processPacketData(unsigned char *data, unsigned int length, 
    156                   unsigned char channel, unsigned char tag, unsigned char sy, 
    157                   unsigned int cycle, unsigned int dropped) 
    158         {debugWarning("call not allowed\n"); return false;}; 
    159  
    160     // this one is implemented by us 
     143    // these are implemented by the parent SP 
    161144    enum raw1394_iso_disposition 
    162145        putPacket(unsigned char *data, unsigned int length, 
     
    164147                  unsigned int cycle, unsigned int dropped); 
    165148 
     149    enum raw1394_iso_disposition 
     150    getPacket(unsigned char *data, unsigned int *length, 
     151                unsigned char *tag, unsigned char *sy, 
     152                int cycle, unsigned int dropped, unsigned int max_length); 
     153 
    166154    bool getFrames(unsigned int nbframes, int64_t ts); ///< transfer the buffer contents to the client 
    167 protected: 
     155    bool putFrames(unsigned int nbframes, int64_t ts); ///< transfer the client contents to the buffer 
     156 
     157protected: // the helper receive/transmit functions 
    168158    // to be implemented by the children 
     159    // the following methods are to be implemented by receive SP subclasses 
     160    virtual bool processPacketHeader(unsigned char *data, unsigned int length, 
     161                                     unsigned char channel, unsigned char tag, 
     162                                     unsigned char sy, unsigned int cycle, 
     163                                     unsigned int dropped) 
     164        {debugWarning("call not allowed\n"); return false;}; 
     165    virtual bool processPacketData(unsigned char *data, unsigned int length, 
     166                                   unsigned char channel, unsigned char tag, 
     167                                   unsigned char sy, unsigned int cycle, 
     168                                   unsigned int dropped) 
     169        {debugWarning("call not allowed\n"); return false;}; 
    169170    virtual bool processReadBlock(char *data, unsigned int nevents, unsigned int offset) 
    170171        {debugWarning("call not allowed\n"); return false;}; 
    171172    virtual bool provideSilenceBlock(unsigned int nevents, unsigned int offset) 
     173        {debugWarning("call not allowed\n"); return false;}; 
     174 
     175    // the following methods are to be implemented by transmit SP subclasses 
     176    virtual bool generatePacketHeader(unsigned char *data, unsigned int *length, 
     177                                      unsigned char *tag, unsigned char *sy, 
     178                                      int cycle, unsigned int dropped, 
     179                                      unsigned int max_length) 
     180        {debugWarning("call not allowed\n"); return false;}; 
     181    virtual bool generatePacketData(unsigned char *data, unsigned int *length, 
     182                                    unsigned char *tag, unsigned char *sy, 
     183                                    int cycle, unsigned int dropped, 
     184                                    unsigned int max_length) 
     185        {debugWarning("call not allowed\n"); return false;}; 
     186    virtual bool generateSilentPacketHeader(unsigned char *data, unsigned int *length, 
     187                                            unsigned char *tag, unsigned char *sy, 
     188                                            int cycle, unsigned int dropped, 
     189                                            unsigned int max_length) 
     190        {debugWarning("call not allowed\n"); return false;}; 
     191    virtual bool generateSilentPacketData(unsigned char *data, unsigned int *length, 
     192                                          unsigned char *tag, unsigned char *sy, 
     193                                          int cycle, unsigned int dropped, 
     194                                          unsigned int max_length) 
     195        {debugWarning("call not allowed\n"); return false;}; 
     196    virtual bool processWriteBlock(char *data, unsigned int nevents, unsigned int offset) 
     197        {debugWarning("call not allowed\n"); return false;}; 
     198    virtual bool transmitSilenceBlock(char *data, unsigned int nevents, unsigned int offset) 
    172199        {debugWarning("call not allowed\n"); return false;}; 
    173200 
     
    175202    bool getFramesDry(unsigned int nbframes, int64_t ts); 
    176203    bool getFramesWet(unsigned int nbframes, int64_t ts); 
     204    bool putFramesDry(unsigned int nbframes, int64_t ts); 
     205    bool putFramesWet(unsigned int nbframes, int64_t ts); 
     206 
     207    bool transferSilence(unsigned int size); 
    177208 
    178209    // move to private? 
     
    188219 
    189220//--- data buffering and accounting 
    190 public: // FIXME: should be private 
     221public: 
     222    void getBufferHeadTimestamp ( ffado_timestamp_t *ts, signed int *fc ) 
     223        {m_data_buffer->getBufferHeadTimestamp(ts, fc);}; 
     224    void getBufferTailTimestamp ( ffado_timestamp_t *ts, signed int *fc ) 
     225        {m_data_buffer->getBufferTailTimestamp(ts, fc);}; 
     226 
     227    void setBufferTailTimestamp ( ffado_timestamp_t new_timestamp ) 
     228        {m_data_buffer->setBufferTailTimestamp(new_timestamp);}; 
     229    void setBufferHeadTimestamp ( ffado_timestamp_t new_timestamp ) 
     230        {m_data_buffer->setBufferHeadTimestamp(new_timestamp);}; 
     231protected: 
    191232    Util::TimestampedBuffer *m_data_buffer; 
    192233 
     
    255296        uint64_t getTimeAtPeriod(); 
    256297 
    257         uint64_t getTimeNow(); 
     298        uint64_t getTimeNow(); // FIXME: should disappear 
    258299 
    259300 
     
    318359 
    319360        /** 
    320          * @brief get the nominal number of frames between buffer updates 
    321          * @return the nominal number of frames between buffer updates 
    322          */ 
    323         virtual unsigned int getUpdatePeriod() = 0; 
     361         * @brief get the nominal number of frames in a packet 
     362         * @return the nominal number of frames in a packet 
     363         */ 
     364        virtual unsigned int getNominalFramesPerPacket() = 0; 
    324365 
    325366    protected: 
     
    340381    StreamStatistics m_WakeupStat; 
    341382    DECLARE_DEBUG_MODULE; 
    342  
    343383}; 
    344384 
  • branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp

    r719 r720  
    283283    // lower on average. 
    284284    max_of_min_delay += FFADO_SIGNAL_DELAY_TICKS; 
    285     debugOutput( DEBUG_LEVEL_VERBOSE, " %d ticks (%03us %04uc %04ut)...\n",  
     285    debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n",  
    286286        max_of_min_delay, 
    287287        (unsigned int)TICKS_TO_SECS(max_of_min_delay), 
     
    295295    //sleep(2); // FIXME: be smarter here 
    296296 
    297     // wait for some sort of sync 
     297    // make sure that we are dry-running long enough for the 
     298    // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)? 
    298299    debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n"); 
    299     // in order to obtain that, we wait for the first periods to be received. 
    300300    int nb_sync_runs=20; 
    301301    int64_t time_till_next_period; 
    302302    while(nb_sync_runs--) { // or while not sync-ed? 
     303        // check if we were waked up too soon 
    303304        time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); 
    304         debugOutput( DEBUG_LEVEL_VERBOSE, "waiting for %d usecs...\n", time_till_next_period); 
     305        debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period); 
    305306        if(time_till_next_period > 0) { 
    306307            // wait for the period 
     
    309310    } 
    310311 
    311     // figure out where we are now 
    312     uint64_t time_of_transfer = m_SyncSource->getTimeAtPeriod(); 
    313     debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",  
    314         time_of_transfer, 
    315         (unsigned int)TICKS_TO_SECS(time_of_transfer), 
    316         (unsigned int)TICKS_TO_CYCLES(time_of_transfer), 
    317         (unsigned int)TICKS_TO_OFFSET(time_of_transfer)); 
    318  
    319     // start wet-running in 200 cycles 
    320     // this is the timeframe in which the remaining code should be ready 
    321     time_of_transfer = addTicks(time_of_transfer, 200*TICKS_PER_CYCLE); 
    322  
    323     debugOutput( DEBUG_LEVEL_VERBOSE, "  => start at TS=%011llu (%03us %04uc %04ut)...\n",  
    324         time_of_transfer, 
    325         (unsigned int)TICKS_TO_SECS(time_of_transfer), 
    326         (unsigned int)TICKS_TO_CYCLES(time_of_transfer), 
    327         (unsigned int)TICKS_TO_OFFSET(time_of_transfer)); 
    328     // we now should have decent sync info 
    329     // the buffers of the receive streams should be (approx) empty 
    330     // the buffers of the xmit streams should be full 
    331      
    332     // at this point the buffer head timestamp of the transmit buffers can be 
    333     // set properly since we know the sync source's timestamp of the last 
    334     // buffer transfer. we also know the rate. 
    335      
    336     debugOutput( DEBUG_LEVEL_VERBOSE, " propagate sync info...\n"); 
     312    debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n"); 
    337313    // FIXME: in the SPM it would be nice to have system time instead of 
    338314    //        1394 time 
    339 //     float rate=m_SyncSource->getTicksPerFrame(); 
    340 //     int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers*m_period))*rate); 
    341 //     // the data at the front of the buffer is intended to be transmitted 
    342 //     // nb_periods*period_size after the last received period 
    343 //     int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks); 
    344  
    345 //     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    346 //             it != m_TransmitProcessors.end(); 
    347 //             ++it ) { 
    348 //         // FIXME: encapsulate 
    349 //         (*it)->m_data_buffer->setBufferHeadTimestamp(m_time_of_transfer); 
    350 //         //(*it)->m_data_buffer->setNominalRate(rate); //CHECK!!! 
    351 //     } 
    352      
    353     dumpInfo(); 
     315 
     316    // we now should have decent sync info on the sync source 
     317    // determine a point in time where the system should start 
     318    // figure out where we are now 
     319    uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod(); 
     320    debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",  
     321        time_of_first_sample, 
     322        (unsigned int)TICKS_TO_SECS(time_of_first_sample), 
     323        (unsigned int)TICKS_TO_CYCLES(time_of_first_sample), 
     324        (unsigned int)TICKS_TO_OFFSET(time_of_first_sample)); 
     325 
     326    #define CYCLES_FOR_STARTUP 200 
     327    // start wet-running in CYCLES_FOR_STARTUP cycles 
     328    // this is the time window we have to setup all SP's such that they  
     329    // can start wet-running correctly. 
     330    time_of_first_sample = addTicks(time_of_first_sample, 
     331                                    CYCLES_FOR_STARTUP * TICKS_PER_CYCLE); 
     332 
     333    debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",  
     334        time_of_first_sample, 
     335        (unsigned int)TICKS_TO_SECS(time_of_first_sample), 
     336        (unsigned int)TICKS_TO_CYCLES(time_of_first_sample), 
     337        (unsigned int)TICKS_TO_OFFSET(time_of_first_sample)); 
     338 
     339    // we should start wet-running the transmit SP's some cycles in advance 
     340    // such that we know it is wet-running when it should output its first sample 
     341    #define PRESTART_CYCLES_FOR_XMIT 20 
     342    uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,  
     343                                                 PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE); 
     344 
     345    #define PRESTART_CYCLES_FOR_RECV 0 
     346    uint64_t time_to_start_recv = substractTicks(time_of_first_sample, 
     347                                                 PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE); 
     348    debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",  
     349        time_to_start_xmit, 
     350        (unsigned int)TICKS_TO_SECS(time_to_start_xmit), 
     351        (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit), 
     352        (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit)); 
     353    debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",  
     354        time_to_start_recv, 
     355        (unsigned int)TICKS_TO_SECS(time_to_start_recv), 
     356        (unsigned int)TICKS_TO_CYCLES(time_to_start_recv), 
     357        (unsigned int)TICKS_TO_OFFSET(time_to_start_recv)); 
     358 
     359    // at this point the buffer head timestamp of the transmit buffers can be set 
     360    // this is the presentation time of the first sample in the buffer 
     361    for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     362          it != m_TransmitProcessors.end(); 
     363          ++it ) { 
     364        (*it)->setBufferHeadTimestamp(time_of_first_sample); 
     365    } 
    354366 
    355367    // STEP X: switch SP's over to the running state 
    356     uint64_t time_to_start = addTicks(time_of_transfer, 
    357                                       m_SyncSource->getTicksPerFrame() * getPeriodSize()); 
    358     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    359             it != m_ReceiveProcessors.end(); 
    360             ++it ) { 
    361         if(!(*it)->startRunning(time_to_start)) { 
    362             debugError("Could not put SP %p into the running state\n", *it); 
    363             return false; 
    364         } 
    365     } 
    366     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    367             it != m_TransmitProcessors.end(); 
    368             ++it ) { 
    369         if(!(*it)->startRunning(time_to_start)) { 
    370             debugError("Could not put SP %p into the running state\n", *it); 
    371             return false; 
    372         } 
     368    for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     369          it != m_ReceiveProcessors.end(); 
     370          ++it ) { 
     371        if(!(*it)->scheduleStartRunning(time_to_start_recv)) { 
     372            debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv); 
     373            return false; 
     374        } 
     375    } 
     376    for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     377          it != m_TransmitProcessors.end(); 
     378          ++it ) { 
     379        if(!(*it)->scheduleStartRunning(time_to_start_xmit)) { 
     380            debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit); 
     381            return false; 
     382        } 
     383    } 
     384    // wait for the syncsource to start running. 
     385    // that will block the waitForPeriod call until everyone has started (theoretically) 
     386    int cnt = CYCLES_FOR_STARTUP * 2; // by then it should have started 
     387    while (!m_SyncSource->isRunning() && cnt) { 
     388        usleep(125); 
     389        cnt--; 
     390    } 
     391    if(cnt==0) { 
     392        debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n"); 
     393        return false; 
    373394    } 
    374395    debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n"); 
     
    577598    // this is to notify the client of the delay 
    578599    // that we introduced 
    579     m_delayed_usecs = time_till_next_period; 
     600    m_delayed_usecs = -time_till_next_period; 
    580601 
    581602    // we save the 'ideal' time of the transfer at this point, 
     
    683704 
    684705bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) { 
    685     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period for type (%d)...\n", t); 
     706    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",  
     707        t, m_time_of_transfer, 
     708        (unsigned int)TICKS_TO_SECS(m_time_of_transfer), 
     709        (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer), 
     710        (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer)); 
     711 
    686712    bool retval = true; 
    687713    // a static cast could make sure that there is no performance 
    688714    // penalty for the virtual functions (to be checked) 
    689715    if (t==StreamProcessor::ePT_Receive) { 
    690         // determine the time at which we want reception to start 
    691         float rate=m_SyncSource->getTicksPerFrame(); 
    692         int64_t one_frame_in_ticks=(int64_t)(((float)m_period)*rate); 
    693          
    694         int64_t receive_timestamp = substractTicks(m_time_of_transfer, one_frame_in_ticks); 
    695          
    696         if(receive_timestamp<0) { 
    697             debugWarning("receive ts < 0.0 : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n", 
    698              receive_timestamp, m_time_of_transfer, one_frame_in_ticks); 
    699         } 
    700         if(receive_timestamp>(128L*TICKS_PER_SECOND)) { 
    701             debugWarning("receive ts > 128L*TICKS_PER_SECOND : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n", 
    702              receive_timestamp, m_time_of_transfer, one_frame_in_ticks); 
    703         } 
    704          
    705716        for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    706717                it != m_ReceiveProcessors.end(); 
    707718                ++it ) { 
    708             if(!(*it)->getFrames(m_period, receive_timestamp)) { 
     719            if(!(*it)->getFrames(m_period, m_time_of_transfer)) { 
    709720                    debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n", 
    710721                            m_period, m_time_of_transfer,*it); 
     
    713724        } 
    714725    } else { 
     726        // FIXME: in the SPM it would be nice to have system time instead of 
     727        //        1394 time 
     728        float rate = m_SyncSource->getTicksPerFrame(); 
     729        int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate); 
     730 
     731        // the data we are putting into the buffer is intended to be transmitted 
     732        // one ringbuffer size after it has been received 
     733        int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks); 
     734 
    715735        for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    716736                it != m_TransmitProcessors.end(); 
     
    718738            // FIXME: in the SPM it would be nice to have system time instead of 
    719739            //        1394 time 
    720             float rate=m_SyncSource->getTicksPerFrame(); 
    721             int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers*m_period))*rate); 
    722  
    723             // the data we are putting into the buffer is intended to be transmitted 
    724             // one ringbuffer size after it has been received 
    725             int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks); 
    726  
    727740            if(!(*it)->putFrames(m_period, transmit_timestamp)) { 
    728741                debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n", 
     
    730743                retval &= false; // buffer underrun 
    731744            } 
    732  
    733         } 
    734     } 
    735     return retval; 
    736 } 
    737  
    738 /** 
    739  * @brief Dry run one period for both receive and transmit StreamProcessors 
    740  * 
    741  * Process one period of frames for all streamprocessors, without touching the 
    742  * client buffers. This only removes an incoming period from the ISO receive buffer and 
    743  * puts one period of silence into the transmit buffers. 
    744  * 
    745  * @return true if successful, false otherwise (indicates xrun). 
    746  */ 
    747 bool StreamProcessorManager::dryRun() { 
    748     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Dry-running period...\n"); 
    749     bool retval=true; 
    750     retval &= dryRun(StreamProcessor::ePT_Receive); 
    751     retval &= dryRun(StreamProcessor::ePT_Transmit); 
    752     return retval; 
    753 } 
    754  
    755 /** 
    756  * @brief Dry run one period for either the receive or transmit StreamProcessors 
    757  * 
    758  * see dryRun() 
    759  * 
    760  * @param t The processor type to dryRun for (receive or transmit) 
    761  * @return true if successful, false otherwise (indicates xrun). 
    762  */ 
    763  
    764 bool StreamProcessorManager::dryRun(enum StreamProcessor::eProcessorType t) { 
    765     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Dry-running period...\n"); 
    766     bool retval = true; 
    767     // a static cast could make sure that there is no performance 
    768     // penalty for the virtual functions (to be checked) 
    769     if (t==StreamProcessor::ePT_Receive) { 
    770         // determine the time at which we want reception to start 
    771         float rate=m_SyncSource->getTicksPerFrame(); 
    772         int64_t one_frame_in_ticks=(int64_t)(((float)m_period)*rate); 
    773          
    774         int64_t receive_timestamp = substractTicks(m_time_of_transfer, one_frame_in_ticks); 
    775          
    776         if(receive_timestamp<0) { 
    777             debugWarning("receive ts < 0.0 : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n", 
    778              receive_timestamp, m_time_of_transfer, one_frame_in_ticks); 
    779         } 
    780         if(receive_timestamp>(128L*TICKS_PER_SECOND)) { 
    781             debugWarning("receive ts > 128L*TICKS_PER_SECOND : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n", 
    782              receive_timestamp, m_time_of_transfer, one_frame_in_ticks); 
    783         } 
    784          
    785         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    786                 it != m_ReceiveProcessors.end(); 
    787                 ++it ) { 
    788  
    789             if(!(*it)->getFramesDry(m_period, receive_timestamp)) { 
    790                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)\n", 
    791                             m_period, m_time_of_transfer,*it); 
    792                 retval &= false; // buffer underrun 
    793             } 
    794  
    795         } 
    796     } else { 
    797         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    798                 it != m_TransmitProcessors.end(); 
    799                 ++it ) { 
    800             // FIXME: in the SPM it would be nice to have system time instead of 
    801             //        1394 time 
    802             float rate=m_SyncSource->getTicksPerFrame(); 
    803             int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers*m_period))*rate); 
    804  
    805             // the data we are putting into the buffer is intended to be transmitted 
    806             // one ringbuffer size after it has been received 
    807             int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks); 
    808  
    809             if(!(*it)->putFramesDry(m_period, transmit_timestamp)) { 
    810                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)\n", 
    811                         m_period, transmit_timestamp, *it); 
    812                 retval &= false; // buffer underrun 
    813             } 
    814  
    815745        } 
    816746    } 
  • branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.h

    r719 r720  
    8383 
    8484    bool waitForPeriod(); ///< wait for the next period 
    85  
    8685    bool transfer(); ///< transfer the buffer contents from/to client 
    8786    bool transfer(enum StreamProcessor::eProcessorType); ///< transfer the buffer contents from/to client (single processor type) 
    88      
    89     bool dryRun(); 
    90     bool dryRun(enum StreamProcessor::eProcessorType); 
    9187 
    9288    int getDelayedUsecs() {return m_delayed_usecs;}; 
     
    9591 
    9692    unsigned int getNominalRate() {return m_nominal_framerate;}; 
     93    uint64_t getTimeOfLastTransfer() { return m_time_of_transfer;}; 
    9794 
    9895private: 
    99  
    10096    int m_delayed_usecs; 
    10197    // this stores the time at which the next transfer should occur 
  • branches/ppalmers-streaming/src/libutil/TimestampedBuffer.cpp

    r719 r720  
    266266 * @return true if successful 
    267267 */ 
    268 bool TimestampedBuffer::reset() { 
     268bool TimestampedBuffer::clearBuffer() { 
     269    debugOutput(DEBUG_LEVEL_VERBOSE, "Clearing buffer\n"); 
    269270    ffado_ringbuffer_reset(m_event_buffer); 
    270  
    271271    resetFrameCounter(); 
    272  
    273272    return true; 
    274273} 
     
    397396 
    398397/** 
     398 * @brief Preload frames into the buffer 
     399 * 
     400 * Preload \ref nframes of frames from the buffer pointed to by \ref data to the 
     401 * internal ringbuffer. Does not care about transparency. Keeps the buffer head or tail 
     402 * timestamp constant. 
     403 * 
     404 * @note not thread safe 
     405 * 
     406 * @param nframes number of frames to copy 
     407 * @param data pointer to the frame buffer 
     408 * @param keep_head_ts if true, keep the head timestamp constant. If false, keep the 
     409 *                     tail timestamp constant. 
     410 * @return true if successful 
     411 */ 
     412bool TimestampedBuffer::preloadFrames(unsigned int nframes, char *data, bool keep_head_ts) { 
     413    unsigned int write_size = nframes * m_event_size * m_events_per_frame; 
     414    // add the data payload to the ringbuffer 
     415    size_t written = ffado_ringbuffer_write(m_event_buffer, data, write_size); 
     416    if (written < write_size) 
     417    { 
     418        debugWarning("ringbuffer full, request: %u, actual: %u\n", write_size, written); 
     419        return false; 
     420    } 
     421     
     422    // make sure the head timestamp remains identical 
     423    signed int fc; 
     424    ffado_timestamp_t ts; 
     425 
     426    if (keep_head_ts) { 
     427        getBufferHeadTimestamp(&ts, &fc); 
     428    } else { 
     429        getBufferTailTimestamp(&ts, &fc); 
     430    } 
     431    // update frame counter 
     432    m_framecounter += nframes; 
     433    if (keep_head_ts) { 
     434        setBufferHeadTimestamp(ts); 
     435    } else { 
     436        setBufferTailTimestamp(ts); 
     437    } 
     438 
     439    return true; 
     440} 
     441 
     442/** 
    399443 * @brief Drop frames from the head of the buffer 
    400444 * 
     
    410454    ffado_ringbuffer_read_advance(m_event_buffer, read_size); 
    411455    decrementFrameCounter(nframes); 
    412  
    413456    return true; 
    414457} 
     
    722765#endif 
    723766 
    724     ffado_timestamp_t ts=new_timestamp; 
     767    ffado_timestamp_t ts = new_timestamp; 
    725768 
    726769    ENTER_CRITICAL_SECTION; 
     
    10451088     
    10461089    ENTER_CRITICAL_SECTION; 
    1047     diff=m_buffer_next_tail_timestamp - m_buffer_tail_timestamp; 
     1090    diff = m_buffer_next_tail_timestamp - m_buffer_tail_timestamp; 
    10481091    EXIT_CRITICAL_SECTION; 
    10491092 
     
    10541097#endif 
    10551098 
    1056     ffado_timestamp_t ts=new_timestamp; 
     1099    ffado_timestamp_t ts = new_timestamp; 
    10571100    ts += m_tick_offset; 
    10581101 
     
    11231166                this, diff); 
    11241167 
    1125     double err=diff; 
     1168    double err = diff; 
    11261169 
    11271170    debugOutputShort(DEBUG_LEVEL_VERY_VERBOSE, "diff2="TIMESTAMP_FORMAT_SPEC" err=%f\n", 
     
    12081251    debugOutputShort( DEBUG_LEVEL_NORMAL, "  TimestampedBuffer (%p) info:\n",this); 
    12091252    debugOutputShort( DEBUG_LEVEL_NORMAL, "  Frame counter         : %d\n", m_framecounter); 
     1253    debugOutputShort( DEBUG_LEVEL_NORMAL, "  Events in buffer      : %d\n", getBufferFill()); 
    12101254    debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer head timestamp : "TIMESTAMP_FORMAT_SPEC"\n",ts_head); 
    12111255    debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer tail timestamp : "TIMESTAMP_FORMAT_SPEC"\n",m_buffer_tail_timestamp); 
  • branches/ppalmers-streaming/src/libutil/TimestampedBuffer.h

    r719 r720  
    3737// #define TIMESTAMP_FORMAT_SPEC "%012lld" 
    3838 
    39 namespace Util { 
    40  
     39namespace Util 
     40
    4141 
    4242class TimestampedBufferClient; 
    4343 
    4444/** 
    45  * \brief Class implementing a frame buffer that is time-aware 
    46  
    47  * This class implements a buffer that is time-aware. Whenever new frames 
    48  * are written to the buffer, the timestamp corresponding to the last frame 
    49  * in the buffer is updated. This allows to calculate the timestamp of any 
    50  * other frame in the buffer. 
    51  
    52  * The buffer is a frame buffer, having the following parameters defining 
    53  * it's behaviour: 
    54  * - buff_size: buffer size in frames (setBufferSize()) 
    55  * - events_per_frame: the number of events per frame (setEventsPerFrame()) 
    56  * - event_size: the storage size of the events (in bytes) (setEventSize()) 
    57  
    58  * The total size of the buffer (in bytes) is at least 
    59  * buff_size*events_per_frame*event_size. 
    60  
    61  * Timestamp tracking is done by requiring that a timestamp is specified every 
    62  * time frames are added to the buffer. In combination with the buffer fill and 
    63  * the frame rate (calculated internally), this allows to calculate the timestamp 
    64  * of any frame in the buffer. In order to initialize the internal data structures, 
    65  * the setNominalRate() and setUpdatePeriod() functions are provided. 
    66  
    67  * \note Currently the class only supports fixed size writes of size update_period. 
    68  *       This can change in the future, implementation ideas are already in place. 
    69  
    70  * The TimestampedBuffer class is time unit agnostic. It can handle any time unit 
    71  * as long as it fits in a 64 bit unsigned integer. The buffer supports wrapped 
    72  * timestamps using (...). 
    73  
    74  * There are two methods of reading and writing to the buffer. 
    75  
    76  * The first method uses conventional readFrames() and writeFrames() functions. 
    77  
    78  * The second method makes use of the TimestampedBufferClient interface. When a 
    79  * TimestampedBuffer is created, it is required that a TimestampedBufferClient is 
    80  * registered. This client implements the processReadBlock and processWriteBlock 
    81  * functions. These are block processing 'callbacks' that allow zero-copy processing 
    82  * of the buffer contents. In order to initiate block processing, the 
    83  * blockProcessWriteFrames and blockProcessReadFrames functions are provided by 
    84  * TimestampedBuffer. 
    85  
    86  */ 
    87 class TimestampedBuffer { 
    88  
    89 public: 
    90  
    91  
    92     TimestampedBuffer(TimestampedBufferClient *); 
    93     virtual ~TimestampedBuffer(); 
    94  
    95     bool writeDummyFrame(); 
    96     bool dropFrames(unsigned int nbframes); 
    97  
    98     bool writeFrames(unsigned int nbframes, char *data, ffado_timestamp_t ts); 
    99     bool readFrames(unsigned int nbframes, char *data); 
    100  
    101     bool blockProcessWriteFrames(unsigned int nbframes, ffado_timestamp_t ts); 
    102     bool blockProcessReadFrames(unsigned int nbframes); 
    103  
    104     bool init(); 
    105     bool prepare(); 
    106     bool reset(); 
    107  
    108     bool isEnabled() {return m_enabled;}; 
    109     void enable() {m_enabled=true;}; 
    110     void disable() {m_enabled=false;}; 
    111  
    112     bool isTransparent() {return m_transparent;}; 
    113     void setTransparent(bool v) {m_transparent=v;}; 
    114  
    115     bool setEventSize(unsigned int s); 
    116     bool setEventsPerFrame(unsigned int s); 
    117     bool setBufferSize(unsigned int s); 
    118     unsigned int getBufferSize() {return m_buffer_size;}; 
    119  
    120     unsigned int getBytesPerFrame() {return m_bytes_per_frame;}; 
    121  
    122     bool setWrapValue(ffado_timestamp_t w); 
    123  
    124     unsigned int getBufferFill(); 
    125  
    126     // timestamp stuff 
    127     int getFrameCounter() {return m_framecounter;}; 
    128  
    129     void getBufferHeadTimestamp(ffado_timestamp_t *ts, signed int *fc); 
    130     void getBufferTailTimestamp(ffado_timestamp_t *ts, signed int *fc); 
    131  
    132     void setBufferTailTimestamp(ffado_timestamp_t new_timestamp); 
    133     void setBufferHeadTimestamp(ffado_timestamp_t new_timestamp); 
    134  
    135     // sync related, also drops or add frames when necessary 
    136     bool syncBufferHeadToTimestamp(ffado_timestamp_t ts); 
    137     bool syncBufferTailToTimestamp(ffado_timestamp_t ts); 
    138     bool syncCorrectLag(int64_t ts); 
    139      
    140     ffado_timestamp_t getTimestampFromTail(int nframes); 
    141     ffado_timestamp_t getTimestampFromHead(int nframes); 
    142  
    143     // buffer offset stuff 
    144     /// return the tick offset value 
    145     ffado_timestamp_t getTickOffset() {return m_tick_offset;}; 
    146  
    147     bool setFrameOffset(int nframes); 
    148     bool setTickOffset(ffado_timestamp_t); 
    149  
    150     // dll stuff 
    151     bool setNominalRate(float r); 
    152     float getNominalRate() {return m_nominal_rate;}; 
    153     float getRate(); 
    154  
    155     bool setUpdatePeriod(unsigned int t); 
    156  
    157     // misc stuff 
    158     void dumpInfo(); 
    159     void setVerboseLevel(int l) {setDebugLevel(l);}; 
    160  
    161 private: 
    162     void decrementFrameCounter(int nbframes); 
    163     void incrementFrameCounter(int nbframes, ffado_timestamp_t new_timestamp); 
    164     void resetFrameCounter(); 
    165  
    166 protected: 
    167  
    168     ffado_ringbuffer_t * m_event_buffer; 
    169     char* m_cluster_buffer; 
    170  
    171     unsigned int m_event_size; // the size of one event 
    172     unsigned int m_events_per_frame; // the number of events in a frame 
    173     unsigned int m_buffer_size; // the number of frames in the buffer 
    174     unsigned int m_bytes_per_frame; 
    175     unsigned int m_bytes_per_buffer; 
    176     bool m_enabled; // you can get frames FIXME: rename!! 
    177     bool m_transparent; // the buffer should hold the frames put in it. if true, discards all frames 
    178  
    179     ffado_timestamp_t m_wrap_at; // value to wrap at 
    180  
    181     TimestampedBufferClient *m_Client; 
    182  
    183     DECLARE_DEBUG_MODULE; 
    184  
    185 private: 
    186     // the framecounter gives the number of frames in the buffer 
    187     signed int m_framecounter; 
    188  
    189     // the offset that define the timing of the buffer 
    190     ffado_timestamp_t m_tick_offset; 
    191  
    192     // the buffer tail timestamp gives the timestamp of the last frame 
    193     // that was put into the buffer 
    194     ffado_timestamp_t   m_buffer_tail_timestamp; 
    195     ffado_timestamp_t   m_buffer_next_tail_timestamp; 
    196  
    197     // this mutex protects the access to the framecounter 
    198     // and the buffer head timestamp. 
    199     pthread_mutex_t m_framecounter_lock; 
    200  
    201     // tracking DLL variables 
     45    * \brief Class implementing a frame buffer that is time-aware 
     46    
     47    * This class implements a buffer that is time-aware. Whenever new frames 
     48    * are written to the buffer, the timestamp corresponding to the last frame 
     49    * in the buffer is updated. This allows to calculate the timestamp of any 
     50    * other frame in the buffer. 
     51    
     52    * The buffer is a frame buffer, having the following parameters defining 
     53    * it's behaviour: 
     54    * - buff_size: buffer size in frames (setBufferSize()) 
     55    * - events_per_frame: the number of events per frame (setEventsPerFrame()) 
     56    * - event_size: the storage size of the events (in bytes) (setEventSize()) 
     57    
     58    * The total size of the buffer (in bytes) is at least 
     59    * buff_size*events_per_frame*event_size. 
     60    
     61    * Timestamp tracking is done by requiring that a timestamp is specified every 
     62    * time frames are added to the buffer. In combination with the buffer fill and 
     63    * the frame rate (calculated internally), this allows to calculate the timestamp 
     64    * of any frame in the buffer. In order to initialize the internal data structures, 
     65    * the setNominalRate() and setUpdatePeriod() functions are provided. 
     66    
     67    * \note Currently the class only supports fixed size writes of size update_period. 
     68    *       This can change in the future, implementation ideas are already in place. 
     69    
     70    * The TimestampedBuffer class is time unit agnostic. It can handle any time unit 
     71    * as long as it fits in a 64 bit unsigned integer. The buffer supports wrapped 
     72    * timestamps using (...). 
     73    
     74    * There are two methods of reading and writing to the buffer. 
     75    
     76    * The first method uses conventional readFrames() and writeFrames() functions. 
     77    
     78    * The second method makes use of the TimestampedBufferClient interface. When a 
     79    * TimestampedBuffer is created, it is required that a TimestampedBufferClient is 
     80    * registered. This client implements the processReadBlock and processWriteBlock 
     81    * functions. These are block processing 'callbacks' that allow zero-copy processing 
     82    * of the buffer contents. In order to initiate block processing, the 
     83    * blockProcessWriteFrames and blockProcessReadFrames functions are provided by 
     84    * TimestampedBuffer. 
     85    
     86    */ 
     87class TimestampedBuffer 
     88
     89    public: 
     90        TimestampedBuffer ( TimestampedBufferClient * ); 
     91        virtual ~TimestampedBuffer(); 
     92 
     93        bool writeDummyFrame(); 
     94        bool dropFrames ( unsigned int nbframes ); 
     95 
     96        bool writeFrames ( unsigned int nbframes, char *data, ffado_timestamp_t ts ); 
     97        bool readFrames ( unsigned int nbframes, char *data ); 
     98 
     99        bool preloadFrames ( unsigned int nbframes, char *data, bool keep_head_ts ); 
     100 
     101        bool blockProcessWriteFrames ( unsigned int nbframes, ffado_timestamp_t ts ); 
     102        bool blockProcessReadFrames ( unsigned int nbframes ); 
     103 
     104        bool init(); 
     105        bool prepare(); 
     106        bool clearBuffer(); 
     107 
     108        bool isEnabled() {return m_enabled;}; 
     109        void enable() {m_enabled=true;}; 
     110        void disable() {m_enabled=false;}; 
     111 
     112        bool isTransparent() {return m_transparent;}; 
     113        void setTransparent ( bool v ) {m_transparent=v;}; 
     114 
     115        bool setEventSize ( unsigned int s ); 
     116        bool setEventsPerFrame ( unsigned int s ); 
     117        bool setBufferSize ( unsigned int s ); 
     118        unsigned int getBufferSize() {return m_buffer_size;}; 
     119 
     120        unsigned int getBytesPerFrame() {return m_bytes_per_frame;}; 
     121 
     122        bool setWrapValue ( ffado_timestamp_t w ); 
     123 
     124        unsigned int getBufferFill(); 
     125 
     126        // timestamp stuff 
     127        int getFrameCounter() {return m_framecounter;}; 
     128 
     129        void getBufferHeadTimestamp ( ffado_timestamp_t *ts, signed int *fc ); 
     130        void getBufferTailTimestamp ( ffado_timestamp_t *ts, signed int *fc ); 
     131 
     132        void setBufferTailTimestamp ( ffado_timestamp_t new_timestamp ); 
     133        void setBufferHeadTimestamp ( ffado_timestamp_t new_timestamp ); 
     134 
     135        // sync related, also drops or add frames when necessary 
     136        bool syncBufferHeadToTimestamp ( ffado_timestamp_t ts ); 
     137        bool syncBufferTailToTimestamp ( ffado_timestamp_t ts ); 
     138        bool syncCorrectLag ( int64_t ts ); 
     139 
     140        ffado_timestamp_t getTimestampFromTail ( int nframes ); 
     141        ffado_timestamp_t getTimestampFromHead ( int nframes ); 
     142 
     143        // buffer offset stuff 
     144        /// return the tick offset value 
     145        ffado_timestamp_t getTickOffset() {return m_tick_offset;}; 
     146 
     147        bool setFrameOffset ( int nframes ); 
     148        bool setTickOffset ( ffado_timestamp_t ); 
     149 
     150        // dll stuff 
     151        bool setNominalRate ( float r ); 
     152        float getNominalRate() {return m_nominal_rate;}; 
     153        float getRate(); 
     154 
     155        bool setUpdatePeriod ( unsigned int t ); 
     156 
     157        // misc stuff 
     158        void dumpInfo(); 
     159        void setVerboseLevel ( int l ) {setDebugLevel ( l );}; 
     160 
     161    private: 
     162        void decrementFrameCounter ( int nbframes ); 
     163        void incrementFrameCounter ( int nbframes, ffado_timestamp_t new_timestamp ); 
     164        void resetFrameCounter(); 
     165 
     166    protected: 
     167 
     168        ffado_ringbuffer_t * m_event_buffer; 
     169        char* m_cluster_buffer; 
     170 
     171        unsigned int m_event_size; // the size of one event 
     172        unsigned int m_events_per_frame; // the number of events in a frame 
     173        unsigned int m_buffer_size; // the number of frames in the buffer 
     174        unsigned int m_bytes_per_frame; 
     175        unsigned int m_bytes_per_buffer; 
     176        bool m_enabled; // you can get frames FIXME: rename!! 
     177        bool m_transparent; // the buffer should hold the frames put in it. if true, discards all frames 
     178 
     179        ffado_timestamp_t m_wrap_at; // value to wrap at 
     180 
     181        TimestampedBufferClient *m_Client; 
     182 
     183        DECLARE_DEBUG_MODULE; 
     184 
     185    private: 
     186        // the framecounter gives the number of frames in the buffer 
     187        signed int m_framecounter; 
     188 
     189        // the offset that define the timing of the buffer 
     190        ffado_timestamp_t m_tick_offset; 
     191 
     192        // the buffer tail timestamp gives the timestamp of the last frame 
     193        // that was put into the buffer 
     194        ffado_timestamp_t   m_buffer_tail_timestamp; 
     195        ffado_timestamp_t   m_buffer_next_tail_timestamp; 
     196 
     197        // this mutex protects the access to the framecounter 
     198        // and the buffer head timestamp. 
     199        pthread_mutex_t m_framecounter_lock; 
     200 
     201        // tracking DLL variables 
    202202// JMW: try double for this too 
    203203//    float m_dll_e2; 
    204     double m_dll_e2; 
    205     float m_dll_b; 
    206     float m_dll_c; 
    207  
    208     float m_nominal_rate; 
    209     unsigned int m_update_period; 
     204        double m_dll_e2; 
     205        float m_dll_b; 
     206        float m_dll_c; 
     207 
     208        float m_nominal_rate; 
     209        unsigned int m_update_period; 
    210210}; 
    211211 
    212212/** 
    213  * \brief Interface to be implemented by TimestampedBuffer clients 
    214  */ 
    215 class TimestampedBufferClient { 
     213    * \brief Interface to be implemented by TimestampedBuffer clients 
     214    */ 
     215class TimestampedBufferClient 
     216
    216217    public: 
    217218        TimestampedBufferClient() {}; 
    218219        virtual ~TimestampedBufferClient() {}; 
    219220 
    220         virtual bool processReadBlock(char *data, unsigned int nevents, unsigned int offset)=0; 
    221         virtual bool processWriteBlock(char *data, unsigned int nevents, unsigned int offset)=0; 
     221        virtual bool processReadBlock ( char *data, unsigned int nevents, unsigned int offset ) =0; 
     222        virtual bool processWriteBlock ( char *data, unsigned int nevents, unsigned int offset ) =0; 
    222223 
    223224};