Changeset 734 for trunk/libffado/src/libstreaming
- Timestamp:
- 11/28/07 05:03:31 (15 years ago)
- Files:
-
- trunk/libffado/src/libstreaming/amdtp (copied) (copied from branches/ppalmers-streaming/src/libstreaming/amdtp)
- trunk/libffado/src/libstreaming/AmdtpPort.cpp (deleted)
- trunk/libffado/src/libstreaming/AmdtpPort.h (deleted)
- trunk/libffado/src/libstreaming/AmdtpPortInfo.cpp (deleted)
- trunk/libffado/src/libstreaming/AmdtpPortInfo.h (deleted)
- trunk/libffado/src/libstreaming/AmdtpSlaveStreamProcessor.cpp (deleted)
- trunk/libffado/src/libstreaming/AmdtpSlaveStreamProcessor.h (deleted)
- trunk/libffado/src/libstreaming/AmdtpStreamProcessor.cpp (deleted)
- trunk/libffado/src/libstreaming/AmdtpStreamProcessor.h (deleted)
- trunk/libffado/src/libstreaming/cip.c (deleted)
- trunk/libffado/src/libstreaming/cip.h (deleted)
- trunk/libffado/src/libstreaming/cycletimer.h (deleted)
- trunk/libffado/src/libstreaming/generic (copied) (copied from branches/ppalmers-streaming/src/libstreaming/generic)
- trunk/libffado/src/libstreaming/IsoHandler.cpp (deleted)
- trunk/libffado/src/libstreaming/IsoHandler.h (deleted)
- trunk/libffado/src/libstreaming/IsoHandlerManager.cpp (deleted)
- trunk/libffado/src/libstreaming/IsoHandlerManager.h (deleted)
- trunk/libffado/src/libstreaming/IsoStream.cpp (deleted)
- trunk/libffado/src/libstreaming/IsoStream.h (deleted)
- trunk/libffado/src/libstreaming/motu (copied) (copied from branches/ppalmers-streaming/src/libstreaming/motu)
- trunk/libffado/src/libstreaming/MotuPort.cpp (deleted)
- trunk/libffado/src/libstreaming/MotuPort.h (deleted)
- trunk/libffado/src/libstreaming/MotuPortInfo.cpp (deleted)
- trunk/libffado/src/libstreaming/MotuPortInfo.h (deleted)
- trunk/libffado/src/libstreaming/MotuStreamProcessor.cpp (deleted)
- trunk/libffado/src/libstreaming/MotuStreamProcessor.h (deleted)
- trunk/libffado/src/libstreaming/Port.cpp (deleted)
- trunk/libffado/src/libstreaming/Port.h (deleted)
- trunk/libffado/src/libstreaming/PortManager.cpp (deleted)
- trunk/libffado/src/libstreaming/PortManager.h (deleted)
- trunk/libffado/src/libstreaming/StreamProcessor.cpp (deleted)
- trunk/libffado/src/libstreaming/StreamProcessor.h (deleted)
- trunk/libffado/src/libstreaming/StreamProcessorManager.cpp (modified) (24 diffs)
- trunk/libffado/src/libstreaming/StreamProcessorManager.h (modified) (7 diffs)
- trunk/libffado/src/libstreaming/util (copied) (copied from branches/ppalmers-streaming/src/libstreaming/util)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/libffado/src/libstreaming/StreamProcessorManager.cpp
r512 r734 23 23 24 24 #include "StreamProcessorManager.h" 25 #include "StreamProcessor.h" 26 #include "Port.h" 25 #include "generic/StreamProcessor.h" 26 #include "generic/Port.h" 27 #include "util/cycletimer.h" 28 27 29 #include <errno.h> 28 30 #include <assert.h> 29 30 #include "libstreaming/cycletimer.h" 31 32 #define CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL 5 31 #include <math.h> 33 32 34 33 #define RUNNING_TIMEOUT_MSEC 4000 … … 36 35 #define ENABLE_TIMEOUT_MSEC 4000 37 36 38 //#define ENABLE_DELAY_CYCLES 100 39 #define ENABLE_DELAY_CYCLES 1000 37 // allows to add some processing margin. This shifts the time 38 // at which the buffer is transfer()'ed, making things somewhat 39 // more robust. It should be noted though that shifting the transfer 40 // time to a later time instant also causes the xmit buffer fill to be 41 // lower on average. 42 #define FFADO_SIGNAL_DELAY_TICKS 3072*4 40 43 41 44 namespace Streaming { 42 45 43 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_ NORMAL);44 45 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int nb_buffers)46 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE ); 47 48 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int framerate, unsigned int nb_buffers) 46 49 : m_is_slave( false ) 47 50 , m_SyncSource(NULL) 48 51 , m_nb_buffers(nb_buffers) 49 52 , m_period(period) 53 , m_nominal_framerate ( framerate ) 50 54 , m_xruns(0) 51 55 , m_isoManager(0) … … 57 61 StreamProcessorManager::~StreamProcessorManager() { 58 62 if (m_isoManager) delete m_isoManager; 59 60 63 } 61 64 … … 77 80 assert(m_isoManager); 78 81 79 if (processor->getType() ==StreamProcessor::E_Receive) {82 if (processor->getType() == StreamProcessor::ePT_Receive) { 80 83 processor->setVerboseLevel(getDebugLevel()); // inherit debug level 81 84 82 85 m_ReceiveProcessors.push_back(processor); 83 84 86 processor->setManager(this); 85 86 87 return true; 87 88 } 88 89 89 if (processor->getType() ==StreamProcessor::E_Transmit) {90 if (processor->getType() == StreamProcessor::ePT_Transmit) { 90 91 processor->setVerboseLevel(getDebugLevel()); // inherit debug level 91 92 92 93 m_TransmitProcessors.push_back(processor); 93 94 94 processor->setManager(this); 95 96 95 return true; 97 96 } 98 97 99 98 debugFatal("Unsupported processor type!\n"); 100 101 99 return false; 102 100 } … … 107 105 assert(processor); 108 106 109 if (processor->getType()==StreamProcessor:: E_Receive) {107 if (processor->getType()==StreamProcessor::ePT_Receive) { 110 108 111 109 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 112 it != m_ReceiveProcessors.end();113 ++it ) {114 110 it != m_ReceiveProcessors.end(); 111 ++it ) 112 { 115 113 if ( *it == processor ) { 116 m_ReceiveProcessors.erase(it); 117 118 processor->clearManager(); 119 120 if(!m_isoManager->unregisterStream(processor)) { 121 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n"); 122 123 return false; 124 125 } 126 127 return true; 114 m_ReceiveProcessors.erase(it); 115 processor->clearManager(); 116 if(!m_isoManager->unregisterStream(processor)) { 117 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor from the Iso manager\n"); 118 return false; 128 119 } 129 } 130 } 131 132 if (processor->getType()==StreamProcessor::E_Transmit) { 120 return true; 121 } 122 } 123 } 124 125 if (processor->getType()==StreamProcessor::ePT_Transmit) { 133 126 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 134 it != m_TransmitProcessors.end();135 ++it ) {136 127 it != m_TransmitProcessors.end(); 128 ++it ) 129 { 137 130 if ( *it == processor ) { 138 m_TransmitProcessors.erase(it); 139 140 processor->clearManager(); 141 142 if(!m_isoManager->unregisterStream(processor)) { 143 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n"); 144 145 return false; 146 147 } 148 149 return true; 131 m_TransmitProcessors.erase(it); 132 processor->clearManager(); 133 if(!m_isoManager->unregisterStream(processor)) { 134 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor from the Iso manager\n"); 135 return false; 150 136 } 137 return true; 138 } 151 139 } 152 140 } 153 141 154 142 debugFatal("Processor (%p) not found!\n",processor); 155 156 143 return false; //not found 157 158 144 } 159 145 160 146 bool StreamProcessorManager::setSyncSource(StreamProcessor *s) { 161 147 debugOutput( DEBUG_LEVEL_VERBOSE, "Setting sync source to (%p)\n", s); 162 163 148 m_SyncSource=s; 164 149 return true; 165 }166 167 StreamProcessor *StreamProcessorManager::getSyncSource() {168 return m_SyncSource;169 150 } 170 151 … … 172 153 { 173 154 debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); 174 175 m_isoManager=new IsoHandlerManager(m_thread_realtime, m_thread_priority); 176 155 m_isoManager = new IsoHandlerManager(m_thread_realtime, m_thread_priority + 1); 177 156 if(!m_isoManager) { 178 157 debugFatal("Could not create IsoHandlerManager\n"); 179 158 return false; 180 159 } 181 182 // propagate the debug level183 160 m_isoManager->setVerboseLevel(getDebugLevel()); 161 162 // try to queue up 75% of the frames in the transmit buffer 163 unsigned int nb_frames = (getNbBuffers() - 1) * getPeriodSize() * 1000 / 2000; 164 m_isoManager->setTransmitBufferNbFrames(nb_frames); 184 165 185 166 if(!m_isoManager->init()) { … … 189 170 190 171 m_xrun_happened=false; 191 192 172 return true; 193 173 } … … 207 187 } 208 188 189 // FIXME: put into separate method 190 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 191 it != m_ReceiveProcessors.end(); 192 ++it ) 193 { 194 if(m_SyncSource == NULL) { 195 debugWarning(" => Sync Source is %p.\n", *it); 196 m_SyncSource = *it; 197 } 198 } 199 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 200 it != m_TransmitProcessors.end(); 201 ++it ) 202 { 203 if(m_SyncSource == NULL) { 204 debugWarning(" => Sync Source is %p.\n", *it); 205 m_SyncSource = *it; 206 } 207 } 208 209 // now do the actual preparation of the SP's 210 debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n"); 209 211 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 210 212 it != m_ReceiveProcessors.end(); 211 213 ++it ) { 212 if(m_SyncSource == NULL) {213 debugWarning(" => Sync Source is %p.\n", *it);214 m_SyncSource = *it;215 }216 }217 218 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();219 it != m_TransmitProcessors.end();220 ++it ) {221 if(m_SyncSource == NULL) {222 debugWarning(" => Sync Source is %p.\n", *it);223 m_SyncSource = *it;224 }225 }226 227 // now do the actual preparation228 debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Receive processors...\n");229 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();230 it != m_ReceiveProcessors.end();231 ++it ) {232 233 if(!(*it)->setSyncSource(m_SyncSource)) {234 debugFatal( " could not set sync source (%p)...\n",(*it));235 return false;236 }237 214 238 215 if(!(*it)->setOption("slaveMode", m_is_slave)) { … … 245 222 } 246 223 } 247 248 224 debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare Transmit processors...\n"); 249 225 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 250 226 it != m_TransmitProcessors.end(); 251 227 ++it ) { 252 if(!(*it)->setSyncSource(m_SyncSource)) {253 debugFatal( " could not set sync source (%p)...\n",(*it));254 return false;255 }256 228 if(!(*it)->setOption("slaveMode", m_is_slave)) { 257 229 debugOutput(DEBUG_LEVEL_VERBOSE, " note: could not set slaveMode option for (%p)...\n",(*it)); … … 269 241 return false; 270 242 } 271 272 243 return true; 273 244 } 274 245 275 bool StreamProcessorManager::syncStartAll() { 276 277 debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for StreamProcessor streams to start running...\n"); 278 // we have to wait until all streamprocessors indicate that they are running 279 // i.e. that there is actually some data stream flowing 280 int wait_cycles=RUNNING_TIMEOUT_MSEC; // two seconds 281 bool notRunning=true; 282 while (notRunning && wait_cycles) { 283 wait_cycles--; 284 notRunning=false; 285 246 bool StreamProcessorManager::startDryRunning() { 247 debugOutput( DEBUG_LEVEL_VERBOSE, "Putting StreamProcessor streams into dry-running state...\n"); 248 debugOutput( DEBUG_LEVEL_VERBOSE, " Schedule start dry-running...\n"); 249 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 250 it != m_ReceiveProcessors.end(); 251 ++it ) { 252 if (!(*it)->isDryRunning()) { 253 if(!(*it)->scheduleStartDryRunning(-1)) { 254 debugError("Could not put SP %p into the dry-running state\n", *it); 255 return false; 256 } 257 } else { 258 debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it); 259 } 260 } 261 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 262 it != m_TransmitProcessors.end(); 263 ++it ) { 264 if (!(*it)->isDryRunning()) { 265 if(!(*it)->scheduleStartDryRunning(-1)) { 266 debugError("Could not put SP %p into the dry-running state\n", *it); 267 return false; 268 } 269 } else { 270 debugOutput( DEBUG_LEVEL_VERBOSE, " SP %p already dry-running...\n", *it); 271 } 272 } 273 debugOutput( DEBUG_LEVEL_VERBOSE, " Waiting for all SP's to be dry-running...\n"); 274 // wait for the syncsource to start running. 275 // that will block the waitForPeriod call until everyone has started (theoretically) 276 #define CYCLES_FOR_DRYRUN 40000 277 int cnt = CYCLES_FOR_DRYRUN; // by then it should have started 278 bool all_dry_running = false; 279 while (!all_dry_running && cnt) { 280 all_dry_running = true; 286 281 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 287 282 it != m_ReceiveProcessors.end(); 288 283 ++it ) { 289 if(!(*it)->isRunning()) notRunning=true; 290 } 291 284 all_dry_running &= (*it)->isDryRunning(); 285 } 292 286 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 293 287 it != m_TransmitProcessors.end(); 294 288 ++it ) { 295 if(!(*it)->isRunning()) notRunning=true; 296 } 297 298 // EXPERIMENT: 299 // the only stream that should be running is the sync 300 // source stream, as this is the one that defines 301 // when to signal buffers. Maybe we get an xrun at startup, 302 // but that should be handled. 303 304 // the problem is that otherwise a setup with a device 305 // that waits for decent input before sending output 306 // will not start up (e.g. the bounce device), because 307 // all streams are required to be running. 308 309 // other streams still have at least ENABLE_DELAY_CYCLES cycles 310 // to start up 311 // if(!m_SyncSource->isRunning()) notRunning=true; 312 313 usleep(1000); 314 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Running check: %d\n",notRunning); 315 } 316 317 if(!wait_cycles) { // timout has occurred 318 debugFatal("One or more streams are not starting up (timeout):\n"); 319 289 all_dry_running &= (*it)->isDryRunning(); 290 } 291 292 usleep(125); 293 cnt--; 294 } 295 if(cnt==0) { 296 debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n"); 320 297 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 321 298 it != m_ReceiveProcessors.end(); 322 299 ++it ) { 323 if(!(*it)->isRunning()) { 324 debugFatal(" receive stream %p not running\n",*it); 325 } else { 326 debugFatal(" receive stream %p running\n",*it); 327 } 328 } 329 300 debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n", 301 (*it)->getTypeString(), *it, (*it)->getStateString()); 302 } 330 303 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 331 304 it != m_TransmitProcessors.end(); 332 305 ++it ) { 333 if(!(*it)->isRunning()) { 334 debugFatal(" transmit stream %p not running\n",*it); 335 } else { 336 debugFatal(" transmit stream %p running\n",*it); 337 } 338 } 339 return false; 340 } 341 342 // we want to make sure that everything is running well, 343 // so wait for a while 344 usleep(USECS_PER_CYCLE * CYCLES_TO_SLEEP_AFTER_RUN_SIGNAL); 345 306 debugOutput( DEBUG_LEVEL_VERBOSE, " %s SP %p has state %s\n", 307 (*it)->getTypeString(), *it, (*it)->getStateString()); 308 } 309 return false; 310 } 311 debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams dry-running...\n"); 312 return true; 313 } 314 315 bool StreamProcessorManager::syncStartAll() { 316 // figure out when to get the SP's running. 317 // the xmit SP's should also know the base timestamp 318 // streams should be aligned here 319 320 // now find out how long we have to delay the wait operation such that 321 // the received frames will all be presented to the SP 322 debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n"); 323 int max_of_min_delay = 0; 324 int min_delay = 0; 325 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 326 it != m_ReceiveProcessors.end(); 327 ++it ) { 328 min_delay = (*it)->getMaxFrameLatency(); 329 if(min_delay > max_of_min_delay) max_of_min_delay = min_delay; 330 } 331 332 // add some processing margin. This only shifts the time 333 // at which the buffer is transfer()'ed. This makes things somewhat 334 // more robust. It should be noted though that shifting the transfer 335 // time to a later time instant also causes the xmit buffer fill to be 336 // lower on average. 337 max_of_min_delay += FFADO_SIGNAL_DELAY_TICKS; 338 debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d ticks (%03us %04uc %04ut)...\n", 339 max_of_min_delay, 340 (unsigned int)TICKS_TO_SECS(max_of_min_delay), 341 (unsigned int)TICKS_TO_CYCLES(max_of_min_delay), 342 (unsigned int)TICKS_TO_OFFSET(max_of_min_delay)); 343 m_SyncSource->setSyncDelay(max_of_min_delay); 344 345 //STEP X: when we implement such a function, we can wait for a signal from the devices that they 346 // have aquired lock 347 //debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for device(s) to indicate clock sync lock...\n"); 348 //sleep(2); // FIXME: be smarter here 349 350 // make sure that we are dry-running long enough for the 351 // DLL to have a decent sync (FIXME: does the DLL get updated when dry-running)? 352 debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for sync...\n"); 353 int nb_sync_runs=20; 354 int64_t time_till_next_period; 355 while(nb_sync_runs--) { // or while not sync-ed? 356 // check if we were woken up too soon 357 time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); 358 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period); 359 if(time_till_next_period > 0) { 360 // wait for the period 361 usleep(time_till_next_period); 362 } 363 } 364 365 debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n"); 366 // FIXME: in the SPM it would be nice to have system time instead of 367 // 1394 time 368 369 // we now should have decent sync info on the sync source 370 // determine a point in time where the system should start 371 // figure out where we are now 372 uint64_t time_of_first_sample = m_SyncSource->getTimeAtPeriod(); 373 debugOutput( DEBUG_LEVEL_VERBOSE, " sync at TS=%011llu (%03us %04uc %04ut)...\n", 374 time_of_first_sample, 375 (unsigned int)TICKS_TO_SECS(time_of_first_sample), 376 (unsigned int)TICKS_TO_CYCLES(time_of_first_sample), 377 (unsigned int)TICKS_TO_OFFSET(time_of_first_sample)); 378 379 #define CYCLES_FOR_STARTUP 2000 380 // start wet-running in CYCLES_FOR_STARTUP cycles 381 // this is the time window we have to setup all SP's such that they 382 // can start wet-running correctly. 383 time_of_first_sample = addTicks(time_of_first_sample, 384 CYCLES_FOR_STARTUP * TICKS_PER_CYCLE); 385 386 debugOutput( DEBUG_LEVEL_VERBOSE, " => first sample at TS=%011llu (%03us %04uc %04ut)...\n", 387 time_of_first_sample, 388 (unsigned int)TICKS_TO_SECS(time_of_first_sample), 389 (unsigned int)TICKS_TO_CYCLES(time_of_first_sample), 390 (unsigned int)TICKS_TO_OFFSET(time_of_first_sample)); 391 392 // we should start wet-running the transmit SP's some cycles in advance 393 // such that we know it is wet-running when it should output its first sample 394 #define PRESTART_CYCLES_FOR_XMIT 20 395 uint64_t time_to_start_xmit = substractTicks(time_of_first_sample, 396 PRESTART_CYCLES_FOR_XMIT * TICKS_PER_CYCLE); 397 398 #define PRESTART_CYCLES_FOR_RECV 0 399 uint64_t time_to_start_recv = substractTicks(time_of_first_sample, 400 PRESTART_CYCLES_FOR_RECV * TICKS_PER_CYCLE); 401 debugOutput( DEBUG_LEVEL_VERBOSE, " => xmit starts at TS=%011llu (%03us %04uc %04ut)...\n", 402 time_to_start_xmit, 403 (unsigned int)TICKS_TO_SECS(time_to_start_xmit), 404 (unsigned int)TICKS_TO_CYCLES(time_to_start_xmit), 405 (unsigned int)TICKS_TO_OFFSET(time_to_start_xmit)); 406 debugOutput( DEBUG_LEVEL_VERBOSE, " => recv starts at TS=%011llu (%03us %04uc %04ut)...\n", 407 time_to_start_recv, 408 (unsigned int)TICKS_TO_SECS(time_to_start_recv), 409 (unsigned int)TICKS_TO_CYCLES(time_to_start_recv), 410 (unsigned int)TICKS_TO_OFFSET(time_to_start_recv)); 411 412 // at this point the buffer head timestamp of the transmit buffers can be set 413 // this is the presentation time of the first sample in the buffer 414 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 415 it != m_TransmitProcessors.end(); 416 ++it ) { 417 (*it)->setBufferHeadTimestamp(time_of_first_sample); 418 } 419 420 // STEP X: switch SP's over to the running state 421 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 422 it != m_ReceiveProcessors.end(); 423 ++it ) { 424 if(!(*it)->scheduleStartRunning(time_to_start_recv)) { 425 debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_recv); 426 return false; 427 } 428 } 429 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 430 it != m_TransmitProcessors.end(); 431 ++it ) { 432 if(!(*it)->scheduleStartRunning(time_to_start_xmit)) { 433 debugError("%p->scheduleStartRunning(%11llu) failed\n", *it, time_to_start_xmit); 434 return false; 435 } 436 } 437 // wait for the syncsource to start running. 438 // that will block the waitForPeriod call until everyone has started (theoretically) 439 int cnt = CYCLES_FOR_STARTUP * 2; // by then it should have started 440 while (!m_SyncSource->isRunning() && cnt) { 441 usleep(125); 442 cnt--; 443 } 444 if(cnt==0) { 445 debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SyncSource to get started\n"); 446 return false; 447 } 448 449 // now align the received streams 450 if(!alignReceivedStreams()) { 451 debugError("Could not align streams\n"); 452 return false; 453 } 346 454 debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n"); 347 348 debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n"); 349 350 int max_of_min_delay=0; 351 int min_delay=0; 352 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 353 it != m_ReceiveProcessors.end(); 354 ++it ) { 355 min_delay=(*it)->getMinimalSyncDelay(); 356 if(min_delay>max_of_min_delay) max_of_min_delay=min_delay; 357 } 358 359 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 360 it != m_TransmitProcessors.end(); 361 ++it ) { 362 min_delay=(*it)->getMinimalSyncDelay(); 363 if(min_delay>max_of_min_delay) max_of_min_delay=min_delay; 364 } 365 366 debugOutput( DEBUG_LEVEL_VERBOSE, " %d ticks\n", max_of_min_delay); 367 m_SyncSource->setSyncDelay(max_of_min_delay); 368 369 370 debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting StreamProcessors...\n"); 371 // now we reset the frame counters 372 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 373 it != m_ReceiveProcessors.end(); 374 ++it ) { 375 (*it)->reset(); 376 } 377 378 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 379 it != m_TransmitProcessors.end(); 380 ++it ) { 381 (*it)->reset(); 382 } 383 384 debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors...\n"); 385 386 uint64_t now=m_SyncSource->getTimeNow(); // fixme: should be in usecs, not ticks 387 388 // FIXME: this should not be in cycles, but in 'time' 389 unsigned int enable_at=TICKS_TO_CYCLES(now)+ENABLE_DELAY_CYCLES; 390 if (enable_at > 8000) enable_at -= 8000; 391 392 if (!enableStreamProcessors(enable_at)) { 393 debugFatal("Could not enable StreamProcessors...\n"); 394 return false; 395 } 396 455 return true; 456 } 457 458 bool 459 StreamProcessorManager::alignReceivedStreams() 460 { 461 #define NB_PERIODS_FOR_ALIGN_AVERAGE 20 462 #define NB_ALIGN_TRIES 20 463 debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n"); 464 unsigned int nb_sync_runs; 465 unsigned int nb_rcv_sp = m_ReceiveProcessors.size(); 466 int64_t diff_between_streams[nb_rcv_sp]; 467 int64_t diff; 468 469 unsigned int i; 470 471 bool aligned = false; 472 int cnt = NB_ALIGN_TRIES; 473 while (!aligned && cnt--) { 474 nb_sync_runs = NB_PERIODS_FOR_ALIGN_AVERAGE; 475 while(nb_sync_runs) { 476 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs); 477 waitForPeriod(); 478 479 i = 0; 480 for ( i = 0; i < nb_rcv_sp; i++) { 481 StreamProcessor *s = m_ReceiveProcessors.at(i); 482 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod()); 483 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " offset between SyncSP %p and SP %p is %lld ticks...\n", 484 m_SyncSource, s, diff); 485 if ( nb_sync_runs == NB_PERIODS_FOR_ALIGN_AVERAGE ) { 486 diff_between_streams[i] = diff; 487 } else { 488 diff_between_streams[i] += diff; 489 } 490 } 491 if(!transferSilence()) { 492 debugError("Could not transfer silence\n"); 493 return false; 494 } 495 nb_sync_runs--; 496 } 497 // calculate the average offsets 498 debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n"); 499 int diff_between_streams_frames[nb_rcv_sp]; 500 aligned = true; 501 for ( i = 0; i < nb_rcv_sp; i++) { 502 StreamProcessor *s = m_ReceiveProcessors.at(i); 503 504 diff_between_streams[i] /= NB_PERIODS_FOR_ALIGN_AVERAGE; 505 diff_between_streams_frames[i] = roundf(diff_between_streams[i] / s->getTicksPerFrame()); 506 debugOutput( DEBUG_LEVEL_VERBOSE, " avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n", 507 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]); 508 509 aligned &= (diff_between_streams_frames[i] == 0); 510 511 // reposition the stream 512 if(!s->shiftStream(diff_between_streams_frames[i])) { 513 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]); 514 return false; 515 } 516 } 517 if (!aligned) { 518 debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n"); 519 } 520 } 521 if (cnt == 0) { 522 debugError("Align failed\n"); 523 return false; 524 } 397 525 return true; 398 526 } … … 405 533 debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n"); 406 534 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 407 it != m_ReceiveProcessors.end(); 408 ++it ) { 409 if (!(*it)->prepareForStart()) { 410 debugOutput(DEBUG_LEVEL_VERBOSE,"Receive stream processor (%p) failed to prepare for start\n", *it); 411 return false; 412 } 413 if (!m_isoManager->registerStream(*it)) { 414 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it); 415 return false; 416 } 417 } 418 535 it != m_ReceiveProcessors.end(); 536 ++it ) 537 { 538 if (!m_isoManager->registerStream(*it)) { 539 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register receive stream processor (%p) with the Iso manager\n",*it); 540 return false; 541 } 542 } 419 543 debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n"); 420 544 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 421 it != m_TransmitProcessors.end(); 422 ++it ) { 423 if (!(*it)->prepareForStart()) { 424 debugOutput(DEBUG_LEVEL_VERBOSE,"Transmit stream processor (%p) failed to prepare for start\n", *it); 425 return false; 426 } 427 if (!m_isoManager->registerStream(*it)) { 428 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it); 429 return false; 430 } 431 } 545 it != m_TransmitProcessors.end(); 546 ++it ) 547 { 548 if (!m_isoManager->registerStream(*it)) { 549 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register transmit stream processor (%p) with the Iso manager\n",*it); 550 return false; 551 } 552 } 432 553 433 554 debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing IsoHandlerManager...\n"); … … 437 558 } 438 559 439 debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n");440 if (!disableStreamProcessors()) {441 debugFatal("Could not disable StreamProcessors...\n");442 return false;443 }444 445 560 debugOutput( DEBUG_LEVEL_VERBOSE, "Starting IsoHandlers...\n"); 446 561 if (!m_isoManager->startHandlers(-1)) { … … 449 564 } 450 565 566 // put all SP's into dry-running state 567 if (!startDryRunning()) { 568 debugFatal("Could not put SP's in dry-running state\n"); 569 return false; 570 } 571 451 572 // start all SP's synchonized 452 573 if (!syncStartAll()) { … … 461 582 462 583 return true; 463 464 584 } 465 585 … … 468 588 assert(m_isoManager); 469 589 470 debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to prepare to stop...\n"); 471 // Most stream processors can just stop without special treatment. However, some 472 // (like the MOTU) need to do a few things before it's safe to turn off the iso 473 // handling. 474 int wait_cycles=PREPARE_TIMEOUT_MSEC; // two seconds ought to be sufficient 475 bool allReady = false; 476 while (!allReady && wait_cycles) { 477 wait_cycles--; 478 allReady = true; 479 590 debugOutput( DEBUG_LEVEL_VERBOSE, " scheduling stop for all SP's...\n"); 591 592 // switch SP's over to the dry-running state 593 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 594 it != m_ReceiveProcessors.end(); 595 ++it ) { 596 if(!(*it)->scheduleStopRunning(-1)) { 597 debugError("%p->scheduleStopRunning(-1) failed\n", *it); 598 return false; 599 } 600 } 601 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 602 it != m_TransmitProcessors.end(); 603 ++it ) { 604 if(!(*it)->scheduleStopRunning(-1)) { 605 debugError("%p->scheduleStopRunning(-1) failed\n", *it); 606 return false; 607 } 608 } 609 // wait for the SP's to get into the dry-running state 610 int cnt = 200; 611 bool ready = false; 612 while (!ready && cnt) { 613 ready = true; 480 614 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 481 615 it != m_ReceiveProcessors.end(); 482 616 ++it ) { 483 if(!(*it)->prepareForStop()) allReady = false; 484 } 485 617 ready &= ((*it)->isDryRunning() || (*it)->isStopped()); 618 } 486 619 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 487 620 it != m_TransmitProcessors.end(); 488 621 ++it ) { 489 if(!(*it)->prepareForStop()) allReady = false; 490 } 491 usleep(1000); 492 } 493 622 ready &= ((*it)->isDryRunning() || (*it)->isStopped()); 623 } 624 usleep(125); 625 cnt--; 626 } 627 if(cnt==0) { 628 debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n"); 629 return false; 630 } 631 632 // switch SP's over to the stopped state 633 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 634 it != m_ReceiveProcessors.end(); 635 ++it ) { 636 if(!(*it)->scheduleStopDryRunning(-1)) { 637 debugError("%p->scheduleStopDryRunning(-1) failed\n", *it); 638 return false; 639 } 640 } 641 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 642 it != m_TransmitProcessors.end(); 643 ++it ) { 644 if(!(*it)->scheduleStopDryRunning(-1)) { 645 debugError("%p->scheduleStopDryRunning(-1) failed\n", *it); 646 return false; 647 } 648 } 649 // wait for the SP's to get into the running state 650 cnt = 200; 651 ready = false; 652 while (!ready && cnt) { 653 ready = true; 654 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 655 it != m_ReceiveProcessors.end(); 656 ++it ) { 657 ready &= (*it)->isStopped(); 658 } 659 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 660 it != m_TransmitProcessors.end(); 661 ++it ) { 662 ready &= (*it)->isStopped(); 663 } 664 usleep(125); 665 cnt--; 666 } 667 if(cnt==0) { 668 debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to stop\n"); 669 return false; 670 } 494 671 495 672 debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handlers...\n"); … … 503 680 debugOutput( DEBUG_LEVEL_VERBOSE, " Receive processors...\n"); 504 681 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 505 it != m_ReceiveProcessors.end(); 506 ++it ) { 507 if (!m_isoManager->unregisterStream(*it)) { 508 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it); 509 return false; 510 } 511 512 } 513 682 it != m_ReceiveProcessors.end(); 683 ++it ) { 684 if (!m_isoManager->unregisterStream(*it)) { 685 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister receive stream processor (%p) from the Iso manager\n",*it); 686 return false; 687 } 688 } 514 689 debugOutput( DEBUG_LEVEL_VERBOSE, " Transmit processors...\n"); 515 690 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 516 it != m_TransmitProcessors.end(); 517 ++it ) { 518 if (!m_isoManager->unregisterStream(*it)) { 519 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it); 520 return false; 521 } 522 523 } 524 525 return true; 526 527 } 528 529 /** 530 * Enables the registered StreamProcessors 531 * @return true if successful, false otherwise 532 */ 533 bool StreamProcessorManager::enableStreamProcessors(uint64_t time_to_enable_at) { 534 debugOutput( DEBUG_LEVEL_VERBOSE, "Enabling StreamProcessors at %llu...\n", time_to_enable_at); 535 536 debugOutput( DEBUG_LEVEL_VERBOSE, " Sync Source StreamProcessor (%p)...\n",m_SyncSource); 537 debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare...\n"); 538 if (!m_SyncSource->prepareForEnable(time_to_enable_at)) { 539 debugFatal("Could not prepare Sync Source StreamProcessor for enable()...\n"); 540 return false; 541 } 542 543 debugOutput( DEBUG_LEVEL_VERBOSE, " Enable...\n"); 544 m_SyncSource->enable(time_to_enable_at); 545 546 debugOutput( DEBUG_LEVEL_VERBOSE, " Other StreamProcessors...\n"); 547 548 // we prepare the streamprocessors for enable 549 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 550 it != m_ReceiveProcessors.end(); 551 ++it ) { 552 if(*it != m_SyncSource) { 553 debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Receive SP (%p)...\n",*it); 554 (*it)->prepareForEnable(time_to_enable_at); 555 } 556 } 557 558 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 559 it != m_TransmitProcessors.end(); 560 ++it ) { 561 if(*it != m_SyncSource) { 562 debugOutput( DEBUG_LEVEL_VERBOSE, " Prepare Transmit SP (%p)...\n",*it); 563 (*it)->prepareForEnable(time_to_enable_at); 564 } 565 } 566 567 // then we enable the streamprocessors 568 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 569 it != m_ReceiveProcessors.end(); 570 ++it ) { 571 if(*it != m_SyncSource) { 572 debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Receive SP (%p)...\n",*it); 573 (*it)->enable(time_to_enable_at); 574 } 575 } 576 577 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 578 it != m_TransmitProcessors.end(); 579 ++it ) { 580 if(*it != m_SyncSource) { 581 debugOutput( DEBUG_LEVEL_VERBOSE, " Enable Transmit SP (%p)...\n",*it); 582 (*it)->enable(time_to_enable_at); 583 } 584 } 585 586 // now we wait for the SP's to get enabled 587 debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be enabled...\n"); 588 // we have to wait until all streamprocessors indicate that they are running 589 // i.e. that there is actually some data stream flowing 590 int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds 591 bool notEnabled=true; 592 while (notEnabled && wait_cycles) { 593 wait_cycles--; 594 notEnabled=false; 595 596 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 597 it != m_ReceiveProcessors.end(); 598 ++it ) { 599 if(!(*it)->isEnabled()) notEnabled=true; 600 } 601 602 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 603 it != m_TransmitProcessors.end(); 604 ++it ) { 605 if(!(*it)->isEnabled()) notEnabled=true; 606 } 607 usleep(1000); // one cycle 608 } 609 610 if(!wait_cycles) { // timout has occurred 611 debugFatal("One or more streams couldn't be enabled (timeout):\n"); 612 613 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 614 it != m_ReceiveProcessors.end(); 615 ++it ) { 616 if(!(*it)->isEnabled()) { 617 debugFatal(" receive stream %p not enabled\n",*it); 618 } else { 619 debugFatal(" receive stream %p enabled\n",*it); 620 } 621 } 622 623 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 624 it != m_TransmitProcessors.end(); 625 ++it ) { 626 if(!(*it)->isEnabled()) { 627 debugFatal(" transmit stream %p not enabled\n",*it); 628 } else { 629 debugFatal(" transmit stream %p enabled\n",*it); 630 } 631 } 632 return false; 633 } 634 635 debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors enabled...\n"); 636 637 return true; 638 } 639 640 /** 641 * Disables the registered StreamProcessors 642 * @return true if successful, false otherwise 643 */ 644 bool StreamProcessorManager::disableStreamProcessors() { 645 // we prepare the streamprocessors for disable 646 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 647 it != m_ReceiveProcessors.end(); 648 ++it ) { 649 (*it)->prepareForDisable(); 650 } 651 652 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 653 it != m_TransmitProcessors.end(); 654 ++it ) { 655 (*it)->prepareForDisable(); 656 } 657 658 // then we disable the streamprocessors 659 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 660 it != m_ReceiveProcessors.end(); 661 ++it ) { 662 (*it)->disable(); 663 } 664 665 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 666 it != m_TransmitProcessors.end(); 667 ++it ) { 668 (*it)->disable(); 669 } 670 671 // now we wait for the SP's to get disabled 672 debugOutput( DEBUG_LEVEL_VERBOSE, "Waiting for all StreamProcessors to be disabled...\n"); 673 // we have to wait until all streamprocessors indicate that they are running 674 // i.e. that there is actually some data stream flowing 675 int wait_cycles=ENABLE_TIMEOUT_MSEC; // two seconds 676 bool enabled=true; 677 while (enabled && wait_cycles) { 678 wait_cycles--; 679 enabled=false; 680 681 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 682 it != m_ReceiveProcessors.end(); 683 ++it ) { 684 if((*it)->isEnabled()) enabled=true; 685 } 686 687 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 688 it != m_TransmitProcessors.end(); 689 ++it ) { 690 if((*it)->isEnabled()) enabled=true; 691 } 692 usleep(1000); // one cycle 693 } 694 695 if(!wait_cycles) { // timout has occurred 696 debugFatal("One or more streams couldn't be disabled (timeout):\n"); 697 698 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 699 it != m_ReceiveProcessors.end(); 700 ++it ) { 701 if(!(*it)->isEnabled()) { 702 debugFatal(" receive stream %p not enabled\n",*it); 703 } else { 704 debugFatal(" receive stream %p enabled\n",*it); 705 } 706 } 707 708 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 709 it != m_TransmitProcessors.end(); 710 ++it ) { 711 if(!(*it)->isEnabled()) { 712 debugFatal(" transmit stream %p not enabled\n",*it); 713 } else { 714 debugFatal(" transmit stream %p enabled\n",*it); 715 } 716 } 717 return false; 718 } 719 720 debugOutput( DEBUG_LEVEL_VERBOSE, " => all StreamProcessors disabled...\n"); 721 691 it != m_TransmitProcessors.end(); 692 ++it ) { 693 if (!m_isoManager->unregisterStream(*it)) { 694 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister transmit stream processor (%p) from the Iso manager\n",*it); 695 return false; 696 } 697 } 722 698 return true; 723 699 } … … 733 709 734 710 debugOutput( DEBUG_LEVEL_VERBOSE, "Handling Xrun ...\n"); 711 712 dumpInfo(); 735 713 736 714 /* … … 743 721 * 3) Re-enable the SP's 744 722 */ 745 debugOutput( DEBUG_LEVEL_VERBOSE, "Disabling StreamProcessors...\n"); 746 if (!disableStreamProcessors()) { 747 debugFatal("Could not disable StreamProcessors...\n"); 723 724 // put all SP's back into dry-running state 725 if (!startDryRunning()) { 726 debugFatal("Could not put SP's in dry-running state\n"); 748 727 return false; 749 728 } … … 771 750 bool StreamProcessorManager::waitForPeriod() { 772 751 int time_till_next_period; 773 bool xrun_occurred =false;752 bool xrun_occurred = false; 774 753 775 754 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n"); … … 799 778 xrun_occurred |= (*it)->xrunOccurred(); 800 779 } 780 if(xrun_occurred) break; 801 781 802 782 // check if we were waked up too soon 803 time_till_next_period=m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); 804 } 805 806 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", time_till_next_period); 807 808 // this is to notify the client of the delay 809 // that we introduced 810 m_delayed_usecs=time_till_next_period; 783 time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); 784 } 811 785 812 786 // we save the 'ideal' time of the transfer at this point, … … 816 790 // NOTE: before waitForPeriod() is called again, both the transmit 817 791 // and the receive processors should have done their transfer. 818 m_time_of_transfer =m_SyncSource->getTimeAtPeriod();792 m_time_of_transfer = m_SyncSource->getTimeAtPeriod(); 819 793 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer at %llu ticks...\n", 820 794 m_time_of_transfer); 795 796 // normally we can transfer frames at this time, but in some cases this is not true 797 // e.g. when there are not enough frames in the receive buffer. 798 // however this doesn't have to be a problem, since we can wait some more until we 799 // have enough frames. There is only a problem once the ISO xmit doesn't have packets 800 // to transmit, or if the receive buffer overflows. These conditions are signaled by 801 // the iso threads 802 // check if xruns occurred on the Iso side. 803 // also check if xruns will occur should we transfer() now 804 #ifdef DEBUG 805 int waited = 0; 806 #endif 807 bool ready_for_transfer = false; 808 xrun_occurred = false; 809 while (!ready_for_transfer && !xrun_occurred) { 810 ready_for_transfer = true; 811 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 812 it != m_ReceiveProcessors.end(); 813 ++it ) { 814 ready_for_transfer &= ((*it)->canClientTransferFrames(m_period)); 815 xrun_occurred |= (*it)->xrunOccurred(); 816 } 817 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 818 it != m_TransmitProcessors.end(); 819 ++it ) { 820 ready_for_transfer &= ((*it)->canClientTransferFrames(m_period)); 821 xrun_occurred |= (*it)->xrunOccurred(); 822 } 823 if (!ready_for_transfer) { 824 usleep(125); // MAGIC: one cycle sleep... 825 826 // in order to avoid this in the future, we increase the sync delay of the sync source SP 827 int d = m_SyncSource->getSyncDelay() + TICKS_PER_CYCLE; 828 m_SyncSource->setSyncDelay(d); 829 830 #ifdef DEBUG 831 waited++; 832 #endif 833 } 834 } // we are either ready or an xrun occurred 835 836 #ifdef DEBUG 837 if(waited > 0) { 838 debugOutput(DEBUG_LEVEL_VERBOSE, "Waited %d x 125us due to SP not ready for transfer\n", waited); 839 } 840 #endif 841 842 // this is to notify the client of the delay that we introduced by waiting 843 m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); 844 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "delayed for %d usecs...\n", m_delayed_usecs); 821 845 822 846 #ifdef DEBUG … … 833 857 } 834 858 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "XF at %011llu ticks, RBF=%d, XBF=%d, SUM=%d...\n", 835 m_time_of_transfer,rcv_bf,xmt_bf,rcv_bf+xmt_bf); 836 837 #endif 838 839 xrun_occurred=false; 859 m_time_of_transfer, rcv_bf, xmt_bf, rcv_bf+xmt_bf); 840 860 841 861 // check if xruns occurred on the Iso side. 842 862 // also check if xruns will occur should we transfer() now 843 844 863 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 845 864 it != m_ReceiveProcessors.end(); 846 865 ++it ) { 847 // a xrun has occurred on the Iso side 848 xrun_occurred |= (*it)->xrunOccurred(); 849 850 // if this is true, a xrun will occur 851 xrun_occurred |= !((*it)->canClientTransferFrames(m_period)); 852 853 #ifdef DEBUG 866 854 867 if ((*it)->xrunOccurred()) { 855 debugWarning("Xrun on RECV SP %p due to ISO xrun\n",*it);868 debugWarning("Xrun on RECV SP %p due to ISO side xrun\n",*it); 856 869 (*it)->dumpInfo(); 857 870 } 858 871 if (!((*it)->canClientTransferFrames(m_period))) { 859 debugWarning("Xrun on RECV SP %p due to buffer xrun\n",*it);872 debugWarning("Xrun on RECV SP %p due to buffer side xrun\n",*it); 860 873 (*it)->dumpInfo(); 861 874 } 862 #endif863 864 875 } 865 876 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 866 877 it != m_TransmitProcessors.end(); 867 878 ++it ) { 868 // a xrun has occurred on the Iso side869 xrun_occurred |= (*it)->xrunOccurred();870 871 // if this is true, a xrun will occur872 xrun_occurred |= !((*it)->canClientTransferFrames(m_period));873 874 #ifdef DEBUG875 879 if ((*it)->xrunOccurred()) { 876 debugWarning("Xrun on XMIT SP %p due to ISO xrun\n",*it);880 debugWarning("Xrun on XMIT SP %p due to ISO side xrun\n",*it); 877 881 } 878 882 if (!((*it)->canClientTransferFrames(m_period))) { 879 debugWarning("Xrun on XMIT SP %p due to buffer xrun\n",*it); 880 } 883 debugWarning("Xrun on XMIT SP %p due to buffer side xrun\n",*it); 884 } 885 } 881 886 #endif 882 }883 887 884 888 m_nbperiods++; 885 886 889 // now we can signal the client that we are (should be) ready 887 890 return !xrun_occurred; … … 896 899 */ 897 900 bool StreamProcessorManager::transfer() { 898 899 debugOutput( DEBUG_LEVEL_VERBOSE, "Transferring period...\n"); 900 901 if (!transfer(StreamProcessor::E_Receive)) return false; 902 if (!transfer(StreamProcessor::E_Transmit)) return false; 903 904 return true; 901 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n"); 902 bool retval=true; 903 retval &= transfer(StreamProcessor::ePT_Receive); 904 retval &= transfer(StreamProcessor::ePT_Transmit); 905 return retval; 905 906 } 906 907 … … 913 914 * @return true if successful, false otherwise (indicates xrun). 914 915 */ 915 916 bool StreamProcessorManager::transfer(enum StreamProcessor::EProcessorType t) { 917 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n"); 918 916 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) { 917 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n", 918 t, m_time_of_transfer, 919 (unsigned int)TICKS_TO_SECS(m_time_of_transfer), 920 (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer), 921 (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer)); 922 923 bool retval = true; 919 924 // a static cast could make sure that there is no performance 920 925 // penalty for the virtual functions (to be checked) 921 if (t==StreamProcessor::E_Receive) { 922 923 // determine the time at which we want reception to start 924 float rate=m_SyncSource->getTicksPerFrame(); 925 int64_t one_frame_in_ticks=(int64_t)(((float)m_period)*rate); 926 927 int64_t receive_timestamp = substractTicks(m_time_of_transfer,one_frame_in_ticks); 928 929 if(receive_timestamp<0) { 930 debugWarning("receive ts < 0.0 : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n", 931 receive_timestamp, m_time_of_transfer, one_frame_in_ticks); 932 } 933 if(receive_timestamp>(128L*TICKS_PER_SECOND)) { 934 debugWarning("receive ts > 128L*TICKS_PER_SECOND : %lld, m_time_of_transfer= %llu, one_frame_in_ticks=%lld\n", 935 receive_timestamp, m_time_of_transfer, one_frame_in_ticks); 936 } 937 926 if (t==StreamProcessor::ePT_Receive) { 938 927 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 939 928 it != m_ReceiveProcessors.end(); 940 929 ++it ) { 941 942 if(!(*it)->getFrames(m_period, receive_timestamp)) { 943 debugOutput(DEBUG_LEVEL_VERBOSE,"could not getFrames(%u, %11llu) from stream processor (%p)\n", 930 if(!(*it)->getFrames(m_period, m_time_of_transfer)) { 931 debugWarning("could not getFrames(%u, %11llu) from stream processor (%p)\n", 944 932 m_period, m_time_of_transfer,*it); 945 return false; // buffer underrun 946 } 947 933 retval &= false; // buffer underrun 934 } 948 935 } 949 936 } else { 937 // FIXME: in the SPM it would be nice to have system time instead of 938 // 1394 time 939 float rate = m_SyncSource->getTicksPerFrame(); 940 int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate); 941 942 // the data we are putting into the buffer is intended to be transmitted 943 // one ringbuffer size after it has been received 944 int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks); 945 950 946 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 951 947 it != m_TransmitProcessors.end(); 952 948 ++it ) { 953 954 if(!(*it)->putFrames(m_period, (int64_t)m_time_of_transfer)) { 955 debugOutput(DEBUG_LEVEL_VERBOSE, "could not putFrames(%u,%llu) to stream processor (%p)\n", 956 m_period, m_time_of_transfer, *it); 957 return false; // buffer overrun 958 } 959 960 } 961 } 962 963 return true; 949 // FIXME: in the SPM it would be nice to have system time instead of 950 // 1394 time 951 if(!(*it)->putFrames(m_period, transmit_timestamp)) { 952 debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n", 953 m_period, transmit_timestamp, *it); 954 retval &= false; // buffer underrun 955 } 956 } 957 } 958 return retval; 959 } 960 961 /** 962 * @brief Transfer one period of silence for both receive and transmit StreamProcessors 963 * 964 * Transfers one period of silence to the Iso side for transmit SP's 965 * or dump one period of frames for receive SP's 966 * 967 * @return true if successful, false otherwise (indicates xrun). 968 */ 969 bool StreamProcessorManager::transferSilence() { 970 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n"); 971 bool retval=true; 972 retval &= transferSilence(StreamProcessor::ePT_Receive); 973 retval &= transferSilence(StreamProcessor::ePT_Transmit); 974 return retval; 975 } 976 977 /** 978 * @brief Transfer one period of silence for either the receive or transmit StreamProcessors 979 * 980 * Transfers one period of silence to the Iso side for transmit SP's 981 * or dump one period of frames for receive SP's 982 * 983 * @param t The processor type to tranfer for (receive or transmit) 984 * @return true if successful, false otherwise (indicates xrun). 985 */ 986 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) { 987 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n", 988 t, m_time_of_transfer, 989 (unsigned int)TICKS_TO_SECS(m_time_of_transfer), 990 (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer), 991 (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer)); 992 993 bool retval = true; 994 // a static cast could make sure that there is no performance 995 // penalty for the virtual functions (to be checked) 996 if (t==StreamProcessor::ePT_Receive) { 997 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 998 it != m_ReceiveProcessors.end(); 999 ++it ) { 1000 if(!(*it)->dropFrames(m_period, m_time_of_transfer)) { 1001 debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n", 1002 m_period, m_time_of_transfer,*it); 1003 retval &= false; // buffer underrun 1004 } 1005 } 1006 } else { 1007 // FIXME: in the SPM it would be nice to have system time instead of 1008 // 1394 time 1009 float rate = m_SyncSource->getTicksPerFrame(); 1010 int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate); 1011 1012 // the data we are putting into the buffer is intended to be transmitted 1013 // one ringbuffer size after it has been received 1014 int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks); 1015 1016 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 1017 it != m_TransmitProcessors.end(); 1018 ++it ) { 1019 // FIXME: in the SPM it would be nice to have system time instead of 1020 // 1394 time 1021 if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) { 1022 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n", 1023 m_period, transmit_timestamp, *it); 1024 retval &= false; // buffer underrun 1025 } 1026 } 1027 } 1028 return retval; 964 1029 } 965 1030 trunk/libffado/src/libstreaming/StreamProcessorManager.h
r445 r734 25 25 #define __FFADO_STREAMPROCESSORMANAGER__ 26 26 27 #include "generic/Port.h" 28 #include "generic/StreamProcessor.h" 29 #include "util/IsoHandlerManager.h" 30 27 31 #include "debugmodule/debugmodule.h" 28 32 #include "libutil/Thread.h" 29 33 #include "libutil/OptionContainer.h" 30 #include <semaphore.h>31 #include "Port.h"32 #include "StreamProcessor.h"33 #include "IsoHandlerManager.h"34 34 35 35 #include <vector> 36 #include <semaphore.h> 36 37 37 38 namespace Streaming { … … 52 53 public: 53 54 54 StreamProcessorManager(unsigned int period, unsigned int nb_buffers);55 StreamProcessorManager(unsigned int period, unsigned int rate, unsigned int nb_buffers); 55 56 virtual ~StreamProcessorManager(); 56 57 … … 61 62 bool stop(); 62 63 64 bool startDryRunning(); 63 65 bool syncStartAll(); 64 66 … … 67 69 bool unregisterProcessor(StreamProcessor *processor); ///< stop managing a streamprocessor 68 70 69 bool enableStreamProcessors(uint64_t time_to_enable_at); /// enable registered StreamProcessors70 bool disableStreamProcessors(); /// disable registered StreamProcessors71 72 71 void setPeriodSize(unsigned int period); 73 72 void setPeriodSize(unsigned int period, unsigned int nb_buffers); 74 int getPeriodSize() {return m_period;};73 unsigned int getPeriodSize() {return m_period;}; 75 74 76 75 void setNbBuffers(unsigned int nb_buffers); … … 82 81 83 82 // the client-side functions 83 bool waitForPeriod(); 84 bool transfer(); 85 bool transfer(enum StreamProcessor::eProcessorType); 86 private: 87 bool transferSilence(); 88 bool transferSilence(enum StreamProcessor::eProcessorType); 84 89 85 bool waitForPeriod(); ///< wait for the next period 86 87 bool transfer(); ///< transfer the buffer contents from/to client 88 bool transfer(enum StreamProcessor::EProcessorType); ///< transfer the buffer contents from/to client (single processor type) 89 90 bool alignReceivedStreams(); 91 public: 90 92 int getDelayedUsecs() {return m_delayed_usecs;}; 91 93 bool xrunOccurred(); 92 94 int getXrunCount() {return m_xruns;}; 95 96 unsigned int getNominalRate() {return m_nominal_framerate;}; 97 uint64_t getTimeOfLastTransfer() { return m_time_of_transfer;}; 93 98 94 99 private: … … 116 121 public: 117 122 bool setSyncSource(StreamProcessor *s); 118 StreamProcessor * getSyncSource(); 123 StreamProcessor& getSyncSource() 124 {return *m_SyncSource;}; 119 125 120 126 protected: … … 132 138 unsigned int m_nb_buffers; 133 139 unsigned int m_period; 140 unsigned int m_nominal_framerate; 134 141 unsigned int m_xruns; 135 142