Changeset 860
- Timestamp:
- 01/19/08 10:47:41 (16 years ago)
- Files:
-
- trunk/libffado/src/libieee1394/IsoHandler.cpp (modified) (11 diffs)
- trunk/libffado/src/libieee1394/IsoHandler.h (modified) (1 diff)
- trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp (modified) (37 diffs)
- trunk/libffado/src/libstreaming/generic/StreamProcessor.h (modified) (3 diffs)
- trunk/libffado/src/libstreaming/StreamProcessorManager.cpp (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/libffado/src/libieee1394/IsoHandler.cpp
r841 r860 174 174 IsoHandler::waitForClient() 175 175 { 176 //debugOutput(DEBUG_LEVEL_VERBOSE, "waiting...\n");176 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "waiting...\n"); 177 177 if(m_Client) { 178 bool result = m_Client->waitForSignal(); 179 //debugOutput(DEBUG_LEVEL_VERBOSE, " returns %d\n", result); 178 bool result; 179 if (m_type == eHT_Receive) { 180 result = m_Client->waitForProducePacket(); 181 } else { 182 result = m_Client->waitForConsumePacket(); 183 } 184 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " returns %d\n", result); 180 185 return result; 181 186 } else { … … 188 193 IsoHandler::tryWaitForClient() 189 194 { 190 //debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "waiting...\n");195 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "waiting...\n"); 191 196 if(m_Client) { 192 bool result = m_Client->tryWaitForSignal(); 193 //debugOutput(DEBUG_LEVEL_VERBOSE, " returns %d\n", result); 197 bool result; 198 if (m_type == eHT_Receive) { 199 result = m_Client->canProducePacket(); 200 } else { 201 result = m_Client->canConsumePacket(); 202 } 203 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " returns %d\n", result); 194 204 return result; 195 205 } else { … … 214 224 // wait for the availability of frames in the client 215 225 // (blocking for transmit handlers) 216 #if 0 //#ifdef DEBUG 217 if (getType() == eHT_Transmit) { 218 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p) Waiting for Client to signal frame availability...\n", this); 219 } 220 #endif 221 if (getType() == eHT_Receive || waitForClient()) { 222 226 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) Waiting for Client activity...\n", this, getTypeString()); 227 if (waitForClient()) { 223 228 #if ISOHANDLER_USE_POLL 224 229 bool result = true; 225 while(result && m_Client && m_Client->canProcessPackets()) {230 while(result && m_Client && tryWaitForClient()) { 226 231 int err = poll(&m_poll_fd, 1, m_poll_timeout); 227 232 if (err == -1) { … … 255 260 // so poll'ing is not really necessary 256 261 bool result = true; 257 while(result && m_Client && m_Client->canProcessPackets()) {262 while(result && m_Client && tryWaitForClient()) { 258 263 result = iterate(); 259 264 // if (getType() == eHT_Receive) { … … 272 277 bool 273 278 IsoHandler::iterate() { 274 // if(m_type==eHT_Receive) { 275 // debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) Iterating ISO handler\n", 276 // this, (m_type==eHT_Receive?"Receive":"Transmit")); 277 // } 279 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) Iterating ISO handler...\n", 280 this, getTypeString()); 278 281 if(m_State == E_Running) { 279 282 #if ISOHANDLER_FLUSH_BEFORE_ITERATE … … 285 288 return false; 286 289 } 287 // debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) done iterating ISO handler\n", 288 // this, (m_type==eHT_Receive?"Receive":"Transmit"));290 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) done interating ISO handler...\n", 291 this, getTypeString()); 289 292 return true; 290 293 } else { 291 294 debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) Not iterating a non-running handler...\n", 292 this, (m_type==eHT_Receive?"Receive":"Transmit"));295 this, getTypeString()); 293 296 return false; 294 297 } … … 415 418 416 419 debugOutputShort( DEBUG_LEVEL_NORMAL, " Handler type................: %s\n", 417 (this->getType() == eHT_Receive ? "Receive" : "Transmit"));420 getTypeString()); 418 421 debugOutputShort( DEBUG_LEVEL_NORMAL, " Port, Channel...............: %2d, %2d\n", 419 422 m_manager.get1394Service().getPort(), channel); … … 475 478 unsigned int cycle, unsigned int dropped) { 476 479 477 debugOutput( DEBUG_LEVEL_VERY_VERBOSE,480 /* debugOutput( DEBUG_LEVEL_VERY_VERBOSE, 478 481 "received packet: length=%d, channel=%d, cycle=%d\n", 479 length, channel, cycle ); 482 length, channel, cycle );*/ 480 483 m_packetcount++; 481 484 m_dropped += dropped; … … 494 497 int cycle, unsigned int dropped) { 495 498 496 debugOutput( DEBUG_LEVEL_ULTRA_VERBOSE,499 /* debugOutput( DEBUG_LEVEL_ULTRA_VERBOSE, 497 500 "sending packet: length=%d, cycle=%d\n", 498 *length, cycle ); 501 *length, cycle );*/ 499 502 m_packetcount++; 500 503 m_dropped += dropped; … … 503 506 return m_Client->getPacket(data, length, tag, sy, cycle, dropped, m_max_packet_size); 504 507 } 508 *tag = 0; 509 *sy = 0; 510 *length = 0; 505 511 return RAW1394_ISO_OK; 506 512 } … … 594 600 return true; 595 601 } 602 603 /** 604 * @brief convert a EHandlerType to a string 605 * @param t the type 606 * @return a char * describing the state 607 */ 608 const char * 609 IsoHandler::eHTToString(enum EHandlerType t) { 610 switch (t) { 611 case eHT_Receive: return "Receive"; 612 case eHT_Transmit: return "Transmit"; 613 default: return "error: unknown type"; 614 } 615 } trunk/libffado/src/libieee1394/IsoHandler.h
r803 r860 100 100 void flush(); 101 101 enum EHandlerType getType() {return m_type;}; 102 const char *getTypeString() {return eHTToString(m_type); }; 103 104 // pretty printing 105 const char *eHTToString(enum EHandlerType); 102 106 103 107 bool isEnabled() trunk/libffado/src/libstreaming/generic/StreamProcessor.cpp
r857 r860 40 40 #include <math.h> 41 41 42 /* 43 #define POST_SEMAPHORE { \ 44 int tmp; \ 45 sem_getvalue(&m_signal_semaphore, &tmp); \ 46 debugWarning("posting semaphore from value %d\n", tmp); \ 47 sem_post(&m_signal_semaphore); \ 48 } 49 */ 50 51 #define POST_SEMAPHORE { \ 52 sem_post(&m_signal_semaphore); \ 42 #define SIGNAL_ACTIVITY { \ 43 pthread_mutex_lock(&m_activity_cond_lock); \ 44 pthread_cond_broadcast(&m_activity_cond); \ 45 pthread_mutex_unlock(&m_activity_cond_lock); \ 53 46 } 54 47 … … 78 71 , m_sync_delay( 0 ) 79 72 , m_in_xrun( false ) 80 , m_signal_period( 0 )81 , m_signal_offset( 0 )82 73 { 83 74 // create the timestamped buffer and register ourselves as its client 84 75 m_data_buffer = new Util::TimestampedBuffer(this); 76 pthread_mutex_init(&m_activity_cond_lock, NULL); 77 pthread_cond_init(&m_activity_cond, NULL); 85 78 } 86 79 … … 90 83 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not unregister stream processor with the Iso manager\n"); 91 84 } 92 // make the threads leave the wait condition 93 POST_SEMAPHORE; 94 sem_destroy(&m_signal_semaphore); 95 85 86 // lock the condition mutex to keep threads from blocking on 87 // the condition var while we destroy it 88 pthread_mutex_lock(&m_activity_cond_lock); 89 // now signal activity, releasing threads that 90 // are already blocking on the condition variable 91 pthread_cond_broadcast(&m_activity_cond); 92 // then destroy it 93 pthread_cond_destroy(&m_activity_cond); 94 pthread_mutex_unlock(&m_activity_cond_lock); 95 96 // destroy the mutexes 97 pthread_mutex_destroy(&m_activity_cond_lock); 96 98 if (m_data_buffer) delete m_data_buffer; 97 99 if (m_scratch_buffer) delete[] m_scratch_buffer; … … 316 318 if (m_state == ePS_Invalid) { 317 319 debugError("Should not have state %s\n", ePSToString(m_state) ); 318 POST_SEMAPHORE;319 320 return RAW1394_ISO_ERROR; 320 321 } … … 338 339 if (!updateState()) { // we are allowed to change the state directly 339 340 debugError("Could not update state!\n"); 340 POST_SEMAPHORE;341 341 return RAW1394_ISO_ERROR; 342 342 } … … 358 358 if (!updateState()) { // we are allowed to change the state directly 359 359 debugError("Could not update state!\n"); 360 POST_SEMAPHORE;361 360 return RAW1394_ISO_ERROR; 362 361 } … … 383 382 if (!updateState()) { // we are allowed to change the state directly 384 383 debugError("Could not update state!\n"); 385 POST_SEMAPHORE;386 384 return RAW1394_ISO_ERROR; 387 385 } … … 417 415 if (!updateState()) { // we are allowed to change the state directly 418 416 debugError("Could not update state!\n"); 419 POST_SEMAPHORE;420 417 return RAW1394_ISO_ERROR; 421 418 } … … 435 432 if (!updateState()) { // we are allowed to change the state directly 436 433 debugError("Could not update state!\n"); 437 POST_SEMAPHORE;438 434 return RAW1394_ISO_ERROR; 439 435 } … … 455 451 if (!updateState()) { // we are allowed to change the state directly 456 452 debugError("Could not update state!\n"); 457 POST_SEMAPHORE;458 453 return RAW1394_ISO_ERROR; 459 454 } 460 POST_SEMAPHORE;461 455 return RAW1394_ISO_DEFER; 462 456 } else if(result2 == eCRV_OK) { 463 457 // no problem here 464 // if we have enough samples, we can post the semaphore and 465 // defer further processing until later. this will allow us to 466 // run the client and process the frames such that we can put them 467 // into the xmit buffers ASAP 468 if (m_state == ePS_Running) { 469 unsigned int bufferfill = m_data_buffer->getBufferFill(); 470 if(bufferfill >= m_signal_period + m_signal_offset) { 471 // this to avoid multiple signals for the same period 472 int semval; 473 sem_getvalue(&m_signal_semaphore, &semval); 474 // NOTE: this can cause 2 posts to be done when the receiving thread 475 // decreases the semaphore but hasn't processed the frames yet 476 unsigned int signal_period = m_signal_period * (semval + 1) + m_signal_offset; 477 if(bufferfill >= signal_period) { 478 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p) buffer fill (%d) > signal period (%d), sem_val=%d\n", 479 this, m_data_buffer->getBufferFill(), signal_period, semval); 480 POST_SEMAPHORE; 481 } 482 // the process thread should have higher prio such that we are blocked until 483 // the samples are processed. 484 return RAW1394_ISO_DEFER; 485 } 486 } 458 SIGNAL_ACTIVITY; 487 459 return RAW1394_ISO_OK; 488 460 } else { 489 461 debugError("Invalid response\n"); 490 POST_SEMAPHORE;491 462 return RAW1394_ISO_ERROR; 492 463 } … … 496 467 } else { 497 468 debugError("Invalid response\n"); 498 POST_SEMAPHORE;499 469 return RAW1394_ISO_ERROR; 500 470 } 501 471 debugError("reached the unreachable\n"); 502 POST_SEMAPHORE;503 472 return RAW1394_ISO_ERROR; 504 473 } … … 633 602 634 603 // assumed not to xrun 635 enum eChildReturnValue result2 =generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length);604 generateSilentPacketData(data, length, tag, sy, cycle, dropped_cycles, max_length); 636 605 return RAW1394_ISO_OK; 637 606 } else { … … 789 758 */ 790 759 bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) { 760 bool result; 791 761 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.getFrames(%d, %11llu)", nbframes, ts); 792 762 assert( getType() == ePT_Receive ); 793 if(isDryRunning()) return getFramesDry(nbframes, ts); 794 else return getFramesWet(nbframes, ts); 763 if(isDryRunning()) result = getFramesDry(nbframes, ts); 764 else result = getFramesWet(nbframes, ts); 765 SIGNAL_ACTIVITY; 766 return result; 795 767 } 796 768 … … 847 819 StreamProcessor::dropFrames(unsigned int nbframes, int64_t ts) 848 820 { 821 bool result; 849 822 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d, %lld)\n", nbframes, ts); 850 return m_data_buffer->dropFrames(nbframes); 823 result = m_data_buffer->dropFrames(nbframes); 824 SIGNAL_ACTIVITY; 825 return result; 851 826 } 852 827 853 828 bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts) 854 829 { 830 bool result; 855 831 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.putFrames(%d, %11llu)", nbframes, ts); 856 832 assert( getType() == ePT_Transmit ); 857 858 if(isDryRunning()) return putFramesDry(nbframes, ts); 859 else return putFramesWet(nbframes, ts); 833 if(isDryRunning()) result = putFramesDry(nbframes, ts); 834 else result = putFramesWet(nbframes, ts); 835 SIGNAL_ACTIVITY; 836 return result; 860 837 } 861 838 … … 867 844 m_data_buffer->blockProcessWriteFrames(nbframes, ts); 868 845 debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, " New timestamp: %llu\n", ts); 869 870 unsigned int bufferfill = m_data_buffer->getBufferFill();871 if (bufferfill >= m_signal_period + m_signal_offset) {872 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p) sufficient frames in buffer (%d / %d), posting semaphore\n",873 this, bufferfill, m_signal_period + m_signal_offset);874 POST_SEMAPHORE;875 } else {876 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p) insufficient frames in buffer (%d / %d), not posting semaphore\n",877 this, bufferfill, m_signal_period + m_signal_offset);878 }879 846 return true; // FIXME: what about failure? 880 847 } … … 911 878 } 912 879 913 unsigned int bufferfill = m_data_buffer->getBufferFill(); 914 if (bufferfill >= m_signal_period + m_signal_offset) { 915 debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) sufficient frames in buffer (%d / %d), posting semaphore\n", 916 this, bufferfill, m_signal_period + m_signal_offset); 917 POST_SEMAPHORE; 918 } else { 919 debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) insufficient frames in buffer (%d / %d), not posting semaphore\n", 920 this, bufferfill, m_signal_period + m_signal_offset); 921 } 922 880 SIGNAL_ACTIVITY; 923 881 return true; 924 }925 926 bool927 StreamProcessor::waitForSignal()928 {929 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) wait ...\n", this, getTypeString());930 int result;931 if(m_state == ePS_Running && m_next_state == ePS_Running) {932 // check whether we already fullfil the criterion933 unsigned int bufferfill = m_data_buffer->getBufferFill();934 if(bufferfill >= m_signal_period + m_signal_offset) {935 return true;936 }937 938 result = sem_wait(&m_signal_semaphore);939 #ifdef DEBUG940 int tmp;941 sem_getvalue(&m_signal_semaphore, &tmp);942 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " sem_wait returns: %d, sem_value: %d\n", result, tmp);943 #endif944 return result == 0;945 } else {946 // when we're not running, we can always provide frames947 // when we're in a state transition, keep iterating too948 debugOutput(DEBUG_LEVEL_VERBOSE, "Not running...\n");949 return true;950 }951 }952 953 bool954 StreamProcessor::tryWaitForSignal()955 {956 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p, %s) trywait ...\n", this, getTypeString());957 int result;958 if(m_state == ePS_Running && m_next_state == ePS_Running) {959 // check whether we already fullfil the criterion960 unsigned int bufferfill = m_data_buffer->getBufferFill();961 if(bufferfill >= m_signal_period + m_signal_offset) {962 return true;963 }964 965 result = sem_trywait(&m_signal_semaphore);966 #ifdef DEBUG967 int tmp;968 sem_getvalue(&m_signal_semaphore, &tmp);969 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " sem_wait returns: %d, sem_value: %d\n", result, tmp);970 #endif971 return result == 0;972 } else {973 // when we're not running, we can always provide frames974 // when we're in a state transition, keep iterating too975 debugOutput(DEBUG_LEVEL_VERBOSE, "Not running...\n");976 return true;977 }978 }979 980 bool981 StreamProcessor::canProcessPackets()982 {983 if(m_state != ePS_Running || m_next_state != ePS_Running) return true;984 bool result;985 unsigned int bufferfill;986 if(getType() == ePT_Receive) {987 bufferfill = m_data_buffer->getBufferSpace();988 } else {989 bufferfill = m_data_buffer->getBufferFill();990 }991 result = bufferfill > getNominalFramesPerPacket();992 // debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) for a bufferfill of %d, we return %d\n",993 // this, ePTToString(getType()), bufferfill, result);994 return result;995 882 } 996 883 … … 998 885 StreamProcessor::shiftStream(int nbframes) 999 886 { 887 bool result; 1000 888 if(nbframes == 0) return true; 1001 889 if(nbframes > 0) { 1002 return m_data_buffer->dropFrames(nbframes); 890 result = m_data_buffer->dropFrames(nbframes); 891 SIGNAL_ACTIVITY; 892 return result; 1003 893 } else { 1004 boolresult = true;894 result = true; 1005 895 while(nbframes++) { 1006 896 result &= m_data_buffer->writeDummyFrame(); 1007 897 } 898 SIGNAL_ACTIVITY; 1008 899 return result; 1009 900 } … … 1089 980 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "init...\n"); 1090 981 1091 if (sem_init(&m_signal_semaphore, 0, 0) == -1) {1092 debugError("Could not init signal semaphore");1093 return false;1094 }1095 1096 982 if(!m_IsoHandlerManager.registerStream(this)) { 1097 983 debugOutput(DEBUG_LEVEL_VERBOSE,"Could not register stream processor with the Iso manager\n"); … … 1170 1056 m_cycle_to_switch_state = TICKS_TO_CYCLES(time_instant); 1171 1057 m_next_state = state; 1172 POST_SEMAPHORE; // needed to ensure that things don't get deadlocked 1058 // wake up any threads that might be waiting on data in the buffers 1059 // since a state transition can cause data to become available 1060 SIGNAL_ACTIVITY; 1173 1061 return true; 1174 1062 } … … 1417 1305 } 1418 1306 #endif 1307 SIGNAL_ACTIVITY; 1419 1308 return result; 1420 1309 } … … 1450 1339 } 1451 1340 #endif 1341 SIGNAL_ACTIVITY; 1452 1342 return true; 1453 1343 } … … 1499 1389 } 1500 1390 #endif 1391 SIGNAL_ACTIVITY; 1501 1392 return result; 1502 1393 } … … 1521 1412 // this basically means nothing, the state change will 1522 1413 // be picked up by the packet iterator 1523 1524 sem_init(&m_signal_semaphore, 0, 0);1525 m_signal_period = m_StreamProcessorManager.getPeriodSize();1526 m_signal_offset = 0; // FIXME: we have to ensure that everyone is ready1527 1414 1528 1415 if(!m_data_buffer->clearBuffer()) { … … 1538 1425 return false; 1539 1426 } 1540 if (m_data_buffer->getBufferFill() >= m_signal_period + m_signal_offset) { 1541 POST_SEMAPHORE; 1542 } 1543 } 1544 1427 } 1545 1428 break; 1546 1429 default: … … 1555 1438 } 1556 1439 #endif 1440 SIGNAL_ACTIVITY; 1557 1441 return true; 1558 1442 } … … 1592 1476 } 1593 1477 #endif 1478 SIGNAL_ACTIVITY; 1594 1479 return result; 1595 1480 } … … 1611 1496 case ePS_Running: 1612 1497 // the thread will do the transition 1613 1614 // we have to wake the iterator if it's asleep1615 POST_SEMAPHORE;1616 1498 break; 1617 1499 default: … … 1626 1508 } 1627 1509 #endif 1510 SIGNAL_ACTIVITY; 1628 1511 return true; 1629 1512 } … … 1648 1531 return true; 1649 1532 } 1650 1651 1533 // after creation, only initialization is allowed 1652 1534 if (m_state == ePS_Created) { … … 1656 1538 // do init here 1657 1539 result = doStop(); 1658 if (result) { POST_SEMAPHORE;return true;}1540 if (result) {return true;} 1659 1541 else goto updateState_exit_change_failed; 1660 1542 } … … 1666 1548 } 1667 1549 result = doWaitForRunningStream(); 1668 if (result) { POST_SEMAPHORE;return true;}1550 if (result) {return true;} 1669 1551 else goto updateState_exit_change_failed; 1670 1552 } … … 1677 1559 } 1678 1560 result = doDryRunning(); 1679 if (result) { POST_SEMAPHORE;return true;}1561 if (result) {return true;} 1680 1562 else goto updateState_exit_change_failed; 1681 1563 } … … 1694 1576 result = doWaitForStreamEnable(); 1695 1577 } 1696 if (result) { POST_SEMAPHORE;return true;}1578 if (result) {return true;} 1697 1579 else goto updateState_exit_change_failed; 1698 1580 } … … 1711 1593 result = doRunning(); 1712 1594 } 1713 if (result) { POST_SEMAPHORE;return true;}1595 if (result) {return true;} 1714 1596 else goto updateState_exit_change_failed; 1715 1597 } … … 1721 1603 } 1722 1604 result = doWaitForStreamDisable(); 1723 if (result) { POST_SEMAPHORE;return true;}1605 if (result) {return true;} 1724 1606 else goto updateState_exit_change_failed; 1725 1607 } … … 1731 1613 } 1732 1614 result = doDryRunning(); 1733 if (result) { POST_SEMAPHORE;return true;}1615 if (result) {return true;} 1734 1616 else goto updateState_exit_change_failed; 1735 1617 } … … 1739 1621 debugError("Invalid state transition: %s => %s\n", 1740 1622 ePSToString(m_state), ePSToString(next_state)); 1741 POST_SEMAPHORE;1623 SIGNAL_ACTIVITY; 1742 1624 return false; 1743 1625 updateState_exit_change_failed: 1744 1626 debugError("State transition failed: %s => %s\n", 1745 1627 ePSToString(m_state), ePSToString(next_state)); 1746 POST_SEMAPHORE;1628 SIGNAL_ACTIVITY; 1747 1629 return false; 1630 } 1631 1632 bool StreamProcessor::waitForProducePacket() 1633 { 1634 return waitForProduce(getNominalFramesPerPacket()); 1635 } 1636 bool StreamProcessor::waitForProducePeriod() 1637 { 1638 return waitForProduce(m_StreamProcessorManager.getPeriodSize()); 1639 } 1640 bool StreamProcessor::waitForProduce(unsigned int nframes) 1641 { 1642 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p) wait ...\n", this); 1643 struct timespec ts; 1644 int result; 1645 1646 if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { 1647 debugError("clock_gettime failed\n"); 1648 return false; 1649 } 1650 1651 // FIXME: hardcoded timeout of 0.1 sec 1652 ts.tv_nsec += 100 * 1000000LL; 1653 if (ts.tv_nsec > 1000000000LL) { 1654 ts.tv_sec += 1; 1655 ts.tv_nsec -= 1000000000LL; 1656 } 1657 1658 pthread_mutex_lock(&m_activity_cond_lock); 1659 while(!canProduce(nframes)) { 1660 result = pthread_cond_timedwait(&m_activity_cond, &m_activity_cond_lock, &ts); 1661 1662 if (result == -1) { 1663 if (errno == ETIMEDOUT) { 1664 debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) pthread_cond_timedwait() timed out\n", this); 1665 pthread_mutex_unlock(&m_activity_cond_lock); 1666 return false; 1667 } else { 1668 debugError("(%p) pthread_cond_timedwait error\n", this); 1669 pthread_mutex_unlock(&m_activity_cond_lock); 1670 return false; 1671 } 1672 } 1673 } 1674 pthread_mutex_unlock(&m_activity_cond_lock); 1675 return true; 1676 } 1677 1678 bool StreamProcessor::waitForConsumePacket() 1679 { 1680 return waitForConsume(getNominalFramesPerPacket()); 1681 } 1682 bool StreamProcessor::waitForConsumePeriod() 1683 { 1684 return waitForConsume(m_StreamProcessorManager.getPeriodSize()); 1685 } 1686 bool StreamProcessor::waitForConsume(unsigned int nframes) 1687 { 1688 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "(%p) wait ...\n", this); 1689 struct timespec ts; 1690 int result; 1691 1692 if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { 1693 debugError("clock_gettime failed\n"); 1694 return false; 1695 } 1696 1697 // FIXME: hardcoded timeout of 0.1 sec 1698 ts.tv_nsec += 100 * 1000000LL; 1699 if (ts.tv_nsec > 1000000000LL) { 1700 ts.tv_sec += 1; 1701 ts.tv_nsec -= 1000000000LL; 1702 } 1703 1704 pthread_mutex_lock(&m_activity_cond_lock); 1705 while(!canConsume(nframes)) { 1706 result = pthread_cond_timedwait(&m_activity_cond, &m_activity_cond_lock, &ts); 1707 1708 if (result == -1) { 1709 if (errno == ETIMEDOUT) { 1710 debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) pthread_cond_timedwait() timed out\n", this); 1711 pthread_mutex_unlock(&m_activity_cond_lock); 1712 return false; 1713 } else { 1714 debugError("(%p) pthread_cond_timedwait error\n", this); 1715 pthread_mutex_unlock(&m_activity_cond_lock); 1716 return false; 1717 } 1718 } 1719 } 1720 pthread_mutex_unlock(&m_activity_cond_lock); 1721 return true; 1722 } 1723 1724 bool StreamProcessor::canProducePacket() 1725 { 1726 return canProduce(getNominalFramesPerPacket()); 1727 } 1728 bool StreamProcessor::canProducePeriod() 1729 { 1730 return canProduce(m_StreamProcessorManager.getPeriodSize()); 1731 } 1732 bool StreamProcessor::canProduce(unsigned int nframes) 1733 { 1734 if(m_state == ePS_Running && m_next_state == ePS_Running) { 1735 // check whether we already fullfil the criterion 1736 unsigned int bufferspace = m_data_buffer->getBufferSpace(); 1737 if(bufferspace >= nframes) { 1738 // debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "enough space (%u)...\n", bufferspace); 1739 return true; 1740 } else return false; 1741 } else { 1742 if(getType() == ePT_Transmit) { 1743 // if we are an xmit SP, we cannot accept frames 1744 // when not running 1745 return false; 1746 } else { 1747 // if we are a receive SP, we can always accept frames 1748 // when not running 1749 return true; 1750 } 1751 } 1752 } 1753 1754 bool StreamProcessor::canConsumePacket() 1755 { 1756 return canConsume(getNominalFramesPerPacket()); 1757 } 1758 bool StreamProcessor::canConsumePeriod() 1759 { 1760 return canConsume(m_StreamProcessorManager.getPeriodSize()); 1761 } 1762 bool StreamProcessor::canConsume(unsigned int nframes) 1763 { 1764 if(m_state == ePS_Running && m_next_state == ePS_Running) { 1765 // check whether we already fullfil the criterion 1766 unsigned int bufferfill = m_data_buffer->getBufferFill(); 1767 if(bufferfill >= nframes) { 1768 // debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "enough items (%u)...\n", bufferfill); 1769 return true; 1770 } else return false; 1771 } else { 1772 if(getType() == ePT_Transmit) { 1773 // if we are an xmit SP, and we're not running, 1774 // we can always provide frames 1775 return true; 1776 } else { 1777 // if we are a receive SP, we can't provide frames 1778 // when not running 1779 return false; 1780 } 1781 } 1748 1782 } 1749 1783 trunk/libffado/src/libstreaming/generic/StreamProcessor.h
r857 r860 34 34 35 35 #include "debugmodule/debugmodule.h" 36 #include <semaphore.h> 36 37 #include <pthread.h> 37 38 38 39 class Ieee1394Service; … … 161 162 bool putFrames(unsigned int nbframes, int64_t ts); ///< transfer the client contents to the buffer 162 163 163 unsigned int getSignalPeriod() {return m_signal_period;}; 164 bool setSignalPeriod(unsigned int p) {m_signal_period=p; return true;}; 165 /** 166 * @brief waits for a 'signal' (blocking) 167 * 168 * a 'signal' is: 169 * when type==Receive: 170 * - one signal_period of frames is present in the buffer 171 * (received by the iso side) 172 * - an error has occurred (xrun, iso error, ...) 173 * when type==Transmit: 174 * - at least one signal_period of frames are present in the buffer 175 * (have been written into it by the client) 176 * - an error occurred 177 * 178 * @return true if the 'signal' is available, false if error 179 */ 180 bool waitForSignal(); 181 182 /** 183 * @brief checks for a 'signal' (non-blocking) 184 * 185 * a 'signal' is: 186 * when type==Receive: 187 * - one signal_period of frames is present in the buffer 188 * (received by the iso side) 189 * - an error has occurred (xrun, iso error, ...) 190 * when type==Transmit: 191 * - at least one signal_period of frames are present in the buffer 192 * (have been written into it by the client) 193 * - an error occurred 194 * 195 * @return true if the 'signal' is available, false if not (or error) 196 */ 197 bool tryWaitForSignal(); 198 199 /** 200 * @brief can a SP process (queue, dequeue) packets at this moment? 201 * 202 * 203 * @return true if packet processing makes sense 204 */ 205 bool canProcessPackets(); 206 164 //FIXME: document wait functions 165 bool waitForProducePacket(); 166 bool waitForProducePeriod(); 167 bool waitForProduce(unsigned int nframes); 168 169 bool waitForConsumePacket(); 170 bool waitForConsumePeriod(); 171 bool waitForConsume(unsigned int nframes); 172 173 bool canProducePacket(); 174 bool canProducePeriod(); 175 bool canProduce(unsigned int nframes); 176 177 bool canConsumePacket(); 178 bool canConsumePeriod(); 179 bool canConsume(unsigned int nframes); 180 181 public: 207 182 /** 208 183 * @brief drop nframes from the internal buffer as if they were transferred to the client side … … 503 478 private: 504 479 bool m_in_xrun; 505 sem_t m_signal_semaphore; 506 unsigned int m_signal_period; 507 unsigned int m_signal_offset; 480 pthread_mutex_t m_activity_cond_lock; 481 pthread_cond_t m_activity_cond; 508 482 509 483 public: trunk/libffado/src/libstreaming/StreamProcessorManager.cpp
r857 r860 712 712 while(period_not_ready) { 713 713 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for period (%d frames in buffer)...\n", m_SyncSource->getBufferFill()); 714 if(!m_SyncSource->waitForSignal()) { 715 debugError("Error waiting for signal\n"); 716 return false; 717 } 714 bool result; 715 if(m_SyncSource->getType() == StreamProcessor::ePT_Receive) { 716 result = m_SyncSource->waitForConsumePeriod(); 717 } else { 718 result = m_SyncSource->waitForProducePeriod(); 719 } 720 // if(!result) { 721 // debugError("Error waiting for signal\n"); 722 // return false; 723 // } 718 724 719 725 // HACK: this should be solved more elegantly … … 722 728 it != m_ReceiveProcessors.end(); 723 729 ++it ) { 724 bool this_sp_period_ready = (*it)->canClientTransferFrames(m_period); 730 bool this_sp_period_ready = (*it)->canConsumePeriod(); 731 if (!this_sp_period_ready) { 732 period_not_ready = true; 733 } 734 } 735 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 736 it != m_TransmitProcessors.end(); 737 ++it ) { 738 bool this_sp_period_ready = (*it)->canProducePeriod(); 725 739 if (!this_sp_period_ready) { 726 740 period_not_ready = true;