Show
Ignore:
Timestamp:
11/28/07 05:03:31 (15 years ago)
Author:
ppalmers
Message:

merge ppalmers-streaming branch

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/libffado/src/libstreaming/StreamProcessorManager.cpp

    r512 r734  
    2323 
    2424#include "StreamProcessorManager.h" 
    25 #include "StreamProcessor.h" 
    26 #include "Port.h" 
     25#include "generic/StreamProcessor.h" 
     26#include "generic/Port.h" 
     27#include "util/cycletimer.h" 
     28 
    2729#include <errno.h> 
    2830#include <assert.h> 
    29  
    30 #include "libstreaming/cycletimer.h" 
    31  
    32 #define CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL 5 
     31#include <math.h> 
    3332 
    3433#define RUNNING_TIMEOUT_MSEC 4000 
     
    3635#define ENABLE_TIMEOUT_MSEC 4000 
    3736 
    38 //#define ENABLE_DELAY_CYCLES 100 
    39 #define ENABLE_DELAY_CYCLES 1000 
     37// allows to add some processing margin. This shifts the time 
     38// at which the buffer is transfer()'ed, making things somewhat 
     39// more robust. It should be noted though that shifting the transfer 
     40// time to a later time instant also causes the xmit buffer fill to be 
     41// lower on average. 
     42#define FFADO_SIGNAL_DELAY_TICKS 3072*4 
    4043 
    4144namespace Streaming { 
    4245 
    43 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_NORMAL ); 
    44  
    45 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int nb_buffers) 
     46IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE ); 
     47 
     48StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int framerate, unsigned int nb_buffers) 
    4649    : m_is_slave( false ) 
    4750    , m_SyncSource(NULL) 
    4851    , m_nb_buffers(nb_buffers) 
    4952    , m_period(period) 
     53    , m_nominal_framerate ( framerate ) 
    5054    , m_xruns(0) 
    5155    , m_isoManager(0) 
     
    5761StreamProcessorManager::~StreamProcessorManager() { 
    5862    if (m_isoManager) delete m_isoManager; 
    59  
    6063} 
    6164 
     
    7780    assert(m_isoManager); 
    7881 
    79     if (processor->getType()==StreamProcessor::E_Receive) { 
     82    if (processor->getType() == StreamProcessor::ePT_Receive) { 
    8083        processor->setVerboseLevel(getDebugLevel()); // inherit debug level 
    8184 
    8285        m_ReceiveProcessors.push_back(processor); 
    83  
    8486        processor->setManager(this); 
    85  
    8687        return true; 
    8788    } 
    8889 
    89     if (processor->getType()==StreamProcessor::E_Transmit) { 
     90    if (processor->getType() == StreamProcessor::ePT_Transmit) { 
    9091        processor->setVerboseLevel(getDebugLevel()); // inherit debug level 
    9192 
    9293        m_TransmitProcessors.push_back(processor); 
    93  
    9494        processor->setManager(this); 
    95  
    9695        return true; 
    9796    } 
    9897 
    9998    debugFatal("Unsupported processor type!\n"); 
    100  
    10199    return false; 
    102100} 
     
    107105    assert(processor); 
    108106 
    109     if (processor->getType()==StreamProcessor::E_Receive) { 
     107    if (processor->getType()==StreamProcessor::ePT_Receive) { 
    110108 
    111109        for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    112             it != m_ReceiveProcessors.end(); 
    113             ++it ) { 
    114  
     110              it != m_ReceiveProcessors.end(); 
     111              ++it ) 
     112        { 
    115113            if ( *it == processor ) { 
    116                     m_ReceiveProcessors.erase(it); 
    117  
    118                     processor->clearManager(); 
    119  
    120                     if(!m_isoManager->unregisterStream(processor)) { 
    121                         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n"); 
    122  
    123                         return false; 
    124  
    125                     } 
    126  
    127                     return true; 
     114                m_ReceiveProcessors.erase(it); 
     115                processor->clearManager(); 
     116                if(!m_isoManager->unregisterStream(processor)) { 
     117                    debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n"); 
     118                    return false; 
    128119                } 
    129         } 
    130     } 
    131  
    132     if (processor->getType()==StreamProcessor::E_Transmit) { 
     120                return true; 
     121            } 
     122        } 
     123    } 
     124 
     125    if (processor->getType()==StreamProcessor::ePT_Transmit) { 
    133126        for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    134             it != m_TransmitProcessors.end(); 
    135             ++it ) { 
    136  
     127              it != m_TransmitProcessors.end(); 
     128              ++it ) 
     129        { 
    137130            if ( *it == processor ) { 
    138                     m_TransmitProcessors.erase(it); 
    139  
    140                     processor->clearManager(); 
    141  
    142                     if(!m_isoManager->unregisterStream(processor)) { 
    143                         debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n"); 
    144  
    145                         return false; 
    146  
    147                     } 
    148  
    149                     return true; 
     131                m_TransmitProcessors.erase(it); 
     132                processor->clearManager(); 
     133                if(!m_isoManager->unregisterStream(processor)) { 
     134                    debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n"); 
     135                    return false; 
    150136                } 
     137                return true; 
     138            } 
    151139        } 
    152140    } 
    153141 
    154142    debugFatal("Processor (%p) not found!\n",processor); 
    155  
    156143    return false; //not found 
    157  
    158144} 
    159145 
    160146bool StreamProcessorManager::setSyncSource(StreamProcessor *s) { 
    161147    debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s); 
    162  
    163148    m_SyncSource=s; 
    164149    return true; 
    165 } 
    166  
    167 StreamProcessor *StreamProcessorManager::getSyncSource() { 
    168     return m_SyncSource; 
    169150} 
    170151 
     
    172153{ 
    173154    debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); 
    174  
    175     m_isoManager=new IsoHandlerManager(m_thread_realtime, m_thread_priority); 
    176  
     155    m_isoManager = new IsoHandlerManager(m_thread_realtime, m_thread_priority + 1); 
    177156    if(!m_isoManager) { 
    178157        debugFatal("Could not create IsoHandlerManager\n"); 
    179158        return false; 
    180159    } 
    181  
    182     // propagate the debug level 
    183160    m_isoManager->setVerboseLevel(getDebugLevel()); 
     161     
     162    // try to queue up 75% of the frames in the transmit buffer 
     163    unsigned int nb_frames = (getNbBuffers() - 1) * getPeriodSize() * 1000 / 2000; 
     164    m_isoManager->setTransmitBufferNbFrames(nb_frames); 
    184165 
    185166    if(!m_isoManager->init()) { 
     
    189170 
    190171    m_xrun_happened=false; 
    191  
    192172    return true; 
    193173} 
     
    207187    } 
    208188 
     189    // FIXME: put into separate method 
     190    for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     191          it != m_ReceiveProcessors.end(); 
     192          ++it ) 
     193    { 
     194        if(m_SyncSource == NULL) { 
     195            debugWarning(" => Sync Source is %p.\n", *it); 
     196            m_SyncSource = *it; 
     197        } 
     198    } 
     199    for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     200          it != m_TransmitProcessors.end(); 
     201          ++it ) 
     202    { 
     203        if(m_SyncSource == NULL) { 
     204            debugWarning(" => Sync Source is %p.\n", *it); 
     205            m_SyncSource = *it; 
     206        } 
     207    } 
     208 
     209    // now do the actual preparation of the SP's 
     210    debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n"); 
    209211    for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    210212        it != m_ReceiveProcessors.end(); 
    211213        ++it ) { 
    212             if(m_SyncSource == NULL) { 
    213                 debugWarning(" => Sync Source is %p.\n", *it); 
    214                 m_SyncSource = *it; 
    215             } 
    216     } 
    217  
    218     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    219         it != m_TransmitProcessors.end(); 
    220         ++it ) { 
    221             if(m_SyncSource == NULL) { 
    222                 debugWarning(" => Sync Source is %p.\n", *it); 
    223                 m_SyncSource = *it; 
    224             } 
    225     } 
    226  
    227     // now do the actual preparation 
    228     debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n"); 
    229     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    230         it != m_ReceiveProcessors.end(); 
    231         ++it ) { 
    232  
    233         if(!(*it)->setSyncSource(m_SyncSource)) { 
    234             debugFatal(  " could not set sync source (%p)...\n",(*it)); 
    235             return false; 
    236         } 
    237214 
    238215        if(!(*it)->setOption("slaveMode", m_is_slave)) { 
     
    245222        } 
    246223    } 
    247  
    248224    debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n"); 
    249225    for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    250226        it != m_TransmitProcessors.end(); 
    251227        ++it ) { 
    252         if(!(*it)->setSyncSource(m_SyncSource)) { 
    253             debugFatal(  " could not set sync source (%p)...\n",(*it)); 
    254             return false; 
    255         } 
    256228        if(!(*it)->setOption("slaveMode", m_is_slave)) { 
    257229            debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it)); 
     
    269241        return false; 
    270242    } 
    271  
    272243    return true; 
    273244} 
    274245 
    275 bool StreamProcessorManager::syncStartAll() { 
    276  
    277     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for StreamProcessor streams to start running...\n"); 
    278     // we have to wait until all streamprocessors indicate that they are running 
    279     // i.e. that there is actually some data stream flowing 
    280     int wait_cycles=RUNNING_TIMEOUT_MSEC; // two seconds 
    281     bool notRunning=true; 
    282     while (notRunning && wait_cycles) { 
    283         wait_cycles--; 
    284         notRunning=false; 
    285  
     246bool StreamProcessorManager::startDryRunning() { 
     247    debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n"); 
     248    debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n"); 
     249    for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     250            it != m_ReceiveProcessors.end(); 
     251            ++it ) { 
     252        if (!(*it)->isDryRunning()) { 
     253            if(!(*it)->scheduleStartDryRunning(-1)) { 
     254                debugError("Could not put SP %p into the dry-running state\n", *it); 
     255                return false; 
     256            } 
     257        } else { 
     258            debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it); 
     259        } 
     260    } 
     261    for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     262            it != m_TransmitProcessors.end(); 
     263            ++it ) { 
     264        if (!(*it)->isDryRunning()) { 
     265            if(!(*it)->scheduleStartDryRunning(-1)) { 
     266                debugError("Could not put SP %p into the dry-running state\n", *it); 
     267                return false; 
     268            } 
     269        } else { 
     270            debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it); 
     271        } 
     272    } 
     273    debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n"); 
     274    // wait for the syncsource to start running. 
     275    // that will block the waitForPeriod call until everyone has started (theoretically) 
     276    #define CYCLES_FOR_DRYRUN 40000 
     277    int cnt = CYCLES_FOR_DRYRUN; // by then it should have started 
     278    bool all_dry_running = false; 
     279    while (!all_dry_running && cnt) { 
     280        all_dry_running = true; 
    286281        for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    287282                it != m_ReceiveProcessors.end(); 
    288283                ++it ) { 
    289             if(!(*it)->isRunning()) notRunning=true; 
    290         } 
    291  
     284            all_dry_running &= (*it)->isDryRunning(); 
     285        } 
    292286        for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    293287                it != m_TransmitProcessors.end(); 
    294288                ++it ) { 
    295             if(!(*it)->isRunning()) notRunning=true; 
    296         } 
    297  
    298         // EXPERIMENT: 
    299         // the only stream that should be running is the sync 
    300         // source stream, as this is the one that defines 
    301         // when to signal buffers. Maybe we get an xrun at startup, 
    302         // but that should be handled. 
    303  
    304         // the problem is that otherwise a setup with a device 
    305         // that waits for decent input before sending output 
    306         // will not start up (e.g. the bounce device), because 
    307         // all streams are required to be running. 
    308  
    309         // other streams still have at least ENABLE_DELAY_CYCLES cycles 
    310         // to start up 
    311 //         if(!m_SyncSource->isRunning()) notRunning=true; 
    312  
    313         usleep(1000); 
    314         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Running check: %d\n",notRunning); 
    315     } 
    316  
    317     if(!wait_cycles) { // timout has occurred 
    318         debugFatal("One or more streams are not starting up (timeout):\n"); 
    319  
     289            all_dry_running &= (*it)->isDryRunning(); 
     290        } 
     291 
     292        usleep(125); 
     293        cnt--; 
     294    } 
     295    if(cnt==0) { 
     296        debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n"); 
    320297        for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    321298                it != m_ReceiveProcessors.end(); 
    322299                ++it ) { 
    323             if(!(*it)->isRunning()) { 
    324                 debugFatal(" receive stream %p not running\n",*it); 
    325             } else { 
    326                 debugFatal(" receive stream %p running\n",*it); 
    327             } 
    328         } 
    329  
     300            debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n", 
     301                (*it)->getTypeString(), *it, (*it)->getStateString()); 
     302        } 
    330303        for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    331304                it != m_TransmitProcessors.end(); 
    332305                ++it ) { 
    333             if(!(*it)->isRunning()) { 
    334                 debugFatal(" transmit stream %p not running\n",*it); 
    335             } else { 
    336                 debugFatal(" transmit stream %p running\n",*it); 
    337             } 
    338         } 
    339         return false; 
    340     } 
    341  
    342     // we want to make sure that everything is running well, 
    343     // so wait for a while 
    344     usleep(USECS_PER_CYCLE * CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL); 
    345  
     306            debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n", 
     307                (*it)->getTypeString(), *it, (*it)->getStateString()); 
     308        } 
     309        return false; 
     310    } 
     311    debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n"); 
     312    return true; 
     313
     314 
     315bool StreamProcessorManager::syncStartAll() { 
     316    // figure out when to get the SP's running. 
     317    // the xmit SP's should also know the base timestamp 
     318    // streams should be aligned here 
     319 
     320    // now find out how long we have to delay the wait operation such that 
     321    // the received frames will all be presented to the SP 
     322    debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n"); 
     323    int max_of_min_delay = 0; 
     324    int min_delay = 0; 
     325    for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     326            it != m_ReceiveProcessors.end(); 
     327            ++it ) { 
     328        min_delay = (*it)->getMaxFrameLatency(); 
     329        if(min_delay > max_of_min_delay) max_of_min_delay = min_delay; 
     330    } 
     331 
     332    // add some processing margin. This only shifts the time 
     333    // at which the buffer is transfer()'ed. This makes things somewhat 
     334    // more robust. It should be noted though that shifting the transfer 
     335    // time to a later time instant also causes the xmit buffer fill to be 
     336    // lower on average. 
     337    max_of_min_delay += FFADO_SIGNAL_DELAY_TICKS; 
     338    debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n",  
     339        max_of_min_delay, 
     340        (unsigned int)TICKS_TO_SECS(max_of_min_delay), 
     341        (unsigned int)TICKS_TO_CYCLES(max_of_min_delay), 
     342        (unsigned int)TICKS_TO_OFFSET(max_of_min_delay)); 
     343    m_SyncSource->setSyncDelay(max_of_min_delay); 
     344 
     345    //STEP X: when we implement such a function, we can wait for a signal from the devices that they 
     346    //        have aquired lock 
     347    //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n"); 
     348    //sleep(2); // FIXME: be smarter here 
     349 
     350    // make sure that we are dry-running long enough for the 
     351    // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)? 
     352    debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n"); 
     353    int nb_sync_runs=20; 
     354    int64_t time_till_next_period; 
     355    while(nb_sync_runs--) { // or while not sync-ed? 
     356        // check if we were woken up too soon 
     357        time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); 
     358        debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period); 
     359        if(time_till_next_period > 0) { 
     360            // wait for the period 
     361            usleep(time_till_next_period); 
     362        } 
     363    } 
     364 
     365    debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n"); 
     366    // FIXME: in the SPM it would be nice to have system time instead of 
     367    //        1394 time 
     368 
     369    // we now should have decent sync info on the sync source 
     370    // determine a point in time where the system should start 
     371    // figure out where we are now 
     372    uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod(); 
     373    debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n",  
     374        time_of_first_sample, 
     375        (unsigned int)TICKS_TO_SECS(time_of_first_sample), 
     376        (unsigned int)TICKS_TO_CYCLES(time_of_first_sample), 
     377        (unsigned int)TICKS_TO_OFFSET(time_of_first_sample)); 
     378 
     379    #define CYCLES_FOR_STARTUP 2000 
     380    // start wet-running in CYCLES_FOR_STARTUP cycles 
     381    // this is the time window we have to setup all SP's such that they  
     382    // can start wet-running correctly. 
     383    time_of_first_sample = addTicks(time_of_first_sample, 
     384                                    CYCLES_FOR_STARTUP * TICKS_PER_CYCLE); 
     385 
     386    debugOutput( DEBUG_LEVEL_VERBOSE, "  => first sample at TS=%011llu (%03us %04uc %04ut)...\n",  
     387        time_of_first_sample, 
     388        (unsigned int)TICKS_TO_SECS(time_of_first_sample), 
     389        (unsigned int)TICKS_TO_CYCLES(time_of_first_sample), 
     390        (unsigned int)TICKS_TO_OFFSET(time_of_first_sample)); 
     391 
     392    // we should start wet-running the transmit SP's some cycles in advance 
     393    // such that we know it is wet-running when it should output its first sample 
     394    #define PRESTART_CYCLES_FOR_XMIT 20 
     395    uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,  
     396                                                 PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE); 
     397 
     398    #define PRESTART_CYCLES_FOR_RECV 0 
     399    uint64_t time_to_start_recv = substractTicks(time_of_first_sample, 
     400                                                 PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE); 
     401    debugOutput( DEBUG_LEVEL_VERBOSE, "  => xmit starts at  TS=%011llu (%03us %04uc %04ut)...\n",  
     402        time_to_start_xmit, 
     403        (unsigned int)TICKS_TO_SECS(time_to_start_xmit), 
     404        (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit), 
     405        (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit)); 
     406    debugOutput( DEBUG_LEVEL_VERBOSE, "  => recv starts at  TS=%011llu (%03us %04uc %04ut)...\n",  
     407        time_to_start_recv, 
     408        (unsigned int)TICKS_TO_SECS(time_to_start_recv), 
     409        (unsigned int)TICKS_TO_CYCLES(time_to_start_recv), 
     410        (unsigned int)TICKS_TO_OFFSET(time_to_start_recv)); 
     411 
     412    // at this point the buffer head timestamp of the transmit buffers can be set 
     413    // this is the presentation time of the first sample in the buffer 
     414    for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     415          it != m_TransmitProcessors.end(); 
     416          ++it ) { 
     417        (*it)->setBufferHeadTimestamp(time_of_first_sample); 
     418    } 
     419 
     420    // STEP X: switch SP's over to the running state 
     421    for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     422          it != m_ReceiveProcessors.end(); 
     423          ++it ) { 
     424        if(!(*it)->scheduleStartRunning(time_to_start_recv)) { 
     425            debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv); 
     426            return false; 
     427        } 
     428    } 
     429    for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     430          it != m_TransmitProcessors.end(); 
     431          ++it ) { 
     432        if(!(*it)->scheduleStartRunning(time_to_start_xmit)) { 
     433            debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit); 
     434            return false; 
     435        } 
     436    } 
     437    // wait for the syncsource to start running. 
     438    // that will block the waitForPeriod call until everyone has started (theoretically) 
     439    int cnt = CYCLES_FOR_STARTUP * 2; // by then it should have started 
     440    while (!m_SyncSource->isRunning() && cnt) { 
     441        usleep(125); 
     442        cnt--; 
     443    } 
     444    if(cnt==0) { 
     445        debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n"); 
     446        return false; 
     447    } 
     448 
     449    // now align the received streams 
     450    if(!alignReceivedStreams()) { 
     451        debugError("Could not align streams\n"); 
     452        return false; 
     453    } 
    346454    debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n"); 
    347  
    348     debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n"); 
    349  
    350     int max_of_min_delay=0; 
    351     int min_delay=0; 
    352     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    353             it != m_ReceiveProcessors.end(); 
    354             ++it ) { 
    355         min_delay=(*it)->getMinimalSyncDelay(); 
    356         if(min_delay>max_of_min_delay) max_of_min_delay=min_delay; 
    357     } 
    358  
    359     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    360             it != m_TransmitProcessors.end(); 
    361             ++it ) { 
    362         min_delay=(*it)->getMinimalSyncDelay(); 
    363         if(min_delay>max_of_min_delay) max_of_min_delay=min_delay; 
    364     } 
    365  
    366     debugOutput( DEBUG_LEVEL_VERBOSE, "  %d ticks\n", max_of_min_delay); 
    367     m_SyncSource->setSyncDelay(max_of_min_delay); 
    368  
    369  
    370     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting StreamProcessors...\n"); 
    371     // now we reset the frame counters 
    372     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    373             it != m_ReceiveProcessors.end(); 
    374             ++it ) { 
    375         (*it)->reset(); 
    376     } 
    377  
    378     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    379             it != m_TransmitProcessors.end(); 
    380             ++it ) { 
    381         (*it)->reset(); 
    382     } 
    383  
    384     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n"); 
    385  
    386     uint64_t now=m_SyncSource->getTimeNow(); // fixme: should be in usecs, not ticks 
    387  
    388     // FIXME: this should not be in cycles, but in 'time' 
    389     unsigned int enable_at=TICKS_TO_CYCLES(now)+ENABLE_DELAY_CYCLES; 
    390     if (enable_at > 8000) enable_at -= 8000; 
    391  
    392     if (!enableStreamProcessors(enable_at)) { 
    393         debugFatal("Could not enable StreamProcessors...\n"); 
    394         return false; 
    395     } 
    396  
     455    return true; 
     456
     457 
     458bool 
     459StreamProcessorManager::alignReceivedStreams() 
     460
     461    #define NB_PERIODS_FOR_ALIGN_AVERAGE 20 
     462    #define NB_ALIGN_TRIES 20 
     463    debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n"); 
     464    unsigned int nb_sync_runs; 
     465    unsigned int nb_rcv_sp = m_ReceiveProcessors.size(); 
     466    int64_t diff_between_streams[nb_rcv_sp]; 
     467    int64_t diff; 
     468 
     469    unsigned int i; 
     470 
     471    bool aligned = false; 
     472    int cnt = NB_ALIGN_TRIES; 
     473    while (!aligned && cnt--) { 
     474        nb_sync_runs = NB_PERIODS_FOR_ALIGN_AVERAGE; 
     475        while(nb_sync_runs) { 
     476            debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs); 
     477            waitForPeriod(); 
     478 
     479            i = 0; 
     480            for ( i = 0; i < nb_rcv_sp; i++) { 
     481                StreamProcessor *s = m_ReceiveProcessors.at(i); 
     482                diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod()); 
     483                debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "  offset between SyncSP %p and SP %p is %lld ticks...\n",  
     484                    m_SyncSource, s, diff); 
     485                if ( nb_sync_runs == NB_PERIODS_FOR_ALIGN_AVERAGE ) { 
     486                    diff_between_streams[i] = diff; 
     487                } else { 
     488                    diff_between_streams[i] += diff; 
     489                } 
     490            } 
     491            if(!transferSilence()) { 
     492                debugError("Could not transfer silence\n"); 
     493                return false; 
     494            } 
     495            nb_sync_runs--; 
     496        } 
     497        // calculate the average offsets 
     498        debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n"); 
     499        int diff_between_streams_frames[nb_rcv_sp]; 
     500        aligned = true; 
     501        for ( i = 0; i < nb_rcv_sp; i++) { 
     502            StreamProcessor *s = m_ReceiveProcessors.at(i); 
     503 
     504            diff_between_streams[i] /= NB_PERIODS_FOR_ALIGN_AVERAGE; 
     505            diff_between_streams_frames[i] = roundf(diff_between_streams[i] / s->getTicksPerFrame()); 
     506            debugOutput( DEBUG_LEVEL_VERBOSE, "   avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n",  
     507                m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]); 
     508 
     509            aligned &= (diff_between_streams_frames[i] == 0); 
     510 
     511            // reposition the stream 
     512            if(!s->shiftStream(diff_between_streams_frames[i])) { 
     513                debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]); 
     514                return false; 
     515            } 
     516        } 
     517        if (!aligned) { 
     518            debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n"); 
     519        } 
     520    } 
     521    if (cnt == 0) { 
     522        debugError("Align failed\n"); 
     523        return false; 
     524    } 
    397525    return true; 
    398526} 
     
    405533    debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n"); 
    406534    for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    407         it != m_ReceiveProcessors.end(); 
    408         ++it ) { 
    409             if (!(*it)->prepareForStart()) { 
    410                 debugOutput(DEBUG_LEVEL_VERBOSE,"Receive stream processor (%p) failed to prepare for start\n", *it); 
    411                 return false; 
    412             } 
    413             if (!m_isoManager->registerStream(*it)) { 
    414                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it); 
    415                 return false; 
    416             } 
    417         } 
    418  
     535          it != m_ReceiveProcessors.end(); 
     536          ++it ) 
     537    { 
     538        if (!m_isoManager->registerStream(*it)) { 
     539            debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it); 
     540            return false; 
     541        } 
     542    } 
    419543    debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n"); 
    420544    for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    421         it != m_TransmitProcessors.end(); 
    422         ++it ) { 
    423             if (!(*it)->prepareForStart()) { 
    424                 debugOutput(DEBUG_LEVEL_VERBOSE,"Transmit stream processor (%p) failed to prepare for start\n", *it); 
    425                 return false; 
    426             } 
    427             if (!m_isoManager->registerStream(*it)) { 
    428                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it); 
    429                 return false; 
    430             } 
    431         } 
     545          it != m_TransmitProcessors.end(); 
     546          ++it ) 
     547    { 
     548        if (!m_isoManager->registerStream(*it)) { 
     549            debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it); 
     550            return false; 
     551        } 
     552    } 
    432553 
    433554    debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n"); 
     
    437558    } 
    438559 
    439     debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n"); 
    440         if (!disableStreamProcessors()) { 
    441         debugFatal("Could not disable StreamProcessors...\n"); 
    442         return false; 
    443     } 
    444  
    445560    debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n"); 
    446561    if (!m_isoManager->startHandlers(-1)) { 
     
    449564    } 
    450565 
     566    // put all SP's into dry-running state 
     567    if (!startDryRunning()) { 
     568        debugFatal("Could not put SP's in dry-running state\n"); 
     569        return false; 
     570    } 
     571 
    451572    // start all SP's synchonized 
    452573    if (!syncStartAll()) { 
     
    461582 
    462583    return true; 
    463  
    464584} 
    465585 
     
    468588    assert(m_isoManager); 
    469589 
    470     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to prepare to stop...\n"); 
    471     // Most stream processors can just stop without special treatment.  However, some 
    472     // (like the MOTU) need to do a few things before it's safe to turn off the iso 
    473     // handling. 
    474     int wait_cycles=PREPARE_TIMEOUT_MSEC; // two seconds ought to be sufficient 
    475     bool allReady = false; 
    476     while (!allReady && wait_cycles) { 
    477         wait_cycles--; 
    478         allReady = true; 
    479  
     590    debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n"); 
     591 
     592    // switch SP's over to the dry-running state 
     593    for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     594          it != m_ReceiveProcessors.end(); 
     595          ++it ) { 
     596        if(!(*it)->scheduleStopRunning(-1)) { 
     597            debugError("%p->scheduleStopRunning(-1) failed\n", *it); 
     598            return false; 
     599        } 
     600    } 
     601    for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     602          it != m_TransmitProcessors.end(); 
     603          ++it ) { 
     604        if(!(*it)->scheduleStopRunning(-1)) { 
     605            debugError("%p->scheduleStopRunning(-1) failed\n", *it); 
     606            return false; 
     607        } 
     608    } 
     609    // wait for the SP's to get into the dry-running state 
     610    int cnt = 200; 
     611    bool ready = false; 
     612    while (!ready && cnt) { 
     613        ready = true; 
    480614        for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    481615            it != m_ReceiveProcessors.end(); 
    482616            ++it ) { 
    483             if(!(*it)->prepareForStop()) allReady = false; 
    484         } 
    485  
     617            ready &= ((*it)->isDryRunning() || (*it)->isStopped()); 
     618        } 
    486619        for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    487620            it != m_TransmitProcessors.end(); 
    488621            ++it ) { 
    489             if(!(*it)->prepareForStop()) allReady = false; 
    490         } 
    491         usleep(1000); 
    492     } 
    493  
     622            ready &= ((*it)->isDryRunning() || (*it)->isStopped()); 
     623        } 
     624        usleep(125); 
     625        cnt--; 
     626    } 
     627    if(cnt==0) { 
     628        debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n"); 
     629        return false; 
     630    } 
     631 
     632    // switch SP's over to the stopped state 
     633    for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     634          it != m_ReceiveProcessors.end(); 
     635          ++it ) { 
     636        if(!(*it)->scheduleStopDryRunning(-1)) { 
     637            debugError("%p->scheduleStopDryRunning(-1) failed\n", *it); 
     638            return false; 
     639        } 
     640    } 
     641    for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     642          it != m_TransmitProcessors.end(); 
     643          ++it ) { 
     644        if(!(*it)->scheduleStopDryRunning(-1)) { 
     645            debugError("%p->scheduleStopDryRunning(-1) failed\n", *it); 
     646            return false; 
     647        } 
     648    } 
     649    // wait for the SP's to get into the running state 
     650    cnt = 200; 
     651    ready = false; 
     652    while (!ready && cnt) { 
     653        ready = true; 
     654        for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     655            it != m_ReceiveProcessors.end(); 
     656            ++it ) { 
     657            ready &= (*it)->isStopped(); 
     658        } 
     659        for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     660            it != m_TransmitProcessors.end(); 
     661            ++it ) { 
     662            ready &= (*it)->isStopped(); 
     663        } 
     664        usleep(125); 
     665        cnt--; 
     666    } 
     667    if(cnt==0) { 
     668        debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to stop\n"); 
     669        return false; 
     670    } 
    494671 
    495672    debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n"); 
     
    503680    debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n"); 
    504681    for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    505         it != m_ReceiveProcessors.end(); 
    506         ++it ) { 
    507             if (!m_isoManager->unregisterStream(*it)) { 
    508                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it); 
    509                 return false; 
    510             } 
    511  
    512         } 
    513  
     682          it != m_ReceiveProcessors.end(); 
     683          ++it ) { 
     684        if (!m_isoManager->unregisterStream(*it)) { 
     685            debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it); 
     686            return false; 
     687        } 
     688    } 
    514689    debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n"); 
    515690    for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    516         it != m_TransmitProcessors.end(); 
    517         ++it ) { 
    518             if (!m_isoManager->unregisterStream(*it)) { 
    519                 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it); 
    520                 return false; 
    521             } 
    522  
    523         } 
    524  
    525     return true; 
    526  
    527 
    528  
    529 /** 
    530  * Enables the registered StreamProcessors 
    531  * @return true if successful, false otherwise 
    532  */ 
    533 bool StreamProcessorManager::enableStreamProcessors(uint64_t time_to_enable_at) { 
    534     debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors at %llu...\n", time_to_enable_at); 
    535  
    536     debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor (%p)...\n",m_SyncSource); 
    537     debugOutput( DEBUG_LEVEL_VERBOSE, "  Prepare...\n"); 
    538     if (!m_SyncSource->prepareForEnable(time_to_enable_at)) { 
    539             debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n"); 
    540         return false; 
    541     } 
    542  
    543     debugOutput( DEBUG_LEVEL_VERBOSE, "  Enable...\n"); 
    544     m_SyncSource->enable(time_to_enable_at); 
    545  
    546     debugOutput( DEBUG_LEVEL_VERBOSE, " Other StreamProcessors...\n"); 
    547  
    548     // we prepare the streamprocessors for enable 
    549     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    550             it != m_ReceiveProcessors.end(); 
    551             ++it ) { 
    552         if(*it != m_SyncSource) { 
    553             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Receive SP (%p)...\n",*it); 
    554             (*it)->prepareForEnable(time_to_enable_at); 
    555         } 
    556     } 
    557  
    558     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    559             it != m_TransmitProcessors.end(); 
    560             ++it ) { 
    561         if(*it != m_SyncSource) { 
    562             debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Transmit SP (%p)...\n",*it); 
    563             (*it)->prepareForEnable(time_to_enable_at); 
    564         } 
    565     } 
    566  
    567     // then we enable the streamprocessors 
    568     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    569             it != m_ReceiveProcessors.end(); 
    570             ++it ) { 
    571         if(*it != m_SyncSource) { 
    572             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Receive SP (%p)...\n",*it); 
    573             (*it)->enable(time_to_enable_at); 
    574         } 
    575     } 
    576  
    577     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    578             it != m_TransmitProcessors.end(); 
    579             ++it ) { 
    580         if(*it != m_SyncSource) { 
    581             debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Transmit SP (%p)...\n",*it); 
    582             (*it)->enable(time_to_enable_at); 
    583         } 
    584     } 
    585  
    586     // now we wait for the SP's to get enabled 
    587     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be enabled...\n"); 
    588     // we have to wait until all streamprocessors indicate that they are running 
    589     // i.e. that there is actually some data stream flowing 
    590     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds 
    591     bool notEnabled=true; 
    592     while (notEnabled && wait_cycles) { 
    593         wait_cycles--; 
    594         notEnabled=false; 
    595  
    596         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    597                 it != m_ReceiveProcessors.end(); 
    598                 ++it ) { 
    599             if(!(*it)->isEnabled()) notEnabled=true; 
    600         } 
    601  
    602         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    603                 it != m_TransmitProcessors.end(); 
    604                 ++it ) { 
    605             if(!(*it)->isEnabled()) notEnabled=true; 
    606         } 
    607         usleep(1000); // one cycle 
    608     } 
    609  
    610     if(!wait_cycles) { // timout has occurred 
    611         debugFatal("One or more streams couldn't be enabled (timeout):\n"); 
    612  
    613         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    614                 it != m_ReceiveProcessors.end(); 
    615                 ++it ) { 
    616             if(!(*it)->isEnabled()) { 
    617                     debugFatal(" receive stream %p not enabled\n",*it); 
    618             } else { 
    619                     debugFatal(" receive stream %p enabled\n",*it); 
    620             } 
    621         } 
    622  
    623         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    624                 it != m_TransmitProcessors.end(); 
    625                 ++it ) { 
    626             if(!(*it)->isEnabled()) { 
    627                     debugFatal(" transmit stream %p not enabled\n",*it); 
    628             } else { 
    629                     debugFatal(" transmit stream %p enabled\n",*it); 
    630             } 
    631         } 
    632         return false; 
    633     } 
    634  
    635     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors enabled...\n"); 
    636  
    637     return true; 
    638 
    639  
    640 /** 
    641  * Disables the registered StreamProcessors 
    642  * @return true if successful, false otherwise 
    643  */ 
    644 bool StreamProcessorManager::disableStreamProcessors() { 
    645     // we prepare the streamprocessors for disable 
    646     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    647             it != m_ReceiveProcessors.end(); 
    648             ++it ) { 
    649         (*it)->prepareForDisable(); 
    650     } 
    651  
    652     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    653             it != m_TransmitProcessors.end(); 
    654             ++it ) { 
    655         (*it)->prepareForDisable(); 
    656     } 
    657  
    658     // then we disable the streamprocessors 
    659     for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    660             it != m_ReceiveProcessors.end(); 
    661             ++it ) { 
    662         (*it)->disable(); 
    663     } 
    664  
    665     for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    666             it != m_TransmitProcessors.end(); 
    667             ++it ) { 
    668         (*it)->disable(); 
    669     } 
    670  
    671     // now we wait for the SP's to get disabled 
    672     debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be disabled...\n"); 
    673     // we have to wait until all streamprocessors indicate that they are running 
    674     // i.e. that there is actually some data stream flowing 
    675     int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds 
    676     bool enabled=true; 
    677     while (enabled && wait_cycles) { 
    678         wait_cycles--; 
    679         enabled=false; 
    680  
    681         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    682                 it != m_ReceiveProcessors.end(); 
    683                 ++it ) { 
    684             if((*it)->isEnabled()) enabled=true; 
    685         } 
    686  
    687         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    688                 it != m_TransmitProcessors.end(); 
    689                 ++it ) { 
    690             if((*it)->isEnabled()) enabled=true; 
    691         } 
    692         usleep(1000); // one cycle 
    693     } 
    694  
    695     if(!wait_cycles) { // timout has occurred 
    696         debugFatal("One or more streams couldn't be disabled (timeout):\n"); 
    697  
    698         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    699                 it != m_ReceiveProcessors.end(); 
    700                 ++it ) { 
    701             if(!(*it)->isEnabled()) { 
    702                     debugFatal(" receive stream %p not enabled\n",*it); 
    703             } else { 
    704                     debugFatal(" receive stream %p enabled\n",*it); 
    705             } 
    706         } 
    707  
    708         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    709                 it != m_TransmitProcessors.end(); 
    710                 ++it ) { 
    711             if(!(*it)->isEnabled()) { 
    712                     debugFatal(" transmit stream %p not enabled\n",*it); 
    713             } else { 
    714                     debugFatal(" transmit stream %p enabled\n",*it); 
    715             } 
    716         } 
    717         return false; 
    718     } 
    719  
    720     debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors disabled...\n"); 
    721  
     691          it != m_TransmitProcessors.end(); 
     692          ++it ) { 
     693        if (!m_isoManager->unregisterStream(*it)) { 
     694            debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it); 
     695            return false; 
     696        } 
     697    } 
    722698    return true; 
    723699} 
     
    733709 
    734710    debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n"); 
     711 
     712    dumpInfo(); 
    735713 
    736714    /* 
     
    743721     * 3) Re-enable the SP's 
    744722     */ 
    745     debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n"); 
    746         if (!disableStreamProcessors()) { 
    747         debugFatal("Could not disable StreamProcessors...\n"); 
     723 
     724    // put all SP's back into dry-running state 
     725    if (!startDryRunning()) { 
     726        debugFatal("Could not put SP's in dry-running state\n"); 
    748727        return false; 
    749728    } 
     
    771750bool StreamProcessorManager::waitForPeriod() { 
    772751    int time_till_next_period; 
    773     bool xrun_occurred=false; 
     752    bool xrun_occurred = false; 
    774753 
    775754    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n"); 
     
    799778            xrun_occurred |= (*it)->xrunOccurred(); 
    800779        } 
     780        if(xrun_occurred) break; 
    801781 
    802782        // check if we were waked up too soon 
    803         time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); 
    804     } 
    805  
    806     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", time_till_next_period); 
    807  
    808     // this is to notify the client of the delay 
    809     // that we introduced 
    810     m_delayed_usecs=time_till_next_period; 
     783        time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); 
     784    } 
    811785 
    812786    // we save the 'ideal' time of the transfer at this point, 
     
    816790    // NOTE: before waitForPeriod() is called again, both the transmit 
    817791    //       and the receive processors should have done their transfer. 
    818     m_time_of_transfer=m_SyncSource->getTimeAtPeriod(); 
     792    m_time_of_transfer = m_SyncSource->getTimeAtPeriod(); 
    819793    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n", 
    820794        m_time_of_transfer); 
     795 
     796    // normally we can transfer frames at this time, but in some cases this is not true 
     797    // e.g. when there are not enough frames in the receive buffer. 
     798    // however this doesn't have to be a problem, since we can wait some more until we 
     799    // have enough frames. There is only a problem once the ISO xmit doesn't have packets 
     800    // to transmit, or if the receive buffer overflows. These conditions are signaled by 
     801    // the iso threads 
     802    // check if xruns occurred on the Iso side. 
     803    // also check if xruns will occur should we transfer() now 
     804    #ifdef DEBUG 
     805    int waited = 0; 
     806    #endif 
     807    bool ready_for_transfer = false; 
     808    xrun_occurred = false; 
     809    while (!ready_for_transfer && !xrun_occurred) { 
     810        ready_for_transfer = true; 
     811        for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     812            it != m_ReceiveProcessors.end(); 
     813            ++it ) { 
     814            ready_for_transfer &= ((*it)->canClientTransferFrames(m_period)); 
     815            xrun_occurred |= (*it)->xrunOccurred(); 
     816        } 
     817        for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     818            it != m_TransmitProcessors.end(); 
     819            ++it ) { 
     820            ready_for_transfer &= ((*it)->canClientTransferFrames(m_period)); 
     821            xrun_occurred |= (*it)->xrunOccurred(); 
     822        } 
     823        if (!ready_for_transfer) { 
     824            usleep(125); // MAGIC: one cycle sleep... 
     825 
     826            // in order to avoid this in the future, we increase the sync delay of the sync source SP 
     827            int d = m_SyncSource->getSyncDelay() + TICKS_PER_CYCLE; 
     828            m_SyncSource->setSyncDelay(d); 
     829 
     830            #ifdef DEBUG 
     831            waited++; 
     832            #endif 
     833        } 
     834    } // we are either ready or an xrun occurred 
     835 
     836    #ifdef DEBUG 
     837    if(waited > 0) { 
     838        debugOutput(DEBUG_LEVEL_VERBOSE, "Waited %d x 125us due to SP not ready for transfer\n", waited); 
     839    } 
     840    #endif 
     841 
     842    // this is to notify the client of the delay that we introduced by waiting 
     843    m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); 
     844    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", m_delayed_usecs); 
    821845 
    822846#ifdef DEBUG 
     
    833857    } 
    834858    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n", 
    835         m_time_of_transfer,rcv_bf,xmt_bf,rcv_bf+xmt_bf); 
    836  
    837 #endif 
    838  
    839     xrun_occurred=false; 
     859        m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf); 
    840860 
    841861    // check if xruns occurred on the Iso side. 
    842862    // also check if xruns will occur should we transfer() now 
    843  
    844863    for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    845864          it != m_ReceiveProcessors.end(); 
    846865          ++it ) { 
    847         // a xrun has occurred on the Iso side 
    848         xrun_occurred |= (*it)->xrunOccurred(); 
    849  
    850         // if this is true, a xrun will occur 
    851         xrun_occurred |= !((*it)->canClientTransferFrames(m_period)); 
    852  
    853 #ifdef DEBUG 
     866 
    854867        if ((*it)->xrunOccurred()) { 
    855             debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it); 
     868            debugWarning("Xrun on RECV SP %p due to ISO side xrun\n",*it); 
    856869            (*it)->dumpInfo(); 
    857870        } 
    858871        if (!((*it)->canClientTransferFrames(m_period))) { 
    859             debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it); 
     872            debugWarning("Xrun on RECV SP %p due to buffer side xrun\n",*it); 
    860873            (*it)->dumpInfo(); 
    861874        } 
    862 #endif 
    863  
    864875    } 
    865876    for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    866877          it != m_TransmitProcessors.end(); 
    867878          ++it ) { 
    868         // a xrun has occurred on the Iso side 
    869         xrun_occurred |= (*it)->xrunOccurred(); 
    870  
    871         // if this is true, a xrun will occur 
    872         xrun_occurred |= !((*it)->canClientTransferFrames(m_period)); 
    873  
    874 #ifdef DEBUG 
    875879        if ((*it)->xrunOccurred()) { 
    876             debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it); 
     880            debugWarning("Xrun on XMIT SP %p due to ISO side xrun\n",*it); 
    877881        } 
    878882        if (!((*it)->canClientTransferFrames(m_period))) { 
    879             debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it); 
    880         } 
     883            debugWarning("Xrun on XMIT SP %p due to buffer side xrun\n",*it); 
     884        } 
     885    } 
    881886#endif 
    882     } 
    883887 
    884888    m_nbperiods++; 
    885  
    886889    // now we can signal the client that we are (should be) ready 
    887890    return !xrun_occurred; 
     
    896899 */ 
    897900bool StreamProcessorManager::transfer() { 
    898  
    899     debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n"); 
    900  
    901     if (!transfer(StreamProcessor::E_Receive)) return false; 
    902     if (!transfer(StreamProcessor::E_Transmit)) return false; 
    903  
    904     return true; 
     901    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n"); 
     902    bool retval=true; 
     903    retval &= transfer(StreamProcessor::ePT_Receive); 
     904    retval &= transfer(StreamProcessor::ePT_Transmit); 
     905    return retval; 
    905906} 
    906907 
     
    913914 * @return true if successful, false otherwise (indicates xrun). 
    914915 */ 
    915  
    916 bool StreamProcessorManager::transfer(enum StreamProcessor::EProcessorType t) { 
    917     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n"); 
    918  
     916bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) { 
     917    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n",  
     918        t, m_time_of_transfer, 
     919        (unsigned int)TICKS_TO_SECS(m_time_of_transfer), 
     920        (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer), 
     921        (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer)); 
     922 
     923    bool retval = true; 
    919924    // a static cast could make sure that there is no performance 
    920925    // penalty for the virtual functions (to be checked) 
    921     if (t==StreamProcessor::E_Receive) { 
    922          
    923         // determine the time at which we want reception to start 
    924         float rate=m_SyncSource->getTicksPerFrame(); 
    925         int64_t one_frame_in_ticks=(int64_t)(((float)m_period)*rate); 
    926          
    927         int64_t receive_timestamp = substractTicks(m_time_of_transfer,one_frame_in_ticks); 
    928          
    929         if(receive_timestamp<0) { 
    930             debugWarning("receive ts < 0.0 : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n", 
    931              receive_timestamp, m_time_of_transfer, one_frame_in_ticks); 
    932         } 
    933         if(receive_timestamp>(128L*TICKS_PER_SECOND)) { 
    934             debugWarning("receive ts > 128L*TICKS_PER_SECOND : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n", 
    935              receive_timestamp, m_time_of_transfer, one_frame_in_ticks); 
    936         } 
    937          
     926    if (t==StreamProcessor::ePT_Receive) { 
    938927        for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
    939928                it != m_ReceiveProcessors.end(); 
    940929                ++it ) { 
    941  
    942             if(!(*it)->getFrames(m_period, receive_timestamp)) { 
    943                     debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)\n", 
     930            if(!(*it)->getFrames(m_period, m_time_of_transfer)) { 
     931                    debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n", 
    944932                            m_period, m_time_of_transfer,*it); 
    945                     return false; // buffer underrun 
    946             } 
    947  
     933                retval &= false; // buffer underrun 
     934            } 
    948935        } 
    949936    } else { 
     937        // FIXME: in the SPM it would be nice to have system time instead of 
     938        //        1394 time 
     939        float rate = m_SyncSource->getTicksPerFrame(); 
     940        int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate); 
     941 
     942        // the data we are putting into the buffer is intended to be transmitted 
     943        // one ringbuffer size after it has been received 
     944        int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks); 
     945 
    950946        for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
    951947                it != m_TransmitProcessors.end(); 
    952948                ++it ) { 
    953  
    954             if(!(*it)->putFrames(m_period, (int64_t)m_time_of_transfer)) { 
    955                 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)\n", 
    956                         m_period, m_time_of_transfer, *it); 
    957                 return false; // buffer overrun 
    958             } 
    959  
    960         } 
    961     } 
    962  
    963     return true; 
     949            // FIXME: in the SPM it would be nice to have system time instead of 
     950            //        1394 time 
     951            if(!(*it)->putFrames(m_period, transmit_timestamp)) { 
     952                debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n", 
     953                        m_period, transmit_timestamp, *it); 
     954                retval &= false; // buffer underrun 
     955            } 
     956        } 
     957    } 
     958    return retval; 
     959
     960 
     961/** 
     962 * @brief Transfer one period of silence for both receive and transmit StreamProcessors 
     963 * 
     964 * Transfers one period of silence to the Iso side for transmit SP's 
     965 * or dump one period of frames for receive SP's 
     966 * 
     967 * @return true if successful, false otherwise (indicates xrun). 
     968 */ 
     969bool StreamProcessorManager::transferSilence() { 
     970    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n"); 
     971    bool retval=true; 
     972    retval &= transferSilence(StreamProcessor::ePT_Receive); 
     973    retval &= transferSilence(StreamProcessor::ePT_Transmit); 
     974    return retval; 
     975
     976 
     977/** 
     978 * @brief Transfer one period of silence for either the receive or transmit StreamProcessors 
     979 * 
     980 * Transfers one period of silence to the Iso side for transmit SP's 
     981 * or dump one period of frames for receive SP's 
     982 * 
     983 * @param t The processor type to tranfer for (receive or transmit) 
     984 * @return true if successful, false otherwise (indicates xrun). 
     985 */ 
     986bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) { 
     987    debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n",  
     988        t, m_time_of_transfer, 
     989        (unsigned int)TICKS_TO_SECS(m_time_of_transfer), 
     990        (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer), 
     991        (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer)); 
     992 
     993    bool retval = true; 
     994    // a static cast could make sure that there is no performance 
     995    // penalty for the virtual functions (to be checked) 
     996    if (t==StreamProcessor::ePT_Receive) { 
     997        for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 
     998                it != m_ReceiveProcessors.end(); 
     999                ++it ) { 
     1000            if(!(*it)->dropFrames(m_period, m_time_of_transfer)) { 
     1001                    debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n", 
     1002                            m_period, m_time_of_transfer,*it); 
     1003                retval &= false; // buffer underrun 
     1004            } 
     1005        } 
     1006    } else { 
     1007        // FIXME: in the SPM it would be nice to have system time instead of 
     1008        //        1394 time 
     1009        float rate = m_SyncSource->getTicksPerFrame(); 
     1010        int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate); 
     1011 
     1012        // the data we are putting into the buffer is intended to be transmitted 
     1013        // one ringbuffer size after it has been received 
     1014        int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks); 
     1015 
     1016        for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 
     1017                it != m_TransmitProcessors.end(); 
     1018                ++it ) { 
     1019            // FIXME: in the SPM it would be nice to have system time instead of 
     1020            //        1394 time 
     1021            if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) { 
     1022                debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n", 
     1023                        m_period, transmit_timestamp, *it); 
     1024                retval &= false; // buffer underrun 
     1025            } 
     1026        } 
     1027    } 
     1028    return retval; 
    9641029} 
    9651030 
  • trunk/libffado/src/libstreaming/StreamProcessorManager.h

    r445 r734  
    2525#define __FFADO_STREAMPROCESSORMANAGER__ 
    2626 
     27#include "generic/Port.h" 
     28#include "generic/StreamProcessor.h" 
     29#include "util/IsoHandlerManager.h" 
     30 
    2731#include "debugmodule/debugmodule.h" 
    2832#include "libutil/Thread.h" 
    2933#include "libutil/OptionContainer.h" 
    30 #include <semaphore.h> 
    31 #include "Port.h" 
    32 #include "StreamProcessor.h" 
    33 #include "IsoHandlerManager.h" 
    3434 
    3535#include <vector> 
     36#include <semaphore.h> 
    3637 
    3738namespace Streaming { 
     
    5253public: 
    5354 
    54     StreamProcessorManager(unsigned int period, unsigned int nb_buffers); 
     55    StreamProcessorManager(unsigned int period, unsigned int rate, unsigned int nb_buffers); 
    5556    virtual ~StreamProcessorManager(); 
    5657 
     
    6162    bool stop(); 
    6263 
     64    bool startDryRunning(); 
    6365    bool syncStartAll(); 
    6466 
     
    6769    bool unregisterProcessor(StreamProcessor *processor); ///< stop managing a streamprocessor 
    6870 
    69     bool enableStreamProcessors(uint64_t time_to_enable_at); /// enable registered StreamProcessors 
    70     bool disableStreamProcessors(); /// disable registered StreamProcessors 
    71  
    7271    void setPeriodSize(unsigned int period); 
    7372    void setPeriodSize(unsigned int period, unsigned int nb_buffers); 
    74     int getPeriodSize() {return m_period;}; 
     73    unsigned int getPeriodSize() {return m_period;}; 
    7574 
    7675    void setNbBuffers(unsigned int nb_buffers); 
     
    8281 
    8382    // the client-side functions 
     83    bool waitForPeriod(); 
     84    bool transfer(); 
     85    bool transfer(enum StreamProcessor::eProcessorType); 
     86private: 
     87    bool transferSilence(); 
     88    bool transferSilence(enum StreamProcessor::eProcessorType); 
    8489 
    85     bool waitForPeriod(); ///< wait for the next period 
    86  
    87     bool transfer(); ///< transfer the buffer contents from/to client 
    88     bool transfer(enum StreamProcessor::EProcessorType); ///< transfer the buffer contents from/to client (single processor type) 
    89  
     90    bool alignReceivedStreams(); 
     91public: 
    9092    int getDelayedUsecs() {return m_delayed_usecs;}; 
    9193    bool xrunOccurred(); 
    9294    int getXrunCount() {return m_xruns;}; 
     95 
     96    unsigned int getNominalRate() {return m_nominal_framerate;}; 
     97    uint64_t getTimeOfLastTransfer() { return m_time_of_transfer;}; 
    9398 
    9499private: 
     
    116121public: 
    117122    bool setSyncSource(StreamProcessor *s); 
    118     StreamProcessor * getSyncSource(); 
     123    StreamProcessor& getSyncSource() 
     124        {return *m_SyncSource;}; 
    119125 
    120126protected: 
     
    132138    unsigned int m_nb_buffers; 
    133139    unsigned int m_period; 
     140    unsigned int m_nominal_framerate; 
    134141    unsigned int m_xruns; 
    135142