Changeset 938
- Timestamp:
- 03/12/08 04:48:37 (13 years ago)
- Files:
-
- trunk/libffado/config.h.in (modified) (1 diff)
- trunk/libffado/src/libieee1394/IsoHandlerManager.cpp (modified) (22 diffs)
- trunk/libffado/src/libieee1394/IsoHandlerManager.h (modified) (4 diffs)
- trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/libffado/config.h.in
r936 r938 67 67 #define ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT 16 68 68 #define ISOHANDLERMANAGER_MAX_STREAMS_PER_ISOTHREAD 16 69 // the transmit thread should run at elevated priority since it 70 // should push packets into the kernel ASAP 71 #define ISOHANDLERMANAGER_TRANSMIT_PRIO_INCREASE 1 72 // the receive thread should have lower priority than the transmit one, since 73 // it should be interrupted once it has flagged the semaphore that indicates 74 // one period of frames. 75 #define ISOHANDLERMANAGER_RECEIVE_PRIO_INCREASE -6 69 70 // Ideally the audio processing will be driven by this thread 71 #define ISOHANDLERMANAGER_ISO_PRIO_INCREASE 1 76 72 77 73 // allows to add some processing margin. This shifts the time trunk/libffado/src/libieee1394/IsoHandlerManager.cpp
r937 r938 43 43 // --- ISO Thread --- // 44 44 45 IsoTask::IsoTask(IsoHandlerManager& manager , enum IsoTask::eTaskType t)45 IsoTask::IsoTask(IsoHandlerManager& manager) 46 46 : m_manager( manager ) 47 , m_ type( t)47 , m_SyncIsoHandler ( NULL ) 48 48 { 49 49 } … … 66 66 IsoTask::requestShadowMapUpdate() 67 67 { 68 debugOutput(DEBUG_LEVEL_VERBOSE, " enter\n");68 debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) enter\n", this); 69 69 INC_ATOMIC(&request_update); 70 70 return true; … … 73 73 // updates the internal stream map 74 74 // note that this should be executed with the guarantee that 75 // nobody will modify 75 // nobody will modify the parent data structures 76 76 void 77 77 IsoTask::updateShadowMapHelper() 78 78 { 79 debugOutput( DEBUG_LEVEL_VERBOSE, " updating shadow vars...\n");79 debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) updating shadow vars...\n", this); 80 80 unsigned int i, cnt, max; 81 81 max = m_manager.m_IsoHandlers.size(); 82 m_SyncIsoHandler = NULL; 82 83 for (i = 0, cnt = 0; i < max; i++) { 83 84 IsoHandler *h = m_manager.m_IsoHandlers.at(i); 84 85 assert(h); 85 86 // skip handlers of the wrong type87 if (h->getType() == IsoHandler::eHT_Receive && m_type == eTT_Transmit) continue;88 if (h->getType() == IsoHandler::eHT_Transmit && m_type == eTT_Receive) continue;89 86 90 87 if (h->isEnabled()) { … … 94 91 m_poll_fds_shadow[cnt].events = POLLIN; 95 92 cnt++; 96 debugOutput( DEBUG_LEVEL_VERBOSE, "%s handler %p added\n", h->getTypeString(), h); 93 // FIXME: need a more generic approach here 94 if( m_SyncIsoHandler == NULL 95 && h->getType() == IsoHandler::eHT_Transmit) { 96 m_SyncIsoHandler = h; 97 } 98 99 debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) %s handler %p added\n", 100 this, h->getTypeString(), h); 97 101 } else { 98 debugOutput( DEBUG_LEVEL_VERBOSE, "%s handler %p skipped (disabled)\n", h->getTypeString(), h); 102 debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) %s handler %p skipped (disabled)\n", 103 this, h->getTypeString(), h); 99 104 } 100 105 if(cnt > ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT) { … … 103 108 } 104 109 } 110 111 // FIXME: need a more generic approach here 112 // if there are no active transmit handlers, 113 // use the first receive handler 114 if( m_SyncIsoHandler == NULL 115 && m_poll_nfds_shadow) { 116 m_SyncIsoHandler = m_IsoHandler_map_shadow[0]; 117 } 105 118 m_poll_nfds_shadow = cnt; 106 debugOutput( DEBUG_LEVEL_VERBOSE, " updated shadow vars...\n");119 debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) updated shadow vars...\n", this); 107 120 } 108 121 … … 111 124 { 112 125 debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 113 "(%p, %s) Execute\n", 114 this, m_type==eTT_Transmit?"Transmit":"Receive"); 126 "(%p) Execute\n", this); 115 127 int err; 116 128 unsigned int i; … … 126 138 if (m_poll_nfds_shadow == 0) { 127 139 debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 128 "(%p , %8s) bypass iterate since no handlers to poll\n",129 this , m_type==eTT_Transmit?"Transmit":"Receive");140 "(%p) bypass iterate since no handlers to poll\n", 141 this); 130 142 usleep(m_poll_timeout * 1000); 131 143 return true; … … 139 151 bool no_one_to_poll = true; 140 152 while(no_one_to_poll) { 141 if (m_type==eTT_Transmit) { 142 // if we are a transmit thread, we should only poll on 143 // those handlers that have a client that is ready to send 144 // something. poll'ing the others will only cause busy-wait 145 // looping. 146 for (i = 0; i < m_poll_nfds_shadow; i++) { 147 short events = 0; 148 if (m_IsoHandler_map_shadow[i]->tryWaitForClient()) { 153 for (i = 0; i < m_poll_nfds_shadow; i++) { 154 short events = 0; 155 IsoHandler *h = m_IsoHandler_map_shadow[i]; 156 if(h->getType() == IsoHandler::eHT_Transmit) { 157 // we should only poll on a transmit handler 158 // that has a client that is ready to send 159 // something. Otherwise it will end up in 160 // busy wait looping since the packet function 161 // will defer processing (also avoids the 162 // AGAIN problem) 163 if (h->tryWaitForClient()) { 149 164 events = POLLIN | POLLPRI; 150 165 no_one_to_poll = false; 151 166 } 152 m_poll_fds_shadow[i].events = events; 153 } 154 } else { 155 // for receive handlers, we can do the same. we might not have to though 156 for (i = 0; i < m_poll_nfds_shadow; i++) { 157 short events = 0; 158 if (m_IsoHandler_map_shadow[i]->tryWaitForClient()) { 159 events = POLLIN | POLLERR | POLLHUP; 167 } else { 168 // a receive handler should only be polled if 169 // it's client doesn't already have enough data 170 // and if it can still accept data. 171 if (h->tryWaitForClient()) { // FIXME 172 events = POLLIN | POLLPRI; 160 173 no_one_to_poll = false; 161 174 } 162 m_poll_fds_shadow[i].events = events; 163 } 164 } 175 } 176 m_poll_fds_shadow[i].events = events; 177 } 178 165 179 if(no_one_to_poll) { 166 180 debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 167 "(%p, %8s) No one to poll, waiting on the first handler to become ready\n", 168 this, m_type==eTT_Transmit?"Transmit":"Receive"); 169 170 m_IsoHandler_map_shadow[0]->waitForClient(); 181 "(%p) No one to poll, waiting on the sync handler to become ready\n", 182 this); 183 184 if(!m_SyncIsoHandler->waitForClient()) { 185 debugError("Failed to wait for client\n"); 186 return false; 187 } 188 189 #ifdef DEBUG 190 // if this happens we end up in a deadlock! 191 if(!m_SyncIsoHandler->tryWaitForClient()) { 192 debugFatal("inconsistency in wait functions!\n"); 193 return false; 194 } 195 #endif 171 196 172 197 debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE, 173 "(%p, %8s) handler ready\n", 174 this, m_type==eTT_Transmit?"Transmit":"Receive"); 175 } 176 } 177 178 // Use a shadow map of the fd's such that the poll call is not in a critical section 198 "(%p) sync handler ready\n", 199 this); 200 } 201 } 202 203 // Use a shadow map of the fd's such that we don't have to update 204 // the fd map everytime we run poll(). It doesn't change that much 205 // anyway 179 206 err = poll (m_poll_fds_shadow, m_poll_nfds_shadow, m_poll_timeout); 180 207 … … 201 228 // if we get here, it means two things: 202 229 // 1) the kernel can accept or provide packets (poll returned POLLIN) 203 // 2) the client can provide or accept packets ( we enabled polling)230 // 2) the client can provide or accept packets (since we enabled polling) 204 231 if(m_poll_fds_shadow[i].revents & (POLLIN)) { 205 232 m_IsoHandler_map_shadow[i]->iterate(); … … 238 265 , m_service( service ) 239 266 , m_realtime(false), m_priority(0) 240 , m_ReceiveThread ( NULL ) 241 , m_TransmitThread ( NULL ) 242 , m_ReceiveTask ( NULL ) 243 , m_TransmitTask ( NULL ) 267 , m_IsoThread ( NULL ) 268 , m_IsoTask ( NULL ) 244 269 {} 245 270 … … 248 273 , m_service( service ) 249 274 , m_realtime(run_rt), m_priority(rt_prio) 250 , m_ReceiveThread ( NULL ) 251 , m_TransmitThread ( NULL ) 252 , m_ReceiveTask ( NULL ) 253 , m_TransmitTask ( NULL ) 275 , m_IsoThread ( NULL ) 276 , m_IsoTask ( NULL ) 254 277 {} 255 278 … … 261 284 debugError("Still some handlers in use\n"); 262 285 } 263 if (m_ReceiveThread) { 264 m_ReceiveThread->Stop(); 265 delete m_ReceiveThread; 266 } 267 if (m_ReceiveTask) { 268 delete m_ReceiveTask; 269 } 270 271 if (m_TransmitThread) { 272 m_TransmitThread->Stop(); 273 delete m_TransmitThread; 274 } 275 if (m_TransmitTask) { 276 delete m_TransmitTask; 286 if (m_IsoThread) { 287 m_IsoThread->Stop(); 288 delete m_IsoThread; 289 } 290 if (m_IsoTask) { 291 delete m_IsoTask; 277 292 } 278 293 } … … 284 299 m_realtime = rt; 285 300 m_priority = priority; 286 bool result = true; 287 288 if (m_ReceiveThread) { 301 302 if (m_IsoThread) { 289 303 if (m_realtime) { 290 m_ ReceiveThread->AcquireRealTime(m_priority);304 m_IsoThread->AcquireRealTime(m_priority); 291 305 } else { 292 m_ReceiveThread->DropRealTime(); 293 } 294 } 295 if (m_TransmitThread) { 296 if (m_realtime) { 297 m_TransmitThread->AcquireRealTime(m_priority); 298 } else { 299 m_TransmitThread->DropRealTime(); 300 } 301 } 302 303 return result; 306 m_IsoThread->DropRealTime(); 307 } 308 } 309 310 return true; 304 311 } 305 312 … … 313 320 } 314 321 315 // create a thread to iterate our receivehandlers316 debugOutput( DEBUG_LEVEL_VERBOSE, "Create receivethread for %p...\n", this);317 m_ ReceiveTask = new IsoTask( *this, IsoTask::eTT_Receive);318 if(!m_ ReceiveTask) {322 // create a thread to iterate our ISO handlers 323 debugOutput( DEBUG_LEVEL_VERBOSE, "Create iso thread for %p...\n", this); 324 m_IsoTask = new IsoTask( *this ); 325 if(!m_IsoTask) { 319 326 debugFatal("No task\n"); 320 327 return false; 321 328 } 322 m_ ReceiveThread = new Util::PosixThread(m_ReceiveTask, m_realtime,323 m_priority + ISOHANDLERMANAGER_RECEIVE_PRIO_INCREASE,324 325 326 if(!m_ ReceiveThread) {329 m_IsoThread = new Util::PosixThread(m_IsoTask, m_realtime, 330 m_priority + ISOHANDLERMANAGER_ISO_PRIO_INCREASE, 331 PTHREAD_CANCEL_DEFERRED); 332 333 if(!m_IsoThread) { 327 334 debugFatal("No thread\n"); 328 335 return false; 329 336 } 330 if (m_ReceiveThread->Start() != 0) { 331 debugFatal("Could not start receive thread\n"); 332 return false; 333 } 334 335 // create a thread to iterate our transmit handlers 336 debugOutput( DEBUG_LEVEL_VERBOSE, "Create transmit thread for %p...\n", this); 337 m_TransmitTask = new IsoTask( *this, IsoTask::eTT_Transmit ); 338 if(!m_TransmitTask) { 339 debugFatal("No task\n"); 340 return false; 341 } 342 m_TransmitThread = new Util::PosixThread(m_TransmitTask, m_realtime, 343 m_priority + ISOHANDLERMANAGER_TRANSMIT_PRIO_INCREASE, 344 PTHREAD_CANCEL_DEFERRED); 345 if(!m_TransmitThread) { 346 debugFatal("No thread\n"); 347 return false; 348 } 349 if (m_TransmitThread->Start() != 0) { 350 debugFatal("Could not start transmit thread\n"); 337 if (m_IsoThread->Start() != 0) { 338 debugFatal("Could not start ISO thread\n"); 351 339 return false; 352 340 } 353 341 354 342 m_State=E_Running; 355 return true;356 }357 358 359 bool360 IsoHandlerManager::updateShadowMapFor(IsoHandler *h)361 {362 // update the shadow map363 if(h->getType() == IsoHandler::eHT_Receive) {364 if(!m_ReceiveTask->requestShadowMapUpdate()) {365 debugError("failed to update shadow map\n");366 return false;367 }368 } else {369 if(!m_TransmitTask->requestShadowMapUpdate()) {370 debugError("failed to update shadow map\n");371 return false;372 }373 }374 343 return true; 375 344 } … … 386 355 if ((*it) == h) { 387 356 result = h->disable(); 388 result &= updateShadowMapFor(h);357 result &= m_IsoTask->requestShadowMapUpdate(); 389 358 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " disabled\n"); 390 359 return result; … … 407 376 if ((*it) == h) { 408 377 result = h->enable(); 409 result &= updateShadowMapFor(h);378 result &= m_IsoTask->requestShadowMapUpdate(); 410 379 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " enabled\n"); 411 380 return result; … … 423 392 handler->setVerboseLevel(getDebugLevel()); 424 393 m_IsoHandlers.push_back(handler); 425 return updateShadowMapFor(handler);394 return m_IsoTask->requestShadowMapUpdate(); 426 395 } 427 396 … … 437 406 if ( *it == handler ) { 438 407 m_IsoHandlers.erase(it); 439 return updateShadowMapFor(handler);408 return m_IsoTask->requestShadowMapUpdate(); 440 409 } 441 410 } … … 675 644 return false; 676 645 } 677 if(! updateShadowMapFor(*it)) {646 if(!m_IsoTask->requestShadowMapUpdate()) { 678 647 debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it); 679 648 return false; … … 736 705 return false; 737 706 } 738 if(! updateShadowMapFor(*it)) {707 if(!m_IsoTask->requestShadowMapUpdate()) { 739 708 debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it); 740 709 return false; … … 767 736 retval=false; 768 737 } 769 if(! updateShadowMapFor(*it)) {738 if(!m_IsoTask->requestShadowMapUpdate()) { 770 739 debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it); 771 740 retval=false; … … 801 770 (*it)->setVerboseLevel(i); 802 771 } 803 if(m_ReceiveThread) m_ReceiveThread->setVerboseLevel(i); 804 if(m_ReceiveTask) m_ReceiveTask->setVerboseLevel(i); 805 if(m_TransmitThread) m_TransmitThread->setVerboseLevel(i); 806 if(m_TransmitTask) m_TransmitTask->setVerboseLevel(i); 772 if(m_IsoThread) m_IsoThread->setVerboseLevel(i); 773 if(m_IsoTask) m_IsoTask->setVerboseLevel(i); 807 774 } 808 775 trunk/libffado/src/libieee1394/IsoHandlerManager.h
r904 r938 57 57 { 58 58 public: 59 enum eTaskType { 60 eTT_Receive, 61 eTT_Transmit, 62 }; 63 IsoTask(IsoHandlerManager& manager, enum IsoTask::eTaskType t); 59 IsoTask(IsoHandlerManager& manager); 64 60 virtual ~IsoTask() {}; 65 61 … … 76 72 protected: 77 73 IsoHandlerManager& m_manager; 78 enum eTaskType m_type;79 74 80 75 // the event request structure … … 84 79 // this is the map used by the actual thread 85 80 // it is a shadow of the m_StreamProcessors vector 86 struct pollfd m_poll_fds_shadow[ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT]; 87 IsoHandler *m_IsoHandler_map_shadow[ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT]; 88 unsigned int m_poll_nfds_shadow; 81 struct pollfd m_poll_fds_shadow[ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT]; 82 IsoHandler * m_IsoHandler_map_shadow[ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT]; 83 unsigned int m_poll_nfds_shadow; 84 IsoHandler * m_SyncIsoHandler; 89 85 90 86 // updates the streams map … … 187 183 Streaming::StreamProcessorVector m_StreamProcessors; 188 184 189 // thread params for the handler threads 190 bool m_realtime; 191 int m_priority; 192 // handler threads 193 Util::Thread * m_ReceiveThread; 194 Util::Thread * m_TransmitThread; 195 196 // actual tasks 197 IsoTask * m_ReceiveTask; 198 IsoTask * m_TransmitTask; 199 200 bool updateShadowMapFor(IsoHandler *h); 185 // handler thread/task 186 bool m_realtime; 187 int m_priority; 188 Util::Thread * m_IsoThread; 189 IsoTask * m_IsoTask; 201 190 202 191 // debug stuff trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp
r937 r938 1769 1769 if(m_in_xrun) return true; 1770 1770 if(m_state == ePS_Running && m_next_state == ePS_Running) { 1771 // check whether we already fullfil the criterion 1772 unsigned int bufferspace = m_data_buffer->getBufferSpace(); 1773 if(bufferspace >= nframes) { 1774 return true; 1775 } else return false; 1771 1772 if(getType() == ePT_Transmit) { 1773 // can we put a certain amount of frames into the buffer? 1774 unsigned int bufferspace = m_data_buffer->getBufferSpace(); 1775 if(bufferspace >= nframes) { 1776 return true; 1777 } else return false; 1778 } else { 1779 // do we still have to put frames in the buffer? 1780 unsigned int bufferfill = m_data_buffer->getBufferFill(); 1781 unsigned int periodsize = m_StreamProcessorManager.getPeriodSize(); 1782 if (bufferfill > periodsize) return false; 1783 else return true; 1784 } 1785 1786 1776 1787 } else { 1777 1788 if(getType() == ePT_Transmit) {