root/branches/streaming-rework/src/libstreaming/StreamProcessor.cpp

Revision 390, 14.0 kB (checked in by pieterpalmers, 17 years ago)

* working version of SYT based AMDTP receive and transmit.

Still has to be tuned to work with low buffer sizes.

Line 
1 /* $Id$ */
2
3 /*
4  *   FreeBob Streaming API
5  *   FreeBob = Firewire (pro-)audio for linux
6  *
7  *   http://freebob.sf.net
8  *
9  *   Copyright (C) 2005,2006 Pieter Palmers <pieterpalmers@users.sourceforge.net>
10  *
11  *   This program is free software {} you can redistribute it and/or modify
12  *   it under the terms of the GNU General Public License as published by
13  *   the Free Software Foundation {} either version 2 of the License, or
14  *   (at your option) any later version.
15  *
16  *   This program is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY {} without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with this program {} if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  *
26  *
27  */
28
29 #include "libutil/Atomic.h"
30
31 #include "StreamProcessor.h"
32 #include "StreamProcessorManager.h"
33 #include "cycletimer.h"
34
35 #include <assert.h>
36
37 namespace FreebobStreaming {
38
39 IMPL_DEBUG_MODULE( StreamProcessor, StreamProcessor, DEBUG_LEVEL_VERBOSE );
40 IMPL_DEBUG_MODULE( ReceiveStreamProcessor, ReceiveStreamProcessor, DEBUG_LEVEL_VERBOSE );
41 IMPL_DEBUG_MODULE( TransmitStreamProcessor, TransmitStreamProcessor, DEBUG_LEVEL_VERBOSE );
42
43 StreamProcessor::StreamProcessor(enum IsoStream::EStreamType type, int port, int framerate)
44         : IsoStream(type, port)
45         , m_nb_buffers(0)
46         , m_period(0)
47         , m_xruns(0)
48         , m_framerate(framerate)
49         , m_manager(NULL)
50         , m_running(false)
51         , m_disabled(true)
52         , m_is_disabled(true)
53         , m_cycle_to_enable_at(0)
54         , m_framecounter(0)
55         , m_SyncSource(NULL)
56         , m_ticks_per_frame(0)
57 {
58
59 }
60
61 StreamProcessor::~StreamProcessor() {
62
63 }
64
65 void StreamProcessor::dumpInfo()
66 {
67     int64_t diff=(int64_t)m_buffer_head_timestamp - (int64_t)m_buffer_tail_timestamp;
68
69     debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor information\n");
70     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Iso stream info:\n");
71    
72     IsoStream::dumpInfo();
73     debugOutputShort( DEBUG_LEVEL_NORMAL, "  StreamProcessor info:\n");
74     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Frame counter         : %d\n", m_framecounter);
75     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer head timestamp : %011llu\n",m_buffer_head_timestamp);
76     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp);
77     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Head - Tail           : %011lld\n",diff);
78     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Now                   : %011u\n",m_handler->getCycleTimerTicks());
79     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Xruns                 : %d\n", m_xruns);
80     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Running               : %d\n", m_running);
81     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Enabled               : %s\n", m_disabled ? "No" : "Yes");
82     debugOutputShort( DEBUG_LEVEL_NORMAL, "   enable status        : %s\n", m_is_disabled ? "No" : "Yes");
83    
84 //     m_PeriodStat.dumpInfo();
85 //     m_PacketStat.dumpInfo();
86 //     m_WakeupStat.dumpInfo();
87
88 }
89
90 bool StreamProcessor::init()
91 {
92     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
93    
94     pthread_mutex_init(&m_framecounter_lock, NULL);
95
96     return IsoStream::init();
97 }
98
99 /**
100  * Resets the frame counter, the xrun counter, the ports and the iso stream.
101  * @return true if reset succeeded
102  */
103 bool StreamProcessor::reset() {
104
105     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting...\n");
106
107     resetFrameCounter();
108
109     resetXrunCounter();
110
111     // loop over the ports to reset them
112     if (!PortManager::resetPorts()) {
113         debugFatal("Could not reset ports\n");
114         return false;
115     }
116
117     // reset the iso stream
118     if (!IsoStream::reset()) {
119         debugFatal("Could not reset isostream\n");
120         return false;
121     }
122     return true;
123        
124 }
125    
126 bool StreamProcessor::prepareForEnable() {
127     int64_t diff=(int64_t)m_buffer_head_timestamp - (int64_t)m_buffer_tail_timestamp;
128    
129     debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::prepareForEnable for (%p)\n",this);
130     debugOutput(DEBUG_LEVEL_VERBOSE," Frame Counter         : %05d\n",m_framecounter);
131     debugOutput(DEBUG_LEVEL_VERBOSE," Buffer head timestamp : %011llu\n",m_buffer_head_timestamp);
132     debugOutput(DEBUG_LEVEL_VERBOSE," Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp);
133     debugOutput(DEBUG_LEVEL_VERBOSE," Head - Tail           : %011lld\n",diff);
134     debugOutput(DEBUG_LEVEL_VERBOSE," Now                   : %011u\n",m_handler->getCycleTimerTicks());
135     return true;
136 }
137
138 bool StreamProcessor::prepareForDisable() {
139     int64_t diff=(int64_t)m_buffer_head_timestamp - (int64_t)m_buffer_tail_timestamp;
140    
141     debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::prepareForDisable for (%p)\n",this);
142     debugOutput(DEBUG_LEVEL_VERBOSE," Frame Counter         : %05d\n",m_framecounter);
143     debugOutput(DEBUG_LEVEL_VERBOSE," Buffer head timestamp : %011llu\n",m_buffer_head_timestamp);
144     debugOutput(DEBUG_LEVEL_VERBOSE," Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp);
145     debugOutput(DEBUG_LEVEL_VERBOSE," Head - Tail           : %011lld\n",diff);
146     debugOutput(DEBUG_LEVEL_VERBOSE," Now                   : %011u\n",m_handler->getCycleTimerTicks());
147     return true;
148
149 }
150
151 bool StreamProcessor::prepare() {
152
153         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
154 // TODO: implement
155
156         // init the ports
157        
158         if(!m_manager) {
159                 debugFatal("Not attached to a manager!\n");
160                 return -1;
161         }
162
163         m_nb_buffers=m_manager->getNbBuffers();
164         debugOutput( DEBUG_LEVEL_VERBOSE, "Setting m_nb_buffers  : %d\n", m_nb_buffers);
165
166         m_period=m_manager->getPeriodSize();
167         debugOutput( DEBUG_LEVEL_VERBOSE, "Setting m_period      : %d\n", m_period);
168
169         // loop over the ports to reset them
170         PortManager::preparePorts();
171
172         // reset the iso stream
173         IsoStream::prepare();
174        
175         return true;
176
177 }
178
179 /**
180  * @brief Notify the StreamProcessor that frames were written
181  *
182  * This notifies the StreamProcessor of the fact that frames were written to the internal
183  * buffer. This is for framecounter & timestamp bookkeeping.
184  *
185  * @param nbframes the number of frames that are written to the internal buffers
186  * @param ts the new timestamp of the 'tail' of the buffer, i.e. the last sample
187  *           present in the buffer.
188  * @return true if successful
189  */
190 bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts) {
191
192         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Putting %d frames for %llu into frame buffer...\n", nbframes,ts);
193         incrementFrameCounter(nbframes, ts);
194         return true;
195 }
196
197 /**
198  * @brief Notify the StreamProcessor that frames were read
199  *
200  * This notifies the StreamProcessor of the fact that frames were read from the internal
201  * buffer. This is for framecounter & timestamp bookkeeping.
202  *
203  * @param nbframes the number of frames that are read from the internal buffers
204  * @param ts the new timestamp of the 'head' of the buffer, i.e. the first sample
205  *           present in the buffer.
206  * @return true if successful
207  */
208 bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) {
209
210         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Getting %d frames from frame buffer at (%011lld)...\n", nbframes, ts);
211         decrementFrameCounter(nbframes, ts);
212         return true;
213 }
214
215 uint64_t StreamProcessor::getTimeNow() {
216     return m_handler->getCycleTimerTicks();
217 }
218
219
220 bool StreamProcessor::isRunning() {
221         return m_running;
222 }
223
224 bool StreamProcessor::enable(uint64_t time_to_enable_at)  {
225     // FIXME: time_to_enable_at will be in 'time' not cycles
226     m_cycle_to_enable_at=time_to_enable_at;
227    
228     if(!m_running) {
229             debugWarning("The StreamProcessor is not running yet, enable() might not be a good idea.\n");
230     }
231
232 #ifdef DEBUG
233     uint64_t now_cycles=TICKS_TO_CYCLES(m_handler->getCycleTimerTicks());
234     const int64_t max=(int64_t)(TICKS_PER_SECOND/2);
235    
236     int64_t diff=m_cycle_to_enable_at-now_cycles;
237    
238     if (diff > max) {
239         diff-=TICKS_PER_SECOND;
240     } else if (diff < -max) {
241         diff+=TICKS_PER_SECOND;
242     }
243    
244     if (diff<0) {
245         debugWarning("Request to enable streamprocessor %d cycles ago.\n",diff);
246     }
247 #endif
248
249     m_disabled=false;
250    
251     return true;
252 }
253
254 bool StreamProcessor::disable()  {
255    
256     m_disabled=true;
257
258     return true;
259
260 }
261
262 bool StreamProcessor::setSyncSource(StreamProcessor *s) {
263     m_SyncSource=s;
264     return true;
265 }
266
267 /**
268  * Decrements the frame counter, in a atomic way. This
269  * also sets the buffer head timestamp
270  * is thread safe.
271  */
272 void StreamProcessor::decrementFrameCounter(int nbframes, uint64_t new_timestamp) {
273     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer head timestamp for (%p) to %11llu\n",
274                 this, new_timestamp);
275                
276     pthread_mutex_lock(&m_framecounter_lock);
277     m_framecounter -= nbframes;
278     m_buffer_head_timestamp = new_timestamp;
279     pthread_mutex_unlock(&m_framecounter_lock);
280 }
281
282 /**
283  * Increments the frame counter, in a atomic way.
284  * also sets the buffer tail timestamp
285  * This is thread safe.
286  */
287 void StreamProcessor::incrementFrameCounter(int nbframes, uint64_t new_timestamp) {
288     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer tail timestamp for (%p) to %11llu\n",
289                 this, new_timestamp);
290    
291     pthread_mutex_lock(&m_framecounter_lock);
292     m_framecounter += nbframes;
293     m_buffer_tail_timestamp = new_timestamp;
294     pthread_mutex_unlock(&m_framecounter_lock);
295    
296 }
297
298 /**
299  * Sets the buffer tail timestamp (in usecs)
300  * This is thread safe.
301  */
302 void StreamProcessor::setBufferTailTimestamp(uint64_t new_timestamp) {
303     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer tail timestamp for (%p) to %11llu\n",
304                 this, new_timestamp);
305    
306     pthread_mutex_lock(&m_framecounter_lock);
307     m_buffer_tail_timestamp = new_timestamp;
308     pthread_mutex_unlock(&m_framecounter_lock);
309 }
310
311 /**
312  * Sets the buffer head timestamp (in usecs)
313  * This is thread safe.
314  */
315 void StreamProcessor::setBufferHeadTimestamp(uint64_t new_timestamp) {
316     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer head timestamp for (%p) to %11llu\n",
317                 this, new_timestamp);
318
319     pthread_mutex_lock(&m_framecounter_lock);
320     m_buffer_head_timestamp = new_timestamp;
321     pthread_mutex_unlock(&m_framecounter_lock);
322 }
323
324 /**
325  * Sets both the buffer head and tail timestamps (in usecs)
326  * (avoids multiple mutex lock/unlock's)
327  * This is thread safe.
328  */
329 void StreamProcessor::setBufferTimestamps(uint64_t new_head, uint64_t new_tail) {
330     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer head timestamp for (%p) to %11llu\n",
331                 this, new_head);
332     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "    and buffer tail timestamp for (%p) to %11llu\n",
333                 this, new_tail);
334    
335     pthread_mutex_lock(&m_framecounter_lock);
336     m_buffer_head_timestamp = new_head;
337     m_buffer_tail_timestamp = new_tail;
338     pthread_mutex_unlock(&m_framecounter_lock);
339 }
340 /**
341  * \brief return the timestamp of the first frame in the buffer
342  *
343  * This function returns the timestamp of the very first sample in
344  * the StreamProcessor's buffer. This is useful for slave StreamProcessors
345  * to find out what the base for their timestamp generation should
346  * be. It also returns the framecounter value for which this timestamp
347  * is valid.
348  *
349  * The system is built in such a way that we assume that the processing
350  * of the buffers doesn't take any time. Assume we have a buffer transfer at
351  * time T1, meaning that the last sample of this buffer occurs at T1. As
352  * processing does not take time, we don't have to add anything to T1. When
353  * transferring the processed buffer to the xmit processor, the timestamp
354  * of the last sample is still T1.
355  *
356  * When starting the streams, we don't have any information on this last
357  * timestamp. We prefill the buffer at the xmit side, and we should find
358  * out what the timestamp for the last sample in the buffer is. If we sync
359  * on a receive SP, we know that the last prefilled sample corresponds with
360  * the first sample received - 1 sample duration. This is the same as if the last
361  * transfer from iso to client would have emptied the receive buffer.
362  *
363  *
364  * @param ts address to store the timestamp in
365  * @param fc address to store the associated framecounter in
366  */
367 void StreamProcessor::getBufferHeadTimestamp(uint64_t *ts, uint64_t *fc) {
368     pthread_mutex_lock(&m_framecounter_lock);
369     *fc = m_framecounter;
370     *ts = m_buffer_head_timestamp;
371     pthread_mutex_unlock(&m_framecounter_lock);
372 }
373        
374 /**
375  * \brief return the timestamp of the last frame in the buffer
376  *
377  * This function returns the timestamp of the last frame in
378  * the StreamProcessor's buffer. It also returns the framecounter
379  * value for which this timestamp is valid.
380  *
381  * @param ts address to store the timestamp in
382  * @param fc address to store the associated framecounter in
383  */
384 void StreamProcessor::getBufferTailTimestamp(uint64_t *ts, uint64_t *fc) {
385     pthread_mutex_lock(&m_framecounter_lock);
386     *fc = m_framecounter;
387     *ts = m_buffer_tail_timestamp;
388     pthread_mutex_unlock(&m_framecounter_lock);
389 }
390
391 /**
392  * Resets the frame counter, in a atomic way. This
393  * is thread safe.
394  */
395 void StreamProcessor::resetFrameCounter() {
396     pthread_mutex_lock(&m_framecounter_lock);
397     m_framecounter = 0;
398     pthread_mutex_unlock(&m_framecounter_lock);
399 }
400
401 /**
402  * Resets the xrun counter, in a atomic way. This
403  * is thread safe.
404  */
405 void StreamProcessor::resetXrunCounter() {
406         ZERO_ATOMIC((SInt32 *)&m_xruns);
407 }
408
409 void StreamProcessor::setVerboseLevel(int l) {
410         setDebugLevel(l);
411         IsoStream::setVerboseLevel(l);
412         PortManager::setVerboseLevel(l);
413
414 }
415
416 ReceiveStreamProcessor::ReceiveStreamProcessor(int port, int framerate)
417         : StreamProcessor(IsoStream::EST_Receive, port, framerate) {
418
419 }
420
421 ReceiveStreamProcessor::~ReceiveStreamProcessor() {
422
423 }
424
425 void ReceiveStreamProcessor::setVerboseLevel(int l) {
426         setDebugLevel(l);
427         StreamProcessor::setVerboseLevel(l);
428
429 }
430
431
432 TransmitStreamProcessor::TransmitStreamProcessor( int port, int framerate)
433         : StreamProcessor(IsoStream::EST_Transmit, port, framerate) {
434
435 }
436
437 TransmitStreamProcessor::~TransmitStreamProcessor() {
438
439 }
440
441 void TransmitStreamProcessor::setVerboseLevel(int l) {
442         setDebugLevel(l);
443         StreamProcessor::setVerboseLevel(l);
444
445 }
446
447 }
Note: See TracBrowser for help on using the browser.