Changeset 1005 for trunk/libffado/src/libieee1394/IsoHandlerManager.cpp
- Timestamp:
- 04/21/08 01:27:47 (16 years ago)
- Files:
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/libffado/src/libieee1394/IsoHandlerManager.cpp
r993 r1005 25 25 #include "IsoHandlerManager.h" 26 26 #include "ieee1394service.h" 27 #include "IsoHandler.h"28 27 #include "libstreaming/generic/StreamProcessor.h" 29 28 … … 42 41 // --- ISO Thread --- // 43 42 44 IsoTask::IsoTask(IsoHandlerManager& manager )43 IsoTask::IsoTask(IsoHandlerManager& manager, enum IsoHandler::EHandlerType t) 45 44 : m_manager( manager ) 46 45 , m_SyncIsoHandler ( NULL ) 47 { 46 , m_handlerType( t ) 47 { 48 } 49 50 IsoTask::~IsoTask() 51 { 52 sem_destroy(&m_activity_semaphore); 48 53 } 49 54 … … 65 70 #endif 66 71 72 sem_init(&m_activity_semaphore, 0, 0); 67 73 return true; 68 74 } … … 89 95 IsoHandler *h = m_manager.m_IsoHandlers.at(i); 90 96 assert(h); 97 98 // skip the handlers not intended for us 99 if(h->getType() != m_handlerType) continue; 91 100 92 101 if (h->isEnabled()) { … … 129 138 { 130 139 debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 131 "(%p) Execute\n", this); 140 "(%p, %s) Execute\n", 141 this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 132 142 int err; 133 143 unsigned int i; … … 139 149 if(diff < 100) { 140 150 debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 141 "(%p) short loop detected (%d usec), cnt: %d\n", 142 this, diff, m_successive_short_loops); 151 "(%p, %s) short loop detected (%d usec), cnt: %d\n", 152 this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"), 153 diff, m_successive_short_loops); 143 154 m_successive_short_loops++; 144 if(m_successive_short_loops > 100 ) {155 if(m_successive_short_loops > 10000) { 145 156 debugError("Shutting down runaway thread\n"); 146 157 return false; … … 163 174 if (m_poll_nfds_shadow == 0) { 164 175 debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 165 "(%p ) bypass iterate since no handlers to poll\n",166 this );176 "(%p, %s) bypass iterate since no handlers to poll\n", 177 this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 167 178 usleep(m_poll_timeout * 1000); 168 179 return true; … … 179 190 short events = 0; 180 191 IsoHandler *h = m_IsoHandler_map_shadow[i]; 181 if(h->getType() == IsoHandler::eHT_Transmit) { 182 // we should only poll on a transmit handler 183 // that has a client that is ready to send 184 // something. Otherwise it will end up in 185 // busy wait looping since the packet function 186 // will defer processing (also avoids the 187 // AGAIN problem) 188 if (h->tryWaitForClient()) { 189 events = POLLIN | POLLPRI; 190 no_one_to_poll = false; 191 } 192 } else { 193 // a receive handler should only be polled if 194 // it's client doesn't already have enough data 195 // and if it can still accept data. 196 if (h->tryWaitForClient()) { // FIXME 197 events = POLLIN | POLLPRI; 198 no_one_to_poll = false; 199 } 192 // we should only poll on a transmit handler 193 // that has a client that is ready to send 194 // something. Otherwise it will end up in 195 // busy wait looping since the packet function 196 // will defer processing (also avoids the 197 // AGAIN problem) 198 if (h->canIterateClient()) { 199 events = POLLIN | POLLPRI; 200 no_one_to_poll = false; 201 // if we are going to poll() it, let's ensure 202 // it can run until someone wants it to exit 203 h->allowIterateLoop(); 200 204 } 201 205 m_poll_fds_shadow[i].events = events; … … 203 207 204 208 if(no_one_to_poll) { 205 debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 206 "(%p) No one to poll, waiting on the sync handler to become ready\n", 207 this); 208 209 if(!m_SyncIsoHandler->waitForClient()) { 210 debugError("Failed to wait for client\n"); 211 // This can be due to error or due to timeout 212 213 // sleep for a while 214 usleep(m_poll_timeout * 1000); // FIXME 215 // exit this iteration loop 216 return true; 217 } 218 219 #ifdef DEBUG 220 // if this happens we end up in a deadlock! 221 if(!m_SyncIsoHandler->tryWaitForClient()) { 222 debugFatal("inconsistency in wait functions!\n"); 223 return false; 224 } 225 #endif 226 227 debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 228 "(%p) sync handler ready\n", 229 this); 209 debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 210 "(%p, %s) No one to poll, waiting for something to happen\n", 211 this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 212 // wait for something to happen 213 switch(waitForActivity()) { 214 case IsoTask::eAR_Error: 215 debugError("Error while waiting for activity\n"); 216 return false; 217 case IsoTask::eAR_Interrupted: 218 // FIXME: what to do here? 219 debugWarning("Interrupted while waiting for activity\n"); 220 break; 221 case IsoTask::eAR_Timeout: 222 // FIXME: what to do here? 223 debugWarning("Timeout while waiting for activity\n"); 224 break; 225 case IsoTask::eAR_Activity: 226 // do nothing 227 debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 228 "(%p, %s) something happened\n", 229 this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 230 break; 231 } 230 232 } 231 233 } 232 234 233 235 // Use a shadow map of the fd's such that we don't have to update 234 // the fd map everytime we run poll(). It doesn't change that much 235 // anyway 236 // the fd map everytime we run poll(). 236 237 err = poll (m_poll_fds_shadow, m_poll_nfds_shadow, m_poll_timeout); 237 238 238 239 if (err < 0) { 239 240 if (errno == EINTR) { 241 debugOutput(DEBUG_LEVEL_VERBOSE, "Ignoring poll return due to signal\n"); 240 242 return true; 241 243 } … … 247 249 #ifdef DEBUG 248 250 if(m_poll_fds_shadow[i].revents) { 249 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, 250 "(%p) received events: %08X for (%d/%d, %p, %s)\n", 251 this, m_poll_fds_shadow[i].revents, 251 debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, 252 "(%p, %s) received events: %08X for (%d/%d, %p, %s)\n", 253 this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"), 254 m_poll_fds_shadow[i].revents, 252 255 i, m_poll_nfds_shadow, 253 256 m_IsoHandler_map_shadow[i], … … 283 286 } 284 287 return true; 285 288 } 289 290 enum IsoTask::eActivityResult 291 IsoTask::waitForActivity() 292 { 293 debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 294 "(%p, %s) waiting for activity\n", 295 this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 296 struct timespec ts; 297 int result; 298 299 if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { 300 debugError("clock_gettime failed\n"); 301 return eAR_Error; 302 } 303 long long int timeout_nsec=0; 304 int timeout_sec = 0; 305 306 timeout_nsec = ISOHANDLERMANAGER_ISO_TASK_WAIT_TIMEOUT_USECS * 1000LL; 307 timeout_sec = 0; 308 while(timeout_nsec >= 1000000000LL) { 309 timeout_sec += 1; 310 timeout_nsec -= 1000000000LL; 311 } 312 ts.tv_nsec += timeout_nsec; 313 ts.tv_sec += timeout_sec; 314 315 result = sem_timedwait(&m_activity_semaphore, &ts); 316 317 if(result != 0) { 318 if (result == ETIMEDOUT) { 319 debugOutput(DEBUG_LEVEL_VERBOSE, 320 "(%p) pthread_cond_timedwait() timed out (result=%d)\n", 321 this, result); 322 return eAR_Timeout; 323 } else if (result == EINTR) { 324 debugOutput(DEBUG_LEVEL_VERBOSE, 325 "(%p) pthread_cond_[timed]wait() interrupted by signal (result=%d)\n", 326 this, result); 327 return eAR_Interrupted; 328 } else { 329 debugError("(%p) pthread_cond_[timed]wait error (result=%d)\n", 330 this, result); 331 debugError("(%p) timeout_sec=%d timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n", 332 this, timeout_sec, timeout_nsec, ts.tv_sec, ts.tv_nsec); 333 return eAR_Error; 334 } 335 } 336 337 debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 338 "(%p, %s) got activity\n", 339 this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 340 return eAR_Activity; 341 } 342 343 void 344 IsoTask::signalActivity() 345 { 346 // signal the activity cond var 347 sem_post(&m_activity_semaphore); 348 debugOutputExtreme(DEBUG_LEVEL_VERBOSE, 349 "(%p, %s) activity\n", 350 this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive")); 286 351 } 287 352 … … 295 360 , m_service( service ) 296 361 , m_realtime(false), m_priority(0) 297 , m_IsoThread ( NULL ) 298 , m_IsoTask ( NULL ) 299 {} 362 , m_IsoThreadTransmit ( NULL ) 363 , m_IsoTaskTransmit ( NULL ) 364 , m_IsoThreadReceive ( NULL ) 365 , m_IsoTaskReceive ( NULL ) 366 { 367 } 300 368 301 369 IsoHandlerManager::IsoHandlerManager(Ieee1394Service& service, bool run_rt, int rt_prio) … … 303 371 , m_service( service ) 304 372 , m_realtime(run_rt), m_priority(rt_prio) 305 , m_IsoThread ( NULL ) 306 , m_IsoTask ( NULL ) 307 {} 373 , m_IsoThreadTransmit ( NULL ) 374 , m_IsoTaskTransmit ( NULL ) 375 , m_IsoThreadReceive ( NULL ) 376 , m_IsoTaskReceive ( NULL ) 377 { 378 } 308 379 309 380 IsoHandlerManager::~IsoHandlerManager() … … 314 385 debugError("Still some handlers in use\n"); 315 386 } 316 if (m_IsoThread) { 317 m_IsoThread->Stop(); 318 delete m_IsoThread; 319 } 320 if (m_IsoTask) { 321 delete m_IsoTask; 387 if (m_IsoThreadTransmit) { 388 m_IsoThreadTransmit->Stop(); 389 delete m_IsoThreadTransmit; 390 } 391 if (m_IsoThreadReceive) { 392 m_IsoThreadReceive->Stop(); 393 delete m_IsoThreadReceive; 394 } 395 if (m_IsoTaskTransmit) { 396 delete m_IsoTaskTransmit; 397 } 398 if (m_IsoTaskReceive) { 399 delete m_IsoTaskReceive; 322 400 } 323 401 } … … 326 404 IsoHandlerManager::requestShadowMapUpdate() 327 405 { 328 if(m_IsoTask) m_IsoTask->requestShadowMapUpdate(); 406 if(m_IsoTaskTransmit) m_IsoTaskTransmit->requestShadowMapUpdate(); 407 if(m_IsoTaskReceive) m_IsoTaskReceive->requestShadowMapUpdate(); 329 408 } 330 409 … … 336 415 m_priority = priority; 337 416 338 if (m_IsoThread ) {417 if (m_IsoThreadTransmit) { 339 418 if (m_realtime) { 340 m_IsoThread->AcquireRealTime(m_priority); 419 m_IsoThreadTransmit->AcquireRealTime(m_priority 420 + ISOHANDLERMANAGER_ISO_PRIO_INCREASE 421 + ISOHANDLERMANAGER_ISO_PRIO_INCREASE_XMIT); 341 422 } else { 342 m_IsoThread->DropRealTime(); 423 m_IsoThreadTransmit->DropRealTime(); 424 } 425 } 426 if (m_IsoThreadReceive) { 427 if (m_realtime) { 428 m_IsoThreadReceive->AcquireRealTime(m_priority 429 + ISOHANDLERMANAGER_ISO_PRIO_INCREASE 430 + ISOHANDLERMANAGER_ISO_PRIO_INCREASE_RECV); 431 } else { 432 m_IsoThreadReceive->DropRealTime(); 343 433 } 344 434 } … … 356 446 } 357 447 358 // create a threadto iterate our ISO handlers359 debugOutput( DEBUG_LEVEL_VERBOSE, "Create iso thread for %p ...\n", this);360 m_IsoTask = new IsoTask( *this);361 if(!m_IsoTask ) {448 // create threads to iterate our ISO handlers 449 debugOutput( DEBUG_LEVEL_VERBOSE, "Create iso thread for %p transmit...\n", this); 450 m_IsoTaskTransmit = new IsoTask( *this, IsoHandler::eHT_Transmit ); 451 if(!m_IsoTaskTransmit) { 362 452 debugFatal("No task\n"); 363 453 return false; 364 454 } 365 m_IsoThread = new Util::PosixThread(m_IsoTask, m_realtime, 366 m_priority + ISOHANDLERMANAGER_ISO_PRIO_INCREASE, 367 PTHREAD_CANCEL_DEFERRED); 368 369 if(!m_IsoThread) { 455 m_IsoThreadTransmit = new Util::PosixThread(m_IsoTaskTransmit, m_realtime, 456 m_priority + ISOHANDLERMANAGER_ISO_PRIO_INCREASE 457 + ISOHANDLERMANAGER_ISO_PRIO_INCREASE_XMIT, 458 PTHREAD_CANCEL_DEFERRED); 459 460 if(!m_IsoThreadTransmit) { 461 debugFatal("No thread\n"); 462 return false; 463 } 464 465 debugOutput( DEBUG_LEVEL_VERBOSE, "Create iso thread for %p receive...\n", this); 466 m_IsoTaskReceive = new IsoTask( *this, IsoHandler::eHT_Receive ); 467 if(!m_IsoTaskReceive) { 468 debugFatal("No task\n"); 469 return false; 470 } 471 m_IsoThreadReceive = new Util::PosixThread(m_IsoTaskReceive, m_realtime, 472 m_priority + ISOHANDLERMANAGER_ISO_PRIO_INCREASE 473 + ISOHANDLERMANAGER_ISO_PRIO_INCREASE_RECV, 474 PTHREAD_CANCEL_DEFERRED); 475 476 if(!m_IsoThreadReceive) { 370 477 debugFatal("No thread\n"); 371 478 return false; … … 374 481 Util::Watchdog *watchdog = m_service.getWatchdog(); 375 482 if(watchdog) { 376 if(!watchdog->registerThread(m_IsoThread)) { 377 debugWarning("could not register iso thread with watchdog\n"); 483 if(!watchdog->registerThread(m_IsoThreadTransmit)) { 484 debugWarning("could not register iso transmit thread with watchdog\n"); 485 } 486 if(!watchdog->registerThread(m_IsoThreadReceive)) { 487 debugWarning("could not register iso receive thread with watchdog\n"); 378 488 } 379 489 } else { … … 381 491 } 382 492 383 if (m_IsoThread->Start() != 0) { 384 debugFatal("Could not start ISO thread\n"); 493 if (m_IsoThreadTransmit->Start() != 0) { 494 debugFatal("Could not start ISO Transmit thread\n"); 495 return false; 496 } 497 if (m_IsoThreadReceive->Start() != 0) { 498 debugFatal("Could not start ISO Receive thread\n"); 385 499 return false; 386 500 } … … 401 515 if ((*it) == h) { 402 516 result = h->disable(); 403 result &= m_IsoTask->requestShadowMapUpdate(); 517 if(h->getType() == IsoHandler::eHT_Transmit) { 518 result &= m_IsoTaskTransmit->requestShadowMapUpdate(); 519 } else { 520 result &= m_IsoTaskReceive->requestShadowMapUpdate(); 521 } 404 522 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " disabled\n"); 405 523 return result; … … 422 540 if ((*it) == h) { 423 541 result = h->enable(); 424 result &= m_IsoTask->requestShadowMapUpdate(); 542 if(h->getType() == IsoHandler::eHT_Transmit) { 543 result &= m_IsoTaskTransmit->requestShadowMapUpdate(); 544 } else { 545 result &= m_IsoTaskReceive->requestShadowMapUpdate(); 546 } 425 547 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " enabled\n"); 426 548 return result; … … 430 552 debugError("Handler not found\n"); 431 553 return false; 554 } 555 556 void 557 IsoHandlerManager::signalActivityTransmit() 558 { 559 assert(m_IsoTaskTransmit); 560 m_IsoTaskTransmit->signalActivity(); 561 } 562 563 void 564 IsoHandlerManager::signalActivityReceive() 565 { 566 assert(m_IsoTaskReceive); 567 m_IsoTaskReceive->signalActivity(); 432 568 } 433 569 … … 438 574 handler->setVerboseLevel(getDebugLevel()); 439 575 m_IsoHandlers.push_back(handler); 440 return m_IsoTask->requestShadowMapUpdate(); 576 requestShadowMapUpdate(); 577 return true; 441 578 } 442 579 … … 452 589 if ( *it == handler ) { 453 590 m_IsoHandlers.erase(it); 454 return m_IsoTask->requestShadowMapUpdate(); 591 requestShadowMapUpdate(); 592 return true; 455 593 } 456 594 } … … 690 828 return false; 691 829 } 692 if(!m_IsoTask->requestShadowMapUpdate()) { 830 bool result; 831 if((*it)->getType() == IsoHandler::eHT_Transmit) { 832 result = m_IsoTaskTransmit->requestShadowMapUpdate(); 833 } else { 834 result = m_IsoTaskReceive->requestShadowMapUpdate(); 835 } 836 if(!result) { 693 837 debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it); 694 838 return false; … … 751 895 return false; 752 896 } 753 if(!m_IsoTask->requestShadowMapUpdate()) { 897 bool result; 898 if((*it)->getType() == IsoHandler::eHT_Transmit) { 899 result = m_IsoTaskTransmit->requestShadowMapUpdate(); 900 } else { 901 result = m_IsoTaskReceive->requestShadowMapUpdate(); 902 } 903 if(!result) { 754 904 debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it); 755 905 return false; … … 782 932 retval=false; 783 933 } 784 if(!m_IsoTask->requestShadowMapUpdate()) { 934 bool result; 935 if((*it)->getType() == IsoHandler::eHT_Transmit) { 936 result = m_IsoTaskTransmit->requestShadowMapUpdate(); 937 } else { 938 result = m_IsoTaskReceive->requestShadowMapUpdate(); 939 } 940 if(!result) { 785 941 debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it); 786 ret val=false;942 return false; 787 943 } 788 944 } … … 816 972 (*it)->setVerboseLevel(i); 817 973 } 818 if(m_IsoThread) m_IsoThread->setVerboseLevel(i); 819 if(m_IsoTask) m_IsoTask->setVerboseLevel(i); 974 if(m_IsoThreadTransmit) m_IsoThreadTransmit->setVerboseLevel(i); 975 if(m_IsoTaskTransmit) m_IsoTaskTransmit->setVerboseLevel(i); 976 if(m_IsoThreadReceive) m_IsoThreadReceive->setVerboseLevel(i); 977 if(m_IsoTaskReceive) m_IsoTaskReceive->setVerboseLevel(i); 820 978 } 821 979