Changeset 1005
- Timestamp:
- 04/21/08 01:27:47 (13 years ago)
- Files:
-
- trunk/libffado/config.h.in (modified) (2 diffs)
- trunk/libffado/src/libieee1394/IsoHandler.cpp (modified) (6 diffs)
- trunk/libffado/src/libieee1394/IsoHandler.h (modified) (5 diffs)
- trunk/libffado/src/libieee1394/IsoHandlerManager.cpp (modified) (28 diffs)
- trunk/libffado/src/libieee1394/IsoHandlerManager.h (modified) (8 diffs)
- trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp (modified) (30 diffs)
- trunk/libffado/src/libstreaming/generic/StreamProcessor.h (modified) (3 diffs)
- trunk/libffado/src/libstreaming/StreamProcessorManager.cpp (modified) (11 diffs)
- trunk/libffado/src/libstreaming/StreamProcessorManager.h (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/libffado/config.h.in
r981 r1005 80 80 #define ISOHANDLERMANAGER_MAX_STREAMS_PER_ISOTHREAD 16 81 81 82 // Ideally the audio processing will be driven by this thread 83 #define ISOHANDLERMANAGER_ISO_PRIO_INCREASE 1 82 // The best setup is if the receive handlers have lower priority 83 // than the client thread since that ensures that as soon as we 84 // received sufficient frames, the client thread runs. 85 // The transmit thread should have higher priority to ensure that 86 // all available data is flushed to the ISO kernel buffers as 87 // soon as possible 88 // At this moment, the jack backend uses base+5 to init ffado 89 // prio 90 #define ISOHANDLERMANAGER_ISO_PRIO_INCREASE 0 91 #define ISOHANDLERMANAGER_ISO_PRIO_INCREASE_RECV -6 92 #define ISOHANDLERMANAGER_ISO_PRIO_INCREASE_XMIT 1 93 94 // the timeout for ISO activity on any thread 95 // NOTE: don't make this 0 96 #define ISOHANDLERMANAGER_ISO_TASK_WAIT_TIMEOUT_USECS 1000000LL 84 97 85 98 // allows to add some processing margin. This shifts the time … … 97 110 #define STREAMPROCESSORMANAGER_SYNCSTART_TRIES 10 98 111 #define STREAMPROCESSORMANAGER_SYNC_WAIT_TIME_MSEC 200 99 #define STREAMPROCESSORMANAGER_ALIGN_AVERAGE_TIME_MSEC 200112 #define STREAMPROCESSORMANAGER_ALIGN_AVERAGE_TIME_MSEC 400 100 113 #define STREAMPROCESSORMANAGER_NB_ALIGN_TRIES 40 101 114 trunk/libffado/src/libieee1394/IsoHandler.cpp
r977 r1005 26 26 #include "IsoHandler.h" 27 27 #include "ieee1394service.h" 28 #include "IsoHandlerManager.h" 28 29 29 30 #include "libstreaming/generic/StreamProcessor.h" … … 91 92 , m_speed( RAW1394_ISO_SPEED_400 ) 92 93 , m_prebuffers( 0 ) 94 , m_dont_exit_iterate_loop( true ) 93 95 , m_State( E_Created ) 94 96 #ifdef DEBUG … … 150 152 151 153 bool 152 IsoHandler::waitForClient() 153 { 154 debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, "waiting...\n"); 155 if(m_Client) { 156 bool result; 157 if (m_type == eHT_Receive) { 158 result = m_Client->waitForProducePacket(); 159 } else { 160 result = m_Client->waitForConsumePacket(); 161 } 162 debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, " returns %d\n", result); 163 return result; 164 } else { 165 debugOutputExtreme(DEBUG_LEVEL_VERBOSE, " no client\n"); 166 } 167 return false; 168 } 169 170 bool 171 IsoHandler::tryWaitForClient() 172 { 173 debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, "waiting...\n"); 154 IsoHandler::canIterateClient() 155 { 156 debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, "checking...\n"); 174 157 if(m_Client) { 175 158 bool result; … … 186 169 return false; 187 170 } 188 /*189 bool190 IsoHandler::Execute()191 {192 debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE, "%p: Execute thread...\n", this);193 194 // bypass if not running195 if (m_State != E_Running) {196 debugOutput( DEBUG_LEVEL_VERBOSE, "%p: not polling since not running...\n", this);197 usleep(m_poll_timeout * 1000);198 debugOutput( DEBUG_LEVEL_VERBOSE, "%p: done sleeping...\n", this);199 return true;200 }201 202 // wait for the availability of frames in the client203 // (blocking for transmit handlers)204 debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) Waiting for Client activity...\n", this, getTypeString());205 if (waitForClient()) {206 #if ISOHANDLER_USE_POLL207 bool result = true;208 while(result && m_Client && tryWaitForClient()) {209 int err = poll(&m_poll_fd, 1, m_poll_timeout);210 if (err == -1) {211 if (errno == EINTR) {212 return true;213 }214 debugFatal("%p, poll error: %s\n", this, strerror (errno));215 return false;216 }217 218 if(m_poll_fd.revents & (POLLIN)) {219 result=iterate();220 if(!result) {221 debugOutput( DEBUG_LEVEL_VERBOSE,222 "IsoHandler (%p): Failed to iterate handler\n",223 this);224 }225 } else {226 if (m_poll_fd.revents & POLLERR) {227 debugWarning("error on fd for %p\n", this);228 }229 if (m_poll_fd.revents & POLLHUP) {230 debugWarning("hangup on fd for %p\n",this);231 }232 break;233 }234 }235 return result;236 #else237 // iterate() is blocking if no 1394 data is available238 // so poll'ing is not really necessary239 bool result = true;240 while(result && m_Client && tryWaitForClient()) {241 result = iterate();242 // if (getType() == eHT_Receive) {243 // debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) Iterate returned: %d\n",244 // this, (m_type==eHT_Receive?"Receive":"Transmit"), result);245 // }246 }247 return result;248 #endif249 } else {250 debugError("waitForClient() failed.\n");251 return false;252 }253 }*/254 171 255 172 bool … … 440 357 #endif 441 358 if(m_Client) { 442 return m_Client->putPacket(data, length, channel, tag, sy, cycle, dropped, skipped); 359 enum raw1394_iso_disposition retval = m_Client->putPacket(data, length, channel, tag, sy, cycle, dropped, skipped); 360 if (retval == RAW1394_ISO_OK) { 361 if (m_dont_exit_iterate_loop) { 362 return RAW1394_ISO_OK; 363 } else { 364 m_dont_exit_iterate_loop = true; 365 debugOutput(DEBUG_LEVEL_VERBOSE, 366 "(%p) loop exit requested\n", 367 this); 368 return RAW1394_ISO_DEFER; 369 } 370 } else { 371 return retval; 372 } 443 373 } 444 374 … … 467 397 } 468 398 #endif 469 return retval; 399 if (retval == RAW1394_ISO_OK) { 400 if (m_dont_exit_iterate_loop) { 401 return RAW1394_ISO_OK; 402 } else { 403 m_dont_exit_iterate_loop = true; 404 debugOutput(DEBUG_LEVEL_VERBOSE, 405 "(%p) loop exit requested\n", 406 this); 407 return RAW1394_ISO_DEFER; 408 } 409 } else { 410 return retval; 411 } 470 412 } 471 413 *tag = 0; trunk/libffado/src/libieee1394/IsoHandler.h
r930 r1005 26 26 27 27 #include "debugmodule/debugmodule.h" 28 #include "IsoHandlerManager.h"29 28 30 29 #include "libutil/Thread.h" … … 32 31 enum raw1394_iso_disposition; 33 32 33 class IsoHandlerManager; 34 34 namespace Streaming { 35 35 class StreamProcessor; … … 121 121 bool unregisterStream(Streaming::StreamProcessor *); 122 122 123 bool waitForClient(); 124 bool tryWaitForClient(); 123 bool canIterateClient(); // FIXME: implement with functor 124 125 /** 126 * @brief request that the handler exits the packet processing loop ASAP 127 * 128 * The raw1394 lib doesn't provide a means to stop the packet iteration loop 129 * except when the iterate callback returns a DEFER value. Calling this function 130 * will make the callback return DEFER ASAP. 131 */ 132 void requestIterateLoopExit() {m_dont_exit_iterate_loop = false;}; 133 /** 134 * @brief allow the handler to stay in the packet processing loop 135 * 136 * This resets the state set by requestIterateLoopExit() 137 */ 138 void allowIterateLoop() {m_dont_exit_iterate_loop = true;}; 125 139 126 140 private: … … 132 146 int m_irq_interval; 133 147 134 Streaming::StreamProcessor *m_Client; 148 Streaming::StreamProcessor *m_Client; // FIXME: implement with functors 135 149 136 150 int handleBusReset(unsigned int generation); … … 140 154 enum raw1394_iso_speed m_speed; 141 155 unsigned int m_prebuffers; 156 bool m_dont_exit_iterate_loop; 142 157 143 158 // the state machine trunk/libffado/src/libieee1394/IsoHandlerManager.cpp
r993 r1005 25 25 #include "IsoHandlerManager.h" 26 26 #include "ieee1394service.h" 27 #include "IsoHandler.h"28 27 #include "libstreaming/generic/StreamProcessor.h" 29 28 … … 42 41 // --- ISO Thread --- // 43 42 44 IsoTask::IsoTask(IsoHandlerManager& manager )43 IsoTask::IsoTask(IsoHandlerManager& manager, enum IsoHandler::EHandlerType t) 45 44 : m_manager( manager ) 46 45 , m_SyncIsoHandler ( NULL ) 47 { 46 , m_handlerType( t ) 47 { 48 } 49 50 IsoTask::~IsoTask() 51 { 52 sem_destroy(&m_activity_semaphore); 48 53 } 49 54 … … 65 70 #endif 66 71 72 sem_init(&m_activity_semaphore, 0, 0); 67 73 return true; 68 74 } … … 89 95 IsoHandler *h = m_manager.m_IsoHandlers.at(i); 90 96 assert(h); 97 98 // skip the handlers not intended for us 99 if(h->getType() != m_handlerType) continue; 91 100 92 101 if (h->isEnabled()) { … … 129 138 { 130 139 debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 131 "(%p) Execute\n", this); 140 "(%p, %s) Execute\n", 141 this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 132 142 int err; 133 143 unsigned int i; … … 139 149 if(diff < 100) { 140 150 debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 141 "(%p) short loop detected (%d usec), cnt: %d\n", 142 this, diff, m_successive_short_loops); 151 "(%p, %s) short loop detected (%d usec), cnt: %d\n", 152 this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"), 153 diff, m_successive_short_loops); 143 154 m_successive_short_loops++; 144 if(m_successive_short_loops > 100 ) {155 if(m_successive_short_loops > 10000) { 145 156 debugError("Shutting down runaway thread\n"); 146 157 return false; … … 163 174 if (m_poll_nfds_shadow == 0) { 164 175 debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 165 "(%p ) bypass iterate since no handlers to poll\n",166 this );176 "(%p, %s) bypass iterate since no handlers to poll\n", 177 this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 167 178 usleep(m_poll_timeout * 1000); 168 179 return true; … … 179 190 short events = 0; 180 191 IsoHandler *h = m_IsoHandler_map_shadow[i]; 181 if(h->getType() == IsoHandler::eHT_Transmit) { 182 // we should only poll on a transmit handler 183 // that has a client that is ready to send 184 // something. Otherwise it will end up in 185 // busy wait looping since the packet function 186 // will defer processing (also avoids the 187 // AGAIN problem) 188 if (h->tryWaitForClient()) { 189 events = POLLIN | POLLPRI; 190 no_one_to_poll = false; 191 } 192 } else { 193 // a receive handler should only be polled if 194 // it's client doesn't already have enough data 195 // and if it can still accept data. 196 if (h->tryWaitForClient()) { // FIXME 197 events = POLLIN | POLLPRI; 198 no_one_to_poll = false; 199 } 192 // we should only poll on a transmit handler 193 // that has a client that is ready to send 194 // something. Otherwise it will end up in 195 // busy wait looping since the packet function 196 // will defer processing (also avoids the 197 // AGAIN problem) 198 if (h->canIterateClient()) { 199 events = POLLIN | POLLPRI; 200 no_one_to_poll = false; 201 // if we are going to poll() it, let's ensure 202 // it can run until someone wants it to exit 203 h->allowIterateLoop(); 200 204 } 201 205 m_poll_fds_shadow[i].events = events; … … 203 207 204 208 if(no_one_to_poll) { 205 debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 206 "(%p) No one to poll, waiting on the sync handler to become ready\n", 207 this); 208 209 if(!m_SyncIsoHandler->waitForClient()) { 210 debugError("Failed to wait for client\n"); 211 // This can be due to error or due to timeout 212 213 // sleep for a while 214 usleep(m_poll_timeout * 1000); // FIXME 215 // exit this iteration loop 216 return true; 217 } 218 219 #ifdef DEBUG 220 // if this happens we end up in a deadlock! 221 if(!m_SyncIsoHandler->tryWaitForClient()) { 222 debugFatal("inconsistency in wait functions!\n"); 223 return false; 224 } 225 #endif 226 227 debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 228 "(%p) sync handler ready\n", 229 this); 209 debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 210 "(%p, %s) No one to poll, waiting for something to happen\n", 211 this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 212 // wait for something to happen 213 switch(waitForActivity()) { 214 case IsoTask::eAR_Error: 215 debugError("Error while waiting for activity\n"); 216 return false; 217 case IsoTask::eAR_Interrupted: 218 // FIXME: what to do here? 219 debugWarning("Interrupted while waiting for activity\n"); 220 break; 221 case IsoTask::eAR_Timeout: 222 // FIXME: what to do here? 223 debugWarning("Timeout while waiting for activity\n"); 224 break; 225 case IsoTask::eAR_Activity: 226 // do nothing 227 debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 228 "(%p, %s) something happened\n", 229 this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 230 break; 231 } 230 232 } 231 233 } 232 234 233 235 // Use a shadow map of the fd's such that we don't have to update 234 // the fd map everytime we run poll(). It doesn't change that much 235 // anyway 236 // the fd map everytime we run poll(). 236 237 err = poll (m_poll_fds_shadow, m_poll_nfds_shadow, m_poll_timeout); 237 238 238 239 if (err < 0) { 239 240 if (errno == EINTR) { 241 debugOutput(DEBUG_LEVEL_VERBOSE, "Ignoring poll return due to signal\n"); 240 242 return true; 241 243 } … … 247 249 #ifdef DEBUG 248 250 if(m_poll_fds_shadow[i].revents) { 249 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, 250 "(%p) received events: %08X for (%d/%d, %p, %s)\n", 251 this, m_poll_fds_shadow[i].revents, 251 debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, 252 "(%p, %s) received events: %08X for (%d/%d, %p, %s)\n", 253 this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"), 254 m_poll_fds_shadow[i].revents, 252 255 i, m_poll_nfds_shadow, 253 256 m_IsoHandler_map_shadow[i], … … 283 286 } 284 287 return true; 285 288 } 289 290 enum IsoTask::eActivityResult 291 IsoTask::waitForActivity() 292 { 293 debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 294 "(%p, %s) waiting for activity\n", 295 this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 296 struct timespec ts; 297 int result; 298 299 if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { 300 debugError("clock_gettime failed\n"); 301 return eAR_Error; 302 } 303 long long int timeout_nsec=0; 304 int timeout_sec = 0; 305 306 timeout_nsec = ISOHANDLERMANAGER_ISO_TASK_WAIT_TIMEOUT_USECS * 1000LL; 307 timeout_sec = 0; 308 while(timeout_nsec >= 1000000000LL) { 309 timeout_sec += 1; 310 timeout_nsec -= 1000000000LL; 311 } 312 ts.tv_nsec += timeout_nsec; 313 ts.tv_sec += timeout_sec; 314 315 result = sem_timedwait(&m_activity_semaphore, &ts); 316 317 if(result != 0) { 318 if (result == ETIMEDOUT) { 319 debugOutput(DEBUG_LEVEL_VERBOSE, 320 "(%p) pthread_cond_timedwait() timed out (result=%d)\n", 321 this, result); 322 return eAR_Timeout; 323 } else if (result == EINTR) { 324 debugOutput(DEBUG_LEVEL_VERBOSE, 325 "(%p) pthread_cond_[timed]wait() interrupted by signal (result=%d)\n", 326 this, result); 327 return eAR_Interrupted; 328 } else { 329 debugError("(%p) pthread_cond_[timed]wait error (result=%d)\n", 330 this, result); 331 debugError("(%p) timeout_sec=%d timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n", 332 this, timeout_sec, timeout_nsec, ts.tv_sec, ts.tv_nsec); 333 return eAR_Error; 334 } 335 } 336 337 debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 338 "(%p, %s) got activity\n", 339 this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 340 return eAR_Activity; 341 } 342 343 void 344 IsoTask::signalActivity() 345 { 346 // signal the activity cond var 347 sem_post(&m_activity_semaphore); 348 debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 349 "(%p, %s) activity\n", 350 this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 286 351 } 287 352 … … 295 360 , m_service( service ) 296 361 , m_realtime(false), m_priority(0) 297 , m_IsoThread ( NULL ) 298 , m_IsoTask ( NULL ) 299 {} 362 , m_IsoThreadTransmit ( NULL ) 363 , m_IsoTaskTransmit ( NULL ) 364 , m_IsoThreadReceive ( NULL ) 365 , m_IsoTaskReceive ( NULL ) 366 { 367 } 300 368 301 369 IsoHandlerManager::IsoHandlerManager(Ieee1394Service& service, bool run_rt, int rt_prio) … … 303 371 , m_service( service ) 304 372 , m_realtime(run_rt), m_priority(rt_prio) 305 , m_IsoThread ( NULL ) 306 , m_IsoTask ( NULL ) 307 {} 373 , m_IsoThreadTransmit ( NULL ) 374 , m_IsoTaskTransmit ( NULL ) 375 , m_IsoThreadReceive ( NULL ) 376 , m_IsoTaskReceive ( NULL ) 377 { 378 } 308 379 309 380 IsoHandlerManager::~IsoHandlerManager() … … 314 385 debugError("Still some handlers in use\n"); 315 386 } 316 if (m_IsoThread) { 317 m_IsoThread->Stop(); 318 delete m_IsoThread; 319 } 320 if (m_IsoTask) { 321 delete m_IsoTask; 387 if (m_IsoThreadTransmit) { 388 m_IsoThreadTransmit->Stop(); 389 delete m_IsoThreadTransmit; 390 } 391 if (m_IsoThreadReceive) { 392 m_IsoThreadReceive->Stop(); 393 delete m_IsoThreadReceive; 394 } 395 if (m_IsoTaskTransmit) { 396 delete m_IsoTaskTransmit; 397 } 398 if (m_IsoTaskReceive) { 399 delete m_IsoTaskReceive; 322 400 } 323 401 } … … 326 404 IsoHandlerManager::requestShadowMapUpdate() 327 405 { 328 if(m_IsoTask) m_IsoTask->requestShadowMapUpdate(); 406 if(m_IsoTaskTransmit) m_IsoTaskTransmit->requestShadowMapUpdate(); 407 if(m_IsoTaskReceive) m_IsoTaskReceive->requestShadowMapUpdate(); 329 408 } 330 409 … … 336 415 m_priority = priority; 337 416 338 if (m_IsoThread ) {417 if (m_IsoThreadTransmit) { 339 418 if (m_realtime) { 340 m_IsoThread->AcquireRealTime(m_priority); 419 m_IsoThreadTransmit->AcquireRealTime(m_priority 420 + ISOHANDLERMANAGER_ISO_PRIO_INCREASE 421 + ISOHANDLERMANAGER_ISO_PRIO_INCREASE_XMIT); 341 422 } else { 342 m_IsoThread->DropRealTime(); 423 m_IsoThreadTransmit->DropRealTime(); 424 } 425 } 426 if (m_IsoThreadReceive) { 427 if (m_realtime) { 428 m_IsoThreadReceive->AcquireRealTime(m_priority 429 + ISOHANDLERMANAGER_ISO_PRIO_INCREASE 430 + ISOHANDLERMANAGER_ISO_PRIO_INCREASE_RECV); 431 } else { 432 m_IsoThreadReceive->DropRealTime(); 343 433 } 344 434 } … … 356 446 } 357 447 358 // create a threadto iterate our ISO handlers359 debugOutput( DEBUG_LEVEL_VERBOSE, "Create iso thread for %p ...\n", this);360 m_IsoTask = new IsoTask( *this);361 if(!m_IsoTask ) {448 // create threads to iterate our ISO handlers 449 debugOutput( DEBUG_LEVEL_VERBOSE, "Create iso thread for %p transmit...\n", this); 450 m_IsoTaskTransmit = new IsoTask( *this, IsoHandler::eHT_Transmit ); 451 if(!m_IsoTaskTransmit) { 362 452 debugFatal("No task\n"); 363 453 return false; 364 454 } 365 m_IsoThread = new Util::PosixThread(m_IsoTask, m_realtime, 366 m_priority + ISOHANDLERMANAGER_ISO_PRIO_INCREASE, 367 PTHREAD_CANCEL_DEFERRED); 368 369 if(!m_IsoThread) { 455 m_IsoThreadTransmit = new Util::PosixThread(m_IsoTaskTransmit, m_realtime, 456 m_priority + ISOHANDLERMANAGER_ISO_PRIO_INCREASE 457 + ISOHANDLERMANAGER_ISO_PRIO_INCREASE_XMIT, 458 PTHREAD_CANCEL_DEFERRED); 459 460 if(!m_IsoThreadTransmit) { 461 debugFatal("No thread\n"); 462 return false; 463 } 464 465 debugOutput( DEBUG_LEVEL_VERBOSE, "Create iso thread for %p receive...\n", this); 466 m_IsoTaskReceive = new IsoTask( *this, IsoHandler::eHT_Receive ); 467 if(!m_IsoTaskReceive) { 468 debugFatal("No task\n"); 469 return false; 470 } 471 m_IsoThreadReceive = new Util::PosixThread(m_IsoTaskReceive, m_realtime, 472 m_priority + ISOHANDLERMANAGER_ISO_PRIO_INCREASE 473 + ISOHANDLERMANAGER_ISO_PRIO_INCREASE_RECV, 474 PTHREAD_CANCEL_DEFERRED); 475 476 if(!m_IsoThreadReceive) { 370 477 debugFatal("No thread\n"); 371 478 return false; … … 374 481 Util::Watchdog *watchdog = m_service.getWatchdog(); 375 482 if(watchdog) { 376 if(!watchdog->registerThread(m_IsoThread)) { 377 debugWarning("could not register iso thread with watchdog\n"); 483 if(!watchdog->registerThread(m_IsoThreadTransmit)) { 484 debugWarning("could not register iso transmit thread with watchdog\n"); 485 } 486 if(!watchdog->registerThread(m_IsoThreadReceive)) { 487 debugWarning("could not register iso receive thread with watchdog\n"); 378 488 } 379 489 } else { … … 381 491 } 382 492 383 if (m_IsoThread->Start() != 0) { 384 debugFatal("Could not start ISO thread\n"); 493 if (m_IsoThreadTransmit->Start() != 0) { 494 debugFatal("Could not start ISO Transmit thread\n"); 495 return false; 496 } 497 if (m_IsoThreadReceive->Start() != 0) { 498 debugFatal("Could not start ISO Receive thread\n"); 385 499 return false; 386 500 } … … 401 515 if ((*it) == h) { 402 516 result = h->disable(); 403 result &= m_IsoTask->requestShadowMapUpdate(); 517 if(h->getType() == IsoHandler::eHT_Transmit) { 518 result &= m_IsoTaskTransmit->requestShadowMapUpdate(); 519 } else { 520 result &= m_IsoTaskReceive->requestShadowMapUpdate(); 521 } 404 522 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " disabled\n"); 405 523 return result; … … 422 540 if ((*it) == h) { 423 541 result = h->enable(); 424 result &= m_IsoTask->requestShadowMapUpdate(); 542 if(h->getType() == IsoHandler::eHT_Transmit) { 543 result &= m_IsoTaskTransmit->requestShadowMapUpdate(); 544 } else { 545 result &= m_IsoTaskReceive->requestShadowMapUpdate(); 546 } 425 547 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " enabled\n"); 426 548 return result; … … 430 552 debugError("Handler not found\n"); 431 553 return false; 554 } 555 556 void 557 IsoHandlerManager::signalActivityTransmit() 558 { 559 assert(m_IsoTaskTransmit); 560 m_IsoTaskTransmit->signalActivity(); 561 } 562 563 void 564 IsoHandlerManager::signalActivityReceive() 565 { 566 assert(m_IsoTaskReceive); 567 m_IsoTaskReceive->signalActivity(); 432 568 } 433 569 … … 438 574 handler->setVerboseLevel(getDebugLevel()); 439 575 m_IsoHandlers.push_back(handler); 440 return m_IsoTask->requestShadowMapUpdate(); 576 requestShadowMapUpdate(); 577 return true; 441 578 } 442 579 … … 452 589 if ( *it == handler ) { 453 590 m_IsoHandlers.erase(it); 454 return m_IsoTask->requestShadowMapUpdate(); 591 requestShadowMapUpdate(); 592 return true; 455 593 } 456 594 } … … 690 828 return false; 691 829 } 692 if(!m_IsoTask->requestShadowMapUpdate()) { 830 bool result; 831 if((*it)->getType() == IsoHandler::eHT_Transmit) { 832 result = m_IsoTaskTransmit->requestShadowMapUpdate(); 833 } else { 834 result = m_IsoTaskReceive->requestShadowMapUpdate(); 835 } 836 if(!result) { 693 837 debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it); 694 838 return false; … … 751 895 return false; 752 896 } 753 if(!m_IsoTask->requestShadowMapUpdate()) { 897 bool result; 898 if((*it)->getType() == IsoHandler::eHT_Transmit) { 899 result = m_IsoTaskTransmit->requestShadowMapUpdate(); 900 } else { 901 result = m_IsoTaskReceive->requestShadowMapUpdate(); 902 } 903 if(!result) { 754 904 debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it); 755 905 return false; … … 782 932 retval=false; 783 933 } 784 if(!m_IsoTask->requestShadowMapUpdate()) { 934 bool result; 935 if((*it)->getType() == IsoHandler::eHT_Transmit) { 936 result = m_IsoTaskTransmit->requestShadowMapUpdate(); 937 } else { 938 result = m_IsoTaskReceive->requestShadowMapUpdate(); 939 } 940 if(!result) { 785 941 debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it); 786 ret val=false;942 return false; 787 943 } 788 944 } … … 816 972 (*it)->setVerboseLevel(i); 817 973 } 818 if(m_IsoThread) m_IsoThread->setVerboseLevel(i); 819 if(m_IsoTask) m_IsoTask->setVerboseLevel(i); 974 if(m_IsoThreadTransmit) m_IsoThreadTransmit->setVerboseLevel(i); 975 if(m_IsoTaskTransmit) m_IsoTaskTransmit->setVerboseLevel(i); 976 if(m_IsoThreadReceive) m_IsoThreadReceive->setVerboseLevel(i); 977 if(m_IsoTaskReceive) m_IsoTaskReceive->setVerboseLevel(i); 820 978 } 821 979 trunk/libffado/src/libieee1394/IsoHandlerManager.h
r978 r1005 26 26 27 27 #include "config.h" 28 29 28 #include "debugmodule/debugmodule.h" 30 29 31 30 #include "libutil/Thread.h" 31 32 #include "IsoHandler.h" 32 33 33 34 #include <sys/poll.h> 34 35 #include <errno.h> 35 36 36 #include <vector> 37 #include <semaphore.h> 37 38 38 39 class Ieee1394Service; 39 class IsoHandler; 40 //class IsoHandler; 41 //enum IsoHandler::EHandlerType; 40 42 41 43 namespace Streaming { … … 57 59 { 58 60 public: 59 IsoTask(IsoHandlerManager& manager );60 virtual ~IsoTask() {};61 IsoTask(IsoHandlerManager& manager, enum IsoHandler::EHandlerType); 62 virtual ~IsoTask(); 61 63 62 64 public: … … 65 67 66 68 /** 67 * requests the thread to sync it's stream map with the manager69 * @brief requests the thread to sync it's stream map with the manager 68 70 */ 69 71 bool requestShadowMapUpdate(); 72 enum eActivityResult { 73 eAR_Activity, 74 eAR_Timeout, 75 eAR_Interrupted, 76 eAR_Error 77 }; 78 79 /** 80 * @brief signals that something happened in one of the clients of this task 81 */ 82 void signalActivity(); 83 /** 84 * @brief wait until something happened in one of the clients of this task 85 */ 86 enum eActivityResult waitForActivity(); 70 87 71 88 void setVerboseLevel(int i); … … 92 109 #endif 93 110 111 // activity signaling 112 sem_t m_activity_semaphore; 113 114 enum IsoHandler::EHandlerType m_handlerType; 94 115 // debug stuff 95 116 DECLARE_DEBUG_MODULE; … … 110 131 class IsoHandlerManager 111 132 { 112 friend class Streaming::StreamProcessorManager;113 133 friend class IsoTask; 114 friend class IsoHandler;115 134 116 135 public: … … 138 157 bool disable(IsoHandler *); ///< disables a handler 139 158 bool enable(IsoHandler *); ///< enables a handler 159 160 /** 161 * @brief signals that something happened in one of the clients 162 */ 163 void signalActivityTransmit(); 164 void signalActivityReceive(); 165 140 166 ///> disables the handler attached to the stream 141 167 bool stopHandlerForStream(Streaming::StreamProcessor *); … … 157 183 Ieee1394Service& get1394Service() {return m_service;}; 158 184 159 protected:160 185 void requestShadowMapUpdate(); 161 186 … … 195 220 bool m_realtime; 196 221 int m_priority; 197 Util::Thread * m_IsoThread; 198 IsoTask * m_IsoTask; 222 Util::Thread * m_IsoThreadTransmit; 223 IsoTask * m_IsoTaskTransmit; 224 Util::Thread * m_IsoThreadReceive; 225 IsoTask * m_IsoTaskReceive; 199 226 200 227 // debug stuff trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp
r1001 r1005 40 40 #include <math.h> 41 41 42 #define SIGNAL_ACTIVITY { \ 43 pthread_mutex_lock(&m_activity_cond_lock); \ 44 pthread_cond_broadcast(&m_activity_cond); \ 45 pthread_mutex_unlock(&m_activity_cond_lock); \ 42 #define SIGNAL_ACTIVITY_SPM { \ 43 m_StreamProcessorManager.signalActivity(); \ 44 } 45 #define SIGNAL_ACTIVITY_ISO_XMIT { \ 46 m_IsoHandlerManager.signalActivityTransmit(); \ 47 } 48 #define SIGNAL_ACTIVITY_ISO_RECV { \ 49 m_IsoHandlerManager.signalActivityReceive(); \ 50 } 51 #define SIGNAL_ACTIVITY_ALL { \ 52 m_StreamProcessorManager.signalActivity(); \ 53 m_IsoHandlerManager.signalActivityTransmit(); \ 54 m_IsoHandlerManager.signalActivityReceive(); \ 46 55 } 47 56 … … 71 80 , m_sync_delay( 0 ) 72 81 , m_in_xrun( false ) 82 , m_min_ahead( 7999 ) 73 83 { 74 84 // create the timestamped buffer and register ourselves as its client 75 85 m_data_buffer = new Util::TimestampedBuffer(this); 76 pthread_mutex_init(&m_activity_cond_lock, NULL);77 pthread_cond_init(&m_activity_cond, NULL);78 86 } 79 87 … … 84 92 } 85 93 86 // lock the condition mutex to keep threads from blocking on87 // the condition var while we destroy it88 pthread_mutex_lock(&m_activity_cond_lock);89 // now signal activity, releasing threads that90 // are already blocking on the condition variable91 pthread_cond_broadcast(&m_activity_cond);92 // then destroy it93 pthread_cond_destroy(&m_activity_cond);94 pthread_mutex_unlock(&m_activity_cond_lock);95 96 // destroy the mutexes97 pthread_mutex_destroy(&m_activity_cond_lock);98 94 if (m_data_buffer) delete m_data_buffer; 99 95 if (m_scratch_buffer) delete[] m_scratch_buffer; … … 108 104 debugError("Failed to stop SP\n"); 109 105 } 110 SIGNAL_ACTIVITY ;106 SIGNAL_ACTIVITY_ALL; 111 107 } 112 108 … … 313 309 m_dropped += dropped_cycles; 314 310 m_last_cycle = cycle; 315 m_Parent.showDevice(); 316 // flushDebugOutput(); 317 // assert(0); 311 dumpInfo(); 318 312 } 319 313 } … … 481 475 } else if(result2 == eCRV_OK) { 482 476 // no problem here 483 SIGNAL_ACTIVITY; 477 // FIXME: cache the period size? 478 unsigned int periodsize = m_StreamProcessorManager.getPeriodSize(); 479 unsigned int bufferfill = m_data_buffer->getBufferFill(); 480 if(bufferfill >= periodsize) { 481 debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "signal activity, %d>%d\n", bufferfill, periodsize); 482 SIGNAL_ACTIVITY_SPM; 483 return RAW1394_ISO_DEFER; 484 } 484 485 return RAW1394_ISO_OK; 485 486 } else { … … 589 590 590 591 if(cycle_diff < 0 && (m_state == ePS_Running || m_state == ePS_DryRunning)) { 591 debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy)\n", 592 cycle, now_cycles); 592 unsigned int fc = m_data_buffer->getBufferFill(); 593 debugWarning("Requesting packet for cycle %04d which is in the past (now=%04dcy, fill=%u)\n", 594 cycle, now_cycles, fc); 593 595 if(m_state == ePS_Running) { 594 596 debugShowBackLogLines(200); … … 642 644 generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length); 643 645 return RAW1394_ISO_OK; 644 // FIXME: PP: I think this should be possible too646 // FIXME: PP: I think this should also be a possibility 645 647 //} else if (result == eCRV_EmptyPacket) { 646 648 // goto send_empty_packet; … … 687 689 enum eChildReturnValue result = generatePacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length); 688 690 if (result == eCRV_Packet || result == eCRV_Defer) { 689 debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 690 "XMIT: CY=%04u TS=%011llu\n", 691 cycle, m_last_timestamp); 691 int ahead = diffCycles(cycle, now_cycles); 692 if (ahead < m_min_ahead) m_min_ahead = ahead; 693 debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 694 "XMIT: CY=%04u TS=%011llu NOW_CY=%04u AHEAD=%04d\n", 695 cycle, m_last_timestamp, now_cycles, ahead); 692 696 // update some accounting 693 697 m_last_good_cycle = cycle; … … 804 808 } 805 809 } 806 807 debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 808 "XMIT EMPTY: CY=%04u\n", 809 cycle); 810 811 { // context to avoid ahead var clash 812 int ahead = diffCycles(cycle, now_cycles); 813 if (ahead < m_min_ahead) m_min_ahead = ahead; 814 debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 815 "XMIT EMPTY: CY=%04u, NOW_CY=%04u, AHEAD=%04d\n", 816 cycle, now_cycles, ahead); 817 } 818 810 819 generateEmptyPacketHeader(data, length, tag, sy, cycle, dropped_cycles, max_length); 811 820 generateEmptyPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length); 812 821 return RAW1394_ISO_OK; 813 822 } 814 815 823 816 824 // Frame Transfer API … … 823 831 bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) { 824 832 bool result; 825 debugOutputExtreme( DEBUG_LEVEL_VER Y_VERBOSE,826 " %p.getFrames(%d, %11llu)",827 nbframes, ts);833 debugOutputExtreme( DEBUG_LEVEL_VERBOSE, 834 "(%p, %s) getFrames(%d, %11llu)\n", 835 this, getTypeString(), nbframes, ts); 828 836 assert( getType() == ePT_Receive ); 829 837 if(isDryRunning()) result = getFramesDry(nbframes, ts); 830 838 else result = getFramesWet(nbframes, ts); 831 SIGNAL_ACTIVITY ;839 SIGNAL_ACTIVITY_ISO_RECV; 832 840 return result; 833 841 } … … 890 898 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d, %lld)\n", nbframes, ts); 891 899 result = m_data_buffer->dropFrames(nbframes); 892 SIGNAL_ACTIVITY ;900 SIGNAL_ACTIVITY_ISO_RECV; 893 901 return result; 894 902 } … … 897 905 { 898 906 bool result; 899 debugOutputExtreme( DEBUG_LEVEL_VERY_VERBOSE,900 "%p.putFrames(%d, %11llu)",901 nbframes, ts);907 debugOutputExtreme( DEBUG_LEVEL_VERBOSE, 908 "(%p, %s) putFrames(%d, %11llu)\n", 909 this, getTypeString(), nbframes, ts); 902 910 assert( getType() == ePT_Transmit ); 903 911 if(isDryRunning()) result = putFramesDry(nbframes, ts); 904 912 else result = putFramesWet(nbframes, ts); 905 SIGNAL_ACTIVITY ;913 SIGNAL_ACTIVITY_ISO_XMIT; 906 914 return result; 907 915 } … … 933 941 StreamProcessor::putSilenceFrames(unsigned int nbframes, int64_t ts) 934 942 { 935 debugOutput Extreme(DEBUG_LEVEL_ULTRA_VERBOSE,943 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, 936 944 "StreamProcessor::putSilenceFrames(%d, %llu)\n", 937 945 nbframes, ts); … … 955 963 } 956 964 957 SIGNAL_ACTIVITY ;965 SIGNAL_ACTIVITY_ISO_XMIT; 958 966 return true; 959 967 } … … 966 974 if(nbframes > 0) { 967 975 result = m_data_buffer->dropFrames(nbframes); 968 SIGNAL_ACTIVITY ;976 SIGNAL_ACTIVITY_ALL; 969 977 return result; 970 978 } else { … … 973 981 result &= m_data_buffer->writeDummyFrame(); 974 982 } 975 SIGNAL_ACTIVITY ;983 SIGNAL_ACTIVITY_ALL; 976 984 return result; 977 985 } … … 1135 1143 // wake up any threads that might be waiting on data in the buffers 1136 1144 // since a state transition can cause data to become available 1137 SIGNAL_ACTIVITY ;1145 SIGNAL_ACTIVITY_ALL; 1138 1146 return true; 1139 1147 } … … 1412 1420 } 1413 1421 #endif 1414 SIGNAL_ACTIVITY ;1422 SIGNAL_ACTIVITY_ALL; 1415 1423 return result; 1416 1424 } … … 1446 1454 } 1447 1455 #endif 1448 SIGNAL_ACTIVITY ;1456 SIGNAL_ACTIVITY_ALL; 1449 1457 return true; 1450 1458 } … … 1496 1504 } 1497 1505 #endif 1498 SIGNAL_ACTIVITY ;1506 SIGNAL_ACTIVITY_ALL; 1499 1507 return result; 1500 1508 } … … 1545 1553 } 1546 1554 #endif 1547 SIGNAL_ACTIVITY ;1555 SIGNAL_ACTIVITY_ALL; 1548 1556 return true; 1549 1557 } … … 1569 1577 this, m_last_cycle); 1570 1578 m_in_xrun = false; 1579 m_min_ahead = 7999; 1571 1580 m_local_node_id = m_1394service.getLocalNodeId() & 0x3f; 1572 1581 m_data_buffer->setTransparent(false); … … 1583 1592 } 1584 1593 #endif 1585 SIGNAL_ACTIVITY ;1594 SIGNAL_ACTIVITY_ALL; 1586 1595 return result; 1587 1596 } … … 1615 1624 } 1616 1625 #endif 1617 SIGNAL_ACTIVITY ;1626 SIGNAL_ACTIVITY_ALL; 1618 1627 return true; 1619 1628 } … … 1728 1737 debugError("Invalid state transition: %s => %s\n", 1729 1738 ePSToString(m_state), ePSToString(next_state)); 1730 SIGNAL_ACTIVITY ;1739 SIGNAL_ACTIVITY_ALL; 1731 1740 return false; 1732 1741 updateState_exit_change_failed: 1733 1742 debugError("State transition failed: %s => %s\n", 1734 1743 ePSToString(m_state), ePSToString(next_state)); 1735 SIGNAL_ACTIVITY ;1744 SIGNAL_ACTIVITY_ALL; 1736 1745 return false; 1737 }1738 1739 bool StreamProcessor::waitForProducePacket()1740 {1741 return waitForProduce(getNominalFramesPerPacket());1742 }1743 bool StreamProcessor::waitForProducePeriod()1744 {1745 return waitForProduce(m_StreamProcessorManager.getPeriodSize());1746 }1747 bool StreamProcessor::waitForProduce(unsigned int nframes)1748 {1749 debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,1750 "(%p, %s) wait ...\n",1751 this, getTypeString());1752 struct timespec ts;1753 int result;1754 int max_runs = 1000;1755 1756 if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {1757 debugError("clock_gettime failed\n");1758 return false;1759 }1760 1761 // FIXME: hardcoded timeout of 10 sec1762 // ts.tv_nsec += 1000 * 1000000LL;1763 // while (ts.tv_nsec > 1000000000LL) {1764 // ts.tv_sec += 1;1765 // ts.tv_nsec -= 1000000000LL;1766 // }1767 ts.tv_sec += 2;1768 1769 pthread_mutex_lock(&m_activity_cond_lock);1770 while(!canProduce(nframes) && max_runs) {1771 result = pthread_cond_timedwait(&m_activity_cond, &m_activity_cond_lock, &ts);1772 1773 if(result != 0) {1774 if (result == ETIMEDOUT) {1775 debugOutput(DEBUG_LEVEL_VERBOSE,1776 "(%p, %s) pthread_cond_timedwait() timed out (result=%d)\n",1777 this, getTypeString(), result);1778 pthread_mutex_unlock(&m_activity_cond_lock);1779 dumpInfo();1780 return false;1781 } else if (result == EINTR) {1782 debugOutput(DEBUG_LEVEL_VERBOSE,1783 "(%p, %s) pthread_cond_timedwait() interrupted by signal (result=%d)\n",1784 this, getTypeString(), result);1785 pthread_mutex_unlock(&m_activity_cond_lock);1786 dumpInfo();1787 return false;1788 } else {1789 debugError("(%p, %s) pthread_cond_timedwait error (result=%d)\n",1790 this, getTypeString(), result);1791 pthread_mutex_unlock(&m_activity_cond_lock);1792 dumpInfo();1793 return false;1794 }1795 }1796 }1797 pthread_mutex_unlock(&m_activity_cond_lock);1798 if(max_runs == 0) {1799 debugWarning("(%p) runaway loop\n");1800 }1801 return true;1802 }1803 1804 bool StreamProcessor::waitForConsumePacket()1805 {1806 return waitForConsume(getNominalFramesPerPacket());1807 }1808 bool StreamProcessor::waitForConsumePeriod()1809 {1810 return waitForConsume(m_StreamProcessorManager.getPeriodSize());1811 }1812 bool StreamProcessor::waitForConsume(unsigned int nframes)1813 {1814 debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,1815 "(%p, %s) wait ...\n",1816 this, getTypeString());1817 struct timespec ts;1818 int result;1819 1820 int max_runs = 1000;1821 1822 if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {1823 debugError("clock_gettime failed\n");1824 return false;1825 }1826 1827 // FIXME: hardcoded timeout of 10 sec1828 // ts.tv_nsec += 1000 * 1000000LL;1829 // while (ts.tv_nsec > 1000000000LL) {1830 // ts.tv_sec += 1;1831 // ts.tv_nsec -= 1000000000LL;1832 // }1833 ts.tv_sec += 2;1834 1835 pthread_mutex_lock(&m_activity_cond_lock);1836 while(!canConsume(nframes) && max_runs) {1837 result = pthread_cond_timedwait(&m_activity_cond, &m_activity_cond_lock, &ts);1838 if(result != 0) {1839 if (result == ETIMEDOUT) {1840 debugOutput(DEBUG_LEVEL_VERBOSE,1841 "(%p, %s) pthread_cond_timedwait() timed out (result=%d)\n",1842 this, getTypeString(), result);1843 pthread_mutex_unlock(&m_activity_cond_lock);1844 dumpInfo();1845 return false;1846 } else if (result == EINTR) {1847 debugOutput(DEBUG_LEVEL_VERBOSE,1848 "(%p, %s) pthread_cond_timedwait() interrupted by signal (result=%d)\n",1849 this, getTypeString(), result);1850 pthread_mutex_unlock(&m_activity_cond_lock);1851 dumpInfo();1852 return false;1853 } else {1854 debugError("(%p, %s) pthread_cond_timedwait error (result=%d)\n",1855 this, getTypeString(), result);1856 pthread_mutex_unlock(&m_activity_cond_lock);1857 dumpInfo();1858 return false;1859 }1860 }1861 max_runs--;1862 }1863 pthread_mutex_unlock(&m_activity_cond_lock);1864 1865 if(max_runs == 0) {1866 debugWarning("(%p) runaway loop\n");1867 }1868 1869 debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,1870 "(%p, %s) leave ...\n",1871 this, getTypeString());1872 return true;1873 1746 } 1874 1747 … … 1885 1758 if(m_in_xrun) return true; 1886 1759 if(m_state == ePS_Running && m_next_state == ePS_Running) { 1887 1888 if(getType() == ePT_Transmit) { 1889 // can we put a certain amount of frames into the buffer? 1890 unsigned int bufferspace = m_data_buffer->getBufferSpace(); 1891 if(bufferspace >= nframes) { 1892 return true; 1893 } else return false; 1894 } else { 1895 // do we still have to put frames in the buffer? 1896 unsigned int bufferfill = m_data_buffer->getBufferFill(); 1897 unsigned int periodsize = m_StreamProcessorManager.getPeriodSize(); 1898 if (bufferfill > periodsize) return false; 1899 else return true; 1900 } 1901 1902 1760 // can we put a certain amount of frames into the buffer? 1761 unsigned int bufferspace = m_data_buffer->getBufferSpace(); 1762 if(bufferspace >= nframes) { 1763 return true; 1764 } else return false; 1903 1765 } else { 1904 1766 if(getType() == ePT_Transmit) { … … 1947 1809 * Helper routines * 1948 1810 ***********************************************/ 1811 // FIXME: I think this can be removed and replaced by putSilenceFrames 1949 1812 bool 1950 1813 StreamProcessor::transferSilence(unsigned int nframes) 1951 1814 { 1952 1815 bool retval; 1816 1817 #ifdef DEBUG 1953 1818 signed int fc; 1954 1819 ffado_timestamp_t ts_tail_tmp; 1820 m_data_buffer->getBufferTailTimestamp(&ts_tail_tmp, &fc); 1821 if (fc != 0) { 1822 debugWarning("Prefilling a buffer that already contains %d frames\n", fc); 1823 } 1824 #endif 1955 1825 1956 1826 // prepare a buffer of silence 1957 1827 char *dummybuffer = (char *)calloc(getEventSize(), nframes * getEventsPerFrame()); 1958 1828 transmitSilenceBlock(dummybuffer, nframes, 0); 1959 1960 m_data_buffer->getBufferTailTimestamp(&ts_tail_tmp, &fc);1961 if (fc != 0) {1962 debugWarning("Prefilling a buffer that already contains %d frames\n", fc);1963 }1964 1829 1965 1830 // add the silence data to the ringbuffer … … 1970 1835 retval = false; 1971 1836 } 1837 1972 1838 free(dummybuffer); 1973 1839 return retval; … … 2023 1889 (unsigned int)TICKS_TO_CYCLES(now), 2024 1890 (unsigned int)TICKS_TO_OFFSET(now)); 1891 if(getType() == ePT_Transmit) { 1892 debugOutputShort( DEBUG_LEVEL_NORMAL, " Min ISOXMT bufferfill : %04d\n", m_min_ahead); 1893 } 2025 1894 debugOutputShort( DEBUG_LEVEL_NORMAL, " Xrun? : %s\n", (m_in_xrun ? "True":"False")); 2026 1895 if (m_state == m_next_state) { trunk/libffado/src/libstreaming/generic/StreamProcessor.h
r1001 r1005 163 163 bool putFrames(unsigned int nbframes, int64_t ts); ///< transfer the client contents to the buffer 164 164 165 //FIXME: document wait functions166 bool waitForProducePacket();167 bool waitForProducePeriod();168 bool waitForProduce(unsigned int nframes);169 170 bool waitForConsumePacket();171 bool waitForConsumePeriod();172 bool waitForConsume(unsigned int nframes);173 174 165 bool canProducePacket(); 175 166 bool canProducePeriod(); … … 476 467 private: 477 468 bool m_in_xrun; 478 pthread_mutex_t m_activity_cond_lock;479 pthread_cond_t m_activity_cond;480 469 481 470 public: … … 488 477 const char *getTypeString() 489 478 {return ePTToString(getType());}; 490 StreamStatistics m_PacketStat; 491 StreamStatistics m_PeriodStat;492 StreamStatistics m_WakeupStat; 479 480 int m_min_ahead; // DEBUG 481 493 482 DECLARE_DEBUG_MODULE; 494 483 }; trunk/libffado/src/libstreaming/StreamProcessorManager.cpp
r1001 r1005 42 42 , m_SyncSource(NULL) 43 43 , m_xrun_happened( false ) 44 , m_activity_wait_timeout_usec( 1000*1000 ) 44 45 , m_nb_buffers( 0 ) 45 46 , m_period( 0 ) … … 51 52 { 52 53 addOption(Util::OptionContainer::Option("slaveMode",false)); 54 sem_init(&m_activity_semaphore, 0, 0); 53 55 } 54 56 … … 57 59 , m_SyncSource(NULL) 58 60 , m_xrun_happened( false ) 61 , m_activity_wait_timeout_usec( 1000*1000 ) 59 62 , m_nb_buffers(nb_buffers) 60 63 , m_period(period) … … 66 69 { 67 70 addOption(Util::OptionContainer::Option("slaveMode",false)); 71 sem_init(&m_activity_semaphore, 0, 0); 68 72 } 69 73 70 74 StreamProcessorManager::~StreamProcessorManager() { 75 sem_post(&m_activity_semaphore); 76 sem_destroy(&m_activity_semaphore); 71 77 } 72 78 … … 100 106 101 107 m_WaitLock.Unlock(); 108 } 109 110 void 111 StreamProcessorManager::signalActivity() 112 { 113 sem_post(&m_activity_semaphore); 114 debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p activity\n", this); 115 } 116 117 enum StreamProcessorManager::eActivityResult 118 StreamProcessorManager::waitForActivity() 119 { 120 debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p waiting for activity\n", this); 121 struct timespec ts; 122 int result; 123 124 if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { 125 debugError("clock_gettime failed\n"); 126 return eAR_Error; 127 } 128 long long int timeout_nsec=0; 129 int timeout_sec = 0; 130 if (m_activity_wait_timeout_usec >= 0) { 131 timeout_nsec = m_activity_wait_timeout_usec * 1000LL; 132 timeout_sec = 0; 133 while(timeout_nsec >= 1000000000LL) { 134 timeout_sec += 1; 135 timeout_nsec -= 1000000000LL; 136 } 137 ts.tv_nsec += timeout_nsec; 138 ts.tv_sec += timeout_sec; 139 } 140 141 if (m_activity_wait_timeout_usec >= 0) { 142 result = sem_timedwait(&m_activity_semaphore, &ts); 143 } else { 144 result = sem_wait(&m_activity_semaphore); 145 } 146 147 if(result != 0) { 148 if (result == ETIMEDOUT) { 149 debugOutput(DEBUG_LEVEL_VERBOSE, 150 "(%p) pthread_cond_timedwait() timed out (result=%d)\n", 151 this, result); 152 return eAR_Timeout; 153 } else if (result == EINTR) { 154 debugOutput(DEBUG_LEVEL_VERBOSE, 155 "(%p) pthread_cond_[timed]wait() interrupted by signal (result=%d)\n", 156 this, result); 157 return eAR_Interrupted; 158 } else { 159 debugError("(%p) pthread_cond_[timed]wait error (result=%d)\n", 160 this, result); 161 debugError("(%p) timeout_sec=%d timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n", 162 this, timeout_sec, timeout_nsec, ts.tv_sec, ts.tv_nsec); 163 return eAR_Error; 164 } 165 } 166 167 debugOutputExtreme(DEBUG_LEVEL_VERBOSE,"%p got activity\n", this); 168 return eAR_Activity; 102 169 } 103 170 … … 545 612 }; 546 613 614 // before we do anything else, transfer 615 if(!transferSilence()) { 616 debugError("Could not transfer silence\n"); 617 return false; 618 } 619 620 // now calculate the stream offset 547 621 i = 0; 548 622 for ( i = 0; i < nb_rcv_sp; i++) { … … 558 632 } 559 633 560 if(!transferSilence()) {561 debugError("Could not transfer silence\n");562 return false;563 }564 634 nb_sync_runs--; 565 635 } … … 806 876 807 877 while(period_not_ready) { 808 debugOutputExtreme(DEBUG_LEVEL_VER Y_VERBOSE,878 debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 809 879 "waiting for period (%d frames in buffer)...\n", 810 880 m_SyncSource->getBufferFill()); 811 bool result; 812 if(m_SyncSource->getType() == StreamProcessor::ePT_Receive) { 813 result = m_SyncSource->waitForConsumePeriod(); 814 } else { 815 result = m_SyncSource->waitForProducePeriod(); 816 } 817 // if(!result) { 818 // debugError("Error waiting for signal\n"); 819 // return false; 820 // } 881 882 // wait for something to happen 883 switch(waitForActivity()) { 884 case eAR_Error: 885 debugError("Error while waiting for activity\n"); 886 return false; 887 case eAR_Interrupted: 888 // FIXME: what to do here? 889 debugWarning("Interrupted while waiting for activity\n"); 890 break; 891 case eAR_Timeout: 892 // FIXME: what to do here? 893 debugWarning("Timeout while waiting for activity\n"); 894 break; 895 case eAR_Activity: 896 // do nothing 897 break; 898 } 899 debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "got activity...\n"); 821 900 822 901 // HACK: this should be solved more elegantly … … 838 917 } 839 918 } 919 debugOutputExtreme(DEBUG_LEVEL_VERBOSE, " period not ready? %d...\n", period_not_ready); 920 840 921 // check for underruns on the ISO side, 841 922 // those should make us bail out of the wait loop … … 885 966 m_time_of_transfer2 = m_time_of_transfer; 886 967 #endif 887 888 debugOutputExtreme( DEBUG_LEVEL_VER Y_VERBOSE,968 969 debugOutputExtreme( DEBUG_LEVEL_VERBOSE, 889 970 "transfer at %llu ticks...\n", 890 971 m_time_of_transfer); … … 1027 1108 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n"); 1028 1109 bool retval=true; 1110 // NOTE: the order here is opposite from the order in 1111 // normal operation (transmit is before receive), because 1112 // we can do that here (data=silence=available) and 1113 // it increases reliability (esp. on startup) 1114 retval &= transferSilence(StreamProcessor::ePT_Transmit); 1029 1115 retval &= transferSilence(StreamProcessor::ePT_Receive); 1030 retval &= transferSilence(StreamProcessor::ePT_Transmit);1031 1116 return retval; 1032 1117 } trunk/libffado/src/libstreaming/StreamProcessorManager.h
r967 r1005 69 69 bool startDryRunning(); 70 70 bool syncStartAll(); 71 // activity signaling 72 enum eActivityResult { 73 eAR_Activity, 74 eAR_Timeout, 75 eAR_Interrupted, 76 eAR_Error 77 }; 78 void signalActivity(); 79 enum eActivityResult waitForActivity(); 71 80 72 81 // this is the setup API … … 139 148 {return *m_SyncSource;}; 140 149 141 protected: 150 protected: // FIXME: private? 142 151 143 // thread sync primitives152 // thread related vars 144 153 bool m_xrun_happened; 154 int m_activity_wait_timeout_usec; 145 155 bool m_thread_realtime; 146 156 int m_thread_priority; 157 158 // activity signaling 159 sem_t m_activity_semaphore; 147 160 148 161 // processor list