Changeset 727 for branches/ppalmers-streaming
- Timestamp:
- 11/25/07 12:57:43 (16 years ago)
- Files:
-
- branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.cpp (modified) (3 diffs)
- branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.cpp (modified) (9 diffs)
- branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.h (modified) (3 diffs)
- branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp (modified) (6 diffs)
- branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.h (modified) (2 diffs)
- branches/ppalmers-streaming/src/libutil/TimestampedBuffer.cpp (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
branches/ppalmers-streaming/src/libstreaming/amdtp/AmdtpTransmitStreamProcessor.cpp
r722 r727 438 438 unsigned int nevents, unsigned int offset ) 439 439 { 440 bool no_problem =true;440 bool no_problem = true; 441 441 442 442 for ( PortVectorIterator it = m_PeriodPorts.begin(); 443 it != m_PeriodPorts.end(); 444 ++it ) 445 { 446 447 if ( ( *it )->isDisabled() ) {continue;}; 443 it != m_PeriodPorts.end(); 444 ++it ) 445 { 446 if ( (*it)->isDisabled() ) { continue; }; 448 447 449 448 //FIXME: make this into a static_cast when not DEBUG? 450 451 AmdtpPortInfo *pinfo=dynamic_cast<AmdtpPortInfo *> ( *it ); 449 AmdtpPortInfo *pinfo = dynamic_cast<AmdtpPortInfo *> ( *it ); 452 450 assert ( pinfo ); // this should not fail!! 453 451 454 switch 452 switch( pinfo->getFormat() ) 455 453 { 456 454 case AmdtpPortInfo::E_MBLA: 457 if ( encodePortToMBLAEvents ( static_cast<AmdtpAudioPort *> ( *it ), ( quadlet_t * ) data, offset, nevents) )455 if( encodePortToMBLAEvents(static_cast<AmdtpAudioPort *>(*it), (quadlet_t *)data, offset, nevents) ) 458 456 { 459 debugWarning ( "Could not encode port %s to MBLA events", ( *it)->getName().c_str() );460 no_problem =false;457 debugWarning ( "Could not encode port %s to MBLA events", (*it)->getName().c_str() ); 458 no_problem = false; 461 459 } 462 460 break; … … 470 468 } 471 469 472 bool AmdtpTransmitStreamProcessor::transmitSilenceBlock ( char *data, 473 unsigned int nevents, unsigned int offset ) 474 { 475 bool problem = false; 476 for ( PortVectorIterator it = m_PeriodPorts.begin(); 477 it != m_PeriodPorts.end(); 478 ++it ) 470 bool 471 AmdtpTransmitStreamProcessor::transmitSilenceBlock( 472 char *data, unsigned int nevents, unsigned int offset) 473 { 474 bool no_problem = true; 475 for(PortVectorIterator it = m_PeriodPorts.begin(); 476 it != m_PeriodPorts.end(); 477 ++it ) 479 478 { 480 479 //FIXME: make this into a static_cast when not DEBUG? 481 AmdtpPortInfo *pinfo=dynamic_cast<AmdtpPortInfo *> ( *it);482 assert ( pinfo); // this should not fail!!483 484 switch 480 AmdtpPortInfo *pinfo=dynamic_cast<AmdtpPortInfo *>(*it); 481 assert(pinfo); // this should not fail!! 482 483 switch( pinfo->getFormat() ) 485 484 { 486 485 case AmdtpPortInfo::E_MBLA: 487 if ( encodeSilencePortToMBLAEvents ( static_cast<AmdtpAudioPort *> ( *it ), ( quadlet_t * ) data, offset, nevents) )486 if ( encodeSilencePortToMBLAEvents(static_cast<AmdtpAudioPort *>(*it), (quadlet_t *)data, offset, nevents) ) 488 487 { 489 debugWarning ( "Could not encode port %s to MBLA events", ( *it )->getName().c_str());490 problem = true;488 debugWarning("Could not encode port %s to MBLA events", (*it)->getName().c_str()); 489 no_problem = false; 491 490 } 492 491 break; … … 497 496 } 498 497 } 499 return problem;498 return no_problem; 500 499 } 501 500 branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.cpp
r723 r727 41 41 , m_next_state( ePS_Invalid ) 42 42 , m_cycle_to_switch_state( 0 ) 43 , m_scratch_buffer( NULL ) 44 , m_scratch_buffer_size_bytes( 0 ) 43 45 , m_manager( NULL ) 44 46 , m_ticks_per_frame( 0 ) … … 51 53 { 52 54 // create the timestamped buffer and register ourselves as its client 53 m_data_buffer =new Util::TimestampedBuffer(this);55 m_data_buffer = new Util::TimestampedBuffer(this); 54 56 } 55 57 56 58 StreamProcessor::~StreamProcessor() { 57 59 if (m_data_buffer) delete m_data_buffer; 60 if (m_scratch_buffer) delete[] m_scratch_buffer; 58 61 } 59 62 … … 75 78 int StreamProcessor::getBufferFill() { 76 79 return m_data_buffer->getBufferFill(); 77 }78 79 bool80 StreamProcessor::dropFrames(unsigned int nbframes)81 {82 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d)\n", nbframes);83 return m_data_buffer->dropFrames(nbframes);84 80 } 85 81 … … 377 373 if (dropped_cycles < 0) debugWarning("(%p) dropped < 1 (%d)\n", this, dropped_cycles); 378 374 if (dropped_cycles > 0) { 379 debugWarning("(%p) dropped %d packets on cycle %u \n", this, dropped_cycles, cycle);375 debugWarning("(%p) dropped %d packets on cycle %u (last_cycle=%u, dropped=%d)\n", this, dropped_cycles, cycle, m_last_cycle, dropped); 380 376 m_dropped += dropped_cycles; 381 377 } 382 378 } 383 if (cycle > 0) {379 if (cycle >= 0) { 384 380 m_last_cycle = cycle; 385 381 } … … 623 619 } 624 620 625 bool StreamProcessor::getFramesDry(unsigned int nbframes, int64_t ts) { 621 bool StreamProcessor::getFramesDry(unsigned int nbframes, int64_t ts) 622 { 626 623 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "stream (%p): dry run %d frames (@ ts=%lld)\n", 627 624 this, nbframes, ts); 628 629 625 // dry run on this side means that we put silence in all enabled ports 630 626 // since there is do data put into the ringbuffer in the dry-running state … … 632 628 } 633 629 634 bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts) { 630 bool 631 StreamProcessor::dropFrames(unsigned int nbframes, int64_t ts) 632 { 633 debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "StreamProcessor::dropFrames(%d, %lld)\n", nbframes, ts); 634 return m_data_buffer->dropFrames(nbframes); 635 } 636 637 bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts) 638 { 635 639 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "%p.putFrames(%d, %11llu)", nbframes, ts); 636 640 assert( getType() == ePT_Transmit ); … … 640 644 641 645 bool 642 StreamProcessor::putFramesWet(unsigned int nbframes, int64_t ts) { 646 StreamProcessor::putFramesWet(unsigned int nbframes, int64_t ts) 647 { 643 648 debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesWet(%d, %llu)\n", nbframes, ts); 644 649 // transfer the data … … 649 654 650 655 bool 651 StreamProcessor::putFramesDry(unsigned int nbframes, int64_t ts) { 656 StreamProcessor::putFramesDry(unsigned int nbframes, int64_t ts) 657 { 652 658 debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putFramesDry(%d, %llu)\n", nbframes, ts); 653 659 // do nothing 654 660 return true; 661 } 662 663 bool 664 StreamProcessor::putSilenceFrames(unsigned int nbframes, int64_t ts) 665 { 666 debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE, "StreamProcessor::putSilenceFrames(%d, %llu)\n", nbframes, ts); 667 668 size_t bytes_per_frame = getEventSize() * getEventsPerFrame(); 669 unsigned int scratch_buffer_size_frames = m_scratch_buffer_size_bytes / bytes_per_frame; 670 671 if (nbframes > scratch_buffer_size_frames) { 672 debugError("nframes (%u) > scratch_buffer_size_frames (%u)\n", 673 nbframes, scratch_buffer_size_frames); 674 } 675 676 assert(m_scratch_buffer); 677 if(!transmitSilenceBlock((char *)m_scratch_buffer, nbframes, 0)) { 678 debugError("Could not prepare silent block\n"); 679 return false; 680 } 681 if(!m_data_buffer->writeFrames(nbframes, (char *)m_scratch_buffer, ts)) { 682 debugError("Could not write silent block\n"); 683 return false; 684 } 685 return true; 686 } 687 688 bool 689 StreamProcessor::shiftStream(int nbframes) 690 { 691 if(nbframes == 0) return true; 692 if(nbframes > 0) { 693 return m_data_buffer->dropFrames(nbframes); 694 } else { 695 bool result = true; 696 while(nbframes--) { 697 result &= m_data_buffer->writeDummyFrame(); 698 } 699 return result; 700 } 655 701 } 656 702 … … 670 716 bool StreamProcessor::prepare() 671 717 { 672 debugOutput( DEBUG_LEVEL_VER Y_VERBOSE, "prepare...\n");718 debugOutput( DEBUG_LEVEL_VERBOSE, "Prepare SP (%p)...\n", this); 673 719 if(!m_manager) { 674 720 debugFatal("Not attached to a manager!\n"); 721 return false; 722 } 723 724 // make the scratch buffer one period of frames long 725 m_scratch_buffer_size_bytes = m_manager->getPeriodSize() * getEventsPerFrame() * getEventSize(); 726 debugOutput( DEBUG_LEVEL_VERBOSE, " Allocate scratch buffer of %d quadlets\n"); 727 if(m_scratch_buffer) delete[] m_scratch_buffer; 728 m_scratch_buffer = new byte_t[m_scratch_buffer_size_bytes]; 729 if(m_scratch_buffer == NULL) { 730 debugFatal("Could not allocate scratch buffer\n"); 675 731 return false; 676 732 } branches/ppalmers-streaming/src/libstreaming/generic/StreamProcessor.h
r723 r727 154 154 bool putFrames(unsigned int nbframes, int64_t ts); ///< transfer the client contents to the buffer 155 155 156 /** 157 * @brief drop nframes from the internal buffer as if they were transferred to the client side 158 * 159 * Gets nframes of frames from the buffer as done by getFrames(), but does not transfer them 160 * to the client side. Instead they are discarded. 161 * 162 * @param nframes number of frames 163 * @return true if the operation was successful 164 */ 165 bool dropFrames(unsigned int nframes, int64_t ts); 166 167 /** 168 * @brief put silence frames into the internal buffer 169 * 170 * Puts nframes of frames into the buffer as done by putFrames(), but does not transfer them 171 * from the client side. Instead, silent frames are used. 172 * 173 * @param nframes number of frames 174 * @return true if the operation was successful 175 */ 176 bool putSilenceFrames(unsigned int nbframes, int64_t ts); 177 178 /** 179 * @brief Shifts the stream with the specified number of frames 180 * 181 * Used to align several streams to each other. It comes down to 182 * making sure the head timestamp corresponds to the timestamp of 183 * one master stream 184 * 185 * @param nframes the number of frames to shift 186 * @return true if successful 187 */ 188 bool shiftStream(int nframes); 156 189 protected: // the helper receive/transmit functions 157 190 enum eChildReturnValue { … … 238 271 protected: 239 272 Util::TimestampedBuffer *m_data_buffer; 240 273 // the scratch buffer is temporary buffer space that can be 274 // used by any function. It's pre-allocated when the SP is created. 275 // the purpose is to avoid allocation of memory (or heap/stack) in 276 // an RT context 277 byte_t* m_scratch_buffer; 278 size_t m_scratch_buffer_size_bytes; 241 279 protected: 242 280 StreamProcessorManager *m_manager; … … 255 293 */ 256 294 bool canClientTransferFrames(unsigned int nframes); 257 258 /**259 * @brief drop nframes from the internal buffer260 *261 * this function drops nframes from the internal buffers, without any262 * specification on what frames are dropped. Timestamps are not updated.263 *264 * @param nframes number of frames265 * @return true if the operation was successful266 */267 bool dropFrames(unsigned int nframes);268 295 269 296 /** branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.cpp
r723 r727 29 29 #include <errno.h> 30 30 #include <assert.h> 31 #include <math.h> 31 32 32 33 #define RUNNING_TIMEOUT_MSEC 4000 … … 350 351 int64_t time_till_next_period; 351 352 while(nb_sync_runs--) { // or while not sync-ed? 352 // check if we were w akedup too soon353 time_till_next_period =m_SyncSource->getTimeUntilNextPeriodSignalUsecs();353 // check if we were woken up too soon 354 time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs(); 354 355 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period); 355 356 if(time_till_next_period > 0) { … … 444 445 445 446 // now align the received streams 446 debugOutput( DEBUG_LEVEL_VERBOSE, " Aligning incoming streams...\n"); 447 448 447 if(!alignReceivedStreams()) { 448 debugError("Could not align streams\n"); 449 return false; 450 } 449 451 debugOutput( DEBUG_LEVEL_VERBOSE, " StreamProcessor streams running...\n"); 452 return true; 453 } 454 455 bool 456 StreamProcessorManager::alignReceivedStreams() 457 { 458 #define NB_PERIODS_FOR_ALIGN_AVERAGE 20 459 #define NB_ALIGN_TRIES 20 460 debugOutput( DEBUG_LEVEL_VERBOSE, "Aligning received streams...\n"); 461 unsigned int nb_sync_runs; 462 unsigned int nb_rcv_sp = m_ReceiveProcessors.size(); 463 int64_t diff_between_streams[nb_rcv_sp]; 464 int64_t diff; 465 466 unsigned int i; 467 468 bool aligned = false; 469 int cnt = NB_ALIGN_TRIES; 470 while (!aligned && cnt--) { 471 nb_sync_runs = NB_PERIODS_FOR_ALIGN_AVERAGE; 472 while(nb_sync_runs) { 473 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " check (%d)...\n", nb_sync_runs); 474 waitForPeriod(); 475 476 i = 0; 477 for ( i = 0; i < nb_rcv_sp; i++) { 478 StreamProcessor *s = m_ReceiveProcessors.at(i); 479 diff = diffTicks(m_SyncSource->getTimeAtPeriod(), s->getTimeAtPeriod()); 480 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " offset between SyncSP %p and SP %p is %lld ticks...\n", 481 m_SyncSource, s, diff); 482 if ( nb_sync_runs == NB_PERIODS_FOR_ALIGN_AVERAGE ) { 483 diff_between_streams[i] = diff; 484 } else { 485 diff_between_streams[i] += diff; 486 } 487 } 488 if(!transferSilence()) { 489 debugError("Could not transfer silence\n"); 490 return false; 491 } 492 nb_sync_runs--; 493 } 494 // calculate the average offsets 495 debugOutput( DEBUG_LEVEL_VERBOSE, " Average offsets:\n"); 496 int diff_between_streams_frames[nb_rcv_sp]; 497 aligned = true; 498 for ( i = 0; i < nb_rcv_sp; i++) { 499 StreamProcessor *s = m_ReceiveProcessors.at(i); 500 501 diff_between_streams[i] /= NB_PERIODS_FOR_ALIGN_AVERAGE; 502 diff_between_streams_frames[i] = roundf(diff_between_streams[i] / s->getTicksPerFrame()); 503 debugOutput( DEBUG_LEVEL_VERBOSE, " avg offset between SyncSP %p and SP %p is %lld ticks, %d frames...\n", 504 m_SyncSource, s, diff_between_streams[i], diff_between_streams_frames[i]); 505 506 aligned &= (diff_between_streams_frames[i] == 0); 507 508 // position the stream 509 if(!s->shiftStream(diff_between_streams_frames[i])) { 510 debugError("Could not shift SP %p %d frames\n", s, diff_between_streams_frames[i]); 511 return false; 512 } 513 } 514 if (!aligned) { 515 debugOutput(DEBUG_LEVEL_VERBOSE, "Streams not aligned, doing new round...\n"); 516 } 517 } 518 if (cnt == 0) { 519 debugError("Align failed\n"); 520 return false; 521 } 450 522 return true; 451 523 } … … 817 889 */ 818 890 bool StreamProcessorManager::transfer() { 819 820 891 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n"); 821 892 bool retval=true; … … 833 904 * @return true if successful, false otherwise (indicates xrun). 834 905 */ 835 836 906 bool StreamProcessorManager::transfer(enum StreamProcessor::eProcessorType t) { 837 907 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transfer(%d) at TS=%011llu (%03us %04uc %04ut)...\n", … … 879 949 } 880 950 951 /** 952 * @brief Transfer one period of silence for both receive and transmit StreamProcessors 953 * 954 * Transfers one period of silence to the Iso side for transmit SP's 955 * or dump one period of frames for receive SP's 956 * 957 * @return true if successful, false otherwise (indicates xrun). 958 */ 959 bool StreamProcessorManager::transferSilence() { 960 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring silent period...\n"); 961 bool retval=true; 962 retval &= transferSilence(StreamProcessor::ePT_Receive); 963 retval &= transferSilence(StreamProcessor::ePT_Transmit); 964 return retval; 965 } 966 967 /** 968 * @brief Transfer one period of silence for either the receive or transmit StreamProcessors 969 * 970 * Transfers one period of silence to the Iso side for transmit SP's 971 * or dump one period of frames for receive SP's 972 * 973 * @param t The processor type to tranfer for (receive or transmit) 974 * @return true if successful, false otherwise (indicates xrun). 975 */ 976 bool StreamProcessorManager::transferSilence(enum StreamProcessor::eProcessorType t) { 977 debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "transferSilence(%d) at TS=%011llu (%03us %04uc %04ut)...\n", 978 t, m_time_of_transfer, 979 (unsigned int)TICKS_TO_SECS(m_time_of_transfer), 980 (unsigned int)TICKS_TO_CYCLES(m_time_of_transfer), 981 (unsigned int)TICKS_TO_OFFSET(m_time_of_transfer)); 982 983 bool retval = true; 984 // a static cast could make sure that there is no performance 985 // penalty for the virtual functions (to be checked) 986 if (t==StreamProcessor::ePT_Receive) { 987 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin(); 988 it != m_ReceiveProcessors.end(); 989 ++it ) { 990 if(!(*it)->dropFrames(m_period, m_time_of_transfer)) { 991 debugWarning("could not dropFrames(%u, %11llu) from stream processor (%p)\n", 992 m_period, m_time_of_transfer,*it); 993 retval &= false; // buffer underrun 994 } 995 } 996 } else { 997 // FIXME: in the SPM it would be nice to have system time instead of 998 // 1394 time 999 float rate = m_SyncSource->getTicksPerFrame(); 1000 int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate); 1001 1002 // the data we are putting into the buffer is intended to be transmitted 1003 // one ringbuffer size after it has been received 1004 int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks); 1005 1006 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin(); 1007 it != m_TransmitProcessors.end(); 1008 ++it ) { 1009 // FIXME: in the SPM it would be nice to have system time instead of 1010 // 1394 time 1011 if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) { 1012 debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n", 1013 m_period, transmit_timestamp, *it); 1014 retval &= false; // buffer underrun 1015 } 1016 } 1017 } 1018 return retval; 1019 } 1020 881 1021 void StreamProcessorManager::dumpInfo() { 882 1022 debugOutputShort( DEBUG_LEVEL_NORMAL, "----------------------------------------------------\n"); branches/ppalmers-streaming/src/libstreaming/StreamProcessorManager.h
r720 r727 71 71 void setPeriodSize(unsigned int period); 72 72 void setPeriodSize(unsigned int period, unsigned int nb_buffers); 73 int getPeriodSize() {return m_period;};73 unsigned int getPeriodSize() {return m_period;}; 74 74 75 75 void setNbBuffers(unsigned int nb_buffers); … … 81 81 82 82 // the client-side functions 83 bool waitForPeriod(); 84 bool transfer(); 85 bool transfer(enum StreamProcessor::eProcessorType); 86 private: 87 bool transferSilence(); 88 bool transferSilence(enum StreamProcessor::eProcessorType); 83 89 84 bool waitForPeriod(); ///< wait for the next period 85 bool transfer(); ///< transfer the buffer contents from/to client 86 bool transfer(enum StreamProcessor::eProcessorType); ///< transfer the buffer contents from/to client (single processor type) 87 90 bool alignReceivedStreams(); 91 public: 88 92 int getDelayedUsecs() {return m_delayed_usecs;}; 89 93 bool xrunOccurred(); branches/ppalmers-streaming/src/libutil/TimestampedBuffer.cpp
r720 r727 448 448 * @return true if successful 449 449 */ 450 bool TimestampedBuffer::dropFrames(unsigned int nframes) { 451 452 unsigned int read_size=nframes*m_event_size*m_events_per_frame; 453 450 bool 451 TimestampedBuffer::dropFrames(unsigned int nframes) { 452 unsigned int read_size = nframes * m_event_size * m_events_per_frame; 454 453 ffado_ringbuffer_read_advance(m_event_buffer, read_size); 455 454 decrementFrameCounter(nframes);