Changeset 719

Show
Ignore:
Timestamp:
11/22/07 06:43:39 (13 years ago)
Author:
ppalmers
Message:

backup commit

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • branches/ppalmers-streaming/src/ffado_streaming.cpp

    r715 r719  
    340340 
    341341int ffado_streaming_transfer_playback_buffers(ffado_device_t *dev) { 
    342     return dev->processorManager->transfer(StreamProcessor::ePT_Receive); 
     342    return dev->processorManager->transfer(StreamProcessor::ePT_Transmit); 
    343343} 
    344344 
  • branches/ppalmers-streaming/src/genericavc/avc_avdevice.cpp

    r715 r719  
    592592int 
    593593AvDevice::getStreamCount() { 
    594     return m_receiveProcessors.size() + m_transmitProcessors.size(); 
     594    //return m_receiveProcessors.size() + m_transmitProcessors.size(); 
     595    return 1; 
    595596} 
    596597 
  • branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.cpp

    r715 r719  
    4545    : StreamProcessor(ePT_Receive , port) 
    4646    , m_dimension(dimension) 
    47     , m_last_timestamp(0) 
    48     , m_last_timestamp2(0) 
    49     , m_dropped(0)  
    5047{} 
    5148 
    52 bool AmdtpReceiveStreamProcessor::init() { 
    53  
    54     // call the parent init 
    55     // this has to be done before allocating the buffers, 
    56     // because this sets the buffersizes from the processormanager 
    57     if(!StreamProcessor::init()) { 
    58         debugFatal("Could not do base class init (%d)\n",this); 
    59         return false; 
    60     } 
    61     return true; 
    62 
    63  
    64 enum raw1394_iso_disposition 
    65 AmdtpReceiveStreamProcessor::putPacket(unsigned char *data, unsigned int length, 
    66                   unsigned char channel, unsigned char tag, unsigned char sy, 
    67                   unsigned int cycle, unsigned int dropped) { 
    68  
    69     enum raw1394_iso_disposition retval=RAW1394_ISO_OK; 
    70  
    71     int dropped_cycles=diffCycles(cycle, m_last_cycle) - 1; 
    72     if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles); 
    73     else m_dropped += dropped_cycles; 
    74     if (dropped_cycles > 0) debugWarning("(%p) dropped %d packets on cycle %u\n", this, dropped_cycles, cycle); 
    75  
    76     m_last_cycle=cycle; 
    77  
    78     struct iec61883_packet *packet = (struct iec61883_packet *) data; 
    79     assert(packet); 
    80  
    81 #ifdef DEBUG 
    82     if(dropped>0) { 
    83         debugWarning("(%p) Dropped %d packets on cycle %d\n", this, dropped, cycle); 
    84     } 
    85  
    86     debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"ch%2u: CY=%4u, SYT=%08X (%4ucy + %04uticks) (running=%d)\n", 
    87         channel, cycle, ntohs(packet->syt), 
    88         CYCLE_TIMER_GET_CYCLES(ntohs(packet->syt)), CYCLE_TIMER_GET_OFFSET(ntohs(packet->syt)), 
    89         m_running); 
    90  
    91     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, 
    92         "RCV: CH = %d, FDF = %X. SYT = %6d, DBS = %3d, DBC = %3d, FMT = %3d, LEN = %4d\n", 
    93         channel, packet->fdf, 
    94         packet->syt, 
    95         packet->dbs, 
    96         packet->dbc, 
    97         packet->fmt, 
    98         length); 
    99  
    100 #endif 
    101  
    102     // check if this is a valid packet 
    103     if((packet->syt != 0xFFFF) 
    104        && (packet->fdf != 0xFF) 
    105        && (packet->fmt == 0x10) 
    106        && (packet->dbs>0) 
    107        && (length>=2*sizeof(quadlet_t))) { 
    108  
    109         unsigned int nevents=((length / sizeof (quadlet_t)) - 2)/packet->dbs; 
    110  
    111         //=> store the previous timestamp 
    112         m_last_timestamp2=m_last_timestamp; 
    113  
    114         uint64_t nowX = m_handler->getCycleTimer(); 
    115         //=> convert the SYT to a full timestamp in ticks 
    116         m_last_timestamp=sytRecvToFullTicks((uint32_t)ntohs(packet->syt), 
    117                                         cycle, nowX); 
    118  
    119         int64_t diffx = diffTicks(m_last_timestamp, m_last_timestamp2); 
    120         if (abs(diffx) > m_syt_interval * m_data_buffer->getRate() * 1.1) { 
    121             uint32_t now=m_handler->getCycleTimer(); 
    122             uint32_t syt = (uint32_t)ntohs(packet->syt); 
    123             uint32_t now_ticks=CYCLE_TIMER_TO_TICKS(now); 
    124              
    125             debugOutput(DEBUG_LEVEL_VERBOSE, "diff=%06lld TS=%011llu TS2=%011llu\n", 
    126                 diffx, m_last_timestamp, m_last_timestamp2); 
    127             debugOutput(DEBUG_LEVEL_VERBOSE, "[1] cy=%04d dropped=%05llu syt=%04llX NOW=%08llX => TS=%011llu\n", 
    128                 m_last_good_cycle, m_last_dropped, m_last_syt, m_last_now, m_last_timestamp2); 
    129             debugOutput(DEBUG_LEVEL_VERBOSE, "[2] cy=%04d dropped=%05d syt=%04X NOW=%08llX => TS=%011llu\n", 
    130                 cycle, dropped_cycles, ntohs(packet->syt), nowX, m_last_timestamp); 
    131  
    132             uint32_t test_ts=sytRecvToFullTicks(syt, cycle, now); 
    133  
    134             debugOutput(DEBUG_LEVEL_VERBOSE, "R %04d: SYT=%08X,            CY=%04d OFF=%04d\n", 
    135                 cycle, syt, CYCLE_TIMER_GET_CYCLES(syt), CYCLE_TIMER_GET_OFFSET(syt) 
    136                 ); 
    137             debugOutput(DEBUG_LEVEL_VERBOSE, "R %04d: NOW=%011lu, SEC=%03u CY=%04u OFF=%04u\n", 
    138                 cycle, now_ticks, CYCLE_TIMER_GET_SECS(now), CYCLE_TIMER_GET_CYCLES(now), CYCLE_TIMER_GET_OFFSET(now) 
    139                 ); 
    140             debugOutput(DEBUG_LEVEL_VERBOSE, "R %04d: TSS=%011lu, SEC=%03u CY=%04u OFF=%04u\n", 
    141                 cycle, test_ts, TICKS_TO_SECS(test_ts), TICKS_TO_CYCLES(test_ts), TICKS_TO_OFFSET(test_ts) 
    142                 ); 
    143                  
    144             int64_t diff_ts = diffTicks(now_ticks, test_ts); 
    145             debugOutput(DEBUG_LEVEL_VERBOSE, "DIFF  : TCK=%011lld, SEC=%03llu CY=%04llu OFF=%04llu\n", 
    146                 diff_ts,  
    147                 TICKS_TO_SECS((uint64_t)diff_ts), 
    148                 TICKS_TO_CYCLES((uint64_t)diff_ts), 
    149                 TICKS_TO_OFFSET((uint64_t)diff_ts) 
    150                 ); 
    151         } 
    152         m_last_syt = ntohs(packet->syt); 
    153         m_last_now = nowX; 
    154         m_last_good_cycle = cycle; 
    155         m_last_dropped = dropped_cycles; 
    156  
    157         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "RECV: CY=%04u TS=%011llu\n", 
    158                 cycle, m_last_timestamp); 
    159  
    160         // we have to keep in mind that there are also 
    161         // some packets buffered by the ISO layer, 
    162         // at most x=m_handler->getWakeupInterval() 
    163         // these contain at most x*syt_interval 
    164         // frames, meaning that we might receive 
    165         // this packet x*syt_interval*ticks_per_frame 
    166         // later than expected (the real receive time) 
    167         debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"STMP: %lluticks | buff=%d, syt_interval=%d, tpf=%f\n", 
    168             m_last_timestamp, m_handler->getWakeupInterval(),m_syt_interval,getTicksPerFrame()); 
    169  
    170         //=> signal that we're running (if we are) 
    171         if(!m_running && nevents && m_last_timestamp2 && m_last_timestamp) { 
    172             debugOutput(DEBUG_LEVEL_VERBOSE,"Receive StreamProcessor %p started running at %d\n", this, cycle); 
    173             m_running=true; 
    174             m_data_buffer->setBufferTailTimestamp(m_last_timestamp); 
    175             // we don't want this first sample to be written 
    176             return RAW1394_ISO_OK; 
    177         } 
    178  
    179         // if we are not running yet, there is nothing more to do 
    180         if(!m_running) { 
    181             return RAW1394_ISO_OK; 
    182         } 
    183         #ifdef DEBUG_OFF 
    184         if((cycle % 1000) == 0) { 
    185             uint32_t now=m_handler->getCycleTimer(); 
    186             uint32_t syt = (uint32_t)ntohs(packet->syt); 
    187             uint32_t now_ticks=CYCLE_TIMER_TO_TICKS(now); 
    188  
    189             uint32_t test_ts=sytRecvToFullTicks(syt, cycle, now); 
    190  
    191             debugOutput(DEBUG_LEVEL_VERBOSE, "R %04d: SYT=%08X,            CY=%02d OFF=%04d\n", 
    192                 cycle, syt, CYCLE_TIMER_GET_CYCLES(syt), CYCLE_TIMER_GET_OFFSET(syt) 
    193                 ); 
    194             debugOutput(DEBUG_LEVEL_VERBOSE, "R %04d: NOW=%011lu, SEC=%03u CY=%02u OFF=%04u\n", 
    195                 cycle, now_ticks, CYCLE_TIMER_GET_SECS(now), CYCLE_TIMER_GET_CYCLES(now), CYCLE_TIMER_GET_OFFSET(now) 
    196                 ); 
    197             debugOutput(DEBUG_LEVEL_VERBOSE, "R %04d: TSS=%011lu, SEC=%03u CY=%02u OFF=%04u\n", 
    198                 cycle, test_ts, TICKS_TO_SECS(test_ts), TICKS_TO_CYCLES(test_ts), TICKS_TO_OFFSET(test_ts) 
    199                 ); 
    200         } 
    201         #endif 
    202  
    203         #ifdef DEBUG 
    204             // keep track of the lag 
    205             uint32_t now=m_handler->getCycleTimer(); 
    206             int32_t diff = diffCycles( cycle,  ((int)CYCLE_TIMER_GET_CYCLES(now)) ); 
    207             m_PacketStat.mark(diff); 
    208         #endif 
    209  
    210         //=> process the packet 
    211         // add the data payload to the ringbuffer 
    212          
    213         if(dropped_cycles) { 
    214             debugWarning("(%p) Correcting timestamp for dropped cycles, discarding packet...\n", this); 
    215             m_data_buffer->setBufferTailTimestamp(m_last_timestamp); 
    216             // we don't want this first sample to be written 
    217             return RAW1394_ISO_OK; 
    218         } 
    219          
    220         if(m_data_buffer->writeFrames(nevents, (char *)(data+8), m_last_timestamp)) { 
    221             retval=RAW1394_ISO_OK; 
    222  
    223             // process all ports that should be handled on a per-packet base 
    224             // this is MIDI for AMDTP (due to the need of DBC) 
    225             if (!decodePacketPorts((quadlet_t *)(data+8), nevents, packet->dbc)) { 
    226                 debugWarning("Problem decoding Packet Ports\n"); 
    227                 retval=RAW1394_ISO_DEFER; 
    228             } 
    229  
    230         } else { 
    231  
    232 //             debugWarning("Receive buffer overrun (cycle %d, FC=%d, PC=%d)\n", 
    233 //                  cycle, m_data_buffer->getFrameCounter(), m_handler->getPacketCount()); 
    234  
    235             m_xruns++; 
    236  
    237             retval=RAW1394_ISO_DEFER; 
    238         } 
    239     } 
    240  
    241     return retval; 
    242 
    243  
    244 void AmdtpReceiveStreamProcessor::dumpInfo() { 
    245     StreamProcessor::dumpInfo(); 
    246 
    247  
    248 bool AmdtpReceiveStreamProcessor::reset() { 
    249  
    250     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting...\n"); 
    251  
    252     m_PeriodStat.reset(); 
    253     m_PacketStat.reset(); 
    254     m_WakeupStat.reset(); 
    255  
    256     m_data_buffer->setTickOffset(0); 
    257  
    258     // reset all non-device specific stuff 
    259     // i.e. the iso stream and the associated ports 
    260     if(!StreamProcessor::reset()) { 
    261             debugFatal("Could not do base class reset\n"); 
    262             return false; 
    263     } 
    264     return true; 
    265 
    266  
    267 bool AmdtpReceiveStreamProcessor::prepare() { 
     49 
     50unsigned int 
     51AmdtpReceiveStreamProcessor::getPacketsPerPeriod() 
     52
     53    return (m_manager->getPeriodSize())/m_syt_interval; 
     54
     55 
     56bool AmdtpReceiveStreamProcessor::prepareChild() { 
    26857 
    26958    m_PeriodStat.setName("RCV PERIOD"); 
     
    27362    debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing (%p)...\n", this); 
    27463 
    275     // prepare all non-device specific stuff 
    276     // i.e. the iso stream and the associated ports 
    277     if(!StreamProcessor::prepare()) { 
    278         debugFatal("Could not prepare base class\n"); 
    279         return false; 
    280     } 
    281  
    28264    switch (m_manager->getNominalRate()) { 
    283     case 32000: 
    284         m_syt_interval = 8; 
    285         break; 
    286     case 44100: 
    287         m_syt_interval = 8; 
    288         break; 
    289     default: 
    290     case 48000: 
    291         m_syt_interval = 8; 
    292         break; 
    293     case 88200: 
    294         m_syt_interval = 16; 
    295         break; 
    296     case 96000: 
    297         m_syt_interval = 16; 
    298         break; 
    299     case 176400: 
    300         m_syt_interval = 32; 
    301         break; 
    302     case 192000: 
    303         m_syt_interval = 32; 
    304         break; 
    305     } 
    306  
    307     // prepare the framerate estimate 
    308     float ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_manager->getNominalRate()); 
    309     m_ticks_per_frame=ticks_per_frame; 
    310  
    311     debugOutput(DEBUG_LEVEL_VERBOSE,"Initializing remote ticks/frame to %f\n",ticks_per_frame); 
    312  
    313     // initialize internal buffer 
    314     unsigned int ringbuffer_size_frames=m_manager->getNbBuffers() * m_manager->getPeriodSize(); 
    315  
    316     assert(m_data_buffer); 
    317     m_data_buffer->setBufferSize(ringbuffer_size_frames * 2); 
    318     m_data_buffer->setEventSize(sizeof(quadlet_t)); 
    319     m_data_buffer->setEventsPerFrame(m_dimension); 
    320  
    321     // the buffer is written every syt_interval 
    322     m_data_buffer->setUpdatePeriod(m_syt_interval); 
    323     m_data_buffer->setNominalRate(ticks_per_frame); 
    324  
    325     m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND); 
    326  
    327     m_data_buffer->prepare(); 
    328  
    329     // set the parameters of ports we can: 
    330     // we want the audio ports to be period buffered, 
    331     // and the midi ports to be packet buffered 
    332     for ( PortVectorIterator it = m_Ports.begin(); 
    333           it != m_Ports.end(); 
    334           ++it ) 
    335     { 
    336         debugOutput(DEBUG_LEVEL_VERBOSE, "Setting up port %s\n",(*it)->getName().c_str()); 
    337         if(!(*it)->setBufferSize(m_manager->getPeriodSize())) { 
    338             debugFatal("Could not set buffer size to %d\n",m_manager->getPeriodSize()); 
     65        case 32000: 
     66        case 44100: 
     67        case 48000: 
     68            m_syt_interval = 8; 
     69            break; 
     70        case 88200: 
     71        case 96000: 
     72            m_syt_interval = 16; 
     73            break; 
     74        case 176400: 
     75        case 192000: 
     76            m_syt_interval = 32; 
     77            break; 
     78        default: 
     79            debugError("Unsupported rate: %d\n", m_manager->getNominalRate()); 
    33980            return false; 
    340         } 
    341  
    342         switch ((*it)->getPortType()) { 
    343             case Port::E_Audio: 
    344                 if(!(*it)->setSignalType(Port::E_PeriodSignalled)) { 
    345                     debugFatal("Could not set signal type to PeriodSignalling"); 
    346                     return false; 
    347                 } 
    348                 // buffertype and datatype are dependant on the API 
    349                 debugWarning("---------------- ! Doing hardcoded dummy setup ! --------------\n"); 
    350                 // buffertype and datatype are dependant on the API 
    351                 if(!(*it)->setBufferType(Port::E_PointerBuffer)) { 
    352                     debugFatal("Could not set buffer type"); 
    353                     return false; 
    354                 } 
    355                 if(!(*it)->useExternalBuffer(true)) { 
    356                     debugFatal("Could not set external buffer usage"); 
    357                     return false; 
    358                 } 
    359                 if(!(*it)->setDataType(Port::E_Float)) { 
    360                     debugFatal("Could not set data type"); 
    361                     return false; 
    362                 } 
    363                 break; 
    364             case Port::E_Midi: 
    365                 if(!(*it)->setSignalType(Port::E_PacketSignalled)) { 
    366                     debugFatal("Could not set signal type to PacketSignalling"); 
    367                     return false; 
    368                 } 
    369                 // buffertype and datatype are dependant on the API 
    370                 // buffertype and datatype are dependant on the API 
    371                 debugWarning("---------------- ! Doing hardcoded test setup ! --------------\n"); 
    372                 // buffertype and datatype are dependant on the API 
    373                 if(!(*it)->setBufferType(Port::E_RingBuffer)) { 
    374                     debugFatal("Could not set buffer type"); 
    375                     return false; 
    376                 } 
    377                 if(!(*it)->setDataType(Port::E_MidiEvent)) { 
    378                     debugFatal("Could not set data type"); 
    379                     return false; 
    380                 } 
    381                 break; 
    382             default: 
    383                 debugWarning("Unsupported port type specified\n"); 
    384                 break; 
    385         } 
    386     } 
    387  
    388     // the API specific settings of the ports should already be set, 
    389     // as this is called from the processorManager->prepare() 
    390     // so we can init the ports 
    391     if(!initPorts()) { 
    392         debugFatal("Could not initialize ports!\n"); 
    393         return false; 
    394     } 
    395  
    396     if(!preparePorts()) { 
    397         debugFatal("Could not initialize ports!\n"); 
    398         return false; 
    39981    } 
    40082 
    40183    debugOutput( DEBUG_LEVEL_VERBOSE, "Prepared for:\n"); 
    40284    debugOutput( DEBUG_LEVEL_VERBOSE, " Samplerate: %d, DBS: %d, SYT: %d\n", 
    403              m_manager->getNominalRate(),m_dimension,m_syt_interval); 
     85             m_manager->getNominalRate(), m_dimension, m_syt_interval); 
    40486    debugOutput( DEBUG_LEVEL_VERBOSE, " PeriodSize: %d, NbBuffers: %d\n", 
    40587             m_manager->getPeriodSize(), m_manager->getNbBuffers()); 
     
    40890 
    40991    return true; 
    410  
    411 
    412  
    413 bool AmdtpReceiveStreamProcessor::prepareForStart() { 
    414     disable(); 
    415     return true; 
    416 
    417  
    418 bool AmdtpReceiveStreamProcessor::prepareForStop() { 
    419     disable(); 
    420     return true; 
    421 
    422  
    423 unsigned int 
    424 AmdtpReceiveStreamProcessor::getPacketsPerPeriod()  
    425 
    426     return (m_manager->getPeriodSize())/m_syt_interval; 
    427 
    428  
    429 bool AmdtpReceiveStreamProcessor::getFrames(unsigned int nbframes, int64_t ts) { 
    430     m_PeriodStat.mark(m_data_buffer->getBufferFill()); 
    431  
    432 #ifdef DEBUG 
    433     uint64_t ts_head; 
    434     signed int fc; 
    435     int32_t lag_ticks; 
    436     float lag_frames; 
    437  
    438     // in order to sync up multiple received streams, we should  
    439     // use the ts parameter. It specifies the time of the block's  
    440     // first sample. 
    441      
    442     ffado_timestamp_t ts_head_tmp; 
    443     m_data_buffer->getBufferHeadTimestamp(&ts_head_tmp, &fc); 
    444     ts_head=(uint64_t)ts_head_tmp; 
    445     lag_ticks=diffTicks(ts, ts_head); 
    446     float rate=m_data_buffer->getRate(); 
    447      
    448     assert(rate!=0.0); 
    449      
    450     lag_frames=(((float)lag_ticks)/rate); 
    451      
    452     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): drifts %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n", 
    453                  this, lag_ticks, lag_frames,rate, ts, ts_head, fc); 
    454  
    455     if (lag_frames>=1.0) { 
    456         // the stream lags 
    457         debugWarning( "stream (%p): lags  with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n", 
    458                       this, lag_ticks, lag_frames,rate, ts, ts_head, fc); 
    459     } else if (lag_frames<=-1.0) { 
    460         // the stream leads 
    461         debugWarning( "stream (%p): leads with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n", 
    462                       this, lag_ticks, lag_frames,rate, ts, ts_head, fc); 
    463     } 
    464 #endif 
    465     // ask the buffer to process nbframes of frames 
    466     // using it's registered client's processReadBlock(), 
    467     // which should be ours 
    468     m_data_buffer->blockProcessReadFrames(nbframes); 
    469  
    470     return true; 
    471 
    472  
    473 bool AmdtpReceiveStreamProcessor::getFramesDry(unsigned int nbframes, int64_t ts) { 
    474     m_PeriodStat.mark(m_data_buffer->getBufferFill()); 
    475     int frames_to_ditch=(int)(nbframes); 
    476     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): dry run %d frames (@ ts=%lld)\n", 
    477                  this, frames_to_ditch, ts); 
    478     char dummy[m_data_buffer->getBytesPerFrame()]; // one frame of garbage 
    479  
    480     while (frames_to_ditch--) { 
    481         m_data_buffer->readFrames(1, dummy); 
    482     } 
    483     return true; 
    484 
    485  
    486 /** 
    487  * \brief write received events to the stream ringbuffers. 
     92
     93 
     94 
     95/** 
     96 * Processes packet header to extract timestamps and so on 
     97 * @param data  
     98 * @param length  
     99 * @param channel  
     100 * @param tag  
     101 * @param sy  
     102 * @param cycle  
     103 * @param dropped  
     104 * @return true if this is a valid packet, false if not 
     105 */ 
     106bool 
     107AmdtpReceiveStreamProcessor::processPacketHeader(unsigned char *data, unsigned int length, 
     108                  unsigned char channel, unsigned char tag, unsigned char sy, 
     109                  unsigned int cycle, unsigned int dropped) 
     110
     111    struct iec61883_packet *packet = (struct iec61883_packet *) data; 
     112    assert(packet); 
     113    bool retval = (packet->syt != 0xFFFF) && 
     114                  (packet->fdf != 0xFF) && 
     115                  (packet->fmt == 0x10) && 
     116                  (packet->dbs > 0) && 
     117                  (length >= 2*sizeof(quadlet_t)); 
     118    if(retval) { 
     119        uint64_t now = m_handler->getCycleTimer(); 
     120        //=> convert the SYT to a full timestamp in ticks 
     121        m_last_timestamp = sytRecvToFullTicks((uint32_t)ntohs(packet->syt), 
     122                                              cycle, now); 
     123    } 
     124    return retval; 
     125
     126 
     127/** 
     128 * extract the data from the packet 
     129 * @pre the IEC61883 packet is valid according to isValidPacket 
     130 * @param data  
     131 * @param length  
     132 * @param channel  
     133 * @param tag  
     134 * @param sy  
     135 * @param cycle  
     136 * @param dropped  
     137 * @return true if successful, false if xrun 
     138 */ 
     139bool 
     140AmdtpReceiveStreamProcessor::processPacketData(unsigned char *data, unsigned int length, 
     141                  unsigned char channel, unsigned char tag, unsigned char sy, 
     142                  unsigned int cycle, unsigned int dropped_cycles) { 
     143    struct iec61883_packet *packet = (struct iec61883_packet *) data; 
     144    assert(packet); 
     145 
     146    unsigned int nevents=((length / sizeof (quadlet_t)) - 2)/packet->dbs; 
     147 
     148    // we have to keep in mind that there are also 
     149    // some packets buffered by the ISO layer, 
     150    // at most x=m_handler->getWakeupInterval() 
     151    // these contain at most x*syt_interval 
     152    // frames, meaning that we might receive 
     153    // this packet x*syt_interval*ticks_per_frame 
     154    // later than expected (the real receive time) 
     155    debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"STMP: %lluticks | buff=%d, syt_interval=%d, tpf=%f\n", 
     156        m_last_timestamp, m_handler->getWakeupInterval(), m_syt_interval, getTicksPerFrame()); 
     157 
     158    if(m_data_buffer->writeFrames(nevents, (char *)(data+8), m_last_timestamp)) { 
     159        // process all ports that should be handled on a per-packet base 
     160        // this is MIDI for AMDTP (due to the need of DBC) 
     161        if (!decodePacketPorts((quadlet_t *)(data+8), nevents, packet->dbc)) { 
     162            debugWarning("Problem decoding Packet Ports\n"); 
     163        } 
     164        return true; 
     165    } else { 
     166        return false; 
     167    } 
     168
     169 
     170/*********************************************** 
     171 * Encoding/Decoding API                       * 
     172 ***********************************************/ 
     173/** 
     174 * @brief write received events to the stream ringbuffers. 
    488175 */ 
    489176bool AmdtpReceiveStreamProcessor::processReadBlock(char *data, 
     
    498185          ++it ) 
    499186    { 
    500  
    501187        if((*it)->isDisabled()) {continue;}; 
    502188 
     
    523209    } 
    524210    return no_problem; 
    525  
     211
     212 
     213/** 
     214 * @brief write silence events to the stream ringbuffers. 
     215 */ 
     216bool AmdtpReceiveStreamProcessor::provideSilenceBlock(unsigned int nevents, unsigned int offset) 
     217
     218    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "(%p)->proviceSilenceBlock(%u, %u)\n", this, nevents, offset); 
     219 
     220    bool no_problem=true; 
     221 
     222    for ( PortVectorIterator it = m_PeriodPorts.begin(); 
     223          it != m_PeriodPorts.end(); 
     224          ++it ) 
     225    { 
     226        if((*it)->isDisabled()) {continue;}; 
     227        //FIXME: make this into a static_cast when not DEBUG? 
     228        AmdtpPortInfo *pinfo=dynamic_cast<AmdtpPortInfo *>(*it); 
     229        assert(pinfo); // this should not fail!! 
     230 
     231        switch(pinfo->getFormat()) { 
     232        case AmdtpPortInfo::E_MBLA: 
     233            if(provideSilenceToPort(static_cast<AmdtpAudioPort *>(*it), offset, nevents)) { 
     234                debugWarning("Could not put silence into to port %s",(*it)->getName().c_str()); 
     235                no_problem=false; 
     236            } 
     237            break; 
     238        case AmdtpPortInfo::E_SPDIF: // still unimplemented 
     239            break; 
     240    /* for this processor, midi is a packet based port 
     241        case AmdtpPortInfo::E_Midi: 
     242            break;*/ 
     243        default: // ignore 
     244            break; 
     245        } 
     246    } 
     247    return no_problem; 
    526248} 
    527249 
     
    583305} 
    584306 
    585 int AmdtpReceiveStreamProcessor::decodeMBLAEventsToPort(AmdtpAudioPort *p, quadlet_t *data, 
     307int 
     308AmdtpReceiveStreamProcessor::decodeMBLAEventsToPort( 
     309                       AmdtpAudioPort *p, quadlet_t *data, 
    586310                       unsigned int offset, unsigned int nevents) 
    587311{ 
    588312    unsigned int j=0; 
    589  
    590 //     printf("****************\n"); 
    591 //     hexDumpQuadlets(data,m_dimension*4); 
    592 //     printf("****************\n"); 
    593  
    594313    quadlet_t *target_event; 
    595314 
     
    640359} 
    641360 
     361int 
     362AmdtpReceiveStreamProcessor::provideSilenceToPort( 
     363                       AmdtpAudioPort *p, unsigned int offset, unsigned int nevents) 
     364{ 
     365    unsigned int j=0; 
     366    switch(p->getDataType()) { 
     367        default: 
     368        case Port::E_Int24: 
     369            { 
     370                quadlet_t *buffer=(quadlet_t *)(p->getBufferAddress()); 
     371                assert(nevents + offset <= p->getBufferSize()); 
     372                buffer+=offset; 
     373 
     374                for(j = 0; j < nevents; j += 1) { // decode max nsamples 
     375                    *(buffer)=0; 
     376                    buffer++; 
     377                } 
     378            } 
     379            break; 
     380        case Port::E_Float: 
     381            { 
     382                float *buffer=(float *)(p->getBufferAddress()); 
     383                assert(nevents + offset <= p->getBufferSize()); 
     384                buffer+=offset; 
     385 
     386                for(j = 0; j < nevents; j += 1) { // decode max nsamples 
     387                    *buffer = 0.0; 
     388                    buffer++; 
     389                } 
     390            } 
     391            break; 
     392    } 
     393    return 0; 
     394} 
     395 
    642396} // end of namespace Streaming 
  • branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpReceiveStreamProcessor.h

    r715 r719  
    7272     * Create a AMDTP receive StreamProcessor 
    7373     * @param port 1394 port 
    74      * @param framerate frame rate 
    7574     * @param dimension number of substreams in the ISO stream 
    7675     *                  (midi-muxed is only one stream) 
     
    7978    virtual ~AmdtpReceiveStreamProcessor() {}; 
    8079 
    81     enum raw1394_iso_disposition putPacket(unsigned char *data, unsigned int length, 
     80    bool processPacketHeader(unsigned char *data, unsigned int length, 
     81                  unsigned char channel, unsigned char tag, unsigned char sy, 
     82                  unsigned int cycle, unsigned int dropped); 
     83    bool processPacketData(unsigned char *data, unsigned int length, 
    8284                  unsigned char channel, unsigned char tag, unsigned char sy, 
    8385                  unsigned int cycle, unsigned int dropped); 
    8486 
     87    virtual bool prepareChild(); 
    8588 
    86     bool init(); 
    87     bool reset(); 
    88     bool prepare()
    89  
    90     bool prepareForStop()
    91     bool prepareForStart(); 
    92  
    93     bool getFrames(unsigned int nbframes, int64_t ts); ///< transfer the buffer contents to the client 
    94     bool getFramesDry(unsigned int nbframes, int64_t ts)
     89public: 
     90    virtual unsigned int getEventSize()  
     91                    {return 4;}
     92    virtual unsigned int getMaxPacketSize()  
     93                    {return 4 * (2 + m_syt_interval * m_dimension);}
     94    virtual unsigned int getEventsPerFrame()  
     95                    { return m_dimension; }; 
     96    virtual unsigned int getUpdatePeriod()  
     97                    {return m_syt_interval;}
    9598 
    9699    // We have 1 period of samples = m_period 
     
    101104    // however, if we only count the number of used packets 
    102105    // it is m_period / m_syt_interval 
    103     unsigned int getPacketsPerPeriod(); 
     106    virtual unsigned int getPacketsPerPeriod(); 
    104107 
    105     unsigned int getMaxPacketSize() {return 4 * (2 + m_syt_interval * m_dimension);}; 
    106  
    107     void dumpInfo(); 
    108108protected: 
    109109 
    110110    bool processReadBlock(char *data, unsigned int nevents, unsigned int offset); 
     111    bool provideSilenceBlock(unsigned int nevents, unsigned int offset); 
    111112 
    112113    bool decodePacketPorts(quadlet_t *data, unsigned int nevents, unsigned int dbc); 
    113114 
    114115    int decodeMBLAEventsToPort(AmdtpAudioPort *, quadlet_t *data, unsigned int offset, unsigned int nevents); 
    115     void updatePreparedState(); 
     116    int provideSilenceToPort(AmdtpAudioPort *p, unsigned int offset, unsigned int nevents); 
    116117 
    117118    int m_dimension; 
    118119    unsigned int m_syt_interval; 
    119120 
    120     uint64_t m_dropped; /// FIXME:debug 
    121     uint64_t m_last_dropped; /// FIXME:debug 
    122121    uint64_t m_last_syt; /// FIXME:debug 
    123122    uint64_t m_last_now; /// FIXME:debug 
    124     int m_last_good_cycle; /// FIXME:debug 
    125     uint64_t m_last_timestamp; /// last timestamp (in ticks) 
    126     uint64_t m_last_timestamp2; /// last timestamp (in ticks) 
    127     uint64_t m_last_timestamp_at_period_ticks; 
    128123}; 
    129124 
  • branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.cpp

    r715 r719  
    4949{} 
    5050 
    51 /** 
    52  * @return 
    53  */ 
    54 bool AmdtpTransmitStreamProcessor::init() { 
    55  
    56     debugOutput( DEBUG_LEVEL_VERBOSE, "Initializing (%p)...\n", this); 
    57     // call the parent init 
    58     // this has to be done before allocating the buffers, 
    59     // because this sets the buffersizes from the processormanager 
    60     if(!StreamProcessor::init()) { 
    61         debugFatal("Could not do base class init (%p)\n",this); 
    62         return false; 
    63     } 
    64     return true; 
    65 } 
    66  
    6751enum raw1394_iso_disposition 
    6852AmdtpTransmitStreamProcessor::getPacket(unsigned char *data, unsigned int *length, 
     
    7357    if (cycle<0) { 
    7458        debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE,"Xmit handler for cycle %d, (running=%d)\n", 
    75             cycle, m_running); 
     59            cycle, isRunning()); 
    7660        *tag = 0; 
    7761        *sy = 0; 
     
    8165 
    8266    debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE,"Xmit handler for cycle %d, (running=%d)\n", 
    83         cycle, m_running); 
     67        cycle, isRunning()); 
    8468 
    8569    if (addCycles(m_last_cycle, 1) != cycle) { 
     
    9983    /* Our node ID can change after a bus reset, so it is best to fetch 
    10084     * our node ID for each packet. */ 
    101     packet->sid = getNodeId() & 0x3f; 
     85    packet->sid = m_handler->getLocalNodeId() & 0x3f; 
    10286 
    10387    packet->dbs = m_dimension; 
     
    126110 
    127111#ifdef DEBUG 
    128     if(m_running && (cycle_diff < 0)) { 
     112    if(isRunning() && (cycle_diff < 0)) { 
    129113        debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy)\n", 
    130114            cycle, now_cycles); 
     
    139123    // to be 'running' 
    140124    // NOTE: this works only at startup 
    141     if (!m_running && cycle_diff >= 0 && cycle >= 0) { 
     125    if (!isRunning() && cycle_diff >= 0 && cycle >= 0) { 
    142126            debugOutput(DEBUG_LEVEL_VERBOSE, "Xmit StreamProcessor %p started running at cycle %d\n",this, cycle); 
    143             m_running=true; 
    144127    } 
    145128 
     
    169152    const int max_cycles_to_transmit_early = 5; 
    170153 
    171     if( !m_running || !m_data_buffer->isEnabled() ) { 
     154    if( !isRunning() || !m_data_buffer->isEnabled() ) { 
    172155        debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, 
    173156                    "Not running (%d) or buffer not enabled (enabled=%d)\n", 
    174                     m_running, m_data_buffer->isEnabled()); 
     157                    isRunning(), m_data_buffer->isEnabled()); 
    175158 
    176159        // not running or not enabled 
     
    357340} 
    358341 
     342unsigned int 
     343AmdtpTransmitStreamProcessor::getEventsPerFrame() 
     344{ 
     345    return m_dimension; 
     346} 
     347 
     348unsigned int 
     349AmdtpTransmitStreamProcessor::getUpdatePeriod() 
     350{ 
     351    return m_syt_interval; 
     352} 
     353 
     354 
    359355unsigned int AmdtpTransmitStreamProcessor::fillDataPacketHeader( 
    360356        struct iec61883_packet *packet, unsigned int* length, 
     
    417413    m_data_buffer->setTickOffset(0); 
    418414 
    419     // reset all non-device specific stuff 
    420     // i.e. the iso stream and the associated ports 
    421     if(!StreamProcessor::reset()) { 
    422         debugFatal("Could not do base class reset\n"); 
    423         return false; 
    424     } 
     415//     // reset all non-device specific stuff 
     416//     // i.e. the iso stream and the associated ports 
     417//     if(!StreamProcessor::reset()) { 
     418//         debugFatal("Could not do base class reset\n"); 
     419//         return false; 
     420//     } 
    425421 
    426422    // we should prefill the event buffer 
     
    433429} 
    434430 
    435 bool AmdtpTransmitStreamProcessor::prepare() { 
     431bool AmdtpTransmitStreamProcessor::prepareChild() { 
    436432    m_PeriodStat.setName("XMT PERIOD"); 
    437433    m_PacketStat.setName("XMT PACKET"); 
     
    620616bool AmdtpTransmitStreamProcessor::prepareForEnable(uint64_t time_to_enable_at) { 
    621617 
    622     if (!StreamProcessor::prepareForEnable(time_to_enable_at)) { 
    623         debugError("StreamProcessor::prepareForEnable failed\n"); 
    624         return false; 
    625     } 
     618//     if (!StreamProcessor::prepareForEnable(time_to_enable_at)) { 
     619//         debugError("StreamProcessor::prepareForEnable failed\n"); 
     620//         return false; 
     621//     } 
    626622 
    627623    return true; 
  • branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.h

    r715 r719  
    8585                    int cycle, unsigned int dropped, unsigned int max_length); 
    8686 
    87     bool init(); 
     87    virtual unsigned int getEventsPerFrame(); 
     88    virtual unsigned int getEventSize() {return 4;}; 
     89    virtual unsigned int getUpdatePeriod(); 
     90 
    8891    bool reset(); 
    89     bool prepare(); 
     92    bool prepareChild(); 
    9093 
    9194    bool prepareForStop(); 
  • branches/ppalmers-streaming/src/libstreaming/generic/IsoStream.cpp

    r714 r719  
    5555    *sy = 0; 
    5656 
    57  
    5857    return RAW1394_ISO_OK; 
    59 } 
    60  
    61 int IsoStream::getNodeId() { 
    62     if (m_handler) { 
    63         return m_handler->getLocalNodeId(); 
    64     } 
    65     return -1; 
    66 } 
    67  
    68  
    69 void IsoStream::dumpInfo() 
    70 { 
    71  
    72     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Address        : %p\n",this); 
    73     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Stream type    : %s\n", 
    74             (this->getStreamType()==eST_Receive ? "Receive" : "Transmit")); 
    75     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Port, Channel  : %d, %d\n", 
    76             m_port, m_channel); 
    77  
    7858} 
    7959 
    8060bool IsoStream::setChannel(int c) { 
    8161    debugOutput( DEBUG_LEVEL_VERBOSE, "setting channel for (%p) to %d\n",this, c); 
    82  
    8362    m_channel=c; 
    8463    return true; 
    85 } 
    86  
    87  
    88 bool IsoStream::reset() { 
    89     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); 
    90     return true; 
    91 } 
    92  
    93 bool IsoStream::prepare() { 
    94     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); 
    95     return true; 
    96 } 
    97  
    98 bool IsoStream::init() { 
    99     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); 
    100     return true; 
    101  
    10264} 
    10365 
     
    10971void IsoStream::clearHandler() { 
    11072    debugOutput( DEBUG_LEVEL_VERBOSE, "clearing handler of isostream %p\n", this); 
     73    m_handler=0; 
     74} 
    11175 
    112     m_handler=0; 
     76void IsoStream::dumpInfo() 
     77
     78    debugOutputShort( DEBUG_LEVEL_NORMAL, "  Address        : %p\n",this); 
     79    debugOutputShort( DEBUG_LEVEL_NORMAL, "  Stream type    : %s\n", 
     80            (this->getStreamType()==eST_Receive ? "Receive" : "Transmit")); 
     81    debugOutputShort( DEBUG_LEVEL_NORMAL, "  Port, Channel  : %d, %d\n", 
     82            m_port, m_channel); 
     83
    11384 
    11485} 
    115  
    116  
    117 } 
  • branches/ppalmers-streaming/src/libstreaming/generic/IsoStream.h

    r714 r719  
    7474        virtual unsigned int getMaxPacketSize() {return 1024;}; //FIXME: arbitrary 
    7575 
    76         virtual bool init(); 
    77  
    7876        virtual enum raw1394_iso_disposition 
    7977                putPacket(unsigned char *data, unsigned int length, 
     
    8684 
    8785        void dumpInfo(); 
    88  
    89         int getNodeId(); 
    90  
    91         virtual bool reset(); 
    92         virtual bool prepare(); 
    9386 
    9487    protected: 
  • branches/ppalmers-streaming/src/libstreaming/generic/Port.cpp

    r705 r719  
    111111}; 
    112112 
     113bool Port::setName(std::string name) { 
     114    debugOutput( DEBUG_LEVEL_VERBOSE, "Setting name to %s for port %s\n",name.c_str(),m_Name.c_str()); 
     115 
     116    if (m_State != E_Created) { 
     117        debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); 
     118        return false; 
     119    } 
     120    m_Name=name; 
     121    return true; 
     122} 
     123 
     124bool Port::setBufferSize(unsigned int newsize) { 
     125    debugOutput( DEBUG_LEVEL_VERBOSE, "Setting buffersize to %d for port %s\n",newsize,m_Name.c_str()); 
     126    if (m_State != E_Created) { 
     127        debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); 
     128        return false; 
     129    } 
     130    m_buffersize=newsize; 
     131    return true; 
     132} 
     133 
     134unsigned int Port::getEventSize() { 
     135    switch (m_DataType) { 
     136        case E_Float: 
     137            return sizeof(float); 
     138        case E_Int24: // 24 bit 2's complement, packed in a 32bit integer (LSB's) 
     139            return sizeof(uint32_t); 
     140        case E_MidiEvent: 
     141            return sizeof(uint32_t); 
     142        default: 
     143            return 0; 
     144    } 
     145} 
     146 
     147bool Port::setDataType(enum E_DataType d) { 
     148    debugOutput( DEBUG_LEVEL_VERBOSE, "Setting datatype to %d for port %s\n",(int) d,m_Name.c_str()); 
     149    if (m_State != E_Created) { 
     150        debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); 
     151        return false; 
     152    } 
     153 
     154    // do some sanity checks 
     155    bool type_is_ok=false; 
     156    switch (m_PortType) { 
     157        case E_Audio: 
     158            if(d == E_Int24) type_is_ok=true; 
     159            if(d == E_Float) type_is_ok=true; 
     160            break; 
     161        case E_Midi: 
     162            if(d == E_MidiEvent) type_is_ok=true; 
     163            break; 
     164        case E_Control: 
     165            if(d == E_Default) type_is_ok=true; 
     166            break; 
     167        default: 
     168            break; 
     169    } 
     170 
     171    if(!type_is_ok) { 
     172        debugFatal("Datatype not supported by this type of port!\n"); 
     173        return false; 
     174    } 
     175 
     176    m_DataType=d; 
     177    return true; 
     178} 
     179 
     180bool Port::setSignalType(enum E_SignalType s) { 
     181    debugOutput( DEBUG_LEVEL_VERBOSE, "Setting signaltype to %d for port %s\n",(int)s,m_Name.c_str()); 
     182    if (m_State != E_Created) { 
     183        debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); 
     184        return false; 
     185    } 
     186 
     187    // do some sanity checks 
     188    bool type_is_ok=false; 
     189    switch (m_PortType) { 
     190        case E_Audio: 
     191            if(s == E_PeriodSignalled) type_is_ok=true; 
     192            break; 
     193        case E_Midi: 
     194            if(s == E_PacketSignalled) type_is_ok=true; 
     195            break; 
     196        case E_Control: 
     197            if(s == E_PeriodSignalled) type_is_ok=true; 
     198            break; 
     199        default: 
     200            break; 
     201    } 
     202    if(!type_is_ok) { 
     203        debugFatal("Signalling type not supported by this type of port!\n"); 
     204        return false; 
     205    } 
     206    m_SignalType=s; 
     207    return true; 
     208} 
     209 
     210bool Port::setBufferType(enum E_BufferType b) { 
     211    debugOutput( DEBUG_LEVEL_VERBOSE, "Setting buffer type to %d for port %s\n",(int)b,m_Name.c_str()); 
     212    if (m_State != E_Created) { 
     213        debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); 
     214        return false; 
     215    } 
     216    // do some sanity checks 
     217    bool type_is_ok=false; 
     218    switch (m_PortType) { 
     219        case E_Audio: 
     220            if(b == E_PointerBuffer) type_is_ok=true; 
     221            break; 
     222        case E_Midi: 
     223            if(b == E_RingBuffer) type_is_ok=true; 
     224            break; 
     225        case E_Control: 
     226            break; 
     227        default: 
     228            break; 
     229    } 
     230    if(!type_is_ok) { 
     231        debugFatal("Buffer type not supported by this type of port!\n"); 
     232        return false; 
     233    } 
     234    m_BufferType=b; 
     235    return true; 
     236} 
     237 
     238bool Port::useExternalBuffer(bool b) { 
     239    // If called on an initialised stream but the request isn't for a change silently 
     240    // allow it (relied on by C API as used by jack backend driver) 
     241    if (m_State==E_Initialized && m_use_external_buffer==b) 
     242        return true; 
     243 
     244    debugOutput( DEBUG_LEVEL_VERBOSE, "Setting external buffer use to %d for port %s\n",(int)b,m_Name.c_str()); 
     245 
     246    if (m_State != E_Created) { 
     247        debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); 
     248        return false; 
     249    } 
     250    m_use_external_buffer=b; 
     251    return true; 
     252} 
     253 
     254// buffer handling api's for pointer buffers 
     255/** 
     256 * Get the buffer address (being the external or the internal one). 
     257 * 
     258 * @param buff 
     259 */ 
     260void *Port::getBufferAddress() { 
     261    assert(m_BufferType==E_PointerBuffer); 
     262    return m_buffer; 
     263}; 
     264 
     265/** 
     266 * Set the external buffer address. 
     267 * only call this when you have specified that you will use 
     268 * an external buffer before doing the init() 
     269 * 
     270 * @param buff 
     271 */ 
     272void Port::setExternalBufferAddress(void *buff) { 
     273    assert(m_BufferType==E_PointerBuffer); 
     274    assert(m_use_external_buffer); // don't call this with an internal buffer! 
     275    m_buffer=buff; 
     276}; 
     277 
     278// buffer handling api's for ringbuffers 
     279bool Port::writeEvent(void *event) { 
     280 
     281#ifdef DEBUG 
     282    if (m_State != E_Initialized) { 
     283        debugFatal("Port (%s) not in E_Initialized state: %d\n",m_Name.c_str(),m_State); 
     284        return false; 
     285    } 
     286     
     287    if(m_BufferType!=E_RingBuffer) { 
     288        debugError("operation not allowed on non E_RingBuffer ports\n"); 
     289        show(); 
     290        return false; 
     291    } 
     292    assert(m_ringbuffer); 
     293#endif 
     294 
     295    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Writing event %08X with size %d to port %s\n",*((quadlet_t *)event),m_eventsize, m_Name.c_str()); 
     296 
     297    return (ffado_ringbuffer_write(m_ringbuffer, (char *)event, m_eventsize)==m_eventsize); 
     298} 
     299 
     300bool Port::readEvent(void *event) { 
     301 
     302#ifdef DEBUG 
     303    if (m_State != E_Initialized) { 
     304        debugFatal("Port (%s) not in E_Initialized state: %d\n",m_Name.c_str(),m_State); 
     305        return false; 
     306    } 
     307     
     308    if(m_BufferType!=E_RingBuffer) { 
     309        debugError("operation not allowed on non E_RingBuffer ports\n"); 
     310        show(); 
     311        return false; 
     312    } 
     313    assert(m_ringbuffer); 
     314#endif 
     315 
     316     
     317    unsigned int read=ffado_ringbuffer_read(m_ringbuffer, (char *)event, m_eventsize); 
     318     
     319    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Reading event %X with size %d from port %s\n",*((quadlet_t *)event),m_eventsize,m_Name.c_str()); 
     320 
     321 
     322    return (read==m_eventsize); 
     323} 
     324 
     325int Port::writeEvents(void *event, unsigned int nevents) { 
     326 
     327#ifdef DEBUG 
     328    if (m_State != E_Initialized) { 
     329        debugFatal("Port (%s) not in E_Initialized state: %d\n",m_Name.c_str(),m_State); 
     330        return false; 
     331    } 
     332     
     333    if(m_BufferType!=E_RingBuffer) { 
     334        debugError("operation not allowed on non E_RingBuffer ports\n"); 
     335        show(); 
     336        return false; 
     337    } 
     338    assert(m_ringbuffer); 
     339#endif 
     340 
     341 
     342    unsigned int bytes2write=m_eventsize*nevents; 
     343 
     344    unsigned int written=ffado_ringbuffer_write(m_ringbuffer, (char *)event,bytes2write)/m_eventsize; 
     345 
     346#ifdef DEBUG 
     347    if(written) { 
     348        unsigned int i=0; 
     349        quadlet_t * tmp=(quadlet_t *)event; 
     350        debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Written %d events (",written); 
     351        for (i=0;i<written;i++) { 
     352            debugOutputShort(DEBUG_LEVEL_VERY_VERBOSE, "%X ", *(tmp+i)); 
     353        } 
     354        debugOutputShort(DEBUG_LEVEL_VERY_VERBOSE, ") to port %s\n",m_Name.c_str()); 
     355    } 
     356#endif 
     357 
     358    return written; 
     359 
     360} 
     361 
     362int Port::readEvents(void *event, unsigned int nevents) { 
     363 
     364#ifdef DEBUG 
     365    if (m_State != E_Initialized) { 
     366        debugFatal("Port (%s) not in E_Initialized state: %d\n",m_Name.c_str(),m_State); 
     367        return false; 
     368    } 
     369    if(m_BufferType!=E_RingBuffer) { 
     370        debugError("operation not allowed on non E_RingBuffer ports\n"); 
     371        show(); 
     372        return false; 
     373    } 
     374    assert(m_ringbuffer); 
     375#endif 
     376 
     377 
     378    unsigned int bytes2read=m_eventsize*nevents; 
     379 
     380    unsigned int read=ffado_ringbuffer_read(m_ringbuffer, (char *)event, bytes2read)/m_eventsize; 
     381 
     382#ifdef DEBUG 
     383    if(read) { 
     384        unsigned int i=0; 
     385        quadlet_t * tmp=(quadlet_t *)event; 
     386        debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Read %d events (",read); 
     387        for (i=0;i<read;i++) { 
     388            debugOutputShort(DEBUG_LEVEL_VERY_VERBOSE, "%X ", *(tmp+i)); 
     389        } 
     390        debugOutputShort(DEBUG_LEVEL_VERY_VERBOSE, ") from port %s\n",m_Name.c_str()); 
     391    } 
     392#endif 
     393 
     394    return read; 
     395} 
     396 
     397/* rate control */ 
     398bool Port::canRead() { 
     399    bool byte_present_in_buffer; 
     400 
     401    bool retval=false; 
     402 
     403    assert(m_ringbuffer); 
     404 
     405    byte_present_in_buffer=(ffado_ringbuffer_read_space(m_ringbuffer) >= m_eventsize); 
     406 
     407    if(byte_present_in_buffer) { 
     408 
     409        if(!m_do_ratecontrol) { 
     410            return true; 
     411        } 
     412 
     413        if(m_rate_counter <= 0) { 
     414            // update the counter 
     415            if(m_average_ratecontrol) { 
     416                m_rate_counter += m_event_interval; 
     417                assert(m_rate_counter<m_event_interval); 
     418            } else { 
     419                m_rate_counter = m_event_interval; 
     420            } 
     421 
     422            retval=true; 
     423        } else { 
     424            debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Rate limit (%s)! rate_counter=%d \n",m_Name.c_str(),m_rate_counter); 
     425 
     426        } 
     427    } 
     428 
     429 
     430    m_rate_counter -= m_slot_interval; 
     431 
     432    // we have to limit the decrement of the ratecounter somehow. 
     433    // m_rate_counter_minimum is initialized when enabling ratecontrol 
     434    if(m_rate_counter < m_rate_counter_minimum) { 
     435        m_rate_counter = m_rate_counter_minimum; 
     436    } 
     437 
     438    return retval; 
     439} 
     440 
     441bool Port::useRateControl(bool use, unsigned int slot_interval, 
     442                                unsigned int event_interval, bool average) { 
     443 
     444    if (use) { 
     445        debugOutput(DEBUG_LEVEL_VERBOSE, "Enabling rate control for port %s...\n",m_Name.c_str()); 
     446        if(slot_interval>event_interval) { 
     447            debugWarning("Rate control not needed!\n",m_Name.c_str()); 
     448            m_do_ratecontrol=false; 
     449            return false; 
     450        } 
     451        if(slot_interval==0) { 
     452            debugFatal("Cannot have slot interval == 0!\n"); 
     453            m_do_ratecontrol=false; 
     454            return false; 
     455        } 
     456        if(event_interval==0) { 
     457            debugFatal("Cannot have event interval == 0!\n"); 
     458            m_do_ratecontrol=false; 
     459            return false; 
     460        } 
     461        m_do_ratecontrol=use; 
     462        m_event_interval=event_interval; 
     463        m_slot_interval=slot_interval; 
     464        m_rate_counter=0; 
     465 
     466        // NOTE: pretty arbitrary, but in average mode this limits the peak stream rate 
     467        m_rate_counter_minimum=-(2*event_interval); 
     468 
     469        m_average_ratecontrol=average; 
     470 
     471    } else { 
     472        debugOutput(DEBUG_LEVEL_VERBOSE, "Disabling rate control for port %s...\n",m_Name.c_str()); 
     473        m_do_ratecontrol=use; 
     474    } 
     475    return true; 
     476} 
     477 
     478/// Enable the port. (this can be called anytime) 
     479void 
     480Port::enable()  { 
     481    debugOutput(DEBUG_LEVEL_VERBOSE, "Enabling port %s...\n",m_Name.c_str()); 
     482    m_disabled=false; 
     483}; 
     484 
     485/// Disable the port. (this can be called anytime) 
     486void 
     487Port::disable() { 
     488    debugOutput(DEBUG_LEVEL_VERBOSE, "Disabling port %s...\n",m_Name.c_str()); 
     489    m_disabled=false; 
     490}; 
     491 
     492 
     493/* Private functions */ 
     494 
     495bool Port::allocateInternalBuffer() { 
     496    int event_size=getEventSize(); 
     497 
     498    debugOutput(DEBUG_LEVEL_VERBOSE, 
     499                "Allocating internal buffer of %d events with size %d (%s)\n", 
     500                m_buffersize, event_size, m_Name.c_str()); 
     501 
     502    if(m_buffer) { 
     503        debugWarning("already has an internal buffer attached, re-allocating\n"); 
     504        freeInternalBuffer(); 
     505    } 
     506 
     507    m_buffer=calloc(m_buffersize,event_size); 
     508    if (!m_buffer) { 
     509        debugFatal("could not allocate internal buffer\n"); 
     510        m_buffersize=0; 
     511        return false; 
     512    } 
     513 
     514    return true; 
     515} 
     516 
     517void Port::freeInternalBuffer() { 
     518    debugOutput(DEBUG_LEVEL_VERBOSE, 
     519                "Freeing internal buffer (%s)\n",m_Name.c_str()); 
     520 
     521    if(m_buffer) { 
     522        free(m_buffer); 
     523        m_buffer=0; 
     524    } 
     525} 
     526 
     527bool Port::allocateInternalRingBuffer() { 
     528    int event_size=getEventSize(); 
     529 
     530    debugOutput(DEBUG_LEVEL_VERBOSE, 
     531                "Allocating internal buffer of %d events with size %d (%s)\n", 
     532                m_buffersize, event_size, m_Name.c_str()); 
     533 
     534    if(m_ringbuffer) { 
     535        debugWarning("already has an internal ringbuffer attached, re-allocating\n"); 
     536        freeInternalRingBuffer(); 
     537    } 
     538 
     539    m_ringbuffer=ffado_ringbuffer_create(m_buffersize * event_size); 
     540    if (!m_ringbuffer) { 
     541        debugFatal("could not allocate internal ringbuffer\n"); 
     542        m_buffersize=0; 
     543        return false; 
     544    } 
     545 
     546    return true; 
     547} 
     548 
     549void Port::freeInternalRingBuffer() { 
     550    debugOutput(DEBUG_LEVEL_VERBOSE, 
     551                "Freeing internal ringbuffer (%s)\n",m_Name.c_str()); 
     552 
     553    if(m_ringbuffer) { 
     554        ffado_ringbuffer_free(m_ringbuffer); 
     555        m_ringbuffer=0; 
     556    } 
     557} 
     558 
    113559void Port::show() { 
    114560    debugOutput(DEBUG_LEVEL_VERBOSE,"Name          : %s\n", m_Name.c_str()); 
     
    129575} 
    130576 
    131 bool Port::setName(std::string name) { 
    132     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting name to %s for port %s\n",name.c_str(),m_Name.c_str()); 
    133  
    134     if (m_State != E_Created) { 
    135         debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); 
    136         return false; 
    137     } 
    138  
    139     m_Name=name; 
    140  
    141     return true; 
    142 
    143  
    144 bool Port::setBufferSize(unsigned int newsize) { 
    145     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting buffersize to %d for port %s\n",newsize,m_Name.c_str()); 
    146     if (m_State != E_Created) { 
    147         debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); 
    148         return false; 
    149     } 
    150  
    151     m_buffersize=newsize; 
    152     return true; 
    153  
    154 
    155  
    156 unsigned int Port::getEventSize() { 
    157     switch (m_DataType) { 
    158         case E_Float: 
    159             return sizeof(float); 
    160         case E_Int24: // 24 bit 2's complement, packed in a 32bit integer (LSB's) 
    161             return sizeof(uint32_t); 
    162         case E_MidiEvent: 
    163             return sizeof(uint32_t); 
    164         default: 
    165             return 0; 
    166     } 
    167 
    168  
    169 bool Port::setDataType(enum E_DataType d) { 
    170     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting datatype to %d for port %s\n",(int) d,m_Name.c_str()); 
    171     if (m_State != E_Created) { 
    172         debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); 
    173         return false; 
    174     } 
    175  
    176     // do some sanity checks 
    177     bool type_is_ok=false; 
    178     switch (m_PortType) { 
    179         case E_Audio: 
    180             if(d == E_Int24) type_is_ok=true; 
    181             if(d == E_Float) type_is_ok=true; 
    182             break; 
    183         case E_Midi: 
    184             if(d == E_MidiEvent) type_is_ok=true; 
    185             break; 
    186         case E_Control: 
    187             if(d == E_Default) type_is_ok=true; 
    188             break; 
    189         default: 
    190             break; 
    191     } 
    192  
    193     if(!type_is_ok) { 
    194         debugFatal("Datatype not supported by this type of port!\n"); 
    195         return false; 
    196     } 
    197  
    198     m_DataType=d; 
    199     return true; 
    200 
    201  
    202 bool Port::setSignalType(enum E_SignalType s) { 
    203     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting signaltype to %d for port %s\n",(int)s,m_Name.c_str()); 
    204     if (m_State != E_Created) { 
    205         debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); 
    206         return false; 
    207     } 
    208  
    209     // do some sanity checks 
    210     bool type_is_ok=false; 
    211     switch (m_PortType) { 
    212         case E_Audio: 
    213             if(s == E_PeriodSignalled) type_is_ok=true; 
    214             break; 
    215         case E_Midi: 
    216             if(s == E_PacketSignalled) type_is_ok=true; 
    217             break; 
    218         case E_Control: 
    219             if(s == E_PeriodSignalled) type_is_ok=true; 
    220             break; 
    221         default: 
    222             break; 
    223     } 
    224  
    225     if(!type_is_ok) { 
    226         debugFatal("Signalling type not supported by this type of port!\n"); 
    227         return false; 
    228     } 
    229  
    230     m_SignalType=s; 
    231     return true; 
    232  
    233 
    234  
    235 bool Port::setBufferType(enum E_BufferType b) { 
    236     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting buffer type to %d for port %s\n",(int)b,m_Name.c_str()); 
    237     if (m_State != E_Created) { 
    238         debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); 
    239         return false; 
    240     } 
    241  
    242     // do some sanity checks 
    243     bool type_is_ok=false; 
    244     switch (m_PortType) { 
    245         case E_Audio: 
    246             if(b == E_PointerBuffer) type_is_ok=true; 
    247             break; 
    248         case E_Midi: 
    249             if(b == E_RingBuffer) type_is_ok=true; 
    250             break; 
    251         case E_Control: 
    252             break; 
    253         default: 
    254             break; 
    255     } 
    256  
    257     if(!type_is_ok) { 
    258         debugFatal("Buffer type not supported by this type of port!\n"); 
    259         return false; 
    260     } 
    261  
    262     m_BufferType=b; 
    263     return true; 
    264  
    265 
    266  
    267 bool Port::useExternalBuffer(bool b) { 
    268  
    269     // If called on an initialised stream but the request isn't for a change silently 
    270     // allow it (relied on by C API as used by jack backend driver) 
    271     if (m_State==E_Initialized && m_use_external_buffer==b) 
    272         return true; 
    273  
    274     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting external buffer use to %d for port %s\n",(int)b,m_Name.c_str()); 
    275  
    276     if (m_State != E_Created) { 
    277         debugFatal("Port (%s) not in E_Created state: %d\n",m_Name.c_str(),m_State); 
    278         return false; 
    279     } 
    280  
    281     m_use_external_buffer=b; 
    282     return true; 
    283 
    284  
    285 // buffer handling api's for pointer buffers 
    286 /** 
    287  * Get the buffer address (being the external or the internal one). 
    288  * 
    289  * @param buff 
    290  */ 
    291 void *Port::getBufferAddress() { 
    292     assert(m_BufferType==E_PointerBuffer); 
    293     return m_buffer; 
    294 }; 
    295  
    296 /** 
    297  * Set the external buffer address. 
    298  * only call this when you have specified that you will use 
    299  * an external buffer before doing the init() 
    300  * 
    301  * @param buff 
    302  */ 
    303 void Port::setExternalBufferAddress(void *buff) { 
    304     assert(m_BufferType==E_PointerBuffer); 
    305     assert(m_use_external_buffer); // don't call this with an internal buffer! 
    306     m_buffer=buff; 
    307 }; 
    308  
    309 // buffer handling api's for ringbuffers 
    310 bool Port::writeEvent(void *event) { 
    311  
    312 #ifdef DEBUG 
    313     if (m_State != E_Initialized) { 
    314         debugFatal("Port (%s) not in E_Initialized state: %d\n",m_Name.c_str(),m_State); 
    315         return false; 
    316     } 
    317      
    318     if(m_BufferType!=E_RingBuffer) { 
    319         debugError("operation not allowed on non E_RingBuffer ports\n"); 
    320         show(); 
    321         return false; 
    322     } 
    323     assert(m_ringbuffer); 
    324 #endif 
    325  
    326     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Writing event %08X with size %d to port %s\n",*((quadlet_t *)event),m_eventsize, m_Name.c_str()); 
    327  
    328     return (ffado_ringbuffer_write(m_ringbuffer, (char *)event, m_eventsize)==m_eventsize); 
    329 
    330  
    331 bool Port::readEvent(void *event) { 
    332  
    333 #ifdef DEBUG 
    334     if (m_State != E_Initialized) { 
    335         debugFatal("Port (%s) not in E_Initialized state: %d\n",m_Name.c_str(),m_State); 
    336         return false; 
    337     } 
    338      
    339     if(m_BufferType!=E_RingBuffer) { 
    340         debugError("operation not allowed on non E_RingBuffer ports\n"); 
    341         show(); 
    342         return false; 
    343     } 
    344     assert(m_ringbuffer); 
    345 #endif 
    346  
    347      
    348     unsigned int read=ffado_ringbuffer_read(m_ringbuffer, (char *)event, m_eventsize); 
    349      
    350     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Reading event %X with size %d from port %s\n",*((quadlet_t *)event),m_eventsize,m_Name.c_str()); 
    351  
    352  
    353     return (read==m_eventsize); 
    354 
    355  
    356 int Port::writeEvents(void *event, unsigned int nevents) { 
    357  
    358 #ifdef DEBUG 
    359     if (m_State != E_Initialized) { 
    360         debugFatal("Port (%s) not in E_Initialized state: %d\n",m_Name.c_str(),m_State); 
    361         return false; 
    362     } 
    363      
    364     if(m_BufferType!=E_RingBuffer) { 
    365         debugError("operation not allowed on non E_RingBuffer ports\n"); 
    366         show(); 
    367         return false; 
    368     } 
    369     assert(m_ringbuffer); 
    370 #endif 
    371  
    372  
    373     unsigned int bytes2write=m_eventsize*nevents; 
    374  
    375     unsigned int written=ffado_ringbuffer_write(m_ringbuffer, (char *)event,bytes2write)/m_eventsize; 
    376  
    377 #ifdef DEBUG 
    378     if(written) { 
    379         unsigned int i=0; 
    380         quadlet_t * tmp=(quadlet_t *)event; 
    381         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Written %d events (",written); 
    382         for (i=0;i<written;i++) { 
    383             debugOutputShort(DEBUG_LEVEL_VERY_VERBOSE, "%X ", *(tmp+i)); 
    384         } 
    385         debugOutputShort(DEBUG_LEVEL_VERY_VERBOSE, ") to port %s\n",m_Name.c_str()); 
    386     } 
    387 #endif 
    388  
    389     return written; 
    390  
    391 
    392  
    393 int Port::readEvents(void *event, unsigned int nevents) { 
    394  
    395 #ifdef DEBUG 
    396     if (m_State != E_Initialized) { 
    397         debugFatal("Port (%s) not in E_Initialized state: %d\n",m_Name.c_str(),m_State); 
    398         return false; 
    399     } 
    400     if(m_BufferType!=E_RingBuffer) { 
    401         debugError("operation not allowed on non E_RingBuffer ports\n"); 
    402         show(); 
    403         return false; 
    404     } 
    405     assert(m_ringbuffer); 
    406 #endif 
    407  
    408  
    409     unsigned int bytes2read=m_eventsize*nevents; 
    410  
    411     unsigned int read=ffado_ringbuffer_read(m_ringbuffer, (char *)event, bytes2read)/m_eventsize; 
    412  
    413 #ifdef DEBUG 
    414     if(read) { 
    415         unsigned int i=0; 
    416         quadlet_t * tmp=(quadlet_t *)event; 
    417         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Read %d events (",read); 
    418         for (i=0;i<read;i++) { 
    419             debugOutputShort(DEBUG_LEVEL_VERY_VERBOSE, "%X ", *(tmp+i)); 
    420         } 
    421         debugOutputShort(DEBUG_LEVEL_VERY_VERBOSE, ") from port %s\n",m_Name.c_str()); 
    422     } 
    423 #endif 
    424  
    425     return read; 
    426 
    427  
    428 /* rate control */ 
    429 bool Port::canRead() { 
    430     bool byte_present_in_buffer; 
    431  
    432     bool retval=false; 
    433  
    434     assert(m_ringbuffer); 
    435  
    436     byte_present_in_buffer=(ffado_ringbuffer_read_space(m_ringbuffer) >= m_eventsize); 
    437  
    438     if(byte_present_in_buffer) { 
    439  
    440         if(!m_do_ratecontrol) { 
    441             return true; 
    442         } 
    443  
    444         if(m_rate_counter <= 0) { 
    445             // update the counter 
    446             if(m_average_ratecontrol) { 
    447                 m_rate_counter += m_event_interval; 
    448                 assert(m_rate_counter<m_event_interval); 
    449             } else { 
    450                 m_rate_counter = m_event_interval; 
    451             } 
    452  
    453             retval=true; 
    454         } else { 
    455             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Rate limit (%s)! rate_counter=%d \n",m_Name.c_str(),m_rate_counter); 
    456  
    457         } 
    458     } 
    459  
    460  
    461     m_rate_counter -= m_slot_interval; 
    462  
    463     // we have to limit the decrement of the ratecounter somehow. 
    464     // m_rate_counter_minimum is initialized when enabling ratecontrol 
    465     if(m_rate_counter < m_rate_counter_minimum) { 
    466         m_rate_counter = m_rate_counter_minimum; 
    467     } 
    468  
    469     return retval; 
    470 
    471  
    472 bool Port::useRateControl(bool use, unsigned int slot_interval, 
    473                                 unsigned int event_interval, bool average) { 
    474  
    475     if (use) { 
    476         debugOutput(DEBUG_LEVEL_VERBOSE, "Enabling rate control for port %s...\n",m_Name.c_str()); 
    477         if(slot_interval>event_interval) { 
    478             debugWarning("Rate control not needed!\n",m_Name.c_str()); 
    479             m_do_ratecontrol=false; 
    480             return false; 
    481         } 
    482         if(slot_interval==0) { 
    483             debugFatal("Cannot have slot interval == 0!\n"); 
    484             m_do_ratecontrol=false; 
    485             return false; 
    486         } 
    487         if(event_interval==0) { 
    488             debugFatal("Cannot have event interval == 0!\n"); 
    489             m_do_ratecontrol=false; 
    490             return false; 
    491         } 
    492         m_do_ratecontrol=use; 
    493         m_event_interval=event_interval; 
    494         m_slot_interval=slot_interval; 
    495         m_rate_counter=0; 
    496  
    497         // NOTE: pretty arbitrary, but in average mode this limits the peak stream rate 
    498         m_rate_counter_minimum=-(2*event_interval); 
    499  
    500         m_average_ratecontrol=average; 
    501  
    502     } else { 
    503         debugOutput(DEBUG_LEVEL_VERBOSE, "Disabling rate control for port %s...\n",m_Name.c_str()); 
    504         m_do_ratecontrol=use; 
    505     } 
    506     return true; 
    507 
    508  
    509 /// Enable the port. (this can be called anytime) 
    510 void 
    511 Port::enable()  { 
    512     debugOutput(DEBUG_LEVEL_VERBOSE, "Enabling port %s...\n",m_Name.c_str()); 
    513     m_disabled=false; 
    514 }; 
    515  
    516 /// Disable the port. (this can be called anytime) 
    517 void 
    518 Port::disable() { 
    519     debugOutput(DEBUG_LEVEL_VERBOSE, "Disabling port %s...\n",m_Name.c_str()); 
    520     m_disabled=false; 
    521 }; 
    522  
    523  
    524 /* Private functions */ 
    525  
    526 bool Port::allocateInternalBuffer() { 
    527     int event_size=getEventSize(); 
    528  
    529     debugOutput(DEBUG_LEVEL_VERBOSE, 
    530                 "Allocating internal buffer of %d events with size %d (%s)\n", 
    531                 m_buffersize, event_size, m_Name.c_str()); 
    532  
    533     if(m_buffer) { 
    534         debugWarning("already has an internal buffer attached, re-allocating\n"); 
    535         freeInternalBuffer(); 
    536     } 
    537  
    538     m_buffer=calloc(m_buffersize,event_size); 
    539     if (!m_buffer) { 
    540         debugFatal("could not allocate internal buffer\n"); 
    541         m_buffersize=0; 
    542         return false; 
    543     } 
    544  
    545     return true; 
    546 
    547  
    548 void Port::freeInternalBuffer() { 
    549     debugOutput(DEBUG_LEVEL_VERBOSE, 
    550                 "Freeing internal buffer (%s)\n",m_Name.c_str()); 
    551  
    552     if(m_buffer) { 
    553         free(m_buffer); 
    554         m_buffer=0; 
    555     } 
    556 
    557  
    558 bool Port::allocateInternalRingBuffer() { 
    559     int event_size=getEventSize(); 
    560  
    561     debugOutput(DEBUG_LEVEL_VERBOSE, 
    562                 "Allocating internal buffer of %d events with size %d (%s)\n", 
    563                 m_buffersize, event_size, m_Name.c_str()); 
    564  
    565     if(m_ringbuffer) { 
    566         debugWarning("already has an internal ringbuffer attached, re-allocating\n"); 
    567         freeInternalRingBuffer(); 
    568     } 
    569  
    570     m_ringbuffer=ffado_ringbuffer_create(m_buffersize * event_size); 
    571     if (!m_ringbuffer) { 
    572         debugFatal("could not allocate internal ringbuffer\n"); 
    573         m_buffersize=0; 
    574         return false; 
    575     } 
    576  
    577     return true; 
    578 
    579  
    580 void Port::freeInternalRingBuffer() { 
    581     debugOutput(DEBUG_LEVEL_VERBOSE, 
    582                 "Freeing internal ringbuffer (%s)\n",m_Name.c_str()); 
    583  
    584     if(m_ringbuffer) { 
    585         ffado_ringbuffer_free(m_ringbuffer); 
    586         m_ringbuffer=0; 
    587     } 
    588 
    589  
    590 
     577
  • branches/ppalmers-streaming/src/libstreaming/generic/PortManager.cpp

    r705 r719  
    251251        } 
    252252    } 
    253  
    254  
    255  
    256253    return true; 
    257254} 
  • branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.cpp

    r715 r719  
    3939    , m_processor_type ( type ) 
    4040    , m_state( ePS_Created ) 
     41    , m_next_state( ePS_Invalid ) 
     42    , m_cycle_to_switch_state( 0 ) 
    4143    , m_xruns( 0 ) 
    42     , m_manager(NULL) 
    43     , m_running(false) 
    44     , m_disabled(true) 
    45     , m_is_disabled(true) 
    46     , m_cycle_to_enable_at(0) 
    47     , m_ticks_per_frame(0) 
    48     , m_last_cycle(0) 
    49     , m_sync_delay(0) 
     44    , m_manager( NULL ) 
     45    , m_ticks_per_frame( 0 ) 
     46    , m_last_cycle( 0 ) 
     47    , m_sync_delay( 0 ) 
     48    , m_last_timestamp(0) 
     49    , m_last_timestamp2(0) 
     50    , m_dropped(0) 
    5051{ 
    5152    // create the timestamped buffer and register ourselves as its client 
    5253    m_data_buffer=new Util::TimestampedBuffer(this); 
    53  
    5454} 
    5555 
     
    5858} 
    5959 
    60 void 
    61 StreamProcessor::setState(enum eProcessorState s) { 
    62     #ifdef DEBUG 
    63         // check the state transistion 
    64         debugOutput( DEBUG_LEVEL_VERBOSE, "State transition from %s to %s", 
    65             ePSToString(m_state), ePSToString(s) ); 
    66     #endif 
    67     m_state = s; 
    68 } 
    69  
    70 void StreamProcessor::dumpInfo() 
    71 { 
    72     debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor information\n"); 
    73     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Iso stream info:\n"); 
    74  
    75     IsoStream::dumpInfo(); 
    76     debugOutputShort( DEBUG_LEVEL_NORMAL, "  StreamProcessor info:\n"); 
    77     if (m_handler) 
    78         debugOutputShort( DEBUG_LEVEL_NORMAL, "  Now                   : %011u\n",m_handler->getCycleTimerTicks()); 
    79     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Xruns                 : %d\n", m_xruns); 
    80     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Running               : %d\n", m_running); 
    81     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Enabled               : %s\n", m_disabled ? "No" : "Yes"); 
    82     debugOutputShort( DEBUG_LEVEL_NORMAL, "   enable status        : %s\n", m_is_disabled ? "No" : "Yes"); 
    83  
    84     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Nominal framerate     : %u\n", m_manager->getNominalRate()); 
    85     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Device framerate      : Sync: %f, Buffer %f\n", 
    86         24576000.0/getSyncSource().m_data_buffer->getRate(), 
    87         24576000.0/m_data_buffer->getRate() 
    88         ); 
    89  
    90     m_data_buffer->dumpInfo(); 
    91  
    92     m_PeriodStat.dumpInfo(); 
    93     m_PacketStat.dumpInfo(); 
    94 //     m_WakeupStat.dumpInfo(); 
    95 } 
    96  
    97 bool StreamProcessor::init() 
    98 { 
    99     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n"); 
    100     m_data_buffer->init(); 
    101     return IsoStream::init(); 
    102 } 
    103  
    104 /** 
    105  * Resets the frame counter, the xrun counter, the ports and the iso stream. 
    106  * @return true if reset succeeded 
    107  */ 
    108 bool StreamProcessor::reset() { 
    109  
    110     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting...\n"); 
    111  
    112     // reset the event buffer, discard all content 
    113     if (!m_data_buffer->reset()) { 
    114         debugFatal("Could not reset data buffer\n"); 
    115         return false; 
    116     } 
    117  
    118     resetXrunCounter(); 
    119  
    120     // loop over the ports to reset them 
    121     if (!PortManager::resetPorts()) { 
    122         debugFatal("Could not reset ports\n"); 
    123         return false; 
    124     } 
    125  
    126     // reset the iso stream 
    127     if (!IsoStream::reset()) { 
    128         debugFatal("Could not reset isostream\n"); 
    129         return false; 
    130     } 
    131     return true; 
    132  
    133 } 
    134  
    135 bool StreamProcessor::prepareForEnable(uint64_t time_to_enable_at) { 
    136     debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::prepareForEnable for (%p)\n",this); 
    137     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011u\n",m_handler->getCycleTimerTicks()); 
    138     debugOutput(DEBUG_LEVEL_VERBOSE,"  Enable at             : %011u\n",time_to_enable_at); 
    139     m_data_buffer->dumpInfo(); 
    140     return true; 
    141 } 
    142  
    143 bool StreamProcessor::prepareForDisable() { 
    144     debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::prepareForDisable for (%p)\n",this); 
    145     debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011u\n",m_handler->getCycleTimerTicks()); 
    146     m_data_buffer->dumpInfo(); 
    147     return true; 
    148 } 
    149  
    150 bool StreamProcessor::prepare() { 
    151  
    152     debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n"); 
    153     if(!m_manager) { 
    154         debugFatal("Not attached to a manager!\n"); 
    155         return -1; 
    156     } 
    157  
    158     // init the ports 
    159     // loop over the ports to reset them 
    160     PortManager::preparePorts(); 
    161  
    162     // reset the iso stream 
    163     IsoStream::prepare(); 
    164  
    165     return true; 
    166  
    167 } 
    168  
    169 StreamProcessor& 
    170 StreamProcessor::getSyncSource() 
    171 { 
    172     return m_manager->getSyncSource(); 
    173 }; 
    174  
    175 int StreamProcessor::getBufferFill() { 
    176 //     return m_data_buffer->getFrameCounter(); 
    177     return m_data_buffer->getBufferFill(); 
    178 } 
    179  
    18060uint64_t StreamProcessor::getTimeNow() { 
    18161    return m_handler->getCycleTimerTicks(); 
    18262} 
    183  
    18463 
    18564int StreamProcessor::getMaxFrameLatency() { 
     
    19170} 
    19271 
    193 bool StreamProcessor::isRunning() { 
    194     return m_running; 
    195 
    196  
    197 bool StreamProcessor::enable(uint64_t time_to_enable_at)  { 
    198     // FIXME: time_to_enable_at will be in 'time' not cycles 
    199     m_cycle_to_enable_at=time_to_enable_at; 
    200  
    201     if(!m_running) { 
    202             debugWarning("The StreamProcessor is not running yet, enable() might not be a good idea.\n"); 
    203     } 
    204  
    205 #ifdef DEBUG 
    206     uint64_t now_cycles=CYCLE_TIMER_GET_CYCLES(m_handler->getCycleTimer()); 
    207     const int64_t max=(int64_t)(CYCLES_PER_SECOND/2); 
    208  
    209     int64_t diff=(int64_t)m_cycle_to_enable_at-(int64_t)now_cycles; 
    210  
    211     if (diff > max) { 
    212         diff-=TICKS_PER_SECOND; 
    213     } else if (diff < -max) { 
    214         diff+=TICKS_PER_SECOND; 
    215     } 
    216  
    217     if (diff<0) { 
    218         debugWarning("Request to enable streamprocessor %lld cycles ago (now=%llu, cy=%llu).\n", 
    219             diff,now_cycles,time_to_enable_at); 
    220     } 
    221 #endif 
    222     m_data_buffer->enable(); 
    223  
    224     m_disabled=false; 
    225     return true; 
    226 
    227  
    228 bool StreamProcessor::disable()  { 
    229     m_data_buffer->disable(); 
    230     m_disabled=true; 
    231     return true; 
    232 
    233  
    234 float 
    235 StreamProcessor::getTicksPerFrame() { 
    236     if (m_data_buffer) { 
    237         float rate=m_data_buffer->getRate(); 
    238         if (fabsf(m_ticks_per_frame - rate)>(m_ticks_per_frame*0.1)) { 
    239             debugWarning("TimestampedBuffer rate (%10.5f) more that 10%% off nominal (%10.5f)\n",rate,m_ticks_per_frame); 
    240             return m_ticks_per_frame; 
    241         } 
    242 //         return m_ticks_per_frame; 
    243         if (rate<0.0) debugError("rate < 0! (%f)\n",rate); 
    244          
    245         return rate; 
    246     } else { 
    247         return 0.0; 
    248     } 
    249 
    250  
    251 int64_t StreamProcessor::getTimeUntilNextPeriodSignalUsecs() { 
     72/*********************************************** 
     73 * Buffer management and manipulation          * 
     74 ***********************************************/ 
     75int StreamProcessor::getBufferFill() { 
     76    return m_data_buffer->getBufferFill(); 
     77
     78 
     79bool 
     80StreamProcessor::dropFrames(unsigned int nbframes) 
     81
     82    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d)\n", nbframes); 
     83    return m_data_buffer->dropFrames(nbframes); 
     84
     85 
     86int64_t 
     87StreamProcessor::getTimeUntilNextPeriodSignalUsecs() 
     88
    25289    uint64_t time_at_period=getTimeAtPeriod(); 
    25390 
     
    26097    // pass before these packets are processed. Adding this extra term makes that 
    26198    // the period boundary is signalled later 
    262     time_at_period = addTicks(time_at_period, getSyncSource().getSyncDelay()); 
     99    time_at_period = addTicks(time_at_period, m_manager->getSyncSource().getSyncDelay()); 
    263100 
    264101    uint64_t cycle_timer=m_handler->getCycleTimerTicks(); 
     
    278115} 
    279116 
    280 uint64_t StreamProcessor::getTimeAtPeriodUsecs() { 
     117uint64_t 
     118StreamProcessor::getTimeAtPeriodUsecs() 
     119
    281120    return (uint64_t)((float)getTimeAtPeriod() * TICKS_PER_USEC); 
    282121} 
    283122 
    284 bool StreamProcessor::dropFrames(unsigned int nbframes) { 
    285     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d)\n", nbframes); 
    286     return m_data_buffer->dropFrames(nbframes); 
    287 } 
    288  
    289 /** 
    290  * Resets the xrun counter, in a atomic way. This 
    291  * is thread safe. 
    292  */ 
    293 void StreamProcessor::resetXrunCounter() { 
    294     ZERO_ATOMIC((SInt32 *)&m_xruns); 
    295 } 
    296  
    297 void StreamProcessor::setVerboseLevel(int l) { 
    298     setDebugLevel(l); 
    299     IsoStream::setVerboseLevel(l); 
    300     PortManager::setVerboseLevel(l); 
    301     m_data_buffer->setVerboseLevel(l); 
    302 } 
    303  
    304123uint64_t 
    305 StreamProcessor::getTimeAtPeriod() { 
     124StreamProcessor::getTimeAtPeriod()  
     125
    306126    if (getType() == ePT_Receive) { 
    307127        ffado_timestamp_t next_period_boundary=m_data_buffer->getTimestampFromHead(m_manager->getPeriodSize()); 
     
    333153} 
    334154 
     155float 
     156StreamProcessor::getTicksPerFrame() 
     157{ 
     158    assert(m_data_buffer != NULL); 
     159    return m_data_buffer->getRate(); 
     160} 
     161 
    335162bool 
    336 StreamProcessor::canClientTransferFrames(unsigned int nbframes) { 
     163StreamProcessor::canClientTransferFrames(unsigned int nbframes) 
     164
     165    bool can_transfer; 
     166    unsigned int fc = m_data_buffer->getFrameCounter(); 
    337167    if (getType() == ePT_Receive) { 
    338         return m_data_buffer->getFrameCounter() >= (int) nbframes; 
     168        can_transfer = fc >= (int) nbframes; 
    339169    } else { 
    340         bool can_transfer; 
    341170        // there has to be enough space to put the frames in 
    342         can_transfer = m_data_buffer->getBufferSize() - m_data_buffer->getFrameCounter() > nbframes; 
     171        can_transfer = m_data_buffer->getBufferSize() - fc > nbframes; 
    343172        // or the buffer is transparent 
    344173        can_transfer |= m_data_buffer->isTransparent(); 
    345         return can_transfer; 
    346     } 
    347 
    348  
     174    } 
     175     
     176    #ifdef DEBUG 
     177    if (!can_transfer) { 
     178        debugWarning("(%p, %s) cannot transfer since fc == %u, nbframes == %u\n",  
     179            this, ePTToString(getType()), fc, nbframes); 
     180    } 
     181    #endif 
     182     
     183    return can_transfer; 
     184
     185 
     186/*********************************************** 
     187 * I/O API                                     * 
     188 ***********************************************/ 
     189 
     190// Packet transfer API 
     191enum raw1394_iso_disposition 
     192StreamProcessor::putPacket(unsigned char *data, unsigned int length, 
     193                           unsigned char channel, unsigned char tag, unsigned char sy, 
     194                           unsigned int cycle, unsigned int dropped) { 
     195 
     196    int dropped_cycles = diffCycles(cycle, m_last_cycle) - 1; 
     197    if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles); 
     198    else m_dropped += dropped_cycles; 
     199    if (dropped_cycles > 0) debugWarning("(%p) dropped %d packets on cycle %u\n", this, dropped_cycles, cycle); 
     200    m_last_cycle = cycle; 
     201 
     202    // bypass based upon state 
     203    if (m_state == ePS_Invalid) { 
     204        debugError("Should not have state %s\n", ePSToString(m_state) ); 
     205        return RAW1394_ISO_ERROR; 
     206    } 
     207    if (m_state == ePS_Created) { 
     208        return RAW1394_ISO_DEFER; 
     209    } 
     210 
     211    // normal processing 
     212    enum raw1394_iso_disposition retval = RAW1394_ISO_OK; 
     213 
     214    // store the previous timestamp 
     215    m_last_timestamp2 = m_last_timestamp; 
     216 
     217    // NOTE: synchronized switching is restricted to a 0.5 sec span (4000 cycles) 
     218    //       it happens on the first 'good' cycle for the wait condition 
     219    //       or on the first received cycle that is received afterwards (might be a problem) 
     220 
     221    // check whether we are waiting for a stream to be disabled 
     222    if(m_state == ePS_WaitingForStreamDisable) { 
     223        // we then check whether we have to switch on this cycle 
     224        if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) { 
     225            debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning\n"); 
     226            m_next_state = ePS_DryRunning; 
     227            if (!updateState()) { // we are allowed to change the state directly 
     228                debugError("Could not update state!\n"); 
     229                return RAW1394_ISO_ERROR; 
     230            } 
     231        } else { 
     232            // not time to disable yet 
     233        } 
     234        // the received data can be discarded while waiting for the stream 
     235        // to be disabled 
     236        return RAW1394_ISO_OK; 
     237    } 
     238 
     239    // check whether we are waiting for a stream to be enabled 
     240    else if(m_state == ePS_WaitingForStreamEnable) { 
     241        // we then check whether we have to switch on this cycle 
     242        if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) { 
     243            debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to Running\n"); 
     244            m_next_state = ePS_Running; 
     245            if (!updateState()) { // we are allowed to change the state directly 
     246                debugError("Could not update state!\n"); 
     247                return RAW1394_ISO_ERROR; 
     248            } 
     249        } else { 
     250            // not time to enable yet 
     251        } 
     252        // we are dryRunning hence data should be processed in any case 
     253    } 
     254 
     255    // check the packet header 
     256    if (processPacketHeader(data, length, channel, tag, sy, cycle, dropped_cycles)) { 
     257        debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "RECV: CY=%04u TS=%011llu\n", 
     258                cycle, m_last_timestamp); 
     259        // update some accounting 
     260        m_last_good_cycle = cycle; 
     261        m_last_dropped = dropped_cycles; 
     262 
     263        // check whether we are waiting for a stream to startup 
     264        // this requires that the packet is good 
     265        if(m_state == ePS_WaitingForStream) { 
     266            // since we have a packet with an OK header, 
     267            // we can indicate that the stream started up 
     268 
     269            // we then check whether we have to switch on this cycle 
     270            if (diffCycles(cycle, m_cycle_to_switch_state) >= 0) { 
     271                debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to good packet\n"); 
     272                // hence go to the dryRunning state 
     273                m_next_state = ePS_DryRunning; 
     274                if (!updateState()) { // we are allowed to change the state directly 
     275                    debugError("Could not update state!\n"); 
     276                    return RAW1394_ISO_ERROR; 
     277                } 
     278            } else { 
     279                // not time (yet) to switch state 
     280            } 
     281            // in both cases we don't want to process the data 
     282            return RAW1394_ISO_OK; 
     283        } 
     284 
     285        // check whether a state change has been requested 
     286        // note that only the wait state changes are synchronized with the cycles 
     287        else if(m_state != m_next_state) { 
     288            debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state from %s to %s\n", 
     289                                             ePSToString(m_state), ePSToString(m_next_state)); 
     290            // execute the requested change 
     291            if (!updateState()) { // we are allowed to change the state directly 
     292                debugError("Could not update state!\n"); 
     293                return RAW1394_ISO_ERROR; 
     294            } 
     295        } 
     296 
     297        // handle dropped cycles 
     298        if(dropped_cycles) { 
     299            // they represent a discontinuity in the timestamps, and hence are 
     300            // to be dealt with 
     301            debugWarning("(%p) Correcting timestamp for dropped cycles, discarding packet...\n", this); 
     302            m_data_buffer->setBufferTailTimestamp(m_last_timestamp); 
     303            // we don't want this sample to be written 
     304            return RAW1394_ISO_OK; 
     305        } 
     306 
     307        // for all states that reach this we are allowed to 
     308        // do protocol specific data reception 
     309        bool ok = processPacketData(data, length, channel, tag, sy, cycle, dropped_cycles); 
     310 
     311        // if an xrun occured, switch to the dryRunning state and 
     312        // allow for the xrun to be picked up 
     313        if (!ok) { 
     314            debugOutput(DEBUG_LEVEL_VERBOSE, "Should update state to DryRunning due to xrun\n"); 
     315            m_next_state = ePS_DryRunning; 
     316            // execute the requested change 
     317            if (!updateState()) { // we are allowed to change the state directly 
     318                debugError("Could not update state!\n"); 
     319                return RAW1394_ISO_ERROR; 
     320            } 
     321            return RAW1394_ISO_DEFER; 
     322        } 
     323    } else { 
     324        // apparently we don't have to do anything when the packets are not valid 
     325    } 
     326    return retval; 
     327
     328 
     329// Frame Transfer API 
     330bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) { 
     331    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.getFrames(%d, %11llu)", nbframes, ts); 
     332    assert( getType() == ePT_Receive ); 
     333    if(isDryRunning()) return getFramesDry(nbframes, ts); 
     334    else return getFramesWet(nbframes, ts); 
     335
     336 
     337bool StreamProcessor::getFramesWet(unsigned int nbframes, int64_t ts) { 
     338// FIXME: this should be done somewhere else 
     339#ifdef DEBUG 
     340    uint64_t ts_head; 
     341    signed int fc; 
     342    int32_t lag_ticks; 
     343    float lag_frames; 
     344 
     345    // in order to sync up multiple received streams, we should  
     346    // use the ts parameter. It specifies the time of the block's  
     347    // first sample. 
     348     
     349    ffado_timestamp_t ts_head_tmp; 
     350    m_data_buffer->getBufferHeadTimestamp(&ts_head_tmp, &fc); 
     351    ts_head=(uint64_t)ts_head_tmp; 
     352    lag_ticks=diffTicks(ts, ts_head); 
     353    float rate=m_data_buffer->getRate(); 
     354     
     355    assert(rate!=0.0); 
     356     
     357    lag_frames=(((float)lag_ticks)/rate); 
     358     
     359    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): drifts %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n", 
     360                 this, lag_ticks, lag_frames,rate, ts, ts_head, fc); 
     361 
     362    if (lag_frames>=1.0) { 
     363        // the stream lags 
     364        debugWarning( "stream (%p): lags  with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n", 
     365                      this, lag_ticks, lag_frames,rate, ts, ts_head, fc); 
     366    } else if (lag_frames<=-1.0) { 
     367        // the stream leads 
     368        debugWarning( "stream (%p): leads with %6d ticks = %10.5f frames (rate=%10.5f), %lld, %llu, %d\n", 
     369                      this, lag_ticks, lag_frames,rate, ts, ts_head, fc); 
     370    } 
     371#endif 
     372    // ask the buffer to process nbframes of frames 
     373    // using it's registered client's processReadBlock(), 
     374    // which should be ours 
     375    m_data_buffer->blockProcessReadFrames(nbframes); 
     376    return true; 
     377
     378 
     379bool StreamProcessor::getFramesDry(unsigned int nbframes, int64_t ts) { 
     380    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): dry run %d frames (@ ts=%lld)\n", 
     381                 this, nbframes, ts); 
     382 
     383    // dry run on this side means that we put silence in all enabled ports 
     384    // since there is do data put into the ringbuffer in the dry-running state 
     385    return provideSilenceBlock(nbframes, 0); 
     386
     387 
     388 
     389/*********************************************** 
     390 * State related API                           * 
     391 ***********************************************/ 
     392bool StreamProcessor::init() 
     393
     394    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "init...\n"); 
     395 
     396    // initialization can be done without requesting it 
     397    // from the packet loop 
     398    m_next_state = ePS_Created; 
     399    return true; 
     400
     401 
     402bool StreamProcessor::prepare() 
     403
     404    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "prepare...\n"); 
     405    if(!m_manager) { 
     406        debugFatal("Not attached to a manager!\n"); 
     407        return false; 
     408    } 
     409 
     410    if (!prepareChild()) { 
     411        debugFatal("Could not prepare child\n"); 
     412        return false; 
     413    } 
     414 
     415    // initialization can be done without requesting it 
     416    // from the packet loop 
     417    m_next_state = ePS_Stopped; 
     418    return updateState(); 
     419
     420 
     421bool StreamProcessor::stop() 
     422
     423    uint64_t time_to_stop_at = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE); 
     424    int cnt; 
     425    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stop...\n"); 
     426    switch (m_state) { 
     427        case ePS_Stopped: return true; 
     428        case ePS_DryRunning: 
     429            return stopDryRunning(-1); 
     430        case ePS_Running: 
     431            return stopRunning(-1) &&  
     432                   stopDryRunning(-1); 
     433        default: 
     434            debugError("Bad state: %s\n", ePSToString(m_state)); 
     435            return false; 
     436    } 
     437
     438 
     439bool StreamProcessor::scheduleStateTransition(enum eProcessorState state, uint64_t time_instant) 
     440
     441    // first set the time, since in the packet loop we first check m_state == m_next_state before 
     442    // using the time 
     443    m_cycle_to_switch_state = TICKS_TO_CYCLES(time_instant); 
     444    m_next_state = state; 
     445    return true; 
     446
     447 
     448bool StreamProcessor::scheduleAndWaitForStateTransition(enum eProcessorState state,  
     449                                                        uint64_t time_instant,  
     450                                                        enum eProcessorState wait_state) 
     451
     452    int cnt=200; // 2 seconds, i.e. 2 cycles 
     453    if(!scheduleStateTransition(state, time_instant)) { 
     454        debugError("Could not schedule state transistion to %s\n", ePSToString(state)); 
     455        return false; 
     456    } 
     457    while (m_state != wait_state && cnt) { 
     458        usleep(10000); 
     459        cnt++; 
     460    } 
     461    if(cnt==0) { 
     462        debugError("Timeout entering Stopped state\n"); 
     463        return false; 
     464    } 
     465    debugOutput(DEBUG_LEVEL_VERBOSE, " entered state %s\n", ePSToString(wait_state)); 
     466    return true; 
     467
     468 
     469bool StreamProcessor::startDryRunning(int64_t t) { 
     470    uint64_t tx; 
     471    if (t < 0) { 
     472        tx = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE); 
     473    } else { 
     474        tx = t; 
     475    } 
     476    debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::startDryRunning for (%p)\n",this); 
     477    debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011lu\n", m_handler->getCycleTimerTicks()); 
     478    debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%u)\n", tx, TICKS_TO_CYCLES(tx)); 
     479    if (m_state == ePS_Stopped) { 
     480        return scheduleAndWaitForStateTransition(ePS_WaitingForStream, tx, ePS_DryRunning); 
     481    } else if (m_state == ePS_Running) { 
     482        return scheduleAndWaitForStateTransition(ePS_WaitingForStreamDisable, tx, ePS_DryRunning); 
     483    } else { 
     484        debugError("Cannot switch to ePS_DryRunning from %s\n", ePSToString(m_state)); 
     485        return false; 
     486    } 
     487
     488 
     489bool StreamProcessor::startRunning(int64_t t) { 
     490    uint64_t tx; 
     491    if (t < 0) { 
     492        tx = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE); 
     493    } else { 
     494        tx = t; 
     495    } 
     496    debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::startRunning for (%p)\n",this); 
     497    debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011lu\n", m_handler->getCycleTimerTicks()); 
     498    debugOutput(DEBUG_LEVEL_VERBOSE,"  Start at              : %011llu (%u)\n", tx, TICKS_TO_CYCLES(tx)); 
     499    return scheduleAndWaitForStateTransition(ePS_WaitingForStreamEnable, tx, ePS_Running); 
     500
     501 
     502bool StreamProcessor::stopDryRunning(int64_t t) { 
     503    uint64_t tx; 
     504    if (t < 0) { 
     505        tx = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE); 
     506    } else { 
     507        tx = t; 
     508    } 
     509    debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::stopDryRunning for (%p)\n",this); 
     510    debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011lu\n", m_handler->getCycleTimerTicks()); 
     511    debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at               : %011llu (%u)\n", tx, TICKS_TO_CYCLES(tx)); 
     512    return scheduleAndWaitForStateTransition(ePS_Stopped, tx, ePS_Stopped); 
     513
     514 
     515bool StreamProcessor::stopRunning(int64_t t) { 
     516    uint64_t tx; 
     517    if (t < 0) { 
     518        tx = addTicks(m_handler->getCycleTimerTicks(), 200 * TICKS_PER_CYCLE); 
     519    } else { 
     520        tx = t; 
     521    } 
     522    debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::stopRunning for (%p)\n",this); 
     523    debugOutput(DEBUG_LEVEL_VERBOSE,"  Now                   : %011lu\n", m_handler->getCycleTimerTicks()); 
     524    debugOutput(DEBUG_LEVEL_VERBOSE,"  Stop at               : %011llu (%u)\n", tx, TICKS_TO_CYCLES(tx)); 
     525    return scheduleAndWaitForStateTransition(ePS_WaitingForStreamDisable, tx, ePS_DryRunning); 
     526
     527 
     528// internal state API 
     529 
     530/** 
     531 * @brief Enter the ePS_Stopped state 
     532 * @return true if successful, false if not 
     533 * 
     534 * @pre none 
     535 * 
     536 * @post the buffer and the isostream are ready for use. 
     537 * @post all dynamic structures have been allocated successfully 
     538 * @post the buffer is transparent and empty, and all parameters are set 
     539 *       to the correct initial/nominal values. 
     540 * 
     541 */ 
     542bool 
     543StreamProcessor::doStop() 
     544
     545    float ticks_per_frame; 
     546    unsigned int ringbuffer_size_frames; 
     547 
     548    debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state)); 
     549    bool result = true; 
     550 
     551    switch(m_state) { 
     552        case ePS_Created: 
     553            assert(m_data_buffer); 
     554            // object just created 
     555            result = m_data_buffer->init(); 
     556             
     557            // prepare the framerate estimate 
     558            ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_manager->getNominalRate()); 
     559            m_ticks_per_frame = ticks_per_frame; 
     560            debugOutput(DEBUG_LEVEL_VERBOSE,"Initializing remote ticks/frame to %f\n", ticks_per_frame); 
     561         
     562            // initialize internal buffer 
     563            ringbuffer_size_frames = m_manager->getNbBuffers() * m_manager->getPeriodSize(); 
     564            result &= m_data_buffer->setBufferSize(ringbuffer_size_frames * 2); 
     565 
     566            result &= m_data_buffer->setEventSize( getEventSize() ); 
     567            result &= m_data_buffer->setEventsPerFrame( getEventsPerFrame() ); 
     568            result &= m_data_buffer->setUpdatePeriod( getUpdatePeriod() ); 
     569 
     570            result &= m_data_buffer->setNominalRate(ticks_per_frame); 
     571            result &= m_data_buffer->setWrapValue(128L*TICKS_PER_SECOND); 
     572            result &= m_data_buffer->prepare(); // FIXME: the name 
     573 
     574            // set the parameters of ports we can: 
     575            // we want the audio ports to be period buffered, 
     576            // and the midi ports to be packet buffered 
     577            for ( PortVectorIterator it = m_Ports.begin(); 
     578                it != m_Ports.end(); 
     579                ++it ) 
     580            { 
     581                debugOutput(DEBUG_LEVEL_VERBOSE, "Setting up port %s\n",(*it)->getName().c_str()); 
     582                if(!(*it)->setBufferSize(m_manager->getPeriodSize())) { 
     583                    debugFatal("Could not set buffer size to %d\n",m_manager->getPeriodSize()); 
     584                    return false; 
     585                } 
     586                switch ((*it)->getPortType()) { 
     587                    case Port::E_Audio: 
     588                        if(!(*it)->setSignalType(Port::E_PeriodSignalled)) { 
     589                            debugFatal("Could not set signal type to PeriodSignalling"); 
     590                            return false; 
     591                        } 
     592                        // buffertype and datatype are dependant on the API 
     593                        debugWarning("---------------- ! Doing hardcoded dummy setup ! --------------\n"); 
     594                        // buffertype and datatype are dependant on the API 
     595                        if(!(*it)->setBufferType(Port::E_PointerBuffer)) { 
     596                            debugFatal("Could not set buffer type"); 
     597                            return false; 
     598                        } 
     599                        if(!(*it)->useExternalBuffer(true)) { 
     600                            debugFatal("Could not set external buffer usage"); 
     601                            return false; 
     602                        } 
     603                        if(!(*it)->setDataType(Port::E_Float)) { 
     604                            debugFatal("Could not set data type"); 
     605                            return false; 
     606                        } 
     607                        break; 
     608                    case Port::E_Midi: 
     609                        if(!(*it)->setSignalType(Port::E_PacketSignalled)) { 
     610                            debugFatal("Could not set signal type to PacketSignalling"); 
     611                            return false; 
     612                        } 
     613                        // buffertype and datatype are dependant on the API 
     614                        // buffertype and datatype are dependant on the API 
     615                        debugWarning("---------------- ! Doing hardcoded test setup ! --------------\n"); 
     616                        // buffertype and datatype are dependant on the API 
     617                        if(!(*it)->setBufferType(Port::E_RingBuffer)) { 
     618                            debugFatal("Could not set buffer type"); 
     619                            return false; 
     620                        } 
     621                        if(!(*it)->setDataType(Port::E_MidiEvent)) { 
     622                            debugFatal("Could not set data type"); 
     623                            return false; 
     624                        } 
     625                        break; 
     626                    default: 
     627                        debugWarning("Unsupported port type specified\n"); 
     628                        break; 
     629                } 
     630            } 
     631            // the API specific settings of the ports should already be set, 
     632            // as this is called from the processorManager->prepare() 
     633            // so we can init the ports 
     634            result &= PortManager::initPorts(); 
     635 
     636            break; 
     637        case ePS_DryRunning: 
     638            // what to do here? 
     639            break; 
     640        default: 
     641            debugError("Entry from invalid state: %s\n", ePSToString(m_state)); 
     642            return false; 
     643    } 
     644 
     645    result &= m_data_buffer->reset(); // FIXME: don't like the reset() name 
     646 
     647    // make the buffer transparent 
     648    m_data_buffer->setTransparent(true); 
     649 
     650    // reset all ports 
     651    result &= PortManager::preparePorts(); 
     652 
     653    m_state = ePS_Stopped; 
     654    return result; 
     655
     656 
     657/** 
     658 * @brief Enter the ePS_WaitingForStream state 
     659 * @return true if successful, false if not 
     660 * 
     661 * @pre all dynamic data structures are allocated successfully 
     662 * 
     663 * @post 
     664 * 
     665 */ 
     666bool 
     667StreamProcessor::doWaitForRunningStream() 
     668
     669    debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state)); 
     670    switch(m_state) { 
     671        case ePS_Stopped: 
     672            // we have to start waiting for an incoming stream 
     673            // this basically means nothing, the state change will 
     674            // be picked up by the packet iterator 
     675            break; 
     676        default: 
     677            debugError("Entry from invalid state: %s\n", ePSToString(m_state)); 
     678            return false; 
     679    } 
     680    m_state = ePS_WaitingForStream; 
     681    return true; 
     682
     683 
     684/** 
     685 * @brief Enter the ePS_DryRunning state 
     686 * @return true if successful, false if not 
     687 * 
     688 * @pre 
     689 * 
     690 * @post 
     691 * 
     692 */ 
     693bool 
     694StreamProcessor::doDryRunning() 
     695
     696    bool result = true; 
     697    debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state)); 
     698    switch(m_state) { 
     699        case ePS_WaitingForStream: 
     700            // a running stream has been detected 
     701            debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started dry-running at cycle %d\n", this, m_last_cycle); 
     702            if (getType() == ePT_Receive) { 
     703                m_data_buffer->setBufferTailTimestamp(m_last_timestamp); 
     704            } else { 
     705                // FIXME 
     706                debugError("Implement\n"); 
     707            } 
     708            break; 
     709        case ePS_WaitingForStreamDisable: 
     710            result &= m_data_buffer->reset(); // FIXME: don't like the reset() name 
     711            m_data_buffer->setTransparent(true); 
     712            break; 
     713        default: 
     714            debugError("Entry from invalid state: %s\n", ePSToString(m_state)); 
     715            return false; 
     716    } 
     717    m_state = ePS_DryRunning; 
     718    return result; 
     719
     720 
     721/** 
     722 * @brief Enter the ePS_WaitingForStreamEnable state 
     723 * @return true if successful, false if not 
     724 * 
     725 * @pre 
     726 * 
     727 * @post 
     728 * 
     729 */ 
     730bool 
     731StreamProcessor::doWaitForStreamEnable() 
     732
     733    debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state)); 
     734    switch(m_state) { 
     735        case ePS_DryRunning: 
     736            // we have to start waiting for an incoming stream 
     737            // this basically means nothing, the state change will 
     738            // be picked up by the packet iterator 
     739            break; 
     740        default: 
     741            debugError("Entry from invalid state: %s\n", ePSToString(m_state)); 
     742            return false; 
     743    } 
     744    m_state = ePS_WaitingForStreamEnable; 
     745    return true; 
     746
     747 
     748/** 
     749 * @brief Enter the ePS_Running state 
     750 * @return true if successful, false if not 
     751 * 
     752 * @pre 
     753 * 
     754 * @post 
     755 * 
     756 */ 
     757bool 
     758StreamProcessor::doRunning() 
     759
     760    bool result = true; 
     761    debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state)); 
     762    switch(m_state) { 
     763        case ePS_WaitingForStreamEnable: 
     764            // a running stream has been detected 
     765            debugOutput(DEBUG_LEVEL_VERBOSE, "StreamProcessor %p started running at cycle %d\n",  
     766                                             this, m_last_cycle); 
     767            if (getType() == ePT_Receive) { 
     768                m_data_buffer->setTransparent(false); 
     769            } else { 
     770                // FIXME 
     771                debugError("Implement\n"); 
     772            } 
     773            break; 
     774        default: 
     775            debugError("Entry from invalid state: %s\n", ePSToString(m_state)); 
     776            return false; 
     777    } 
     778    m_state = ePS_Running; 
     779    return result; 
     780
     781 
     782/** 
     783 * @brief Enter the ePS_WaitingForStreamDisable state 
     784 * @return true if successful, false if not 
     785 * 
     786 * @pre 
     787 * 
     788 * @post 
     789 * 
     790 */ 
     791bool 
     792StreamProcessor::doWaitForStreamDisable() 
     793
     794    debugOutput(DEBUG_LEVEL_VERBOSE, "Enter from state: %s\n", ePSToString(m_state)); 
     795    switch(m_state) { 
     796        case ePS_Running: 
     797            // the thread will do the transition 
     798            break; 
     799        default: 
     800            debugError("Entry from invalid state: %s\n", ePSToString(m_state)); 
     801            return false; 
     802    } 
     803    m_state = ePS_WaitingForStreamDisable; 
     804    return true; 
     805
     806 
     807/** 
     808 * @brief Updates the state machine and calls the necessary transition functions 
     809 * @return true if successful, false if not 
     810 */ 
     811bool StreamProcessor::updateState() { 
     812    bool result = false; 
     813    // copy the current state locally since it could change value, 
     814    // and that's something we don't want to happen inbetween tests 
     815    // if m_next_state changes during this routine, we know for sure 
     816    // that the previous state change was at least attempted correctly. 
     817    enum eProcessorState next_state = m_next_state; 
     818 
     819    debugOutput(DEBUG_LEVEL_VERBOSE, "Do state transition: %s => %s\n", 
     820        ePSToString(m_state), ePSToString(next_state)); 
     821 
     822    if (m_state == next_state) { 
     823        debugWarning("ignoring identity state update from/to %s\n", ePSToString(m_state) ); 
     824        return true; 
     825    } 
     826 
     827    // after creation, only initialization is allowed 
     828    if (m_state == ePS_Created) { 
     829        if(next_state != ePS_Stopped) { 
     830            goto updateState_exit_with_error; 
     831        } 
     832        // do init here  
     833        result = doStop(); 
     834        if (result) return true; 
     835        else goto updateState_exit_change_failed; 
     836    } 
     837 
     838    // after initialization, only WaitingForRunningStream is allowed 
     839    if (m_state == ePS_Stopped) { 
     840        if(next_state != ePS_WaitingForStream) { 
     841            goto updateState_exit_with_error; 
     842        } 
     843        result = doWaitForRunningStream(); 
     844        if (result) return true; 
     845        else goto updateState_exit_change_failed; 
     846    } 
     847 
     848    // after WaitingForStream, only ePS_DryRunning is allowed 
     849    // this means that the stream started running 
     850    if (m_state == ePS_WaitingForStream) { 
     851        if(next_state != ePS_DryRunning) { 
     852            goto updateState_exit_with_error; 
     853        } 
     854        result = doDryRunning(); 
     855        if (result) return true; 
     856        else goto updateState_exit_change_failed; 
     857    } 
     858 
     859    // from ePS_DryRunning we can go to: 
     860    //   - ePS_Stopped if something went wrong during DryRunning 
     861    //   - ePS_WaitingForStreamEnable if there is a requested to enable 
     862    if (m_state == ePS_DryRunning) { 
     863        if((next_state != ePS_Stopped) && 
     864           (next_state != ePS_WaitingForStreamEnable)) { 
     865            goto updateState_exit_with_error; 
     866        } 
     867        if (next_state == ePS_Stopped) { 
     868            result = doStop(); 
     869        } else { 
     870            result = doWaitForStreamEnable(); 
     871        } 
     872        if (result) return true; 
     873        else goto updateState_exit_change_failed; 
     874    } 
     875 
     876    // from ePS_WaitingForStreamEnable we can go to: 
     877    //   - ePS_DryRunning if something went wrong while waiting 
     878    //   - ePS_Running if the stream enabled correctly 
     879    if (m_state == ePS_WaitingForStreamEnable) { 
     880        if((next_state != ePS_DryRunning) && 
     881           (next_state != ePS_Running)) { 
     882            goto updateState_exit_with_error; 
     883        } 
     884        if (next_state == ePS_Stopped) { 
     885            result = doDryRunning(); 
     886        } else { 
     887            result = doRunning(); 
     888        } 
     889        if (result) return true; 
     890        else goto updateState_exit_change_failed; 
     891    } 
     892 
     893    // from ePS_Running we can only start waiting for a disabled stream 
     894    if (m_state == ePS_Running) { 
     895        if(next_state != ePS_WaitingForStreamDisable) { 
     896            goto updateState_exit_with_error; 
     897        } 
     898        result = doWaitForStreamDisable(); 
     899        if (result) return true; 
     900        else goto updateState_exit_change_failed; 
     901    } 
     902 
     903    // from ePS_WaitingForStreamDisable we can go to DryRunning 
     904    if (m_state == ePS_WaitingForStreamDisable) { 
     905        if(next_state != ePS_DryRunning) { 
     906            goto updateState_exit_with_error; 
     907        } 
     908        result = doDryRunning(); 
     909        if (result) return true; 
     910        else goto updateState_exit_change_failed; 
     911    } 
     912 
     913    // if we arrive here there is an error 
     914updateState_exit_with_error: 
     915    debugError("Invalid state transition: %s => %s\n", 
     916        ePSToString(m_state), ePSToString(next_state)); 
     917    return false; 
     918updateState_exit_change_failed: 
     919    debugError("State transition failed: %s => %s\n", 
     920        ePSToString(m_state), ePSToString(next_state)); 
     921    return false; 
     922
     923 
     924 
     925/** 
     926 * @brief convert a eProcessorState to a string 
     927 * @param s the state 
     928 * @return a char * describing the state 
     929 */ 
    349930const char * 
    350931StreamProcessor::ePSToString(enum eProcessorState s) { 
    351932    switch (s) { 
     933        case ePS_Invalid: return "ePS_Invalid"; 
    352934        case ePS_Created: return "ePS_Created"; 
    353         case ePS_Initialized: return "ePS_Initialized"; 
    354         case ePS_WaitingForRunningStream: return "ePS_WaitingForRunningStream"; 
     935        case ePS_Stopped: return "ePS_Stopped"; 
     936        case ePS_WaitingForStream: return "ePS_WaitingForStream"; 
    355937        case ePS_DryRunning: return "ePS_DryRunning"; 
    356         case ePS_WaitingForEnabledStream: return "ePS_WaitingForEnabledStream"; 
    357         case ePS_StreamEnabled: return "ePS_StreamEnabled"; 
    358         case ePS_WaitingForDisabledStream: return "ePS_WaitingForDisabledStream"; 
    359     } 
     938        case ePS_WaitingForStreamEnable: return "ePS_WaitingForStreamEnable"; 
     939        case ePS_Running: return "ePS_Running"; 
     940        case ePS_WaitingForStreamDisable: return "ePS_WaitingForStreamDisable"; 
     941        default: return "error: unknown state"; 
     942    } 
     943
     944 
     945/** 
     946 * @brief convert a eProcessorType to a string 
     947 * @param t the type 
     948 * @return a char * describing the state 
     949 */ 
     950const char * 
     951StreamProcessor::ePTToString(enum eProcessorType t) { 
     952    switch (t) { 
     953        case ePT_Receive: return "Receive"; 
     954        case ePT_Transmit: return "Transmit"; 
     955        default: return "error: unknown type"; 
     956    } 
     957
     958 
     959/*********************************************** 
     960 * Debug                                       * 
     961 ***********************************************/ 
     962void 
     963StreamProcessor::dumpInfo() 
     964
     965    debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor information\n"); 
     966    debugOutputShort( DEBUG_LEVEL_NORMAL, "  Iso stream info:\n"); 
     967 
     968    IsoStream::dumpInfo(); 
     969    debugOutputShort( DEBUG_LEVEL_NORMAL, "  StreamProcessor info:\n"); 
     970    if (m_handler) 
     971        debugOutputShort( DEBUG_LEVEL_NORMAL, "  Now                   : %011u\n",m_handler->getCycleTimerTicks()); 
     972    debugOutputShort( DEBUG_LEVEL_NORMAL, "  Xruns                 : %d\n", m_xruns); 
     973    debugOutputShort( DEBUG_LEVEL_NORMAL, "  State                 : %s\n", ePSToString(m_state)); 
     974    debugOutputShort( DEBUG_LEVEL_NORMAL, "   Next state           : %s\n", ePSToString(m_next_state)); 
     975    debugOutputShort( DEBUG_LEVEL_NORMAL, "    transition at       : %u\n", m_cycle_to_switch_state); 
     976     
     977 
     978    debugOutputShort( DEBUG_LEVEL_NORMAL, "  Nominal framerate     : %u\n", m_manager->getNominalRate()); 
     979    debugOutputShort( DEBUG_LEVEL_NORMAL, "  Device framerate      : Sync: %f, Buffer %f\n", 
     980        24576000.0/m_manager->getSyncSource().m_data_buffer->getRate(), 
     981        24576000.0/m_data_buffer->getRate() 
     982        ); 
     983 
     984    m_data_buffer->dumpInfo(); 
     985 
     986    m_PeriodStat.dumpInfo(); 
     987    m_PacketStat.dumpInfo(); 
     988//     m_WakeupStat.dumpInfo(); 
     989
     990 
     991void 
     992StreamProcessor::setVerboseLevel(int l) { 
     993    setDebugLevel(l); 
     994    IsoStream::setVerboseLevel(l); 
     995    PortManager::setVerboseLevel(l); 
     996    m_data_buffer->setVerboseLevel(l); 
    360997} 
    361998 
  • branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.h

    r715 r719  
    5050                        public PortManager, 
    5151                        public Util::TimestampedBufferClient, 
    52                         public Util::OptionContainer { 
     52                        public Util::OptionContainer 
     53
    5354 
    5455    friend class StreamProcessorManager; // FIXME: get rid of this 
     
    6566    // this can only be set by the constructor 
    6667    enum eProcessorType m_processor_type; 
    67  
     68    // pretty printing 
     69    const char *ePTToString(enum eProcessorType); 
    6870protected: 
    6971    ///> the state the streamprocessor is in 
    7072    enum eProcessorState { 
     73        ePS_Invalid, 
    7174        ePS_Created, 
    72         ePS_Initialized, 
    73         ePS_WaitingForRunningStream, 
     75        // ePS_WaitingToStop, FIXME: this will be needed for the MOTU's 
     76        ePS_Stopped, 
     77        ePS_WaitingForStream, 
    7478        ePS_DryRunning, 
    75         ePS_WaitingForEnabledStream
    76         ePS_StreamEnabled
    77         ePS_WaitingForDisabledStream
     79        ePS_WaitingForStreamEnable
     80        ePS_Running
     81        ePS_WaitingForStreamDisable
    7882    }; 
    7983     
     
    8488private: 
    8589    enum eProcessorState m_state; 
     90    // state switching 
     91    enum eProcessorState m_next_state; 
     92    unsigned int m_cycle_to_switch_state; 
     93    bool updateState(); 
     94    // pretty printing 
    8695    const char *ePSToString(enum eProcessorState); 
    8796 
     97    bool doStop(); 
     98    bool doWaitForRunningStream(); 
     99    bool doDryRunning(); 
     100    bool doWaitForStreamEnable(); 
     101    bool doRunning(); 
     102    bool doWaitForStreamDisable(); 
     103 
     104    bool scheduleStateTransition(enum eProcessorState state, uint64_t time_instant); 
     105    bool scheduleAndWaitForStateTransition(enum eProcessorState state,  
     106                                           uint64_t time_instant,  
     107                                           enum eProcessorState wait_state); 
     108public: 
     109    bool isRunning() 
     110            {return m_state == ePS_Running;}; 
     111    bool isDryRunning() 
     112            {return m_state == ePS_DryRunning;}; 
     113 
     114//--- state stuff (TODO: cleanup) 
     115    bool startDryRunning(int64_t time_to_start_at); 
     116    bool startRunning(int64_t time_to_start_at); 
     117    bool stopDryRunning(int64_t time_to_stop_at); 
     118    bool stopRunning(int64_t time_to_stop_at); 
     119 
     120    // the main difference between init and prepare is that when prepare is called, 
     121    // the SP is registered to a manager (FIXME: can't it be called by the manager?) 
     122    bool init(); 
     123    bool prepare(); 
     124    ///> stop the SP from running or dryrunning 
     125    bool stop(); 
    88126// constructor/destructor 
    89127public: 
     
    109147 
    110148    // the receive interface accepts packets and provides frames 
    111     // implement these for a receive SP 
    112     // leave default for a transmit SP 
    113     virtual enum raw1394_iso_disposition 
     149     
     150    // the following two methods are to be implemented by subclasses 
     151    virtual bool processPacketHeader(unsigned char *data, unsigned int length, 
     152                  unsigned char channel, unsigned char tag, unsigned char sy, 
     153                  unsigned int cycle, unsigned int dropped) 
     154        {debugWarning("call not allowed\n"); return false;}; 
     155    virtual bool processPacketData(unsigned char *data, unsigned int length, 
     156                  unsigned char channel, unsigned char tag, unsigned char sy, 
     157                  unsigned int cycle, unsigned int dropped) 
     158        {debugWarning("call not allowed\n"); return false;}; 
     159 
     160    // this one is implemented by us 
     161    enum raw1394_iso_disposition 
    114162        putPacket(unsigned char *data, unsigned int length, 
    115163                  unsigned char channel, unsigned char tag, unsigned char sy, 
    116                   unsigned int cycle, unsigned int dropped) 
    117         {debugWarning("call not allowed\n"); return RAW1394_ISO_STOP;}; 
    118     virtual bool getFrames(unsigned int nbframes, int64_t ts) 
    119         {debugWarning("call not allowed\n"); return false;}; 
    120     virtual bool getFramesDry(unsigned int nbframes, int64_t ts) 
    121         {debugWarning("call not allowed\n"); return false;}; 
     164                  unsigned int cycle, unsigned int dropped); 
     165 
     166    bool getFrames(unsigned int nbframes, int64_t ts); ///< transfer the buffer contents to the client 
     167protected: 
     168    // to be implemented by the children 
    122169    virtual bool processReadBlock(char *data, unsigned int nevents, unsigned int offset) 
    123170        {debugWarning("call not allowed\n"); return false;}; 
    124  
    125  
    126 //--- state stuff (TODO: cleanup) 
    127     bool xrunOccurred() { return (m_xruns>0); }; 
    128     bool isRunning(); ///< returns true if there is some stream data processed 
    129     virtual bool prepareForEnable(uint64_t time_to_enable_at); 
    130     virtual bool prepareForDisable(); 
    131  
    132     bool enable(uint64_t time_to_enable_at); ///< enable the stream processing 
    133     bool disable(); ///< disable the stream processing 
    134     bool isEnabled() {return !m_is_disabled;}; 
    135  
    136     virtual bool reset(); ///< reset the streams & buffers (e.g. after xrun) 
    137  
    138     virtual bool prepare(); ///< prepare the streams & buffers (e.g. prefill) 
    139     virtual bool init(); 
    140     virtual bool prepareForStop() {return true;}; 
    141     virtual bool prepareForStart() {return true;}; 
     171    virtual bool provideSilenceBlock(unsigned int nevents, unsigned int offset) 
     172        {debugWarning("call not allowed\n"); return false;}; 
     173 
     174private: 
     175    bool getFramesDry(unsigned int nbframes, int64_t ts); 
     176    bool getFramesWet(unsigned int nbframes, int64_t ts); 
    142177 
    143178    // move to private? 
    144     void resetXrunCounter(); 
    145 protected: 
    146     bool m_running; 
    147     bool m_disabled; 
    148     bool m_is_disabled; 
    149     unsigned int m_cycle_to_enable_at; 
     179    bool xrunOccurred() { return (m_xruns>0); }; // FIXME: m_xruns not updated 
     180 
     181protected: // FIXME: move to private 
     182    uint64_t m_dropped; /// FIXME:debug 
     183    uint64_t m_last_dropped; /// FIXME:debug 
     184    int m_last_good_cycle; /// FIXME:debug 
     185    uint64_t m_last_timestamp; /// last timestamp (in ticks) 
     186    uint64_t m_last_timestamp2; /// last timestamp (in ticks) 
     187    uint64_t m_last_timestamp_at_period_ticks; 
    150188 
    151189//--- data buffering and accounting 
     
    170208         *         false if it can't 
    171209         */ 
    172         virtual bool canClientTransferFrames(unsigned int nframes); 
     210        bool canClientTransferFrames(unsigned int nframes); 
    173211 
    174212        /** 
     
    181219         * @return true if the operation was successful 
    182220         */ 
    183         virtual bool dropFrames(unsigned int nframes); 
     221        bool dropFrames(unsigned int nframes); 
    184222 
    185223        /** 
     
    215253         * @return the time in internal units 
    216254         */ 
    217         virtual uint64_t getTimeAtPeriod(); 
     255        uint64_t getTimeAtPeriod(); 
    218256 
    219257        uint64_t getTimeNow(); 
     
    230268         * @param d sync delay 
    231269         */ 
    232         void setSyncDelay(int d) {m_sync_delay=d;}; 
     270        void setSyncDelay(int d) {m_sync_delay = d;}; 
    233271 
    234272        /** 
     
    247285         * @return maximal frame latency 
    248286         */ 
    249         virtual int getMaxFrameLatency(); 
    250  
    251         StreamProcessor& getSyncSource(); 
     287        int getMaxFrameLatency(); 
    252288 
    253289        float getTicksPerFrame(); 
     
    256292 
    257293        int getBufferFill(); 
     294 
     295        // Child implementation interface 
     296        /** 
     297        * @brief prepare the child SP 
     298        * @return true if successful, false otherwise 
     299        * @pre the m_manager pointer points to a valid manager 
     300        * @post getEventsPerFrame() returns the correct value 
     301        * @post getEventSize() returns the correct value 
     302        * @post getUpdatePeriod() returns the correct value 
     303        * @post processPacketHeader(...) can be called 
     304        * @post processPacketData(...) can be called 
     305        */ 
     306        virtual bool prepareChild() = 0; 
     307        /** 
     308         * @brief get the number of events contained in one frame 
     309         * @return the number of events contained in one frame 
     310         */ 
     311        virtual unsigned int getEventsPerFrame() = 0; 
     312 
     313        /** 
     314         * @brief get the size of one frame in bytes 
     315         * @return the size of one frame in bytes 
     316         */ 
     317        virtual unsigned int getEventSize() = 0; 
     318 
     319        /** 
     320         * @brief get the nominal number of frames between buffer updates 
     321         * @return the nominal number of frames between buffer updates 
     322         */ 
     323        virtual unsigned int getUpdatePeriod() = 0; 
    258324 
    259325    protected: 
  • branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp

    r715 r719  
    3333#define PREPARE_TIMEOUT_MSEC 4000 
    3434#define ENABLE_TIMEOUT_MSEC 4000 
     35 
     36// allows to add some processing margin. This shifts the time 
     37// at which the buffer is transfer()'ed, making things somewhat 
     38// more robust. It should be noted though that shifting the transfer 
     39// time to a later time instant also causes the xmit buffer fill to be 
     40// lower on average. 
     41#define FFADO_SIGNAL_DELAY_TICKS 3072 
    3542 
    3643namespace Streaming { 
     
    5360StreamProcessorManager::~StreamProcessorManager() { 
    5461    if (m_isoManager) delete m_isoManager; 
    55  
    5662} 
    5763 
     
    9096 
    9197    debugFatal("Unsupported processor type!\n"); 
    92  
    9398    return false; 
    9499} 
     
    102107 
    103108        for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    104             it != m_ReceiveProcessors.end(); 
    105             ++it ) { 
    106  
     109              it != m_ReceiveProcessors.end(); 
     110              ++it ) 
     111        { 
    107112            if ( *it == processor ) { 
    108                     m_ReceiveProcessors.erase(it); 
    109  
    110                     processor->clearManager(); 
    111  
    112                     if(!m_isoManager->unregisterStream(processor)) { 
    113                         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n"); 
    114  
    115                         return false; 
    116  
    117                     } 
    118  
    119                     return true; 
     113                m_ReceiveProcessors.erase(it); 
     114                processor->clearManager(); 
     115                if(!m_isoManager->unregisterStream(processor)) { 
     116                    debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n"); 
     117                    return false; 
    120118                } 
     119                return true; 
     120            } 
    121121        } 
    122122    } 
     
    124124    if (processor->getType()==StreamProcessor::ePT_Transmit) { 
    125125        for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    126             it != m_TransmitProcessors.end(); 
    127             ++it ) { 
    128  
     126              it != m_TransmitProcessors.end(); 
     127              ++it ) 
     128        { 
    129129            if ( *it == processor ) { 
    130                     m_TransmitProcessors.erase(it); 
    131  
    132                     processor->clearManager(); 
    133  
    134                     if(!m_isoManager->unregisterStream(processor)) { 
    135                         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n"); 
    136  
    137                         return false; 
    138  
    139                     } 
    140  
    141                     return true; 
     130                m_TransmitProcessors.erase(it); 
     131                processor->clearManager(); 
     132                if(!m_isoManager->unregisterStream(processor)) { 
     133                    debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n"); 
     134                    return false; 
    142135                } 
     136                return true; 
     137            } 
    143138        } 
    144139    } 
    145140 
    146141    debugFatal("Processor (%p) not found!\n",processor); 
    147  
    148142    return false; //not found 
    149  
    150143} 
    151144 
    152145bool StreamProcessorManager::setSyncSource(StreamProcessor *s) { 
    153146    debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s); 
    154  
    155147    m_SyncSource=s; 
    156148    return true; 
     
    160152{ 
    161153    debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); 
    162  
    163154    m_isoManager = new IsoHandlerManager(m_thread_realtime, m_thread_priority + 1); 
    164  
    165155    if(!m_isoManager) { 
    166156        debugFatal("Could not create IsoHandlerManager\n"); 
    167157        return false; 
    168158    } 
    169  
    170     // propagate the debug level 
    171159    m_isoManager->setVerboseLevel(getDebugLevel()); 
    172  
    173160    if(!m_isoManager->init()) { 
    174161        debugFatal("Could not initialize IsoHandlerManager\n"); 
     
    177164 
    178165    m_xrun_happened=false; 
    179  
    180166    return true; 
    181167} 
     
    195181    } 
    196182 
    197     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    198         it != m_ReceiveProcessors.end(); 
    199         ++it ) { 
    200             if(m_SyncSource == NULL) { 
    201                 debugWarning(" => Sync Source is %p.\n", *it); 
    202                 m_SyncSource = *it; 
    203             } 
    204     } 
    205  
    206     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    207         it != m_TransmitProcessors.end(); 
    208         ++it ) { 
    209             if(m_SyncSource == NULL) { 
    210                 debugWarning(" => Sync Source is %p.\n", *it); 
    211                 m_SyncSource = *it; 
    212             } 
    213     } 
    214  
    215     // now do the actual preparation 
     183    // FIXME: put into separate method 
     184    for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     185          it != m_ReceiveProcessors.end(); 
     186          ++it ) 
     187    { 
     188        if(m_SyncSource == NULL) { 
     189            debugWarning(" => Sync Source is %p.\n", *it); 
     190            m_SyncSource = *it; 
     191        } 
     192    } 
     193    for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     194          it != m_TransmitProcessors.end(); 
     195          ++it ) 
     196    { 
     197        if(m_SyncSource == NULL) { 
     198            debugWarning(" => Sync Source is %p.\n", *it); 
     199            m_SyncSource = *it; 
     200        } 
     201    } 
     202 
     203    // now do the actual preparation of the SP's 
    216204    debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n"); 
    217205    for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     
    228216        } 
    229217    } 
    230  
    231218    debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n"); 
    232219    for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     
    248235        return false; 
    249236    } 
    250  
    251237    return true; 
    252238} 
    253239 
     240bool StreamProcessorManager::startDryRunning() { 
     241    debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for StreamProcessor streams to start dry-running...\n"); 
     242    for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     243            it != m_ReceiveProcessors.end(); 
     244            ++it ) { 
     245        if(!(*it)->startDryRunning(-1)) { 
     246            debugError("Could not put SP %p into the dry-running state\n", *it); 
     247            return false; 
     248        } 
     249    } 
     250    for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     251            it != m_TransmitProcessors.end(); 
     252            ++it ) { 
     253        if(!(*it)->startDryRunning(-1)) { 
     254            debugError("Could not put SP %p into the dry-running state\n", *it); 
     255            return false; 
     256        } 
     257    } 
     258    debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n"); 
     259    return true; 
     260} 
     261 
    254262bool StreamProcessorManager::syncStartAll() { 
    255  
    256     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for StreamProcessor streams to start running...\n"); 
    257     // we have to wait until all streamprocessors indicate that they are running 
    258     // i.e. that there is actually some data stream flowing 
    259     int wait_cycles=RUNNING_TIMEOUT_MSEC; // two seconds 
    260     bool notRunning=true; 
    261     while (notRunning && wait_cycles) { 
    262         wait_cycles--; 
    263         notRunning=false; 
    264  
    265         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    266                 it != m_ReceiveProcessors.end(); 
    267                 ++it ) { 
    268             if(!(*it)->isRunning()) notRunning=true; 
    269         } 
    270  
    271         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    272                 it != m_TransmitProcessors.end(); 
    273                 ++it ) { 
    274             if(!(*it)->isRunning()) notRunning=true; 
    275         } 
    276  
    277         usleep(1000); 
    278         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Running check: %d\n", notRunning); 
    279     } 
    280  
    281     if(!wait_cycles) { // timout has occurred 
    282         debugFatal("One or more streams are not starting up (timeout):\n"); 
    283  
    284         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    285                 it != m_ReceiveProcessors.end(); 
    286                 ++it ) { 
    287             if(!(*it)->isRunning()) { 
    288                 debugFatal(" receive stream %p not running\n",*it); 
    289             } else { 
    290                 debugFatal(" receive stream %p running\n",*it); 
    291             } 
    292         } 
    293  
    294         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    295                 it != m_TransmitProcessors.end(); 
    296                 ++it ) { 
    297             if(!(*it)->isRunning()) { 
    298                 debugFatal(" transmit stream %p not running\n",*it); 
    299             } else { 
    300                 debugFatal(" transmit stream %p running\n",*it); 
    301             } 
    302         } 
    303         return false; 
    304     } 
    305  
    306     debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n"); 
     263    // figure out when to get the SP's running. 
     264    // the xmit SP's should also know the base timestamp 
     265    // streams should be aligned here 
    307266 
    308267    // now find out how long we have to delay the wait operation such that 
     
    318277    } 
    319278 
     279    // add some processing margin. This only shifts the time 
     280    // at which the buffer is transfer()'ed. This makes things somewhat 
     281    // more robust. It should be noted though that shifting the transfer 
     282    // time to a later time instant also causes the xmit buffer fill to be 
     283    // lower on average. 
     284    max_of_min_delay += FFADO_SIGNAL_DELAY_TICKS; 
    320285    debugOutput( DEBUG_LEVEL_VERBOSE, "  %d ticks (%03us %04uc %04ut)...\n",  
    321286        max_of_min_delay, 
     
    325290    m_SyncSource->setSyncDelay(max_of_min_delay); 
    326291 
    327     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device to indicate clock sync lock...\n"); 
     292    //STEP X: when we implement such a function, we can wait for a signal from the devices that they 
     293    //        have aquired lock 
     294    //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n"); 
    328295    //sleep(2); // FIXME: be smarter here 
    329      
    330     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting StreamProcessors...\n"); 
    331     // now we reset the frame counters 
    332     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    333             it != m_ReceiveProcessors.end(); 
    334             ++it ) { 
    335         // get the receive SP's going at receiving data 
    336         (*it)->m_data_buffer->setTransparent(false); 
    337         (*it)->reset(); 
    338     } 
    339  
    340     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    341             it != m_TransmitProcessors.end(); 
    342             ++it ) { 
    343         // make sure the SP retains it's prefilled data 
    344         (*it)->m_data_buffer->setTransparent(false); 
    345         (*it)->reset(); 
    346     } 
    347      
    348     dumpInfo(); 
    349     // All buffers are prefilled and set-up, the only thing  
    350     // that remains a mistery is the timestamp information. 
    351 //     m_SyncSource->m_data_buffer->setTransparent(false); 
    352 //     debugShowBackLog(); 
    353  
    354 //     m_SyncSource->setVerboseLevel(DEBUG_LEVEL_ULTRA_VERBOSE); 
    355      
     296 
     297    // wait for some sort of sync 
    356298    debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n"); 
    357     // in order to obtain that, we wait for the first periods to be 
    358     // received. 
     299    // in order to obtain that, we wait for the first periods to be received. 
    359300    int nb_sync_runs=20; 
     301    int64_t time_till_next_period; 
    360302    while(nb_sync_runs--) { // or while not sync-ed? 
    361         waitForPeriod(); 
    362         // drop the frames for all receive SP's 
    363         dryRun(StreamProcessor::ePT_Receive); 
    364          
    365         // we don't have to dryrun for the xmit SP's since they 
    366         // are not sending data yet. 
    367          
    368         // sync the xmit SP's buffer head timestamps 
    369         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    370                 it != m_TransmitProcessors.end(); 
    371                 ++it ) { 
    372             // FIXME: encapsulate 
    373             (*it)->m_data_buffer->setBufferHeadTimestamp(m_time_of_transfer); 
    374         } 
    375     } 
    376 //     m_SyncSource->setVerboseLevel(DEBUG_LEVEL_VERBOSE); 
    377  
     303        time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); 
     304        debugOutput( DEBUG_LEVEL_VERBOSE, "waiting for %d usecs...\n", time_till_next_period); 
     305        if(time_till_next_period > 0) { 
     306            // wait for the period 
     307            usleep(time_till_next_period); 
     308        } 
     309    } 
     310 
     311    // figure out where we are now 
     312    uint64_t time_of_transfer = m_SyncSource->getTimeAtPeriod(); 
    378313    debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",  
    379         m_time_of_transfer, 
    380         (unsigned int)TICKS_TO_SECS(m_time_of_transfer), 
    381         (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer), 
    382         (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer)); 
    383     // FIXME: xruns can screw up the framecounter accounting. do something more sane here 
    384     resetXrunCounters(); 
    385     // lock the isohandlermanager such that things freeze 
    386 //     debugShowBackLog(); 
    387  
     314        time_of_transfer, 
     315        (unsigned int)TICKS_TO_SECS(time_of_transfer), 
     316        (unsigned int)TICKS_TO_CYCLES(time_of_transfer), 
     317        (unsigned int)TICKS_TO_OFFSET(time_of_transfer)); 
     318 
     319    // start wet-running in 200 cycles 
     320    // this is the timeframe in which the remaining code should be ready 
     321    time_of_transfer = addTicks(time_of_transfer, 200*TICKS_PER_CYCLE); 
     322 
     323    debugOutput( DEBUG_LEVEL_VERBOSE, "  => start at TS=%011llu (%03us %04uc %04ut)...\n",  
     324        time_of_transfer, 
     325        (unsigned int)TICKS_TO_SECS(time_of_transfer), 
     326        (unsigned int)TICKS_TO_CYCLES(time_of_transfer), 
     327        (unsigned int)TICKS_TO_OFFSET(time_of_transfer)); 
    388328    // we now should have decent sync info 
    389329    // the buffers of the receive streams should be (approx) empty 
    390330    // the buffers of the xmit streams should be full 
    391      
    392     // note what should the timestamp of the first sample be? 
    393331     
    394332    // at this point the buffer head timestamp of the transmit buffers can be 
     
    405343//     int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks); 
    406344 
    407     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    408             it != m_TransmitProcessors.end(); 
    409             ++it ) { 
    410         // FIXME: encapsulate 
    411         (*it)->m_data_buffer->setBufferHeadTimestamp(m_time_of_transfer); 
    412         //(*it)->m_data_buffer->setNominalRate(rate); //CHECK!!! 
    413     } 
     345//     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     346//             it != m_TransmitProcessors.end(); 
     347//             ++it ) { 
     348//         // FIXME: encapsulate 
     349//         (*it)->m_data_buffer->setBufferHeadTimestamp(m_time_of_transfer); 
     350//         //(*it)->m_data_buffer->setNominalRate(rate); //CHECK!!! 
     351//     } 
    414352     
    415353    dumpInfo(); 
    416      
    417     // this is the place were we should be syncing the received streams too 
    418     // check how much they differ 
    419      
    420      
    421     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n"); 
    422  
    423     // FIXME: this should not be in cycles, but in 'time' 
    424     // FIXME: remove the timestamp 
    425     if (!enableStreamProcessors(0)) { 
    426         debugFatal("Could not enable StreamProcessors...\n"); 
    427         return false; 
    428     } 
    429  
    430     debugOutput( DEBUG_LEVEL_VERBOSE, "Running dry for a while...\n"); 
    431     #define MAX_DRYRUN_CYCLES               40 
    432     #define MIN_SUCCESSFUL_DRYRUN_CYCLES    4 
    433     // run some cycles 'dry' such that everything can stabilize 
    434     int nb_dryrun_cycles_left = MAX_DRYRUN_CYCLES; 
    435     int nb_succesful_cycles = 0; 
    436     while(nb_dryrun_cycles_left > 0 && 
    437           nb_succesful_cycles < MIN_SUCCESSFUL_DRYRUN_CYCLES ) { 
    438  
    439         waitForPeriod(); 
    440  
    441         if (dryRun()) { 
    442             nb_succesful_cycles++; 
    443         } else { 
    444             debugOutput( DEBUG_LEVEL_VERBOSE, " This dry-run was not xrun free...\n" ); 
    445             resetXrunCounters(); 
    446             // reset the transmit SP's such that there is no issue with accumulating buffers 
    447             // FIXME: what about receive SP's 
    448             for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    449                     it != m_TransmitProcessors.end(); 
    450                     ++it ) { 
    451                 // FIXME: encapsulate 
    452                 (*it)->reset(); //CHECK!!! 
    453                 (*it)->m_data_buffer->setBufferHeadTimestamp(m_time_of_transfer); 
    454             } 
    455              
    456             nb_succesful_cycles = 0; 
    457             // FIXME: xruns can screw up the framecounter accounting. do something more sane here 
    458         } 
    459         nb_dryrun_cycles_left--; 
    460     } 
    461  
    462     if(nb_dryrun_cycles_left == 0) { 
    463         debugOutput( DEBUG_LEVEL_VERBOSE, " max # dry-run cycles achieved without steady-state...\n" ); 
    464         return false; 
    465     } 
    466     debugOutput( DEBUG_LEVEL_VERBOSE, " dry-run resulted in steady-state...\n" ); 
    467  
    468     // now we should clear the xrun flags 
    469     resetXrunCounters(); 
    470  
    471 /*    debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning streams...\n"); 
    472     // run some cycles 'dry' such that everything can stabilize 
    473     nb_dryrun_cycles_left = MAX_DRYRUN_CYCLES; 
    474     nb_succesful_cycles = 0; 
    475     while(nb_dryrun_cycles_left > 0 && 
    476           nb_succesful_cycles < MIN_SUCCESSFUL_DRYRUN_CYCLES ) { 
    477  
    478         waitForPeriod(); 
    479  
    480         // align the received streams 
    481         int64_t sp_lag; 
    482         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    483                 it != m_ReceiveProcessors.end(); 
    484                 ++it ) { 
    485             uint64_t ts_sp=(*it)->getTimeAtPeriod(); 
    486             uint64_t ts_sync=m_SyncSource->getTimeAtPeriod(); 
    487  
    488             sp_lag = diffTicks(ts_sp, ts_sync); 
    489             debugOutput( DEBUG_LEVEL_VERBOSE, "  SP(%p) TS=%011llu - TS=%011llu = %04lld\n",  
    490                 (*it), ts_sp, ts_sync, sp_lag); 
    491             // sync the other receive SP's to the sync source 
    492 //             if((*it) != m_SyncSource) { 
    493 //                 if(!(*it)->m_data_buffer->syncCorrectLag(sp_lag)) { 
    494 //                         debugOutput(DEBUG_LEVEL_VERBOSE,"could not syncCorrectLag(%11lld) for stream processor (%p)\n", 
    495 //                                 sp_lag, *it); 
    496 //                 } 
    497 //             } 
    498         } 
    499  
    500  
    501         if (dryRun()) { 
    502             nb_succesful_cycles++; 
    503         } else { 
    504             debugOutput( DEBUG_LEVEL_VERBOSE, " This dry-run was not xrun free...\n" ); 
    505             resetXrunCounters(); 
    506             nb_succesful_cycles = 0; 
    507             // FIXME: xruns can screw up the framecounter accounting. do something more sane here 
    508         } 
    509         nb_dryrun_cycles_left--; 
    510     } 
    511  
    512     if(nb_dryrun_cycles_left == 0) { 
    513         debugOutput( DEBUG_LEVEL_VERBOSE, " max # dry-run cycles achieved without aligned steady-state...\n" ); 
    514         return false; 
    515     } 
    516     debugOutput( DEBUG_LEVEL_VERBOSE, " dry-run resulted in aligned steady-state...\n" );*/ 
    517      
    518     // now we should clear the xrun flags 
    519     resetXrunCounters(); 
    520     // and off we go 
     354 
     355    // STEP X: switch SP's over to the running state 
     356    uint64_t time_to_start = addTicks(time_of_transfer, 
     357                                      m_SyncSource->getTicksPerFrame() * getPeriodSize()); 
     358    for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     359            it != m_ReceiveProcessors.end(); 
     360            ++it ) { 
     361        if(!(*it)->startRunning(time_to_start)) { 
     362            debugError("Could not put SP %p into the running state\n", *it); 
     363            return false; 
     364        } 
     365    } 
     366    for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     367            it != m_TransmitProcessors.end(); 
     368            ++it ) { 
     369        if(!(*it)->startRunning(time_to_start)) { 
     370            debugError("Could not put SP %p into the running state\n", *it); 
     371            return false; 
     372        } 
     373    } 
     374    debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n"); 
    521375    return true; 
    522 } 
    523  
    524 void StreamProcessorManager::resetXrunCounters(){ 
    525     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting xrun flags...\n"); 
    526     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    527         it != m_ReceiveProcessors.end(); 
    528         ++it ) 
    529     { 
    530         (*it)->resetXrunCounter(); 
    531     } 
    532  
    533     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    534         it != m_TransmitProcessors.end(); 
    535         ++it )  
    536     { 
    537         (*it)->resetXrunCounter(); 
    538     } 
    539376} 
    540377 
     
    546383    debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n"); 
    547384    for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    548         it != m_ReceiveProcessors.end(); 
    549         ++it ) { 
    550             if (!(*it)->prepareForStart()) { 
    551                 debugOutput(DEBUG_LEVEL_VERBOSE,"Receive stream processor (%p) failed to prepare for start\n", *it); 
    552                 return false; 
    553             } 
    554             if (!m_isoManager->registerStream(*it)) { 
    555                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it); 
    556                 return false; 
    557             } 
    558         } 
    559  
     385          it != m_ReceiveProcessors.end(); 
     386          ++it ) 
     387    { 
     388        if (!m_isoManager->registerStream(*it)) { 
     389            debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it); 
     390            return false; 
     391        } 
     392    } 
    560393    debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n"); 
    561394    for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    562         it != m_TransmitProcessors.end(); 
    563         ++it ) { 
    564             if (!(*it)->prepareForStart()) { 
    565                 debugOutput(DEBUG_LEVEL_VERBOSE,"Transmit stream processor (%p) failed to prepare for start\n", *it); 
    566                 return false; 
    567             } 
    568             if (!m_isoManager->registerStream(*it)) { 
    569                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it); 
    570                 return false; 
    571             } 
    572         } 
     395          it != m_TransmitProcessors.end(); 
     396          ++it ) 
     397    { 
     398        if (!m_isoManager->registerStream(*it)) { 
     399            debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it); 
     400            return false; 
     401        } 
     402    } 
    573403 
    574404    debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n"); 
     
    578408    } 
    579409 
    580     debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n"); 
    581         if (!disableStreamProcessors()) { 
    582         debugFatal("Could not disable StreamProcessors...\n"); 
    583         return false; 
    584     } 
    585  
    586410    debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n"); 
    587411    if (!m_isoManager->startHandlers(-1)) { 
     
    590414    } 
    591415 
     416    // put all SP's into dry-running state 
     417    if (!startDryRunning()) { 
     418        debugFatal("Could not put SP's in dry-running state\n"); 
     419        return false; 
     420    } 
     421 
    592422    // start all SP's synchonized 
    593423    if (!syncStartAll()) { 
     
    602432 
    603433    return true; 
    604  
    605434} 
    606435 
     
    613442    // (like the MOTU) need to do a few things before it's safe to turn off the iso 
    614443    // handling. 
    615     int wait_cycles=PREPARE_TIMEOUT_MSEC; // two seconds ought to be sufficient 
    616     bool allReady = false; 
    617     while (!allReady && wait_cycles) { 
    618         wait_cycles--; 
    619         allReady = true; 
    620  
    621         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    622             it != m_ReceiveProcessors.end(); 
    623             ++it ) { 
    624             if(!(*it)->prepareForStop()) allReady = false; 
    625         } 
    626  
    627         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    628             it != m_TransmitProcessors.end(); 
    629             ++it ) { 
    630             if(!(*it)->prepareForStop()) allReady = false; 
    631         } 
    632         usleep(1000); 
    633     } 
    634  
    635     debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n"); 
    636         if (!disableStreamProcessors()) { 
    637         debugFatal("Could not disable StreamProcessors...\n"); 
    638         return false; 
     444    for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     445          it != m_ReceiveProcessors.end(); 
     446          ++it ) { 
     447        if(!(*it)->stop()) { 
     448            debugError("Could not stop SP %p", (*it)); 
     449        } 
     450    } 
     451    for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     452          it != m_TransmitProcessors.end(); 
     453          ++it ) { 
     454        if(!(*it)->stop()) { 
     455            debugError("Could not stop SP %p", (*it)); 
     456        } 
    639457    } 
    640458 
     
    649467    debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n"); 
    650468    for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    651         it != m_ReceiveProcessors.end(); 
    652         ++it ) { 
    653             if (!m_isoManager->unregisterStream(*it)) { 
    654                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it); 
    655                 return false; 
    656             } 
    657  
    658         } 
    659  
     469          it != m_ReceiveProcessors.end(); 
     470          ++it ) { 
     471        if (!m_isoManager->unregisterStream(*it)) { 
     472            debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it); 
     473            return false; 
     474        } 
     475    } 
    660476    debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n"); 
    661477    for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    662         it != m_TransmitProcessors.end(); 
    663         ++it ) { 
    664             if (!m_isoManager->unregisterStream(*it)) { 
    665                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it); 
    666                 return false; 
    667             } 
    668  
    669         } 
    670  
    671     return true; 
    672  
    673 
    674  
    675 /** 
    676  * Enables the registered StreamProcessors 
    677  * @return true if successful, false otherwise 
    678  */ 
    679 bool StreamProcessorManager::enableStreamProcessors(uint64_t time_to_enable_at) { 
    680     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors at %llu...\n", time_to_enable_at); 
    681  
    682     debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor (%p)...\n",m_SyncSource); 
    683     debugOutput( DEBUG_LEVEL_VERBOSE, "  Prepare...\n"); 
    684     if (!m_SyncSource->prepareForEnable(time_to_enable_at)) { 
    685             debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n"); 
    686         return false; 
    687     } 
    688  
    689     debugOutput( DEBUG_LEVEL_VERBOSE, "  Enable...\n"); 
    690     m_SyncSource->enable(time_to_enable_at); 
    691  
    692     debugOutput( DEBUG_LEVEL_VERBOSE, " Other StreamProcessors...\n"); 
    693  
    694     // we prepare the streamprocessors for enable 
    695     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    696             it != m_ReceiveProcessors.end(); 
    697             ++it ) { 
    698         if(*it != m_SyncSource) { 
    699             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Receive SP (%p)...\n",*it); 
    700             (*it)->prepareForEnable(time_to_enable_at); 
    701         } 
    702     } 
    703  
    704     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    705             it != m_TransmitProcessors.end(); 
    706             ++it ) { 
    707         if(*it != m_SyncSource) { 
    708             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Transmit SP (%p)...\n",*it); 
    709             (*it)->prepareForEnable(time_to_enable_at); 
    710         } 
    711     } 
    712  
    713     // then we enable the streamprocessors 
    714     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    715             it != m_ReceiveProcessors.end(); 
    716             ++it ) { 
    717         if(*it != m_SyncSource) { 
    718             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Receive SP (%p)...\n",*it); 
    719             (*it)->enable(time_to_enable_at); 
    720         } 
    721     } 
    722  
    723     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    724             it != m_TransmitProcessors.end(); 
    725             ++it ) { 
    726         if(*it != m_SyncSource) { 
    727             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Transmit SP (%p)...\n",*it); 
    728             (*it)->enable(time_to_enable_at); 
    729         } 
    730     } 
    731  
    732     // now we wait for the SP's to get enabled 
    733 //     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be enabled...\n"); 
    734 //     // we have to wait until all streamprocessors indicate that they are running 
    735 //     // i.e. that there is actually some data stream flowing 
    736 //     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds 
    737 //     bool notEnabled=true; 
    738 //     while (notEnabled && wait_cycles) { 
    739 //         wait_cycles--; 
    740 //         notEnabled=false; 
    741 //  
    742 //         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    743 //                 it != m_ReceiveProcessors.end(); 
    744 //                 ++it ) { 
    745 //             if(!(*it)->isEnabled()) notEnabled=true; 
    746 //         } 
    747 //  
    748 //         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    749 //                 it != m_TransmitProcessors.end(); 
    750 //                 ++it ) { 
    751 //             if(!(*it)->isEnabled()) notEnabled=true; 
    752 //         } 
    753 //         usleep(1000); // one cycle 
    754 //     } 
    755 //  
    756 //     if(!wait_cycles) { // timout has occurred 
    757 //         debugFatal("One or more streams couldn't be enabled (timeout):\n"); 
    758 //  
    759 //         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    760 //                 it != m_ReceiveProcessors.end(); 
    761 //                 ++it ) { 
    762 //             if(!(*it)->isEnabled()) { 
    763 //                     debugFatal(" receive stream %p not enabled\n",*it); 
    764 //             } else { 
    765 //                     debugFatal(" receive stream %p enabled\n",*it); 
    766 //             } 
    767 //         } 
    768 //  
    769 //         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    770 //                 it != m_TransmitProcessors.end(); 
    771 //                 ++it ) { 
    772 //             if(!(*it)->isEnabled()) { 
    773 //                     debugFatal(" transmit stream %p not enabled\n",*it); 
    774 //             } else { 
    775 //                     debugFatal(" transmit stream %p enabled\n",*it); 
    776 //             } 
    777 //         } 
    778 //         return false; 
    779 //     } 
    780  
    781     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors enabled...\n"); 
    782  
    783     return true; 
    784 
    785  
    786 /** 
    787  * Disables the registered StreamProcessors 
    788  * @return true if successful, false otherwise 
    789  */ 
    790 bool StreamProcessorManager::disableStreamProcessors() { 
    791     // we prepare the streamprocessors for disable 
    792     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    793             it != m_ReceiveProcessors.end(); 
    794             ++it ) { 
    795         (*it)->prepareForDisable(); 
    796     } 
    797  
    798     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    799             it != m_TransmitProcessors.end(); 
    800             ++it ) { 
    801         (*it)->prepareForDisable(); 
    802     } 
    803  
    804     // then we disable the streamprocessors 
    805     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    806             it != m_ReceiveProcessors.end(); 
    807             ++it ) { 
    808         (*it)->disable(); 
    809         (*it)->m_data_buffer->setTransparent(true); 
    810     } 
    811  
    812     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    813             it != m_TransmitProcessors.end(); 
    814             ++it ) { 
    815         (*it)->disable(); 
    816         (*it)->m_data_buffer->setTransparent(true); 
    817     } 
    818  
    819     // now we wait for the SP's to get disabled 
    820     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be disabled...\n"); 
    821     // we have to wait until all streamprocessors indicate that they are running 
    822     // i.e. that there is actually some data stream flowing 
    823     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds 
    824     bool enabled=true; 
    825     while (enabled && wait_cycles) { 
    826         wait_cycles--; 
    827         enabled=false; 
    828  
    829         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    830                 it != m_ReceiveProcessors.end(); 
    831                 ++it ) { 
    832             if((*it)->isEnabled()) enabled=true; 
    833         } 
    834  
    835         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    836                 it != m_TransmitProcessors.end(); 
    837                 ++it ) { 
    838             if((*it)->isEnabled()) enabled=true; 
    839         } 
    840         usleep(1000); // one cycle 
    841     } 
    842  
    843     if(!wait_cycles) { // timout has occurred 
    844         debugFatal("One or more streams couldn't be disabled (timeout):\n"); 
    845  
    846         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    847                 it != m_ReceiveProcessors.end(); 
    848                 ++it ) { 
    849             if(!(*it)->isEnabled()) { 
    850                     debugFatal(" receive stream %p not enabled\n",*it); 
    851             } else { 
    852                     debugFatal(" receive stream %p enabled\n",*it); 
    853             } 
    854         } 
    855  
    856         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    857                 it != m_TransmitProcessors.end(); 
    858                 ++it ) { 
    859             if(!(*it)->isEnabled()) { 
    860                     debugFatal(" transmit stream %p not enabled\n",*it); 
    861             } else { 
    862                     debugFatal(" transmit stream %p enabled\n",*it); 
    863             } 
    864         } 
    865         return false; 
    866     } 
    867  
    868     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors disabled...\n"); 
     478          it != m_TransmitProcessors.end(); 
     479          ++it ) { 
     480        if (!m_isoManager->unregisterStream(*it)) { 
     481            debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it); 
     482            return false; 
     483        } 
     484    } 
    869485 
    870486    return true; 
     
    893509     * 3) Re-enable the SP's 
    894510     */ 
    895     debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n"); 
    896         if (!disableStreamProcessors()) { 
    897         debugFatal("Could not disable StreamProcessors...\n"); 
     511 
     512    // put all SP's back into dry-running state 
     513    if (!startDryRunning()) { 
     514        debugFatal("Could not put SP's in dry-running state\n"); 
    898515        return false; 
    899516    } 
     
    960577    // this is to notify the client of the delay 
    961578    // that we introduced 
    962     m_delayed_usecs=time_till_next_period; 
     579    m_delayed_usecs = time_till_next_period; 
    963580 
    964581    // we save the 'ideal' time of the transfer at this point, 
     
    1001618 
    1002619        // if this is true, a xrun will occur 
    1003         xrun_occurred |= !((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled()
     620        xrun_occurred |= !((*it)->canClientTransferFrames(m_period))
    1004621 
    1005622#ifdef DEBUG 
     
    1008625            (*it)->dumpInfo(); 
    1009626        } 
    1010         if (!((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled()) { 
     627        if (!((*it)->canClientTransferFrames(m_period))) { 
    1011628            debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it); 
    1012629            (*it)->dumpInfo(); 
     
    1022639 
    1023640        // if this is true, a xrun will occur 
    1024         xrun_occurred |= !((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled()
     641        xrun_occurred |= !((*it)->canClientTransferFrames(m_period))
    1025642 
    1026643#ifdef DEBUG 
     
    1028645            debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it); 
    1029646        } 
    1030         if (!((*it)->canClientTransferFrames(m_period)) && (*it)->isEnabled()) { 
     647        if (!((*it)->canClientTransferFrames(m_period))) { 
    1031648            debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it); 
    1032649        } 
     
    1049666bool StreamProcessorManager::transfer() { 
    1050667 
    1051     debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n"); 
     668    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n"); 
    1052669    bool retval=true; 
    1053     retval &= dryRun(StreamProcessor::ePT_Receive); 
    1054     retval &= dryRun(StreamProcessor::ePT_Transmit); 
     670    retval &= transfer(StreamProcessor::ePT_Receive); 
     671    retval &= transfer(StreamProcessor::ePT_Transmit); 
    1055672    return retval; 
    1056673} 
     
    1066683 
    1067684bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) { 
    1068     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n"); 
     685    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period for type (%d)...\n", t); 
    1069686    bool retval = true; 
    1070687    // a static cast could make sure that there is no performance 
     
    1089706                it != m_ReceiveProcessors.end(); 
    1090707                ++it ) { 
    1091  
    1092708            if(!(*it)->getFrames(m_period, receive_timestamp)) { 
    1093                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)\n", 
     709                    debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n", 
    1094710                            m_period, m_time_of_transfer,*it); 
    1095711                retval &= false; // buffer underrun 
     
    1110726 
    1111727            if(!(*it)->putFrames(m_period, transmit_timestamp)) { 
    1112                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)\n", 
     728                debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n", 
    1113729                        m_period, transmit_timestamp, *it); 
    1114730                retval &= false; // buffer underrun 
  • branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.h

    r715 r719  
    6262    bool stop(); 
    6363 
     64    bool startDryRunning(); 
    6465    bool syncStartAll(); 
    6566 
     
    6768    bool registerProcessor(StreamProcessor *processor); ///< start managing a streamprocessor 
    6869    bool unregisterProcessor(StreamProcessor *processor); ///< stop managing a streamprocessor 
    69  
    70     bool enableStreamProcessors(uint64_t time_to_enable_at); /// enable registered StreamProcessors 
    71     bool disableStreamProcessors(); /// disable registered StreamProcessors 
    7270 
    7371    void setPeriodSize(unsigned int period); 
     
    9997 
    10098private: 
    101     void resetXrunCounters(); 
    10299 
    103100    int m_delayed_usecs; 
  • branches/ppalmers-streaming/src/libstreaming/util/IsoHandler.cpp

    r705 r719  
    437437 
    438438    debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing iso receive handler (%p)\n",this); 
    439     debugOutput( DEBUG_LEVEL_VERBOSE, " Buffers         : %d \n",m_buf_packets); 
    440     debugOutput( DEBUG_LEVEL_VERBOSE, " Max Packet size : %d \n",m_max_packet_size); 
    441     debugOutput( DEBUG_LEVEL_VERBOSE, " Channel         : %d \n",m_Client->getChannel()); 
    442     debugOutput( DEBUG_LEVEL_VERBOSE, " Irq interval    : %d \n",m_irq_interval); 
     439    debugOutput( DEBUG_LEVEL_VERBOSE, " Buffers         : %d \n", m_buf_packets); 
     440    debugOutput( DEBUG_LEVEL_VERBOSE, " Max Packet size : %d \n", m_max_packet_size); 
     441    debugOutput( DEBUG_LEVEL_VERBOSE, " Channel         : %d \n", m_Client->getChannel()); 
     442    debugOutput( DEBUG_LEVEL_VERBOSE, " Irq interval    : %d \n", m_irq_interval); 
     443    debugOutput( DEBUG_LEVEL_VERBOSE, " Mode            : %s \n",  
     444                               (m_irq_interval > 1)?"DMA_BUFFERFILL":"PACKET_PER_BUFFER"); 
    443445 
    444446    if(m_irq_interval > 1) { 
  • branches/ppalmers-streaming/src/libstreaming/util/IsoHandlerManager.cpp

    r714 r719  
    327327        unsigned int irq_interval=packets_per_period / MINIMUM_INTERRUPTS_PER_PERIOD; 
    328328        if(irq_interval <= 0) irq_interval=1; 
     329        // FIXME: test 
     330        irq_interval=1; 
     331        #warning Using fixed irq_interval 
     332 
    329333#else 
    330334        // hardware interrupts occur when one DMA block is full, and the size of one DMA 
  • branches/ppalmers-streaming/src/libutil/TimestampedBuffer.cpp

    r712 r719  
    147147        diff += m_wrap_at; 
    148148    } 
    149      
     149 
    150150    float rate=((float)diff)/((float) m_update_period); 
     151    if (rate<0.0) debugError("rate < 0! (%f)\n",rate); 
    151152    if (fabsf(m_nominal_rate - rate)>(m_nominal_rate*0.1)) { 
    152153        debugWarning("(%p) rate (%10.5f) more that 10%% off nominal (rate=%10.5f, diff="TIMESTAMP_FORMAT_SPEC", update_period=%d)\n", 
    153154                     this, rate,m_nominal_rate,diff, m_update_period); 
    154         //dumpInfo(); 
     155 
    155156        return m_nominal_rate; 
    156157    } else { 
     
    379380 
    380381    if (m_transparent) { 
    381 //         // if the buffer is disabled, it's in a 'transparent' state, meaning 
    382 //         // that if too much is put into the buffer, the oldest data is discarded 
    383 //         signed int fc; 
    384 //         ENTER_CRITICAL_SECTION; 
    385 //         fc=m_framecounter; 
    386 //         EXIT_CRITICAL_SECTION; 
    387 //          
    388 //         signed int frames_to_ditch= nframes - (m_buffer_size - m_framecounter) + 1; 
    389 //         if ( frames_to_ditch > 0 ) { 
    390 //             debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "dropping %d frames\n", frames_to_ditch); 
    391 //             dropFrames( frames_to_ditch ); 
    392 //         } 
    393 //         // add the data payload to the ringbuffer 
    394 //         if (ffado_ringbuffer_write(m_event_buffer,data,write_size) < write_size) 
    395 //         { 
    396 //             debugError("we should have freed up enough space for this\n"); 
    397 //             return false; 
    398 //         } 
    399 //          
    400         // while disabled, we don't update the DLL, we just set the correct 
    401         // timestamp for the frames 
     382        // while disabled, we don't update the DLL, nor do we write frames 
     383        // we just set the correct timestamp for the frames 
    402384        setBufferTailTimestamp(ts); 
    403385    } else { 
     
    446428    unsigned int read_size=nframes*m_event_size*m_events_per_frame; 
    447429 
    448     // get the data payload to the ringbuffer 
    449     if ((ffado_ringbuffer_read(m_event_buffer,data,read_size)) < read_size) 
    450     { 
    451 //         debugWarning("readFrames buffer underrun\n"); 
    452         return false; 
    453     } 
    454  
    455     decrementFrameCounter(nframes); 
    456  
    457     return true; 
    458  
     430    if (m_transparent) { 
     431        return true; // FIXME: the data still doesn't make sense! 
     432    } else { 
     433        // get the data payload to the ringbuffer 
     434        if ((ffado_ringbuffer_read(m_event_buffer,data,read_size)) < read_size) 
     435        { 
     436            debugWarning("readFrames buffer underrun\n"); 
     437            return false; 
     438        } 
     439        decrementFrameCounter(nframes); 
     440    } 
     441    return true; 
    459442} 
    460443 
     
    712695    EXIT_CRITICAL_SECTION; 
    713696 
    714     debugOutput(DEBUG_LEVEL_VERBOSE, "for (%p) to " 
     697    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "for (%p) to " 
    715698                                          TIMESTAMP_FORMAT_SPEC" => "TIMESTAMP_FORMAT_SPEC", NTS=" 
    716699                                          TIMESTAMP_FORMAT_SPEC", DLL2=%f, RATE=%f\n", 
  • branches/ppalmers-streaming/src/libutil/TimestampedBuffer.h

    r712 r719  
    150150    // dll stuff 
    151151    bool setNominalRate(float r); 
     152    float getNominalRate() {return m_nominal_rate;}; 
    152153    float getRate(); 
    153154 
  • branches/ppalmers-streaming/tests/test-cycletimer.cpp

    r714 r719  
    296296 
    297297    s->setVerboseLevel(DEBUG_LEVEL_VERBOSE); 
    298  
    299     if (!s->init()) { 
    300         debugOutput(DEBUG_LEVEL_NORMAL, "Could not init IsoStream\n"); 
    301         goto finish; 
    302     } 
    303  
    304298    s->setChannel(0); 
    305299 
  • branches/ppalmers-streaming/tests/test-sytmonitor.cpp

    r705 r719  
    233233 
    234234            monitors[i]->setVerboseLevel(DEBUG_LEVEL_VERBOSE); 
    235  
    236             if (!monitors[i]->init()) { 
    237                 debugOutput(DEBUG_LEVEL_NORMAL, "Could not init SytMonitor %d\n", i); 
    238                 goto finish; 
    239             } 
    240  
    241235            monitors[i]->setChannel(arguments.args[i].channel); 
    242236