root/branches/streaming-rework/src/libstreaming/StreamProcessor.cpp

Revision 386, 14.0 kB (checked in by pieterpalmers, 16 years ago)

- moved files around to the place they belong
- fixed all compile warnings

Line 
1 /* $Id$ */
2
3 /*
4  *   FreeBob Streaming API
5  *   FreeBob = Firewire (pro-)audio for linux
6  *
7  *   http://freebob.sf.net
8  *
9  *   Copyright (C) 2005,2006 Pieter Palmers <pieterpalmers@users.sourceforge.net>
10  *
11  *   This program is free software {} you can redistribute it and/or modify
12  *   it under the terms of the GNU General Public License as published by
13  *   the Free Software Foundation {} either version 2 of the License, or
14  *   (at your option) any later version.
15  *
16  *   This program is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY {} without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with this program {} if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  *
26  *
27  */
28
29 #include "libutil/Atomic.h"
30
31 #include "StreamProcessor.h"
32 #include "StreamProcessorManager.h"
33
34 #include <assert.h>
35
36 namespace FreebobStreaming {
37
38 IMPL_DEBUG_MODULE( StreamProcessor, StreamProcessor, DEBUG_LEVEL_VERBOSE );
39 IMPL_DEBUG_MODULE( ReceiveStreamProcessor, ReceiveStreamProcessor, DEBUG_LEVEL_VERBOSE );
40 IMPL_DEBUG_MODULE( TransmitStreamProcessor, TransmitStreamProcessor, DEBUG_LEVEL_VERBOSE );
41
42 StreamProcessor::StreamProcessor(enum IsoStream::EStreamType type, int port, int framerate)
43         : IsoStream(type, port)
44         , m_nb_buffers(0)
45         , m_period(0)
46         , m_xruns(0)
47         , m_framerate(framerate)
48         , m_manager(NULL)
49         , m_running(false)
50         , m_disabled(true)
51         , m_is_disabled(true)
52         , m_framecounter(0)
53         , m_SyncSource(NULL)
54         , m_ticks_per_frame(0)
55 {
56
57 }
58
59 StreamProcessor::~StreamProcessor() {
60
61 }
62
63 void StreamProcessor::dumpInfo()
64 {
65     int64_t diff=(int64_t)m_buffer_head_timestamp - (int64_t)m_buffer_tail_timestamp;
66
67     debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor information\n");
68     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Iso stream info:\n");
69    
70     IsoStream::dumpInfo();
71     debugOutputShort( DEBUG_LEVEL_NORMAL, "  StreamProcessor info:\n");
72     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Frame counter         : %d\n", m_framecounter);
73     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer head timestamp : %011llu\n",m_buffer_head_timestamp);
74     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp);
75     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Head - Tail           : %011lld\n",diff);
76     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Now                   : %011u\n",m_handler->getCycleTimerTicks());
77     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Xruns                 : %d\n", m_xruns);
78     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Running               : %d\n", m_running);
79     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Enabled               : %s\n", m_disabled ? "No" : "Yes");
80     debugOutputShort( DEBUG_LEVEL_NORMAL, "   enable status        : %s\n", m_is_disabled ? "No" : "Yes");
81    
82 //     m_PeriodStat.dumpInfo();
83 //     m_PacketStat.dumpInfo();
84 //     m_WakeupStat.dumpInfo();
85
86 }
87
88 bool StreamProcessor::init()
89 {
90     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
91    
92     pthread_mutex_init(&m_framecounter_lock, NULL);
93
94     return IsoStream::init();
95 }
96
97 /**
98  * Resets the frame counter, the xrun counter, the ports and the iso stream.
99  * @return true if reset succeeded
100  */
101 bool StreamProcessor::reset() {
102
103     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting...\n");
104
105     resetFrameCounter();
106
107     resetXrunCounter();
108
109     // loop over the ports to reset them
110     if (!PortManager::resetPorts()) {
111         debugFatal("Could not reset ports\n");
112         return false;
113     }
114
115     // reset the iso stream
116     if (!IsoStream::reset()) {
117         debugFatal("Could not reset isostream\n");
118         return false;
119     }
120     return true;
121        
122 }
123    
124 bool StreamProcessor::prepareForEnable() {
125     int64_t diff=(int64_t)m_buffer_head_timestamp - (int64_t)m_buffer_tail_timestamp;
126    
127     debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::prepareForEnable for (%p)\n",this);
128     debugOutput(DEBUG_LEVEL_VERBOSE," Frame Counter         : %05d\n",m_framecounter);
129     debugOutput(DEBUG_LEVEL_VERBOSE," Buffer head timestamp : %011llu\n",m_buffer_head_timestamp);
130     debugOutput(DEBUG_LEVEL_VERBOSE," Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp);
131     debugOutput(DEBUG_LEVEL_VERBOSE," Head - Tail           : %011lld\n",diff);
132     debugOutput(DEBUG_LEVEL_VERBOSE," Now                   : %011u\n",m_handler->getCycleTimerTicks());
133     return true;
134 }
135
136 bool StreamProcessor::prepareForDisable() {
137     int64_t diff=(int64_t)m_buffer_head_timestamp - (int64_t)m_buffer_tail_timestamp;
138    
139     debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::prepareForDisable for (%p)\n",this);
140     debugOutput(DEBUG_LEVEL_VERBOSE," Frame Counter         : %05d\n",m_framecounter);
141     debugOutput(DEBUG_LEVEL_VERBOSE," Buffer head timestamp : %011llu\n",m_buffer_head_timestamp);
142     debugOutput(DEBUG_LEVEL_VERBOSE," Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp);
143     debugOutput(DEBUG_LEVEL_VERBOSE," Head - Tail           : %011lld\n",diff);
144     debugOutput(DEBUG_LEVEL_VERBOSE," Now                   : %011u\n",m_handler->getCycleTimerTicks());
145     return true;
146
147 }
148
149 bool StreamProcessor::prepare() {
150
151         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
152 // TODO: implement
153
154         // init the ports
155        
156         if(!m_manager) {
157                 debugFatal("Not attached to a manager!\n");
158                 return -1;
159         }
160
161         m_nb_buffers=m_manager->getNbBuffers();
162         debugOutput( DEBUG_LEVEL_VERBOSE, "Setting m_nb_buffers  : %d\n", m_nb_buffers);
163
164         m_period=m_manager->getPeriodSize();
165         debugOutput( DEBUG_LEVEL_VERBOSE, "Setting m_period      : %d\n", m_period);
166
167         // loop over the ports to reset them
168         PortManager::preparePorts();
169
170         // reset the iso stream
171         IsoStream::prepare();
172        
173         return true;
174
175 }
176
177 /**
178  * @brief Notify the StreamProcessor that frames were written
179  *
180  * This notifies the StreamProcessor of the fact that frames were written to the internal
181  * buffer. This is for framecounter & timestamp bookkeeping.
182  *
183  * @param nbframes the number of frames that are written to the internal buffers
184  * @param ts the new timestamp of the 'tail' of the buffer, i.e. the last sample
185  *           present in the buffer.
186  * @return true if successful
187  */
188 bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts) {
189
190         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Putting %d frames for %llu into frame buffer...\n", nbframes,ts);
191         incrementFrameCounter(nbframes, ts);
192         return true;
193 }
194
195 /**
196  * @brief Notify the StreamProcessor that frames were read
197  *
198  * This notifies the StreamProcessor of the fact that frames were read from the internal
199  * buffer. This is for framecounter & timestamp bookkeeping.
200  *
201  * @param nbframes the number of frames that are read from the internal buffers
202  * @param ts the new timestamp of the 'head' of the buffer, i.e. the first sample
203  *           present in the buffer.
204  * @return true if successful
205  */
206 bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) {
207
208         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Getting %d frames from frame buffer at (%011lld)...\n", nbframes, ts);
209         decrementFrameCounter(nbframes, ts);
210         return true;
211 }
212
213 bool StreamProcessor::isRunning() {
214         return m_running;
215 }
216
217 bool StreamProcessor::enable()  {
218     int cnt=0;
219    
220     if(!m_running) {
221             debugWarning("The StreamProcessor is not running yet, enable() might not be a good idea.\n");
222     }
223    
224     m_disabled=false;
225    
226     // now wait until it is effectively enabled
227     // time-out at 100ms
228     while(m_is_disabled && cnt++ < 1000) {
229         usleep(100);
230     }
231    
232     // check if the operation timed out
233     if(cnt==1000) {
234         debugWarning("Timeout when enabling StreamProcessor (%p)\n",this);
235         return false;
236     }
237    
238     return true;
239 }
240
241 bool StreamProcessor::disable()  {
242     int cnt=0;
243    
244     m_disabled=true;
245    
246     // now wait until it is effectively disabled
247     // time-out at
248     while(!m_is_disabled && cnt++ < 1000) {
249         usleep(100);
250     }
251    
252     // check if the operation timed out (100ms)
253     if(cnt==1000) {
254         debugWarning("Timeout when disabling StreamProcessor (%p)\n",this);
255         return false;
256     }
257    
258     return true;
259
260 }
261
262 bool StreamProcessor::setSyncSource(StreamProcessor *s) {
263     m_SyncSource=s;
264     return true;
265 }
266
267 /**
268  * Decrements the frame counter, in a atomic way. This
269  * also sets the buffer head timestamp
270  * is thread safe.
271  */
272 void StreamProcessor::decrementFrameCounter(int nbframes, uint64_t new_timestamp) {
273     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer head timestamp for (%p) to %11llu\n",
274                 this, new_timestamp);
275                
276     pthread_mutex_lock(&m_framecounter_lock);
277     m_framecounter -= nbframes;
278     m_buffer_head_timestamp = new_timestamp;
279     pthread_mutex_unlock(&m_framecounter_lock);
280 }
281
282 /**
283  * Increments the frame counter, in a atomic way.
284  * also sets the buffer tail timestamp
285  * This is thread safe.
286  */
287 void StreamProcessor::incrementFrameCounter(int nbframes, uint64_t new_timestamp) {
288     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer tail timestamp for (%p) to %11llu\n",
289                 this, new_timestamp);
290    
291     pthread_mutex_lock(&m_framecounter_lock);
292     m_framecounter += nbframes;
293     m_buffer_tail_timestamp = new_timestamp;
294     pthread_mutex_unlock(&m_framecounter_lock);
295    
296 }
297
298 /**
299  * Sets the buffer tail timestamp (in usecs)
300  * This is thread safe.
301  */
302 void StreamProcessor::setBufferTailTimestamp(uint64_t new_timestamp) {
303     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer tail timestamp for (%p) to %11llu\n",
304                 this, new_timestamp);
305    
306     pthread_mutex_lock(&m_framecounter_lock);
307     m_buffer_tail_timestamp = new_timestamp;
308     pthread_mutex_unlock(&m_framecounter_lock);
309 }
310
311 /**
312  * Sets the buffer head timestamp (in usecs)
313  * This is thread safe.
314  */
315 void StreamProcessor::setBufferHeadTimestamp(uint64_t new_timestamp) {
316     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer head timestamp for (%p) to %11llu\n",
317                 this, new_timestamp);
318
319     pthread_mutex_lock(&m_framecounter_lock);
320     m_buffer_head_timestamp = new_timestamp;
321     pthread_mutex_unlock(&m_framecounter_lock);
322 }
323
324 /**
325  * Sets both the buffer head and tail timestamps (in usecs)
326  * (avoids multiple mutex lock/unlock's)
327  * This is thread safe.
328  */
329 void StreamProcessor::setBufferTimestamps(uint64_t new_head, uint64_t new_tail) {
330     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer head timestamp for (%p) to %11llu\n",
331                 this, new_head);
332     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "    and buffer tail timestamp for (%p) to %11llu\n",
333                 this, new_tail);
334    
335     pthread_mutex_lock(&m_framecounter_lock);
336     m_buffer_head_timestamp = new_head;
337     m_buffer_tail_timestamp = new_tail;
338     pthread_mutex_unlock(&m_framecounter_lock);
339 }
340 /**
341  * \brief return the timestamp of the first frame in the buffer
342  *
343  * This function returns the timestamp of the very first sample in
344  * the StreamProcessor's buffer. This is useful for slave StreamProcessors
345  * to find out what the base for their timestamp generation should
346  * be. It also returns the framecounter value for which this timestamp
347  * is valid.
348  *
349  * The system is built in such a way that we assume that the processing
350  * of the buffers doesn't take any time. Assume we have a buffer transfer at
351  * time T1, meaning that the last sample of this buffer occurs at T1. As
352  * processing does not take time, we don't have to add anything to T1. When
353  * transferring the processed buffer to the xmit processor, the timestamp
354  * of the last sample is still T1.
355  *
356  * When starting the streams, we don't have any information on this last
357  * timestamp. We prefill the buffer at the xmit side, and we should find
358  * out what the timestamp for the last sample in the buffer is. If we sync
359  * on a receive SP, we know that the last prefilled sample corresponds with
360  * the first sample received - 1 sample duration. This is the same as if the last
361  * transfer from iso to client would have emptied the receive buffer.
362  *
363  *
364  * @param ts address to store the timestamp in
365  * @param fc address to store the associated framecounter in
366  */
367 void StreamProcessor::getBufferHeadTimestamp(uint64_t *ts, uint64_t *fc) {
368     pthread_mutex_lock(&m_framecounter_lock);
369     *fc = m_framecounter;
370     *ts = m_buffer_head_timestamp;
371     pthread_mutex_unlock(&m_framecounter_lock);
372 }
373        
374 /**
375  * \brief return the timestamp of the last frame in the buffer
376  *
377  * This function returns the timestamp of the last frame in
378  * the StreamProcessor's buffer. It also returns the framecounter
379  * value for which this timestamp is valid.
380  *
381  * @param ts address to store the timestamp in
382  * @param fc address to store the associated framecounter in
383  */
384 void StreamProcessor::getBufferTailTimestamp(uint64_t *ts, uint64_t *fc) {
385     pthread_mutex_lock(&m_framecounter_lock);
386     *fc = m_framecounter;
387     *ts = m_buffer_tail_timestamp;
388     pthread_mutex_unlock(&m_framecounter_lock);
389 }
390
391 /**
392  * Resets the frame counter, in a atomic way. This
393  * is thread safe.
394  */
395 void StreamProcessor::resetFrameCounter() {
396     pthread_mutex_lock(&m_framecounter_lock);
397     m_framecounter = 0;
398     pthread_mutex_unlock(&m_framecounter_lock);
399 }
400
401 /**
402  * Resets the xrun counter, in a atomic way. This
403  * is thread safe.
404  */
405 void StreamProcessor::resetXrunCounter() {
406         ZERO_ATOMIC((SInt32 *)&m_xruns);
407 }
408
409 void StreamProcessor::setVerboseLevel(int l) {
410         setDebugLevel(l);
411         IsoStream::setVerboseLevel(l);
412         PortManager::setVerboseLevel(l);
413
414 }
415
416 ReceiveStreamProcessor::ReceiveStreamProcessor(int port, int framerate)
417         : StreamProcessor(IsoStream::EST_Receive, port, framerate) {
418
419 }
420
421 ReceiveStreamProcessor::~ReceiveStreamProcessor() {
422
423 }
424
425 void ReceiveStreamProcessor::setVerboseLevel(int l) {
426         setDebugLevel(l);
427         StreamProcessor::setVerboseLevel(l);
428
429 }
430
431
432 TransmitStreamProcessor::TransmitStreamProcessor( int port, int framerate)
433         : StreamProcessor(IsoStream::EST_Transmit, port, framerate) {
434
435 }
436
437 TransmitStreamProcessor::~TransmitStreamProcessor() {
438
439 }
440
441 void TransmitStreamProcessor::setVerboseLevel(int l) {
442         setDebugLevel(l);
443         StreamProcessor::setVerboseLevel(l);
444
445 }
446
447 }
Note: See TracBrowser for help on using the browser.