Changeset 807
- Timestamp:
- 01/05/08 06:07:23 (15 years ago)
- Files:
-
- trunk/libffado/config.h.in (modified) (1 diff)
- trunk/libffado/src/devicemanager.cpp (modified) (1 diff)
- trunk/libffado/src/ffado.cpp (modified) (1 diff)
- trunk/libffado/src/libieee1394/CycleTimerHelper.cpp (modified) (7 diffs)
- trunk/libffado/src/libieee1394/ieee1394service.cpp (modified) (7 diffs)
- trunk/libffado/src/libieee1394/IsoHandler.cpp (modified) (6 diffs)
- trunk/libffado/src/libieee1394/IsoHandlerManager.cpp (modified) (2 diffs)
- trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp (modified) (21 diffs)
- trunk/libffado/src/libstreaming/generic/StreamProcessor.h (modified) (3 diffs)
- trunk/libffado/src/libstreaming/StreamProcessorManager.cpp (modified) (7 diffs)
- trunk/libffado/src/libutil/TimestampedBuffer.cpp (modified) (9 diffs)
- trunk/libffado/src/libutil/TimestampedBuffer.h (modified) (3 diffs)
- trunk/libffado/tests/streaming/teststreaming3.cpp (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/libffado/config.h.in
r803 r807 38 38 #define SHAREDIR "$sharedir" 39 39 40 #define MINIMUM_INTERRUPTS_PER_PERIOD 2U41 #define MAX_ISO_XMIT_BUFFER_FILL_PCT 5040 #define IEEE1394SERVICE_USE_CYCLETIMER_DLL 0 41 #define IEEE1394SERVICE_MAX_FIREWIRE_PORTS 16 42 42 43 #define ISOHANDLER_PER_HANDLER_THREAD 1 44 #define ISOHANDLER_USE_POLL 0 43 #define IEEE1394SERVICE_ISOMANAGER_PRIO_INCREASE 0 44 #define IEEE1394SERVICE_CYCLETIMER_HELPER_PRIO_INCREASE 5 45 46 #define THREAD_MAX_RTPRIO 98 47 #define THREAD_MIN_RTPRIO 0 48 49 #define MINIMUM_INTERRUPTS_PER_PERIOD 2U 50 #define MAX_ISO_XMIT_BUFFER_FILL_PCT 50 51 52 #define ISOHANDLER_PER_HANDLER_THREAD 1 53 #define ISOHANDLER_USE_POLL 0 54 #define ISOHANDLER_FLUSH_BEFORE_ITERATE 0 45 55 46 56 #define ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT 16 trunk/libffado/src/devicemanager.cpp
r784 r807 560 560 debugWarning("XRUN detected\n"); 561 561 // do xrun recovery 562 m_processorManager->handleXrun(); 563 return false; 562 if(m_processorManager->handleXrun()) { 563 return false; 564 } else { 565 debugError("Could not handle XRUN\n"); 566 return false; 567 } 564 568 } 565 569 } trunk/libffado/src/ffado.cpp
r795 r807 245 245 return dev->options.period_size; 246 246 } else { 247 debugWarning("XRUN ");247 debugWarning("XRUN\n"); 248 248 xruns++; 249 249 return -1; trunk/libffado/src/libieee1394/CycleTimerHelper.cpp
r788 r807 21 21 * 22 22 */ 23 24 #include "config.h" 23 25 24 26 #include "CycleTimerHelper.h" … … 47 49 pthread_mutex_unlock(&m_compute_vars_lock); \ 48 50 } 49 50 #define OLD_STYLE51 51 52 52 IMPL_DEBUG_MODULE( CycleTimerHelper, CycleTimerHelper, DEBUG_LEVEL_NORMAL ); … … 100 100 { 101 101 debugOutput( DEBUG_LEVEL_VERBOSE, "Start %p...\n", this); 102 #if ndef OLD_STYLE102 #if IEEE1394SERVICE_USE_CYCLETIMER_DLL 103 103 m_Thread = new Util::PosixThread(this, m_realtime, m_priority, 104 104 PTHREAD_CANCEL_DEFERRED); … … 126 126 CycleTimerHelper::setThreadParameters(bool rt, int priority) { 127 127 debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) switch to: (rt=%d, prio=%d)...\n", this, rt, priority); 128 if (priority > 98) priority = 98; // cap the priority128 if (priority > THREAD_MAX_RTPRIO) priority = THREAD_MAX_RTPRIO; // cap the priority 129 129 m_realtime = rt; 130 130 m_priority = priority; 131 131 132 #if ndef OLD_STYLE132 #if IEEE1394SERVICE_USE_CYCLETIMER_DLL 133 133 if (m_Thread) { 134 134 if (m_realtime) { … … 139 139 } 140 140 #endif 141 141 142 return true; 142 143 } … … 157 158 } 158 159 159 #ifdef OLD_STYLE 160 161 bool 162 CycleTimerHelper::Execute() 163 { 164 usleep(1000*1000); 165 return true; 166 } 167 uint32_t 168 CycleTimerHelper::getCycleTimerTicks() 169 { 170 uint32_t cycle_timer; 171 uint64_t local_time; 172 if(!m_Parent.readCycleTimerReg(&cycle_timer, &local_time)) { 173 debugError("Could not read cycle timer register\n"); 174 return 0; 175 } 176 return CYCLE_TIMER_TO_TICKS(cycle_timer); 177 } 178 179 uint32_t 180 CycleTimerHelper::getCycleTimerTicks(uint64_t now) 181 { 182 return getCycleTimerTicks(); 183 } 184 185 uint32_t 186 CycleTimerHelper::getCycleTimer() 187 { 188 uint32_t cycle_timer; 189 uint64_t local_time; 190 if(!m_Parent.readCycleTimerReg(&cycle_timer, &local_time)) { 191 debugError("Could not read cycle timer register\n"); 192 return 0; 193 } 194 return cycle_timer; 195 } 196 197 uint32_t 198 CycleTimerHelper::getCycleTimer(uint64_t now) 199 { 200 return getCycleTimer(); 201 } 202 #else 160 #if IEEE1394SERVICE_USE_CYCLETIMER_DLL 203 161 204 162 bool … … 327 285 return TICKS_TO_CYCLE_TIMER(getCycleTimerTicks(now)); 328 286 } 287 288 #else 289 290 bool 291 CycleTimerHelper::Execute() 292 { 293 usleep(1000*1000); 294 return true; 295 } 296 uint32_t 297 CycleTimerHelper::getCycleTimerTicks() 298 { 299 uint32_t cycle_timer; 300 uint64_t local_time; 301 if(!m_Parent.readCycleTimerReg(&cycle_timer, &local_time)) { 302 debugError("Could not read cycle timer register\n"); 303 return 0; 304 } 305 return CYCLE_TIMER_TO_TICKS(cycle_timer); 306 } 307 308 uint32_t 309 CycleTimerHelper::getCycleTimerTicks(uint64_t now) 310 { 311 return getCycleTimerTicks(); 312 } 313 314 uint32_t 315 CycleTimerHelper::getCycleTimer() 316 { 317 uint32_t cycle_timer; 318 uint64_t local_time; 319 if(!m_Parent.readCycleTimerReg(&cycle_timer, &local_time)) { 320 debugError("Could not read cycle timer register\n"); 321 return 0; 322 } 323 return cycle_timer; 324 } 325 326 uint32_t 327 CycleTimerHelper::getCycleTimer(uint64_t now) 328 { 329 return getCycleTimer(); 330 } 331 329 332 #endif 330 333 trunk/libffado/src/libieee1394/ieee1394service.cpp
r782 r807 23 23 */ 24 24 25 #include "config.h" 25 26 #include "ieee1394service.h" 26 27 #include "ARMHandler.h" … … 42 43 #include <iostream> 43 44 #include <iomanip> 44 45 #define FFADO_MAX_FIREWIRE_PORTS 1646 47 #define ISOMANAGER_PRIO_INCREASE 1148 #define CYCLETIMER_HELPER_PRIO_INCREASE 1049 45 50 46 IMPL_DEBUG_MODULE( Ieee1394Service, Ieee1394Service, DEBUG_LEVEL_NORMAL ); … … 80 76 , m_realtime ( rt ) 81 77 , m_base_priority ( prio ) 82 , m_pIsoManager( new IsoHandlerManager( *this, rt, prio + I SOMANAGER_PRIO_INCREASE ) )83 , m_pCTRHelper ( new CycleTimerHelper( *this, 1000, rt, prio + CYCLETIMER_HELPER_PRIO_INCREASE ) )78 , m_pIsoManager( new IsoHandlerManager( *this, rt, prio + IEEE1394SERVICE_ISOMANAGER_PRIO_INCREASE ) ) 79 , m_pCTRHelper ( new CycleTimerHelper( *this, 1000, rt, prio + IEEE1394SERVICE_CYCLETIMER_HELPER_PRIO_INCREASE ) ) 84 80 , m_have_new_ctr_read ( false ) 85 81 , m_pTimeSource ( new Util::SystemTimeSource() ) … … 135 131 return 0; 136 132 } 137 struct raw1394_portinfo pinf[ FFADO_MAX_FIREWIRE_PORTS];138 int nb_detected_ports = raw1394_get_port_info(tmp_handle, pinf, FFADO_MAX_FIREWIRE_PORTS);133 struct raw1394_portinfo pinf[IEEE1394SERVICE_MAX_FIREWIRE_PORTS]; 134 int nb_detected_ports = raw1394_get_port_info(tmp_handle, pinf, IEEE1394SERVICE_MAX_FIREWIRE_PORTS); 139 135 raw1394_destroy_handle(tmp_handle); 140 136 … … 215 211 return false; 216 212 } 217 struct raw1394_portinfo pinf[ FFADO_MAX_FIREWIRE_PORTS];218 int nb_detected_ports = raw1394_get_port_info(tmp_handle, pinf, FFADO_MAX_FIREWIRE_PORTS);213 struct raw1394_portinfo pinf[IEEE1394SERVICE_MAX_FIREWIRE_PORTS]; 214 int nb_detected_ports = raw1394_get_port_info(tmp_handle, pinf, IEEE1394SERVICE_MAX_FIREWIRE_PORTS); 219 215 raw1394_destroy_handle(tmp_handle); 220 216 … … 224 220 } 225 221 226 if(nb_detected_ports && port < FFADO_MAX_FIREWIRE_PORTS) {222 if(nb_detected_ports && port < IEEE1394SERVICE_MAX_FIREWIRE_PORTS) { 227 223 m_portName = pinf[port].name; 228 224 } else { … … 275 271 Ieee1394Service::setThreadParameters(bool rt, int priority) { 276 272 bool result = true; 277 if (priority > 98) priority = 98;273 if (priority > THREAD_MAX_RTPRIO) priority = THREAD_MAX_RTPRIO; 278 274 m_base_priority = priority; 279 275 m_realtime = rt; 280 276 if (m_pIsoManager) { 281 277 debugOutput(DEBUG_LEVEL_VERBOSE, "Switching IsoManager to (rt=%d, prio=%d)\n", 282 rt, priority + I SOMANAGER_PRIO_INCREASE);283 result &= m_pIsoManager->setThreadParameters(rt, priority + I SOMANAGER_PRIO_INCREASE);278 rt, priority + IEEE1394SERVICE_ISOMANAGER_PRIO_INCREASE); 279 result &= m_pIsoManager->setThreadParameters(rt, priority + IEEE1394SERVICE_ISOMANAGER_PRIO_INCREASE); 284 280 } 285 281 if (m_pCTRHelper) { 286 282 debugOutput(DEBUG_LEVEL_VERBOSE, "Switching CycleTimerHelper to (rt=%d, prio=%d)\n", 287 rt, priority + CYCLETIMER_HELPER_PRIO_INCREASE);288 result &= m_pCTRHelper->setThreadParameters(rt, priority + CYCLETIMER_HELPER_PRIO_INCREASE);283 rt, priority + IEEE1394SERVICE_CYCLETIMER_HELPER_PRIO_INCREASE); 284 result &= m_pCTRHelper->setThreadParameters(rt, priority + IEEE1394SERVICE_CYCLETIMER_HELPER_PRIO_INCREASE); 289 285 } 290 286 return result; trunk/libffado/src/libieee1394/IsoHandler.cpp
r803 r807 174 174 IsoHandler::waitForClient() 175 175 { 176 debugOutput(DEBUG_LEVEL_VERBOSE, "waiting...\n"); 177 if(m_Client) { 178 bool result = m_Client->waitForSignal(); 179 debugOutput(DEBUG_LEVEL_VERBOSE, " returns %d\n", result); 180 return result; 181 } else { 182 debugOutput(DEBUG_LEVEL_VERBOSE, " no client\n"); 183 } 184 return false; 185 } 186 187 bool 188 IsoHandler::tryWaitForClient() 189 { 176 190 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "waiting...\n"); 177 191 if(m_Client) { 178 bool result = m_Client-> waitForFrames();192 bool result = m_Client->tryWaitForSignal(); 179 193 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " returns %d\n", result); 180 194 return result; 181 } 182 return false; 183 } 184 185 bool 186 IsoHandler::tryWaitForClient() 187 { 188 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "waiting...\n"); 189 if(m_Client) { 190 bool result = m_Client->tryWaitForFrames(); 191 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " returns %d\n", result); 192 return result; 195 } else { 196 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " no client\n"); 193 197 } 194 198 return false; … … 209 213 210 214 // wait for the availability of frames in the client 211 // (blocking) 212 if (getType()==eHT_Receive || waitForClient()) { 215 // (blocking for transmit handlers) 216 #ifdef DEBUG 217 if (getType() == eHT_Transmit) { 218 debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) Waiting for Client to signal frame availability...\n", this); 219 } 220 #endif 221 if (getType() == eHT_Receive || waitForClient()) { 213 222 214 223 #if ISOHANDLER_USE_POLL … … 249 258 // iterate blocks if no 1394 data is available 250 259 // so poll'ing is not really necessary 251 bool result = iterate(); 252 //usleep(125); 260 261 bool result = true; 262 while(result && m_Client->canProcessPackets()) { 263 result = iterate(); 264 debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) Iterate returned: %d\n", 265 this, (m_type==eHT_Receive?"Receive":"Transmit"), result); 266 } 253 267 return result; 254 268 #endif … … 261 275 bool 262 276 IsoHandler::iterate() { 263 //flush(); 264 if(raw1394_loop_iterate(m_handle)) { 265 debugOutput( DEBUG_LEVEL_VERBOSE, 266 "IsoHandler (%p): Failed to iterate handler: %s\n", 267 this,strerror(errno)); 268 return false; 269 } 270 return true; 277 debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) Iterating ISO handler\n", 278 this, (m_type==eHT_Receive?"Receive":"Transmit")); 279 if(m_State == E_Running) { 280 #if ISOHANDLER_FLUSH_BEFORE_ITERATE 281 flush(); 282 #endif 283 if(raw1394_loop_iterate(m_handle)) { 284 debugOutput( DEBUG_LEVEL_VERBOSE, 285 "IsoHandler (%p): Failed to iterate handler: %s\n", 286 this, strerror(errno)); 287 return false; 288 } 289 return true; 290 } else { 291 debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) Not iterating a non-running handler...\n", 292 this, (m_type==eHT_Receive?"Receive":"Transmit")); 293 return false; 294 } 271 295 } 272 296 … … 274 298 IsoHandler::setThreadParameters(bool rt, int priority) { 275 299 debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) switch to: (rt=%d, prio=%d)...\n", this, rt, priority); 276 if (priority > 98) priority = 98; // cap the priority300 if (priority > THREAD_MAX_RTPRIO) priority = THREAD_MAX_RTPRIO; // cap the priority 277 301 m_realtime = rt; 278 302 m_priority = priority; … … 343 367 bool IsoHandler::disable() 344 368 { 345 debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n"); 369 debugOutput( DEBUG_LEVEL_VERBOSE, "(%p, %s) enter...\n", 370 this, (m_type==eHT_Receive?"Receive":"Transmit")); 346 371 347 372 // check state trunk/libffado/src/libieee1394/IsoHandlerManager.cpp
r803 r807 68 68 IsoHandlerManager::setThreadParameters(bool rt, int priority) { 69 69 debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) switch to: (rt=%d, prio=%d)...\n", this, rt, priority); 70 if (priority > 98) priority = 98; // cap the priority70 if (priority > THREAD_MAX_RTPRIO) priority = THREAD_MAX_RTPRIO; // cap the priority 71 71 m_realtime = rt; 72 72 m_priority = priority; … … 409 409 410 410 // set the handler's thread parameters 411 if(!h->setThreadParameters(m_realtime, m_priority)) { 411 // receive handlers have lower priority than the client thread 412 // since they have ISO side buffering 413 // xmit handlers have higher priority since we want client side 414 // frames to be put into the ISO buffers ASAP 415 int thread_prio; 416 if (stream->getType()==StreamProcessor::ePT_Receive) { 417 thread_prio = m_priority - 1; 418 if (thread_prio < THREAD_MIN_RTPRIO) thread_prio = THREAD_MIN_RTPRIO; 419 } else { 420 thread_prio = m_priority + 1; 421 if (thread_prio > THREAD_MAX_RTPRIO) thread_prio = THREAD_MAX_RTPRIO; 422 } 423 424 if(!h->setThreadParameters(m_realtime, thread_prio)) { 412 425 debugFatal("Could not set handler thread parameters\n"); 413 426 return false; trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp
r803 r807 39 39 #include <assert.h> 40 40 #include <math.h> 41 42 /* 43 #define POST_SEMAPHORE { \ 44 int tmp; \ 45 sem_getvalue(&m_signal_semaphore, &tmp); \ 46 debugWarning("posting semaphore from value %d\n", tmp); \ 47 sem_post(&m_signal_semaphore); \ 48 } 49 */ 50 51 #define POST_SEMAPHORE { \ 52 sem_post(&m_signal_semaphore); \ 53 } 41 54 42 55 namespace Streaming { … … 63 76 , m_sync_delay( 0 ) 64 77 , m_in_xrun( false ) 78 , m_signal_period( 0 ) 79 , m_signal_offset( 0 ) 65 80 { 66 81 // create the timestamped buffer and register ourselves as its client … … 76 91 if (m_data_buffer) delete m_data_buffer; 77 92 if (m_scratch_buffer) delete[] m_scratch_buffer; 93 sem_destroy(&m_signal_semaphore); 78 94 } 79 95 … … 111 127 { 112 128 #if ISOHANDLER_PER_HANDLER_THREAD 113 // if we use one thread per packet, we can put every frame into the ISO buffer129 // if we use one thread per packet, we can put every frame directly into the ISO buffer 114 130 // the waitForClient in IsoHandler will take care of the fact that the frames are 115 131 // not present in time 116 unsigned int packets_to_prebuffer = (getPacketsPerPeriod() * (m_StreamProcessorManager.getNbBuffers() -1));132 unsigned int packets_to_prebuffer = (getPacketsPerPeriod() * (m_StreamProcessorManager.getNbBuffers())); 117 133 debugOutput(DEBUG_LEVEL_VERBOSE, "Nominal prebuffer: %u\n", packets_to_prebuffer); 118 134 return packets_to_prebuffer; … … 286 302 m_dropped += dropped_cycles; 287 303 m_in_xrun = true; 304 m_last_cycle = cycle; 305 POST_SEMAPHORE; 306 return RAW1394_ISO_DEFER; 288 307 //flushDebugOutput(); 289 308 //assert(0); … … 295 314 if (m_state == ePS_Invalid) { 296 315 debugError("Should not have state %s\n", ePSToString(m_state) ); 316 POST_SEMAPHORE; 297 317 return RAW1394_ISO_ERROR; 298 318 } … … 316 336 if (!updateState()) { // we are allowed to change the state directly 317 337 debugError("Could not update state!\n"); 338 POST_SEMAPHORE; 318 339 return RAW1394_ISO_ERROR; 319 340 } … … 334 355 if (!updateState()) { // we are allowed to change the state directly 335 356 debugError("Could not update state!\n"); 357 POST_SEMAPHORE; 336 358 return RAW1394_ISO_ERROR; 337 359 } … … 364 386 if (!updateState()) { // we are allowed to change the state directly 365 387 debugError("Could not update state!\n"); 388 POST_SEMAPHORE; 366 389 return RAW1394_ISO_ERROR; 367 390 } … … 381 404 if (!updateState()) { // we are allowed to change the state directly 382 405 debugError("Could not update state!\n"); 406 POST_SEMAPHORE; 383 407 return RAW1394_ISO_ERROR; 384 408 } … … 400 424 if (!updateState()) { // we are allowed to change the state directly 401 425 debugError("Could not update state!\n"); 426 POST_SEMAPHORE; 402 427 return RAW1394_ISO_ERROR; 403 428 } 429 POST_SEMAPHORE; 404 430 return RAW1394_ISO_DEFER; 405 431 } … … 421 447 if (!updateState()) { // we are allowed to change the state directly 422 448 debugError("Could not update state!\n"); 449 POST_SEMAPHORE; 423 450 return RAW1394_ISO_ERROR; 424 451 } 452 POST_SEMAPHORE; 425 453 return RAW1394_ISO_DEFER; 426 454 } else if(result2 == eCRV_OK) { 427 455 // no problem here 456 // if we have enough samples, we can post the semaphore and 457 // defer further processing until later. this will allow us to 458 // run the client and process the frames such that we can put them 459 // into the xmit buffers ASAP 460 if (m_state == ePS_Running) { 461 unsigned int bufferfill = m_data_buffer->getBufferFill(); 462 if(bufferfill >= m_signal_period + m_signal_offset) { 463 // this to avoid multiple signals for the same period 464 int semval; 465 sem_getvalue(&m_signal_semaphore, &semval); 466 unsigned int signal_period = m_signal_period * (semval + 1) + m_signal_offset; 467 if(bufferfill >= signal_period) { 468 debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) buffer fill (%d) > signal period (%d), sem_val=%d\n", 469 this, m_data_buffer->getBufferFill(), signal_period, semval); 470 POST_SEMAPHORE; 471 } 472 // the process thread should have higher prio such that we are blocked until 473 // the samples are processed. 474 } 475 } 428 476 return RAW1394_ISO_OK; 429 477 } else { 430 478 debugError("Invalid response\n"); 479 POST_SEMAPHORE; 431 480 return RAW1394_ISO_ERROR; 432 481 } … … 436 485 } else { 437 486 debugError("Invalid response\n"); 487 POST_SEMAPHORE; 438 488 return RAW1394_ISO_ERROR; 439 489 } 440 490 debugError("reached the unreachable\n"); 491 POST_SEMAPHORE; 441 492 return RAW1394_ISO_ERROR; 442 493 } … … 783 834 m_data_buffer->blockProcessWriteFrames(nbframes, ts); 784 835 debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, " New timestamp: %llu\n", ts); 836 837 unsigned int bufferfill = m_data_buffer->getBufferFill(); 838 if (bufferfill >= m_signal_period + m_signal_offset) { 839 debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) sufficient frames in buffer (%d / %d), posting semaphore\n", 840 this, bufferfill, m_signal_period + m_signal_offset); 841 POST_SEMAPHORE; 842 } else { 843 debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) insufficient frames in buffer (%d / %d), not posting semaphore\n", 844 this, bufferfill, m_signal_period + m_signal_offset); 845 } 785 846 return true; // FIXME: what about failure? 786 847 } … … 816 877 return false; 817 878 } 879 880 unsigned int bufferfill = m_data_buffer->getBufferFill(); 881 if (bufferfill >= m_signal_period + m_signal_offset) { 882 debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) sufficient frames in buffer (%d / %d), posting semaphore\n", 883 this, bufferfill, m_signal_period + m_signal_offset); 884 POST_SEMAPHORE; 885 } else { 886 debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) insufficient frames in buffer (%d / %d), not posting semaphore\n", 887 this, bufferfill, m_signal_period + m_signal_offset); 888 } 889 818 890 return true; 819 891 } 820 892 821 893 bool 822 StreamProcessor::waitForFrames() 823 { 894 StreamProcessor::waitForSignal() 895 { 896 int result; 824 897 if(m_state == ePS_Running) { 825 assert(m_data_buffer); 826 if(getType() == ePT_Receive) { 827 return m_data_buffer->waitForFrames(m_StreamProcessorManager.getPeriodSize()); 828 } else { 829 return m_data_buffer->waitForFrames(getNominalFramesPerPacket()); 830 } 898 result = sem_wait(&m_signal_semaphore); 899 #ifdef DEBUG 900 int tmp; 901 sem_getvalue(&m_signal_semaphore, &tmp); 902 debugOutput(DEBUG_LEVEL_VERBOSE, " sem_wait returns: %d, sem_value: %d\n", result, tmp); 903 #endif 904 return result == 0; 905 } else { 906 // when we're not running, we can always provide frames 907 debugOutput(DEBUG_LEVEL_VERBOSE, "Not running...\n"); 908 return true; 909 } 910 } 911 912 bool 913 StreamProcessor::tryWaitForSignal() 914 { 915 if(m_state == ePS_Running) { 916 return sem_trywait(&m_signal_semaphore) == 0; 831 917 } else { 832 918 // when we're not running, we can always provide frames … … 837 923 838 924 bool 839 StreamProcessor::tryWaitForFrames() 840 { 841 if(m_state == ePS_Running) { 842 assert(m_data_buffer); 843 if(getType() == ePT_Receive) { 844 return m_data_buffer->tryWaitForFrames(m_StreamProcessorManager.getPeriodSize()); 845 } else { 846 return m_data_buffer->tryWaitForFrames(getNominalFramesPerPacket()); 847 } 925 StreamProcessor::canProcessPackets() 926 { 927 if(m_state != ePS_Running) return true; 928 bool result; 929 int bufferfill; 930 if(getType() == ePT_Receive) { 931 bufferfill = m_data_buffer->getBufferSpace(); 848 932 } else { 849 // when we're not running, we can always provide frames 850 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Not running...\n"); 851 return true; 852 } 933 bufferfill = m_data_buffer->getBufferFill(); 934 } 935 result = bufferfill > getNominalFramesPerPacket(); 936 debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) for a bufferfill of %d, we return %d\n", 937 this, ePTToString(getType()), bufferfill, result); 938 return result; 853 939 } 854 940 … … 942 1028 { 943 1029 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "init...\n"); 1030 1031 if (sem_init(&m_signal_semaphore, 0, 0) == -1) { 1032 debugError("Could not init signal semaphore"); 1033 return false; 1034 } 944 1035 945 1036 if(!m_IsoHandlerManager.registerStream(this)) { … … 1408 1499 // be picked up by the packet iterator 1409 1500 1410 if(!m_data_buffer->clearBuffer()) { // FIXME: don't like the reset() name 1501 sem_init(&m_signal_semaphore, 0, 0); 1502 m_signal_period = m_StreamProcessorManager.getPeriodSize(); 1503 m_signal_offset = 0; // FIXME: we have to ensure that everyone is ready 1504 1505 if(!m_data_buffer->clearBuffer()) { 1411 1506 debugError("Could not reset data buffer\n"); 1412 1507 return false; … … 1419 1514 debugFatal("Could not prefill transmit stream\n"); 1420 1515 return false; 1516 } 1517 if (m_data_buffer->getBufferFill() >= m_signal_period + m_signal_offset) { 1518 POST_SEMAPHORE; 1421 1519 } 1422 1520 } … … 1489 1587 case ePS_Running: 1490 1588 // the thread will do the transition 1589 1590 // we have to wake the iterator if it's asleep 1591 POST_SEMAPHORE; 1491 1592 break; 1492 1593 default: … … 1691 1792 StreamProcessor::dumpInfo() 1692 1793 { 1693 debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor %p :\n", this);1794 debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor %p, %s:\n", this, ePTToString(m_processor_type)); 1694 1795 debugOutputShort( DEBUG_LEVEL_NORMAL, " Port, Channel : %d, %d\n", m_1394service.getPort(), m_channel); 1695 1796 uint64_t now = m_1394service.getCycleTimerTicks(); trunk/libffado/src/libstreaming/generic/StreamProcessor.h
r803 r807 34 34 35 35 #include "debugmodule/debugmodule.h" 36 #include <semaphore.h> 36 37 37 38 class Ieee1394Service; … … 157 158 bool putFrames(unsigned int nbframes, int64_t ts); ///< transfer the client contents to the buffer 158 159 159 /** 160 * @brief waits for the availability of frames (blocking) 161 * @param nframes number of frames 162 * 163 * @return true if frames are available, false if not (e.g. signal occurred) 164 */ 165 bool waitForFrames(); 166 167 /** 168 * @brief waits for the availability of frames (non-blocking) 169 * @param nframes number of frames 170 * 171 * @return true if frames are available, false if not 172 */ 173 bool tryWaitForFrames(); 160 unsigned int getSignalPeriod() {return m_signal_period;}; 161 bool setSignalPeriod(unsigned int p) {m_signal_period=p; return true;}; 162 /** 163 * @brief waits for a 'signal' (blocking) 164 * 165 * a 'signal' is: 166 * when type==Receive: 167 * - one signal_period of frames is present in the buffer 168 * (received by the iso side) 169 * - an error has occurred (xrun, iso error, ...) 170 * when type==Transmit: 171 * - at least one signal_period of frames are present in the buffer 172 * (have been written into it by the client) 173 * - an error occurred 174 * 175 * @return true if the 'signal' is available, false if error 176 */ 177 bool waitForSignal(); 178 179 /** 180 * @brief checks for a 'signal' (non-blocking) 181 * 182 * a 'signal' is: 183 * when type==Receive: 184 * - one signal_period of frames is present in the buffer 185 * (received by the iso side) 186 * - an error has occurred (xrun, iso error, ...) 187 * when type==Transmit: 188 * - at least one signal_period of frames are present in the buffer 189 * (have been written into it by the client) 190 * - an error occurred 191 * 192 * @return true if the 'signal' is available, false if not (or error) 193 */ 194 bool tryWaitForSignal(); 195 196 /** 197 * @brief can a SP process (queue, dequeue) packets at this moment? 198 * 199 * 200 * @return true if packet processing makes sense 201 */ 202 bool canProcessPackets(); 174 203 175 204 /** … … 460 489 private: 461 490 bool m_in_xrun; 491 sem_t m_signal_semaphore; 492 unsigned int m_signal_period; 493 unsigned int m_signal_offset; 494 462 495 public: 463 496 // debug stuff trunk/libffado/src/libstreaming/StreamProcessorManager.cpp
r803 r807 111 111 if ( *it == processor ) { 112 112 if (*it == m_SyncSource) { 113 debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source ");113 debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n"); 114 114 m_SyncSource = NULL; 115 115 } … … 127 127 if ( *it == processor ) { 128 128 if (*it == m_SyncSource) { 129 debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source ");129 debugOutput(DEBUG_LEVEL_VERBOSE, "unregistering sync source\n"); 130 130 m_SyncSource = NULL; 131 131 } … … 546 546 } 547 547 // wait for the SP's to get into the dry-running state 548 int cnt = 200 ;548 int cnt = 2000; 549 549 bool ready = false; 550 550 while (!ready && cnt) { … … 565 565 if(cnt==0) { 566 566 debugOutput(DEBUG_LEVEL_VERBOSE, " Timeout waiting for the SP's to start dry-running\n"); 567 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 568 it != m_ReceiveProcessors.end(); 569 ++it ) { 570 (*it)->dumpInfo(); 571 } 572 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 573 it != m_TransmitProcessors.end(); 574 ++it ) { 575 (*it)->dumpInfo(); 576 } 567 577 return false; 568 578 } … … 661 671 bool StreamProcessorManager::waitForPeriod() { 662 672 if(m_SyncSource == NULL) return false; 663 int time_till_next_period;664 673 bool xrun_occurred = false; 665 666 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n"); 667 668 time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); 669 670 while(time_till_next_period > 0) { 671 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period); 672 673 // wait for the period 674 SleepRelativeUsec(time_till_next_period); 674 bool period_not_ready = true; 675 676 while(period_not_ready) { 677 debugOutput( DEBUG_LEVEL_VERBOSE, "waiting for period (%d frames in buffer)...\n", m_SyncSource->getBufferFill()); 678 if(!m_SyncSource->waitForSignal()) { 679 debugError("Error waiting for signal\n"); 680 return false; 681 } 682 683 unsigned int bufferfill = m_SyncSource->getBufferFill(); 684 period_not_ready = bufferfill < m_period; 685 686 #ifdef DEBUG 687 if(period_not_ready) { 688 debugOutput(DEBUG_LEVEL_VERBOSE, "period is not ready (bufferfill: %u)\n", bufferfill); 689 } else { 690 debugOutput(DEBUG_LEVEL_VERBOSE, "period is ready (bufferfill: %u)\n", bufferfill); 691 } 692 #endif 675 693 676 694 // check for underruns on the ISO side, … … 689 707 } 690 708 if(xrun_occurred) break; 691 692 // check if we were waked up too soon 693 time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); 709 // FIXME: make sure we also exit this loop when something else happens (e.g. signal, iso error) 694 710 } 695 711 … … 701 717 // and the receive processors should have done their transfer. 702 718 m_time_of_transfer = m_SyncSource->getTimeAtPeriod(); 703 debugOutput( DEBUG_LEVEL_VER Y_VERBOSE, "transfer at %llu ticks...\n",719 debugOutput( DEBUG_LEVEL_VERBOSE, "transfer at %llu ticks...\n", 704 720 m_time_of_transfer); 705 706 xrun_occurred = false;707 708 #if STREAMPROCESSORMANAGER_DYNAMIC_SYNC_DELAY709 // normally we can transfer frames at this time, but in some cases this is not true710 // e.g. when there are not enough frames in the receive buffer.711 // however this doesn't have to be a problem, since we can wait some more until we712 // have enough frames. There is only a problem once the ISO xmit doesn't have packets713 // to transmit, or if the receive buffer overflows. These conditions are signaled by714 // the iso threads715 // check if xruns occurred on the Iso side.716 // also check if xruns will occur should we transfer() now717 #ifdef DEBUG718 int waited = 0;719 #endif720 721 bool ready_for_transfer = false;722 bool ready;723 while (!ready_for_transfer && !xrun_occurred) {724 // FIXME: can deadlock when the iso handlers die (e.g. unplug the device)725 ready_for_transfer = true;726 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();727 it != m_ReceiveProcessors.end();728 ++it ) {729 ready = ((*it)->canClientTransferFrames(m_period));730 ready_for_transfer &= ready;731 xrun_occurred |= (*it)->xrunOccurred();732 }733 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();734 it != m_TransmitProcessors.end();735 ++it ) {736 ready = ((*it)->canClientTransferFrames(m_period));737 //ready_for_transfer &= ready;738 xrun_occurred |= (*it)->xrunOccurred();739 }740 if(!ready_for_transfer) {741 debugWarning("xrun_occurred = %d\n", xrun_occurred);742 }743 if (!ready_for_transfer) {744 745 SleepRelativeUsec(125); // MAGIC: one cycle sleep...746 747 // in order to avoid this in the future, we increase the sync delay of the sync source SP748 int d = m_SyncSource->getSyncDelay() + TICKS_PER_CYCLE;749 m_SyncSource->setSyncDelay(d);750 d = m_SyncSource->getSyncDelay();751 debugOutput(DEBUG_LEVEL_VERBOSE, "Increased the Sync delay to: %d ticks (%f frames, %f cy)\n",752 d, ((float)d)/m_SyncSource->getTicksPerFrame(),753 ((float)d)/((float)TICKS_PER_CYCLE));754 755 #ifdef DEBUG756 waited++;757 #endif758 }759 } // we are either ready or an xrun occurred760 761 // in order to avoid a runaway value of the sync delay, we gradually decrease762 // it. It will be increased by a 'too early' event (cfr some lines higher)763 // hence we'll be at a good point on average.764 int d = m_SyncSource->getSyncDelay() - 1;765 if (d >= 0) m_SyncSource->setSyncDelay(d);766 767 768 #ifdef DEBUG769 if(waited > 0) {770 debugOutput(DEBUG_LEVEL_VERBOSE, "Waited %d x 125us due to SP not ready for transfer\n", waited);771 }772 #endif773 #endif774 721 775 722 // this is to notify the client of the delay that we introduced by waiting trunk/libffado/src/libutil/TimestampedBuffer.cpp
r803 r807 52 52 pthread_mutex_unlock(&m_framecounter_lock); \ 53 53 } 54 /* 55 #define POST_SEMAPHORE { \ 56 int tmp; \ 57 sem_getvalue(&m_frame_semaphore, &tmp); \ 58 debugWarning("posting semaphore from value %d\n", tmp); \ 59 sem_post(&m_frame_semaphore); \ 60 } 61 */ 62 63 //HACK 64 #define POST_SEMAPHORE { \ 65 if(m_update_period > 8) { \ 66 sem_post(&m_frame_semaphore); \ 67 } \ 68 } 54 69 55 namespace Util { 70 56 … … 85 71 { 86 72 pthread_mutex_init(&m_framecounter_lock, NULL); 87 88 73 } 89 74 … … 91 76 ffado_ringbuffer_free(m_event_buffer); 92 77 free(m_cluster_buffer); 93 sem_destroy(&m_frame_semaphore);94 78 } 95 79 … … 281 265 282 266 /** 267 * \brief Returns the current write space in the buffer 268 * 269 * This returns the buffer free space of the internal ringbuffer. This 270 * can only be used as an indication because it's state is not 271 * guaranteed to be consistent at all times due to threading issues. 272 * 273 * @return the internal buffer fill in frames 274 */ 275 unsigned int TimestampedBuffer::getBufferSpace() { 276 return ffado_ringbuffer_write_space(m_event_buffer)/(m_bytes_per_frame); 277 } 278 279 /** 283 280 * \brief Initializes the TimestampedBuffer 284 281 * … … 289 286 */ 290 287 bool TimestampedBuffer::init() { 291 if (sem_init(&m_frame_semaphore, 0, 0) == -1) {292 debugError("Could not init frame semaphore");293 return false;294 }295 288 return true; 296 289 } … … 371 364 } 372 365 373 bool374 TimestampedBuffer::waitForFrames(unsigned int nframes)375 {376 int result;377 do {378 unsigned int bufferfill = getBufferFill();379 if(bufferfill >= nframes) {380 // first make the semaphore 0381 while((result=sem_trywait(&m_frame_semaphore)) == 0) {};382 return true;383 } else {384 debugOutput(DEBUG_LEVEL_VERY_VERBOSE,385 "only %d frames in buffer, waiting for more (%d)\n",386 bufferfill, nframes);387 }388 } while((result=sem_wait(&m_frame_semaphore)) == 0);389 debugOutput(DEBUG_LEVEL_VERBOSE,390 "sem_wait returns: %d\n",391 result);392 return false;393 }394 395 bool396 TimestampedBuffer::tryWaitForFrames(unsigned int nframes)397 {398 int result;399 do {400 unsigned int bufferfill = getBufferFill();401 if(bufferfill >= nframes) {402 // first make the semaphore 0403 while((result=sem_trywait(&m_frame_semaphore)) == 0) {};404 return true;405 } else {406 debugOutput(DEBUG_LEVEL_VERY_VERBOSE,407 "only %d frames in buffer, waiting for more (%d)\n",408 bufferfill, nframes);409 }410 } while((result=sem_trywait(&m_frame_semaphore)) == 0);411 debugOutput(DEBUG_LEVEL_VERY_VERBOSE,412 "sem_trywait returns: %d\n",413 result);414 return false;415 }416 417 bool418 TimestampedBuffer::waitForFrames()419 {420 return waitForFrames(m_update_period);421 }422 423 bool424 TimestampedBuffer::tryWaitForFrames()425 {426 return tryWaitForFrames(m_update_period);427 }428 429 430 366 /** 431 367 * @brief Insert a dummy frame to the head buffer … … 459 395 m_framecounter++; 460 396 EXIT_CRITICAL_SECTION; 461 462 POST_SEMAPHORE;463 397 return true; 464 398 } … … 537 471 setBufferTailTimestamp(ts); 538 472 } 539 540 POST_SEMAPHORE;541 473 return true; 542 474 } … … 1310 1242 // ts(x) = m_buffer_tail_timestamp + 1311 1243 // (m_buffer_next_tail_timestamp - m_buffer_tail_timestamp)/(samples_between_updates)*x 1312 POST_SEMAPHORE;1313 1244 } 1314 1245 trunk/libffado/src/libutil/TimestampedBuffer.h
r803 r807 27 27 #include "../debugmodule/debugmodule.h" 28 28 #include "libutil/ringbuffer.h" 29 #include <semaphore.h>30 29 31 30 //typedef float ffado_timestamp_t; … … 156 155 157 156 unsigned int getBufferFill(); 157 unsigned int getBufferSpace(); 158 158 159 159 // timestamp stuff … … 244 244 float m_current_rate; 245 245 unsigned int m_update_period; 246 247 sem_t m_frame_semaphore;248 246 }; 249 247 trunk/libffado/tests/streaming/teststreaming3.cpp
r794 r807 292 292 293 293 dev_options.realtime = (arguments.rtprio != 0); 294 dev_options.packetizer_priority = arguments.rtprio + 1;294 dev_options.packetizer_priority = arguments.rtprio; 295 295 296 296 dev_options.verbose = arguments.verbose;