Show
Ignore:
Timestamp:
02/06/07 23:59:56 (16 years ago)
Author:
pieterpalmers
Message:

* working version of SYT based AMDTP receive and transmit.

Still has to be tuned to work with low buffer sizes.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • branches/streaming-rework/src/debugmodule/debugmodule.h

    r384 r390  
    3131 
    3232/* MB_NEXT() relies on the fact that MB_BUFFERS is a power of two */ 
    33 #define MB_BUFFERS      4096 
     33#define MB_BUFFERS      8192 
    3434#define MB_NEXT(index) ((index+1) & (MB_BUFFERS-1)) 
    3535#define MB_BUFFERSIZE   256             /* message length limit */ 
  • branches/streaming-rework/src/libstreaming/AmdtpStreamProcessor.cpp

    r386 r390  
    3838#define RECEIVE_DLL_INTEGRATION_COEFFICIENT 0.015 
    3939 
    40 #define RECEIVE_PROCESSING_DELAY (10000
     40#define RECEIVE_PROCESSING_DELAY (10000U
    4141 
    4242// in ticks 
     
    9595    unsigned int nevents=0; 
    9696     
    97     debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"Xmit handler for cycle %d\n",cycle); 
     97    debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"Xmit handler for cycle %d, (running=%d, enabled=%d,%d)\n", 
     98        cycle, m_running, m_disabled, m_is_disabled); 
    9899     
    99100#ifdef DEBUG 
     
    174175    // the current time, the stream is considered not 
    175176    // to be 'running' 
    176     if(cycle_diff < 0 || cycle == -1) { 
    177         m_running=false; 
    178     } else { 
    179         m_running=true; 
     177    if (!m_running && cycle_diff >= 0 && cycle != -1) { 
     178            debugOutput(DEBUG_LEVEL_VERBOSE, "Xmit StreamProcessor %p started running at cycle %d\n",this, cycle); 
     179            m_running=true; 
    180180    } 
    181181     
     
    196196    int64_t until_next=timestamp-(int64_t)cycle_timer; 
    197197     
     198#ifdef DEBUG 
    198199    int64_t utn2=until_next; // debug!! 
    199      
     200#endif 
     201 
    200202    // we send a packet some cycles in advance, to avoid the 
    201203    // following situation: 
     
    216218 
    217219#ifdef DEBUG 
    218     if(!m_disabled) { 
     220    if(!m_is_disabled) { 
    219221        debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> TS=%11llu, CTR=%11llu, FC=%5d\n", 
    220222            timestamp, cycle_timer, fc 
     
    223225            until_next, utn2 
    224226            ); 
    225         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "    CY_NOW=%04d, CY_TARGET=%04d, CY_DIFF=%04lld\n", 
     227        debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "    CY_NOW=%04d, CY_TARGET=%04d, CY_DIFF=%04d\n", 
    226228            now_cycles, cycle, cycle_diff 
    227229            ); 
     
    244246 
    245247#ifdef DEBUG 
    246     if(!m_disabled) { 
     248    if(!m_is_disabled) { 
    247249        debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " > TS=%11llu, CTR=%11llu, FC=%5d\n", 
    248250            timestamp, cycle_timer, fc 
     
    254256#endif 
    255257 
     258    if (!m_disabled && m_is_disabled) { 
     259        // this means that we are trying to enable 
     260        if ((unsigned int)cycle == m_cycle_to_enable_at) { 
     261            m_is_disabled=false; 
     262            debugOutput(DEBUG_LEVEL_VERBOSE,"Enabling StreamProcessor %p at %u\n", this, cycle); 
     263             
     264            // initialize the buffer head & tail 
     265            uint64_t ts; 
     266            uint64_t fc; 
     267             
     268            debugOutput(DEBUG_LEVEL_VERBOSE,"Preparing to enable...\n"); 
     269             
     270            m_SyncSource->getBufferHeadTimestamp(&ts, &fc); // thread safe 
     271             
     272            // recalculate the buffer head timestamp 
     273            float ticks_per_frame=m_SyncSource->getTicksPerFrame(); 
     274 
     275            // set buffer head timestamp 
     276            // this makes that the next sample to be sent out 
     277            // has the same timestamp as the last one received 
     278            // plus one frame 
     279            ts += (uint64_t)ticks_per_frame; 
     280            if (ts >= TICKS_PER_SECOND * 128L) { 
     281                ts -= TICKS_PER_SECOND * 128L; 
     282            } 
     283             
     284            setBufferHeadTimestamp(ts); 
     285            int64_t timestamp = ts; 
     286         
     287            // since we have frames_in_buffer frames in the buffer,  
     288            // we know that the buffer tail lies 
     289            // frames_in_buffer * rate  
     290            // later 
     291            int frames_in_buffer=getFrameCounter(); 
     292            timestamp += (int64_t)((float)frames_in_buffer * ticks_per_frame); 
     293             
     294            // this happens when the last timestamp is near wrapping, and  
     295            // m_framecounter is low. 
     296            // this means: m_last_timestamp is near wrapping and have just had 
     297            // a getPackets() from the client side. the projected next_period 
     298            // boundary lies beyond the wrap value. 
     299            // the action is to wrap the value. 
     300            if (timestamp >= TICKS_PER_SECOND * 128L) { 
     301                timestamp -= TICKS_PER_SECOND * 128L; 
     302            } 
     303         
     304            StreamProcessor::setBufferTailTimestamp(timestamp); 
     305             
     306            debugOutput(DEBUG_LEVEL_VERBOSE,"XMIT TS SET: TS=%10lld, TSTMP=%10llu, FC=%4d, %f\n", 
     307                            ts, timestamp, frames_in_buffer, ticks_per_frame); 
     308 
     309        } else { 
     310            debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"will enable StreamProcessor %p at %u, now is %d\n", this, m_cycle_to_enable_at, cycle); 
     311        } 
     312    } else if (m_disabled && !m_is_disabled) { 
     313        // trying to disable 
     314        debugOutput(DEBUG_LEVEL_VERBOSE,"disabling StreamProcessor %p at %u\n", this, cycle); 
     315        m_is_disabled=true; 
     316    } 
     317 
    256318    // don't process the stream when it is not enabled, not running 
    257319    // or when the next sample is not due yet. 
     
    260322    // that means that we'll send NODATA packets. 
    261323    // we don't add payload because DICE devices don't like that. 
    262     if((until_next>0) || m_disabled || !m_running) { 
     324    if((until_next>0) || m_is_disabled || !m_running) { 
    263325        // no-data packets have syt=0xFFFF 
    264326        // and have the usual amount of events as dummy data (?) 
     
    278340        *sy = 0; 
    279341         
    280         if(m_disabled) { 
    281             // indicate that we are now in a disabled state. 
    282             m_is_disabled=true; 
    283         } else { 
    284             // indicate that we are now in an enabled state. 
    285             m_is_disabled=false; 
    286         } 
    287          
    288342        return RAW1394_ISO_DEFER; 
    289343    } 
    290344     
    291     // indicate that we are now in an enabled state. 
    292     m_is_disabled=false; 
    293              
    294345    // construct the packet 
    295346    nevents = m_syt_interval; 
     
    299350    *sy = 0; 
    300351      
    301     enum raw1394_iso_disposition retval; 
    302  
    303352    unsigned int read_size=nevents*sizeof(quadlet_t)*m_dimension; 
    304353 
     
    307356    { 
    308357        /* there is no more data in the ringbuffer */ 
    309  
    310         debugWarning("Transmit buffer underrun (cycle %d, FC=%d, PC=%d)\n",  
    311                  cycle, getFrameCounter(), m_handler->getPacketCount()); 
     358        // convert the timestamp to SYT format 
     359        uint64_t ts=timestamp + TRANSMIT_TRANSFER_DELAY; 
     360         
     361        // check if it wrapped 
     362        if (ts >= TICKS_PER_SECOND * 128L) { 
     363            ts -= TICKS_PER_SECOND * 128L; 
     364        } 
     365 
     366        debugWarning("Transmit buffer underrun (now %d, queue %d, target %d)\n",  
     367                 now_cycles, cycle, TICKS_TO_CYCLES(ts)); 
    312368 
    313369        nevents=0; 
     
    319375        m_xruns++; 
    320376 
     377        // disable the processing, will be re-enabled when 
     378        // the xrun is handled 
     379        m_disabled=true; 
     380        m_is_disabled=true; 
     381 
    321382        // compose a no-data packet, we should always 
    322383        // send a valid packet 
     
    330391        //*length = 2*sizeof(quadlet_t); 
    331392 
    332         retval=RAW1394_ISO_DEFER; 
     393        return RAW1394_ISO_DEFER; 
    333394    } else { 
    334395        *length = read_size + 8; 
     
    353414        packet->syt = ntohs(timestamp_SYT); 
    354415 
    355         retval=RAW1394_ISO_OK; 
    356     } 
    357      
    358     // calculate the new buffer head timestamp. this is 
    359     // the previous buffer head timestamp plus 
    360     // the number of frames sent * ticks_per_frame 
    361     timestamp += (int64_t)((float)nevents * ticks_per_frame ); 
    362      
    363     // check if it wrapped 
    364     if (timestamp >= TICKS_PER_SECOND * 128L) { 
    365         timestamp -= TICKS_PER_SECOND * 128L; 
    366     } 
    367      
    368     // update the frame counter such that it reflects the new value 
    369     // also update the buffer head timestamp 
    370     // done in the SP base class 
    371     if (!StreamProcessor::getFrames(nevents, timestamp)) { 
    372         debugError("Could not do StreamProcessor::getFrames(%d, %llu)\n",nevents, timestamp); 
    373         retval=RAW1394_ISO_ERROR; 
    374     } 
    375  
    376     return retval; 
     416        // calculate the new buffer head timestamp. this is 
     417        // the previous buffer head timestamp plus 
     418        // the number of frames sent * ticks_per_frame 
     419        timestamp += (int64_t)((float)nevents * ticks_per_frame ); 
     420         
     421        // check if it wrapped 
     422        if (timestamp >= TICKS_PER_SECOND * 128L) { 
     423            timestamp -= TICKS_PER_SECOND * 128L; 
     424        } 
     425         
     426        // update the frame counter such that it reflects the new value 
     427        // also update the buffer head timestamp 
     428        // done in the SP base class 
     429        if (!StreamProcessor::getFrames(nevents, timestamp)) { 
     430            debugError("Could not do StreamProcessor::getFrames(%d, %llu)\n",nevents, timestamp); 
     431             return RAW1394_ISO_ERROR; 
     432        } 
     433         
     434        return RAW1394_ISO_OK; 
     435    } 
     436 
     437    // we shouldn't get here 
     438    return RAW1394_ISO_ERROR; 
    377439 
    378440} 
     
    411473 
    412474    // update the frame counter such that it reflects the buffer content, 
    413     // and also update the buffer tail timestamp 
     475    // the buffer tail timestamp is initialized when the SP is enabled 
    414476    // done in the SP base class 
    415477    if (!StreamProcessor::putFrames(m_ringbuffer_size_frames, ts)) { 
    416         debugError("Could not do StreamProcessor::putFrames(%d, %llu)\n", 
    417             m_ringbuffer_size_frames, ts); 
     478        debugError("Could not do StreamProcessor::putFrames(%d, %0)\n", 
     479            m_ringbuffer_size_frames); 
    418480        return false; 
    419481    } 
    420482 
    421483    return true; 
    422      
    423484} 
    424485 
     
    652713 
    653714bool AmdtpTransmitStreamProcessor::prepareForEnable() { 
    654     uint64_t ts; 
    655     uint64_t fc; 
    656      
     715 
    657716    debugOutput(DEBUG_LEVEL_VERBOSE,"Preparing to enable...\n"); 
    658      
    659     m_SyncSource->getBufferHeadTimestamp(&ts, &fc); // thread safe 
    660      
    661     // recalculate the buffer head timestamp 
    662     float ticks_per_frame=m_SyncSource->getTicksPerFrame(); 
    663      
    664     // set buffer head timestamp 
    665     // this makes that the next sample to be sent out 
    666     // has the same timestamp as the last one received 
    667     // plus one frame 
    668     ts += (uint64_t)ticks_per_frame; 
    669     setBufferHeadTimestamp(ts); 
    670     int64_t timestamp = ts; 
    671  
    672     // since we have a full buffer, we know that the buffer tail lies 
    673     // m_ringbuffer_size_frames * rate earlier 
    674     timestamp += (int64_t)((float)m_ringbuffer_size_frames * ticks_per_frame); 
    675      
    676     // this happens when the last timestamp is near wrapping, and  
    677     // m_framecounter is low. 
    678     // this means: m_last_timestamp is near wrapping and have just had 
    679     // a getPackets() from the client side. the projected next_period 
    680     // boundary lies beyond the wrap value. 
    681     // the action is to wrap the value. 
    682     if (timestamp >= TICKS_PER_SECOND * 128L) { 
    683         timestamp -= TICKS_PER_SECOND * 128L; 
    684     } 
    685  
    686     StreamProcessor::setBufferTailTimestamp(timestamp); 
    687      
    688     debugOutput(DEBUG_LEVEL_VERBOSE,"TS=%10lld, TSTMP=%10llu, %f\n", 
    689                     ts, timestamp, ticks_per_frame); 
    690      
     717 
    691718    if (!StreamProcessor::prepareForEnable()) { 
    692719        debugError("StreamProcessor::prepareForEnable failed\n"); 
     
    10991126    } 
    11001127#endif 
    1101      
     1128 
     1129    debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"ch%2u: CY=%4u, SYT=%08X (%4ucy + %04uticks) (running=%d, disabled=%d,%d)\n", 
     1130        channel, cycle,ntohs(packet->syt),   
     1131        CYCLE_TIMER_GET_CYCLES(ntohs(packet->syt)), CYCLE_TIMER_GET_OFFSET(ntohs(packet->syt)), 
     1132        m_running,m_disabled,m_is_disabled); 
     1133 
    11021134    if((packet->fmt == 0x10) && (packet->fdf != 0xFF) && (packet->syt != 0xFFFF) && (packet->dbs>0) && (length>=2*sizeof(quadlet_t))) { 
    11031135        unsigned int nevents=((length / sizeof (quadlet_t)) - 2)/packet->dbs; 
    1104          
     1136 
    11051137        //=> store the previous timestamp 
    11061138        m_last_timestamp2=m_last_timestamp; 
    1107          
     1139 
    11081140        //=> convert the SYT to ticks 
    11091141        unsigned int syt_timestamp=ntohs(packet->syt); 
    1110          
    1111         debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"ch%2u: CY=%4u, SYT=%08X (%3u secs + %4u cycles + %04u ticks)\n", 
    1112             channel, cycle,syt_timestamp, CYCLE_TIMER_GET_SECS(syt_timestamp),  
    1113             CYCLE_TIMER_GET_CYCLES(syt_timestamp), CYCLE_TIMER_GET_OFFSET(syt_timestamp)); 
     1142 
     1143        debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"ch%2u: CY=%4u, SYT=%08X (%4u cycles + %04u ticks), FC=%04d, %d\n", 
     1144            channel, cycle,syt_timestamp,   
     1145            CYCLE_TIMER_GET_CYCLES(syt_timestamp), CYCLE_TIMER_GET_OFFSET(syt_timestamp), 
     1146            getFrameCounter(), m_is_disabled); 
    11141147         
    11151148        // reconstruct the full cycle 
     
    11631196        m_last_timestamp += CYCLE_TIMER_GET_OFFSET(syt_timestamp); 
    11641197        m_last_timestamp += cc_seconds * TICKS_PER_SECOND; 
     1198         
     1199        // we have to keep in mind that there are also 
     1200        // some packets buffered by the ISO layer 
     1201        // at most x=m_handler->getNbBuffers() 
     1202        // these contain at most x*syt_interval 
     1203        // frames, meaning that we might receive 
     1204        // this packet x*syt_interval*ticks_per_frame 
     1205        // later than expected (the real receive time) 
     1206        m_last_timestamp += (uint64_t)(((float)m_handler->getNbBuffers()) 
     1207                                       * m_syt_interval * m_ticks_per_frame); 
    11651208         
    11661209        // the receive processing delay indicates how much 
     
    12171260 
    12181261        //=> signal that we're running (if we are) 
    1219         if(!m_running && nevents && m_last_timestamp2 && m_last_timestamp) m_running=true; 
     1262        if(!m_running && nevents && m_last_timestamp2 && m_last_timestamp) { 
     1263            debugOutput(DEBUG_LEVEL_VERBOSE,"Receive StreamProcessor %p started running at %d\n", this, cycle); 
     1264            m_running=true; 
     1265        } 
    12201266 
    12211267        //=> don't process the stream samples when it is not enabled. 
    1222         if(m_disabled) { 
     1268        if (!m_disabled && m_is_disabled) { 
     1269            // this means that we are trying to enable 
     1270            if (cycle == m_cycle_to_enable_at) { 
     1271                m_is_disabled=false; 
     1272                debugOutput(DEBUG_LEVEL_VERBOSE,"enabling StreamProcessor %p at %d\n", this, cycle); 
     1273            } else { 
     1274                debugOutput(DEBUG_LEVEL_VERY_VERBOSE,"will enable StreamProcessor %p at %u, now is %d\n", this, m_cycle_to_enable_at, cycle); 
     1275            } 
     1276        } else if (m_disabled && !m_is_disabled) { 
     1277            // trying to disable 
     1278            debugOutput(DEBUG_LEVEL_VERBOSE,"disabling StreamProcessor %p at %u\n", this, cycle); 
     1279            m_is_disabled=true; 
     1280        } 
     1281         
     1282        if(m_is_disabled) { 
    12231283 
    12241284            // we keep track of the timestamp here 
     
    12381298            StreamProcessor::setBufferTimestamps(ts,ts); 
    12391299             
    1240             // indicate that we are now in a disabled state. 
    1241             m_is_disabled=true; 
    1242              
    12431300            return RAW1394_ISO_DEFER; 
    12441301        } 
    1245          
    1246         // indicate that we are now in an enabled state. 
    1247         m_is_disabled=false; 
    12481302         
    12491303        //=> process the packet 
     
    12571311             
    12581312            m_xruns++; 
     1313             
     1314            // disable the processing, will be re-enabled when 
     1315            // the xrun is handled 
     1316            m_disabled=true; 
     1317            m_is_disabled=true; 
    12591318 
    12601319            retval=RAW1394_ISO_DEFER; 
     
    13801439    next_period_boundary     += (int64_t)(((int64_t)m_period 
    13811440                                          - fc) * m_ticks_per_frame); 
    1382 //     next_period_boundary     += RECEIVE_PROCESSING_DELAY; 
    13831441     
    13841442    debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "=> NPD=%11lld, LTS=%11llu, FC=%5d, TPF=%f\n", 
     
    14371495    m_WakeupStat.reset(); 
    14381496     
    1439 //     m_ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_framerate); 
     1497    // this needs to be reset to the nominal value 
     1498    // because xruns can cause the DLL value to shift a lot 
     1499    // making that we run into problems when trying to re-enable  
     1500    // streaming 
     1501    m_ticks_per_frame = (TICKS_PER_SECOND*1.0) / ((float)m_framerate); 
    14401502 
    14411503    // reset all non-device specific stuff 
  • branches/streaming-rework/src/libstreaming/IsoHandler.cpp

    r385 r390  
    194194    err=raw1394_read_cycle_timer(m_handle_util, &ctr); 
    195195    if(err) { 
    196         debugWarning("raw1394_read_cycle_timer: %s", strerror(err)); 
     196        debugError("raw1394_read_cycle_timer failed.\n"); 
     197        debugError(" Error: %s\n", strerror(err)); 
     198        debugError(" Your system doesn't seem to support the raw1394_read_cycle_timer call\n"); 
     199        return false; 
    197200    } 
    198201    new_timer=ctr.cycle_timer; 
  • branches/streaming-rework/src/libstreaming/IsoHandler.h

    r384 r390  
    7878        // no setter functions, because those would require a re-init 
    7979        unsigned int getMaxPacketSize() { return m_max_packet_size;}; 
    80         unsigned int getBuffersize() { return m_buf_packets;}; 
     80        unsigned int getNbBuffers() { return m_buf_packets;}; 
    8181        int getWakeupInterval() { return m_irq_interval;}; 
    8282 
  • branches/streaming-rework/src/libstreaming/IsoHandlerManager.cpp

    r386 r390  
    3232#include <assert.h> 
    3333 
     34#include "../libutil/PosixThread.h" 
     35 
     36 
    3437#define MINIMUM_INTERRUPTS_PER_PERIOD  4U 
    3538#define PACKETS_PER_INTERRUPT          4U 
     
    4245IsoHandlerManager::IsoHandlerManager() : 
    4346   m_State(E_Created), 
    44    m_poll_timeout(1), m_poll_fds(0), m_poll_nfds(0) 
    45 
    46  
    47 
    48  
     47   m_poll_timeout(1), m_poll_fds(0), m_poll_nfds(0), 
     48   m_realtime(false), m_priority(0) 
     49
     50 
     51
     52 
     53IsoHandlerManager::IsoHandlerManager(bool run_rt, unsigned int rt_prio) : 
     54   m_State(E_Created), 
     55   m_poll_timeout(1), m_poll_fds(0), m_poll_nfds(0), 
     56   m_realtime(run_rt), m_priority(rt_prio) 
     57
     58 
     59
    4960 
    5061IsoHandlerManager::~IsoHandlerManager() 
     
    5364} 
    5465 
     66bool IsoHandlerManager::init() 
     67{ 
     68    // the tread that performs the actual packet transfer 
     69    // needs high priority 
     70    unsigned int prio=m_priority+6; 
     71 
     72    if (prio>98) prio=98; 
     73 
     74    m_isoManagerThread=new FreebobUtil::PosixThread( 
     75        this,  
     76        m_realtime, prio, 
     77        PTHREAD_CANCEL_DEFERRED); 
     78 
     79    if(!m_isoManagerThread) { 
     80        debugFatal("Could not create iso manager thread\n"); 
     81        return false; 
     82    } 
     83 
     84    // propagate the debug level 
     85//     m_isoManagerThread->setVerboseLevel(getDebugLevel()); 
     86 
     87    return true; 
     88} 
     89 
    5590bool IsoHandlerManager::Init() 
    5691{ 
    57        debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); 
    58  
    59        return true; 
     92    debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); 
     93 
     94    return true; 
    6095} 
    6196 
     
    288323                unsigned int packets_per_period=stream->getPacketsPerPeriod(); 
    289324                 
    290 #if 0 
     325#if 1 
    291326                // hardware interrupts occur when one DMA block is full, and the size of one DMA 
    292327                // block = PAGE_SIZE. Setting the max_packet_size makes sure that the HW irq is  
     
    377412                unsigned int packets_per_period=stream->getPacketsPerPeriod(); 
    378413 
    379 #if 0 
     414#if 1 
    380415                // hardware interrupts occur when one DMA block is full, and the size of one DMA 
    381416                // block = PAGE_SIZE. Setting the max_packet_size makes sure that the HW irq   
     
    431466                // buffers get transfered, meaning that we should have at least some 
    432467                // margin here 
    433 //            int buffers=irq_interval * 2; 
     468              int buffers=irq_interval * 2; 
    434469 
    435470                // half a period. the xmit handler will take care of this 
    436                int buffers=packets_per_period/2
     471//             int buffers=packets_per_period/4
    437472                 
    438473                // NOTE: this is dangerous: what if there is not enough prefill? 
     
    612647    } 
    613648     
     649    debugOutput( DEBUG_LEVEL_VERBOSE, "Starting ISO iterator thread...\n"); 
     650 
     651    // note: libraw1394 doesn't like it if you poll() and/or iterate() before  
     652    //       starting the streams. 
     653    // start the iso runner thread 
     654    m_isoManagerThread->Start(); 
     655     
    614656    if (retval) { 
    615657        m_State=E_Running; 
     
    632674    bool retval=true; 
    633675     
     676    debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping ISO iterator thread...\n"); 
     677    m_isoManagerThread->Stop(); 
     678     
    634679    for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin(); 
    635680        it != m_IsoHandlers.end(); 
    636681        ++it ) 
    637682    { 
    638         debugOutput( DEBUG_LEVEL_VERBOSE, " stopping handler (%p)\n",*it); 
     683        debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handler (%p)\n",*it); 
    639684        if(!(*it)->stop()){ 
    640685            debugOutput( DEBUG_LEVEL_VERBOSE, " could not stop handler (%p)\n",*it); 
  • branches/streaming-rework/src/libstreaming/IsoHandlerManager.h

    r384 r390  
    4040#define USLEEP_AFTER_UPDATE 100 
    4141#define MAX_UPDATE_TRIES 10 
     42namespace FreebobUtil { 
     43    class PosixThread; 
     44} 
    4245 
    4346namespace FreebobStreaming 
     
    7376 
    7477        IsoHandlerManager(); 
     78        IsoHandlerManager(bool run_rt, unsigned int rt_prio); 
    7579        virtual ~IsoHandlerManager(); 
    7680 
     
    9296 
    9397        bool prepare(); ///< prepare the ISO manager and all streams 
     98         
     99        bool init(); 
    94100         
    95101        void disablePolling(IsoStream *); ///< disables polling on a stream 
     
    144150        bool rebuildFdMap(); 
    145151 
     152        // threading 
     153        bool m_realtime; 
     154        unsigned int m_priority; 
     155        FreebobUtil::PosixThread *m_isoManagerThread; 
     156         
     157         
    146158        // debug stuff 
    147159        DECLARE_DEBUG_MODULE; 
  • branches/streaming-rework/src/libstreaming/IsoStream.cpp

    r386 r390  
    119119} 
    120120 
     121 
    121122} 
  • branches/streaming-rework/src/libstreaming/IsoStream.h

    r383 r390  
    104104 
    105105        IsoHandler *m_handler; 
    106  
     106         
    107107        DECLARE_DEBUG_MODULE; 
    108108 
  • branches/streaming-rework/src/libstreaming/StreamProcessor.cpp

    r386 r390  
    3131#include "StreamProcessor.h" 
    3232#include "StreamProcessorManager.h" 
     33#include "cycletimer.h" 
    3334 
    3435#include <assert.h> 
     
    5051        , m_disabled(true) 
    5152        , m_is_disabled(true) 
     53        , m_cycle_to_enable_at(0) 
    5254        , m_framecounter(0) 
    5355        , m_SyncSource(NULL) 
     
    211213} 
    212214 
     215uint64_t StreamProcessor::getTimeNow() { 
     216    return m_handler->getCycleTimerTicks(); 
     217} 
     218 
     219 
    213220bool StreamProcessor::isRunning() { 
    214221        return m_running; 
    215222} 
    216223 
    217 bool StreamProcessor::enable()  { 
    218     int cnt=0; 
     224bool StreamProcessor::enable(uint64_t time_to_enable_at)  { 
     225    // FIXME: time_to_enable_at will be in 'time' not cycles 
     226    m_cycle_to_enable_at=time_to_enable_at; 
    219227     
    220228    if(!m_running) { 
    221229            debugWarning("The StreamProcessor is not running yet, enable() might not be a good idea.\n"); 
    222230    } 
    223      
     231 
     232#ifdef DEBUG 
     233    uint64_t now_cycles=TICKS_TO_CYCLES(m_handler->getCycleTimerTicks()); 
     234    const int64_t max=(int64_t)(TICKS_PER_SECOND/2); 
     235     
     236    int64_t diff=m_cycle_to_enable_at-now_cycles; 
     237     
     238    if (diff > max) { 
     239        diff-=TICKS_PER_SECOND; 
     240    } else if (diff < -max) { 
     241        diff+=TICKS_PER_SECOND; 
     242    } 
     243     
     244    if (diff<0) { 
     245        debugWarning("Request to enable streamprocessor %d cycles ago.\n",diff); 
     246    } 
     247#endif 
     248 
    224249    m_disabled=false; 
    225250     
    226     // now wait until it is effectively enabled 
    227     // time-out at 100ms 
    228     while(m_is_disabled && cnt++ < 1000) { 
    229         usleep(100); 
    230     } 
    231      
    232     // check if the operation timed out 
    233     if(cnt==1000) { 
    234         debugWarning("Timeout when enabling StreamProcessor (%p)\n",this); 
    235         return false; 
    236     } 
    237      
    238251    return true; 
    239252} 
    240253 
    241254bool StreamProcessor::disable()  { 
    242     int cnt=0; 
    243255     
    244256    m_disabled=true; 
    245      
    246     // now wait until it is effectively disabled 
    247     // time-out at  
    248     while(!m_is_disabled && cnt++ < 1000) { 
    249         usleep(100); 
    250     } 
    251      
    252     // check if the operation timed out (100ms) 
    253     if(cnt==1000) { 
    254         debugWarning("Timeout when disabling StreamProcessor (%p)\n",this); 
    255         return false; 
    256     } 
    257      
     257 
    258258    return true; 
    259259 
  • branches/streaming-rework/src/libstreaming/StreamProcessor.h

    r386 r390  
    8181 
    8282    bool isRunning(); ///< returns true if there is some stream data processed 
    83     bool enable(); ///< enable the stream processing  
     83    bool enable(uint64_t time_to_enable_at); ///< enable the stream processing  
    8484    bool disable(); ///< disable the stream processing  
    8585    bool isEnabled() {return !m_is_disabled;}; 
     
    122122    bool m_disabled; 
    123123    bool m_is_disabled; 
     124    unsigned int m_cycle_to_enable_at; 
    124125     
    125126    StreamStatistics m_PacketStat; 
     
    185186        virtual uint64_t getTimeAtPeriod() = 0; 
    186187         
     188        uint64_t getTimeNow(); 
     189         
    187190        void getBufferHeadTimestamp(uint64_t *ts, uint64_t *fc); 
    188191        void getBufferTailTimestamp(uint64_t *ts, uint64_t *fc); 
     
    194197        bool setSyncSource(StreamProcessor *s); 
    195198        float getTicksPerFrame() {return m_ticks_per_frame;}; 
     199         
     200        unsigned int getLastCycle() {return m_last_cycle;}; 
    196201     
    197202    private: 
     
    211216         
    212217        float m_ticks_per_frame; 
     218         
     219        unsigned int m_last_cycle; 
    213220 
    214221    private: 
  • branches/streaming-rework/src/libstreaming/StreamProcessorManager.cpp

    r385 r390  
    3232#include <errno.h> 
    3333#include <assert.h> 
     34 
     35#include "../libutil/PosixThread.h" 
    3436 
    3537#include "libstreaming/cycletimer.h" 
     
    165167        // the tread that runs the StreamProcessor 
    166168        // checking the period boundaries 
     169        int prio=m_thread_priority+5; 
     170        if (prio>98) prio=98; 
     171         
    167172        m_streamingThread=new FreebobUtil::PosixThread(this, 
    168            m_thread_realtime, m_thread_priority+5,  
     173           m_thread_realtime, prio,  
    169174           PTHREAD_CANCEL_DEFERRED); 
    170175            
     
    174179        } 
    175180         
    176         m_isoManager=new IsoHandlerManager(); 
     181        m_isoManager=new IsoHandlerManager(m_thread_realtime, m_thread_priority); 
    177182         
    178183        if(!m_isoManager) { 
     
    181186        } 
    182187         
     188        // propagate the debug level 
    183189        m_isoManager->setVerboseLevel(getDebugLevel()); 
    184190         
    185         // the tread that keeps the handler's cycle timers up to date 
    186         // and performs the actual packet transfer 
    187         // needs high priority 
    188         m_isoManagerThread=new FreebobUtil::PosixThread( 
    189               m_isoManager,  
    190               m_thread_realtime, m_thread_priority+6, 
    191               PTHREAD_CANCEL_DEFERRED); 
    192                
    193         if(!m_isoManagerThread) { 
    194                 debugFatal("Could not create iso manager thread\n"); 
     191        if(!m_isoManager->init()) { 
     192                debugFatal("Could not initialize IsoHandlerManager\n"); 
    195193                return false; 
    196194        } 
    197  
     195         
     196        m_xrun_happened=false; 
     197         
    198198        return true; 
    199199} 
     
    204204 
    205205    // no xrun has occurred (yet) 
    206     m_xrun_happened=false; 
    207  
    208     if(sem_init(&m_period_semaphore, 0, 0)) { 
    209         debugFatal( "Cannot init period transfer semaphore\n"); 
    210         debugFatal( " Error: %s\n",strerror(errno)); 
    211         return false; 
    212     } 
    213206 
    214207    return true; 
     
    295288} 
    296289 
     290bool StreamProcessorManager::syncStartAll() { 
     291 
     292    debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessor streams to start running...\n"); 
     293    // we have to wait until all streamprocessors indicate that they are running 
     294    // i.e. that there is actually some data stream flowing 
     295    int wait_cycles=2000; // two seconds 
     296    bool notRunning=true; 
     297    while (notRunning && wait_cycles) { 
     298        wait_cycles--; 
     299        notRunning=false; 
     300         
     301        for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     302                it != m_ReceiveProcessors.end(); 
     303                ++it ) { 
     304            if(!(*it)->isRunning()) notRunning=true; 
     305        } 
     306 
     307        for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     308                it != m_TransmitProcessors.end(); 
     309                ++it ) { 
     310            if(!(*it)->isRunning()) notRunning=true; 
     311        } 
     312        usleep(1000); 
     313        debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Running check: %d\n",notRunning); 
     314    } 
     315 
     316    if(!wait_cycles) { // timout has occurred 
     317        debugFatal("One or more streams are not starting up (timeout):\n"); 
     318                     
     319        for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     320                it != m_ReceiveProcessors.end(); 
     321                ++it ) { 
     322            if(!(*it)->isRunning()) { 
     323                debugFatal(" receive stream %p not running\n",*it); 
     324            } else {     
     325                debugFatal(" receive stream %p running\n",*it); 
     326            } 
     327        } 
     328 
     329        for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     330                it != m_TransmitProcessors.end(); 
     331                ++it ) { 
     332            if(!(*it)->isRunning()) { 
     333                debugFatal(" transmit stream %p not running\n",*it); 
     334            } else {     
     335                debugFatal(" transmit stream %p running\n",*it); 
     336            } 
     337        } 
     338         
     339        return false; 
     340    } 
     341 
     342    // we want to make sure that everything is running well,  
     343    // so wait for a while 
     344    usleep(USECS_PER_CYCLE * CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL); 
     345 
     346    debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n"); 
     347    debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting StreamProcessors...\n"); 
     348 
     349    // now we reset the frame counters 
     350    for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     351            it != m_ReceiveProcessors.end(); 
     352            ++it ) { 
     353 
     354        debugOutput( DEBUG_LEVEL_VERBOSE, "Before:\n"); 
     355 
     356        if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { 
     357            (*it)->dumpInfo(); 
     358        } 
     359 
     360        (*it)->reset(); 
     361 
     362        debugOutput( DEBUG_LEVEL_VERBOSE, "After:\n"); 
     363 
     364        if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { 
     365            (*it)->dumpInfo(); 
     366        } 
     367 
     368    } 
     369     
     370    for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     371            it != m_TransmitProcessors.end(); 
     372            ++it ) { 
     373             
     374        debugOutput( DEBUG_LEVEL_VERBOSE, "Before:\n"); 
     375         
     376        if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { 
     377            (*it)->dumpInfo(); 
     378        } 
     379         
     380        (*it)->reset(); 
     381         
     382        debugOutput( DEBUG_LEVEL_VERBOSE, "After:\n"); 
     383         
     384        if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { 
     385            (*it)->dumpInfo(); 
     386        } 
     387    } 
     388         
     389    debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n"); 
     390     
     391    uint64_t now=m_SyncSource->getTimeNow(); // fixme: should be in usecs, not ticks 
     392     
     393    // FIXME: this should not be in cycles, but in 'time' 
     394    unsigned int enable_at=TICKS_TO_CYCLES(now)+300; 
     395         
     396    debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor...\n"); 
     397    if (!m_SyncSource->prepareForEnable()) { 
     398            debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n"); 
     399        return false; 
     400    } 
     401 
     402    m_SyncSource->enable(enable_at); 
     403 
     404    debugOutput( DEBUG_LEVEL_VERBOSE, " All StreamProcessors...\n"); 
     405    if (!enableStreamProcessors(enable_at)) { 
     406        debugFatal("Could not enable StreamProcessors...\n"); 
     407        return false; 
     408    } 
     409 
     410    return true; 
     411} 
     412 
    297413bool StreamProcessorManager::start() { 
    298414        debugOutput( DEBUG_LEVEL_VERBOSE, "Starting Processors...\n"); 
     
    348464        debugOutput( DEBUG_LEVEL_VERBOSE, "Starting streaming threads...\n"); 
    349465 
    350         // note: libraw1394 doesn't like it if you poll() and/or iterate() before  
    351         //       starting the streams. 
    352         // start the runner thread 
    353         // FIXME: maybe this should go into the isomanager itself. 
    354         m_isoManagerThread->Start(); 
    355          
    356466        // start the runner thread 
    357467        // FIXME: not used anymore (for updatecycletimers ATM, but that's not good) 
    358468        m_streamingThread->Start(); 
    359469 
    360         debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to start running...\n"); 
    361         // we have to wait until all streamprocessors indicate that they are running 
    362         // i.e. that there is actually some data stream flowing 
    363         int wait_cycles=2000; // two seconds 
    364         bool notRunning=true; 
    365         while (notRunning && wait_cycles) { 
    366                 wait_cycles--; 
    367                 notRunning=false; 
    368                  
    369                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    370                         it != m_ReceiveProcessors.end(); 
    371                         ++it ) { 
    372                         if(!(*it)->isRunning()) notRunning=true; 
    373                 } 
    374          
    375                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    376                         it != m_TransmitProcessors.end(); 
    377                         ++it ) { 
    378                         if(!(*it)->isRunning()) notRunning=true; 
    379                 } 
    380                 usleep(1000); 
    381         } 
    382          
    383         if(!wait_cycles) { // timout has occurred 
    384                 debugFatal("One or more streams are not starting up (timeout):\n"); 
    385                             
    386                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    387                         it != m_ReceiveProcessors.end(); 
    388                         ++it ) { 
    389                         if(!(*it)->isRunning()) { 
    390                                 debugFatal(" receive stream %p not running\n",*it); 
    391                         } else {         
    392                                 debugFatal(" receive stream %p running\n",*it); 
    393                         } 
    394                 } 
    395          
    396                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    397                         it != m_TransmitProcessors.end(); 
    398                         ++it ) { 
    399                         if(!(*it)->isRunning()) { 
    400                                 debugFatal(" transmit stream %p not running\n",*it); 
    401                         } else {         
    402                                 debugFatal(" transmit stream %p running\n",*it); 
    403                         } 
    404                 } 
    405                 return false; 
    406         } 
    407  
    408         // we want to make sure that everything is running well,  
    409         // so wait for a while 
    410         usleep(USECS_PER_CYCLE * CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL); 
    411  
    412         debugOutput( DEBUG_LEVEL_VERBOSE, "StreamProcessors running...\n"); 
    413         debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting counters...\n"); 
    414          
    415         // now we reset the frame counters 
    416         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    417                 it != m_ReceiveProcessors.end(); 
    418                 ++it ) { 
    419                  
    420                 debugOutput( DEBUG_LEVEL_VERBOSE, "Before:\n"); 
    421                  
    422                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { 
    423                         (*it)->dumpInfo(); 
    424                 } 
    425  
    426                 (*it)->reset(); 
    427                  
    428                 debugOutput( DEBUG_LEVEL_VERBOSE, "After:\n"); 
    429  
    430                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { 
    431                         (*it)->dumpInfo(); 
    432                 } 
    433                  
    434         } 
    435          
    436         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    437                 it != m_TransmitProcessors.end(); 
    438                 ++it ) { 
    439                  
    440                 debugOutput( DEBUG_LEVEL_VERBOSE, "Before:\n"); 
    441                  
    442                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { 
    443                         (*it)->dumpInfo(); 
    444                 } 
    445                  
    446                 (*it)->reset(); 
    447                  
    448                 debugOutput( DEBUG_LEVEL_VERBOSE, "After:\n"); 
    449                  
    450                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { 
    451                         (*it)->dumpInfo(); 
    452                 } 
    453         } 
    454          
    455         debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n"); 
    456          
    457         debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor...\n"); 
    458         if (!m_SyncSource->prepareForEnable()) { 
    459                 debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n"); 
    460                 return false; 
    461         } 
    462          
    463         m_SyncSource->enable(); 
    464  
    465         debugOutput( DEBUG_LEVEL_VERBOSE, " All StreamProcessors...\n"); 
    466         if (!enableStreamProcessors()) { 
    467                 debugFatal("Could not enable StreamProcessors...\n"); 
     470        // start all SP's synchonized 
     471        if (!syncStartAll()) { 
     472                debugFatal("Could not syncStartAll...\n"); 
    468473                return false; 
    469474        } 
     
    511516         
    512517        m_streamingThread->Stop(); 
    513         m_isoManagerThread->Stop(); 
    514518         
    515519        debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n"); 
     
    551555 * @return true if successful, false otherwise 
    552556 */ 
    553 bool StreamProcessorManager::enableStreamProcessors() { 
    554         // and we enable the streamprocessors 
    555         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    556                 it != m_ReceiveProcessors.end(); 
    557                 ++it ) {                 
    558                 (*it)->prepareForEnable(); 
    559                 (*it)->enable(); 
    560         } 
    561  
    562         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    563                 it != m_TransmitProcessors.end(); 
    564                 ++it ) { 
    565                 (*it)->prepareForEnable(); 
    566                 (*it)->enable(); 
    567         } 
    568         return true; 
     557bool StreamProcessorManager::enableStreamProcessors(unsigned int time_to_enable_at) { 
     558    // we prepare the streamprocessors for enable 
     559    for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     560            it != m_ReceiveProcessors.end(); 
     561            ++it ) {             
     562        (*it)->prepareForEnable(); 
     563    } 
     564 
     565    for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     566            it != m_TransmitProcessors.end(); 
     567            ++it ) { 
     568        (*it)->prepareForEnable(); 
     569    } 
     570 
     571    // then we enable the streamprocessors 
     572    for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     573            it != m_ReceiveProcessors.end(); 
     574            ++it ) {             
     575        (*it)->enable(time_to_enable_at); 
     576    } 
     577 
     578    for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     579            it != m_TransmitProcessors.end(); 
     580            ++it ) { 
     581        (*it)->enable(time_to_enable_at); 
     582    } 
     583 
     584    // now we wait for the SP's to get enabled 
     585    debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be enabled...\n"); 
     586    // we have to wait until all streamprocessors indicate that they are running 
     587    // i.e. that there is actually some data stream flowing 
     588    int wait_cycles=2000; // two seconds 
     589    bool notEnabled=true; 
     590    while (notEnabled && wait_cycles) { 
     591        wait_cycles--; 
     592        notEnabled=false; 
     593         
     594        for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     595                it != m_ReceiveProcessors.end(); 
     596                ++it ) { 
     597            if(!(*it)->isEnabled()) notEnabled=true; 
     598        } 
     599 
     600        for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     601                it != m_TransmitProcessors.end(); 
     602                ++it ) { 
     603            if(!(*it)->isEnabled()) notEnabled=true; 
     604        } 
     605        usleep(1000); // one cycle 
     606    } 
     607     
     608    if(!wait_cycles) { // timout has occurred 
     609        debugFatal("One or more streams couldn't be enabled (timeout):\n"); 
     610                     
     611        for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     612                it != m_ReceiveProcessors.end(); 
     613                ++it ) { 
     614            if(!(*it)->isEnabled()) { 
     615                    debugFatal(" receive stream %p not enabled\n",*it); 
     616            } else {     
     617                    debugFatal(" receive stream %p enabled\n",*it); 
     618            } 
     619        } 
     620     
     621        for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     622                it != m_TransmitProcessors.end(); 
     623                ++it ) { 
     624            if(!(*it)->isEnabled()) { 
     625                    debugFatal(" transmit stream %p not enabled\n",*it); 
     626            } else {     
     627                    debugFatal(" transmit stream %p enabled\n",*it); 
     628            } 
     629        } 
     630        return false; 
     631    } 
     632     
     633    debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors enabled...\n"); 
     634 
     635    return true; 
    569636} 
    570637 
     
    574641 */ 
    575642bool StreamProcessorManager::disableStreamProcessors() { 
    576         // and we disable the streamprocessors 
    577         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    578                 it != m_ReceiveProcessors.end(); 
    579                 ++it ) { 
    580                 (*it)->prepareForDisable(); 
    581                 (*it)->disable(); 
    582         } 
    583  
    584         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    585                 it != m_TransmitProcessors.end(); 
    586                 ++it ) { 
    587                 (*it)->prepareForDisable(); 
    588                 (*it)->disable(); 
    589         } 
    590         return true; 
     643    // we prepare the streamprocessors for disable 
     644    for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     645            it != m_ReceiveProcessors.end(); 
     646            ++it ) { 
     647        (*it)->prepareForDisable(); 
     648    } 
     649 
     650    for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     651            it != m_TransmitProcessors.end(); 
     652            ++it ) { 
     653        (*it)->prepareForDisable(); 
     654    } 
     655 
     656    // then we disable the streamprocessors 
     657    for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     658            it != m_ReceiveProcessors.end(); 
     659            ++it ) { 
     660        (*it)->disable(); 
     661    } 
     662 
     663    for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     664            it != m_TransmitProcessors.end(); 
     665            ++it ) { 
     666        (*it)->disable(); 
     667    } 
     668 
     669    // now we wait for the SP's to get disabled 
     670    debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be disabled...\n"); 
     671    // we have to wait until all streamprocessors indicate that they are running 
     672    // i.e. that there is actually some data stream flowing 
     673    int wait_cycles=2000; // two seconds 
     674    bool enabled=true; 
     675    while (enabled && wait_cycles) { 
     676        wait_cycles--; 
     677        enabled=false; 
     678         
     679        for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     680                it != m_ReceiveProcessors.end(); 
     681                ++it ) { 
     682            if((*it)->isEnabled()) enabled=true; 
     683        } 
     684 
     685        for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     686                it != m_TransmitProcessors.end(); 
     687                ++it ) { 
     688            if((*it)->isEnabled()) enabled=true; 
     689        } 
     690        usleep(1000); // one cycle 
     691    } 
     692     
     693    if(!wait_cycles) { // timout has occurred 
     694        debugFatal("One or more streams couldn't be disabled (timeout):\n"); 
     695                     
     696        for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     697                it != m_ReceiveProcessors.end(); 
     698                ++it ) { 
     699            if(!(*it)->isEnabled()) { 
     700                    debugFatal(" receive stream %p not enabled\n",*it); 
     701            } else {     
     702                    debugFatal(" receive stream %p enabled\n",*it); 
     703            } 
     704        } 
     705     
     706        for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     707                it != m_TransmitProcessors.end(); 
     708                ++it ) { 
     709            if(!(*it)->isEnabled()) { 
     710                    debugFatal(" transmit stream %p not enabled\n",*it); 
     711            } else {     
     712                    debugFatal(" transmit stream %p enabled\n",*it); 
     713            } 
     714        } 
     715        return false; 
     716    } 
     717     
     718    debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors disabled...\n"); 
     719         
     720    return true; 
    591721} 
    592722 
     
    617747        } 
    618748 
    619         debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting Processors...\n"); 
    620          
    621         // now we reset the streamprocessors 
    622         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    623                 it != m_ReceiveProcessors.end(); 
    624                 ++it ) { 
    625                  
    626                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { 
    627                         (*it)->dumpInfo(); 
    628                 } 
    629                  
    630                 (*it)->reset(); 
    631                  
    632                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { 
    633                         (*it)->dumpInfo(); 
    634                 } 
    635         } 
    636          
    637         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    638                 it != m_TransmitProcessors.end(); 
    639                 ++it ) { 
    640                  
    641                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { 
    642                         (*it)->dumpInfo(); 
    643                 } 
    644                  
    645                 (*it)->reset(); 
    646                  
    647                 if(getDebugLevel()>=DEBUG_LEVEL_VERBOSE) { 
    648                         (*it)->dumpInfo(); 
    649                 } 
    650         } 
    651  
    652         debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n"); 
    653          
    654         debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor...\n"); 
    655         if (!m_SyncSource->prepareForEnable()) { 
    656                 debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n"); 
    657                 return false; 
    658         } 
    659          
    660         m_SyncSource->enable(); 
    661  
    662         debugOutput( DEBUG_LEVEL_VERBOSE, " All StreamProcessors...\n"); 
    663         if (!enableStreamProcessors()) { 
    664                 debugFatal("Could not enable StreamProcessors...\n"); 
     749        debugOutput( DEBUG_LEVEL_VERBOSE, "Restarting StreamProcessors...\n"); 
     750        // start all SP's synchonized 
     751        if (!syncStartAll()) { 
     752                debugFatal("Could not syncStartAll...\n"); 
    665753                return false; 
    666754        } 
  • branches/streaming-rework/src/libstreaming/StreamProcessorManager.h

    r384 r390  
    3131#include "../debugmodule/debugmodule.h" 
    3232#include "../libutil/Thread.h" 
    33 #include "../libutil/PosixThread.h" 
    3433#include <semaphore.h> 
    3534#include "Port.h" 
     
    6463    bool start(); 
    6564    bool stop(); 
    66  
     65     
     66    bool syncStartAll(); 
    6767 
    6868    // this is the setup API 
     
    7070    bool unregisterProcessor(StreamProcessor *processor); ///< stop managing a streamprocessor 
    7171 
    72     bool enableStreamProcessors(); /// enable registered StreamProcessors 
     72    bool enableStreamProcessors(unsigned int time_to_enable_at); /// enable registered StreamProcessors 
    7373    bool disableStreamProcessors(); /// disable registered StreamProcessors 
    7474 
     
    137137    StreamProcessorVector m_ReceiveProcessors; 
    138138    StreamProcessorVector m_TransmitProcessors; 
    139      
    140  
    141139 
    142140    unsigned int m_nb_buffers; 
     
    147145 
    148146    FreebobUtil::PosixThread *m_streamingThread; 
    149     FreebobUtil::PosixThread *m_isoManagerThread; 
    150147 
    151148    unsigned int m_nbperiods; 
  • branches/streaming-rework/tests/test-sytmonitor.cpp

    r384 r390  
    174174    SytMonitor *monitors[128]; 
    175175    int64_t stream_offset_ticks[128]; 
    176      
     176 
    177177    struct arguments arguments; 
    178178 
     
    191191    } 
    192192     
    193     memset(&stream_offset_ticks,0,sizeof(unsigned int) * 128); 
    194      
    195      
    196         run=1; 
    197          
    198         run_realtime=arguments.realtime; 
    199         realtime_prio=arguments.rtprio; 
    200  
    201         signal (SIGINT, sighandler); 
    202         signal (SIGPIPE, sighandler); 
    203  
    204         debugOutput(DEBUG_LEVEL_NORMAL, "Freebob SYT monitor\n"); 
    205          
    206         m_isoManager=new IsoHandlerManager(); 
    207          
    208         if(!m_isoManager) { 
    209                 debugOutput(DEBUG_LEVEL_NORMAL, "Could not create IsoHandlerManager\n"); 
    210                 goto finish; 
    211         } 
    212          
    213         m_isoManager->setVerboseLevel(DEBUG_LEVEL_VERBOSE); 
    214                  
    215     // the thread to execute the manager 
    216         m_isoManagerThread=new PosixThread( 
    217             m_isoManager,  
    218             run_realtime, realtime_prio, 
    219             PTHREAD_CANCEL_DEFERRED); 
    220              
    221         if(!m_isoManagerThread) { 
    222                 debugOutput(DEBUG_LEVEL_NORMAL, "Could not create iso manager thread\n"); 
    223                 goto finish; 
    224         } 
     193    memset(&stream_offset_ticks, 0, sizeof(int64_t) * 128); 
     194     
     195    run=1; 
     196     
     197    run_realtime=arguments.realtime; 
     198    realtime_prio=arguments.rtprio; 
     199 
     200    signal (SIGINT, sighandler); 
     201    signal (SIGPIPE, sighandler); 
     202 
     203    debugOutput(DEBUG_LEVEL_NORMAL, "Freebob SYT monitor\n"); 
     204     
     205    m_isoManager=new IsoHandlerManager(); 
     206     
     207    if(!m_isoManager) { 
     208        debugOutput(DEBUG_LEVEL_NORMAL, "Could not create IsoHandlerManager\n"); 
     209        goto finish; 
     210    } 
     211         
     212    m_isoManager->setVerboseLevel(DEBUG_LEVEL_VERBOSE); 
     213             
     214// the thread to execute the manager 
     215    m_isoManagerThread=new PosixThread( 
     216        m_isoManager,  
     217        run_realtime, realtime_prio, 
     218        PTHREAD_CANCEL_DEFERRED); 
     219         
     220    if(!m_isoManagerThread) { 
     221        debugOutput(DEBUG_LEVEL_NORMAL, "Could not create iso manager thread\n"); 
     222        goto finish; 
     223    } 
    225224         
    226225        // register monitors 
     
    354353                            // average out the offset 
    355354                                int64_t err=(((uint64_t)master_cif.pres_ticks) - ((uint64_t)cif.pres_ticks)); 
     355                                 
     356                                debugOutput(DEBUG_LEVEL_NORMAL,"Diff for %d at cycle %04d: %6lld (MTS: %11llu | STS: %11llu\n", 
     357                                    i,cif.cycle,err, master_cif.pres_ticks, cif.pres_ticks); 
    356358                                 
    357359                                err = err - stream_offset_ticks[i];