root/branches/streaming-rework/src/libstreaming/StreamProcessor.cpp

Revision 385, 14.0 kB (checked in by pieterpalmers, 16 years ago)

- fixed issues with SYT timestamp processing
- SYT based sync works if syncing to the received stream

Line 
1 /* $Id$ */
2
3 /*
4  *   FreeBob Streaming API
5  *   FreeBob = Firewire (pro-)audio for linux
6  *
7  *   http://freebob.sf.net
8  *
9  *   Copyright (C) 2005,2006 Pieter Palmers <pieterpalmers@users.sourceforge.net>
10  *
11  *   This program is free software {} you can redistribute it and/or modify
12  *   it under the terms of the GNU General Public License as published by
13  *   the Free Software Foundation {} either version 2 of the License, or
14  *   (at your option) any later version.
15  *
16  *   This program is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY {} without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with this program {} if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  *
26  *
27  */
28
29 #include "../libutil/Atomic.h"
30
31 #include "StreamProcessor.h"
32 #include "StreamProcessorManager.h"
33 #include <assert.h>
34
35 namespace FreebobStreaming {
36
37 IMPL_DEBUG_MODULE( StreamProcessor, StreamProcessor, DEBUG_LEVEL_VERBOSE );
38 IMPL_DEBUG_MODULE( ReceiveStreamProcessor, ReceiveStreamProcessor, DEBUG_LEVEL_VERBOSE );
39 IMPL_DEBUG_MODULE( TransmitStreamProcessor, TransmitStreamProcessor, DEBUG_LEVEL_VERBOSE );
40
41 StreamProcessor::StreamProcessor(enum IsoStream::EStreamType type, int port, int framerate)
42         : IsoStream(type, port)
43         , m_nb_buffers(0)
44         , m_period(0)
45         , m_xruns(0)
46         , m_framecounter(0)
47         , m_framerate(framerate)
48         , m_manager(NULL)
49         , m_SyncSource(NULL)
50         , m_ticks_per_frame(0)
51         , m_running(false)
52         , m_disabled(true)
53         , m_is_disabled(true)
54 {
55
56 }
57
58 StreamProcessor::~StreamProcessor() {
59
60 }
61
62 void StreamProcessor::dumpInfo()
63 {
64     int64_t diff=(int64_t)m_buffer_head_timestamp - (int64_t)m_buffer_tail_timestamp;
65
66     debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor information\n");
67     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Iso stream info:\n");
68    
69     IsoStream::dumpInfo();
70     debugOutputShort( DEBUG_LEVEL_NORMAL, "  StreamProcessor info:\n");
71     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Frame counter         : %d\n", m_framecounter);
72     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer head timestamp : %011llu\n",m_buffer_head_timestamp);
73     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp);
74     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Head - Tail           : %011lld\n",diff);
75     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Now                   : %011u\n",m_handler->getCycleTimerTicks());
76     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Xruns                 : %d\n", m_xruns);
77     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Running               : %d\n", m_running);
78     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Enabled               : %s\n", m_disabled ? "No" : "Yes");
79     debugOutputShort( DEBUG_LEVEL_NORMAL, "   enable status        : %s\n", m_is_disabled ? "No" : "Yes");
80    
81 //     m_PeriodStat.dumpInfo();
82 //     m_PacketStat.dumpInfo();
83 //     m_WakeupStat.dumpInfo();
84
85 }
86
87 bool StreamProcessor::init()
88 {
89     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
90    
91     pthread_mutex_init(&m_framecounter_lock, NULL);
92
93     return IsoStream::init();
94 }
95
96 /**
97  * Resets the frame counter, the xrun counter, the ports and the iso stream.
98  * @return true if reset succeeded
99  */
100 bool StreamProcessor::reset() {
101
102     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting...\n");
103
104     resetFrameCounter();
105
106     resetXrunCounter();
107
108     // loop over the ports to reset them
109     if (!PortManager::resetPorts()) {
110         debugFatal("Could not reset ports\n");
111         return false;
112     }
113
114     // reset the iso stream
115     if (!IsoStream::reset()) {
116         debugFatal("Could not reset isostream\n");
117         return false;
118     }
119     return true;
120        
121 }
122    
123 bool StreamProcessor::prepareForEnable() {
124     int64_t diff=(int64_t)m_buffer_head_timestamp - (int64_t)m_buffer_tail_timestamp;
125    
126     debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::prepareForEnable for (%p)\n",this);
127     debugOutput(DEBUG_LEVEL_VERBOSE," Frame Counter         : %05d\n",m_framecounter);
128     debugOutput(DEBUG_LEVEL_VERBOSE," Buffer head timestamp : %011llu\n",m_buffer_head_timestamp);
129     debugOutput(DEBUG_LEVEL_VERBOSE," Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp);
130     debugOutput(DEBUG_LEVEL_VERBOSE," Head - Tail           : %011lld\n",diff);
131     debugOutput(DEBUG_LEVEL_VERBOSE," Now                   : %011u\n",m_handler->getCycleTimerTicks());
132     return true;
133 }
134
135 bool StreamProcessor::prepareForDisable() {
136     int64_t diff=(int64_t)m_buffer_head_timestamp - (int64_t)m_buffer_tail_timestamp;
137    
138     debugOutput(DEBUG_LEVEL_VERBOSE," StreamProcessor::prepareForDisable for (%p)\n",this);
139     debugOutput(DEBUG_LEVEL_VERBOSE," Frame Counter         : %05d\n",m_framecounter);
140     debugOutput(DEBUG_LEVEL_VERBOSE," Buffer head timestamp : %011llu\n",m_buffer_head_timestamp);
141     debugOutput(DEBUG_LEVEL_VERBOSE," Buffer tail timestamp : %011llu\n",m_buffer_tail_timestamp);
142     debugOutput(DEBUG_LEVEL_VERBOSE," Head - Tail           : %011lld\n",diff);
143     debugOutput(DEBUG_LEVEL_VERBOSE," Now                   : %011u\n",m_handler->getCycleTimerTicks());
144     return true;
145
146 }
147
148 bool StreamProcessor::prepare() {
149
150         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
151 // TODO: implement
152
153         // init the ports
154        
155         if(!m_manager) {
156                 debugFatal("Not attached to a manager!\n");
157                 return -1;
158         }
159
160         m_nb_buffers=m_manager->getNbBuffers();
161         debugOutput( DEBUG_LEVEL_VERBOSE, "Setting m_nb_buffers  : %d\n", m_nb_buffers);
162
163         m_period=m_manager->getPeriodSize();
164         debugOutput( DEBUG_LEVEL_VERBOSE, "Setting m_period      : %d\n", m_period);
165
166         // loop over the ports to reset them
167         PortManager::preparePorts();
168
169         // reset the iso stream
170         IsoStream::prepare();
171        
172         return true;
173
174 }
175
176 /**
177  * @brief Notify the StreamProcessor that frames were written
178  *
179  * This notifies the StreamProcessor of the fact that frames were written to the internal
180  * buffer. This is for framecounter & timestamp bookkeeping.
181  *
182  * @param nbframes the number of frames that are written to the internal buffers
183  * @param ts the new timestamp of the 'tail' of the buffer, i.e. the last sample
184  *           present in the buffer.
185  * @return true if successful
186  */
187 bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts) {
188
189         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Putting %d frames for %llu into frame buffer...\n", nbframes,ts);
190         incrementFrameCounter(nbframes, ts);
191         return true;
192 }
193
194 /**
195  * @brief Notify the StreamProcessor that frames were read
196  *
197  * This notifies the StreamProcessor of the fact that frames were read from the internal
198  * buffer. This is for framecounter & timestamp bookkeeping.
199  *
200  * @param nbframes the number of frames that are read from the internal buffers
201  * @param ts the new timestamp of the 'head' of the buffer, i.e. the first sample
202  *           present in the buffer.
203  * @return true if successful
204  */
205 bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) {
206
207         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Getting %d frames from frame buffer at (%011lld)...\n", nbframes, ts);
208         decrementFrameCounter(nbframes, ts);
209         return true;
210 }
211
212 bool StreamProcessor::isRunning() {
213         return m_running;
214 }
215
216 bool StreamProcessor::enable()  {
217     int cnt=0;
218    
219     if(!m_running) {
220             debugWarning("The StreamProcessor is not running yet, enable() might not be a good idea.\n");
221     }
222    
223     m_disabled=false;
224    
225     // now wait until it is effectively enabled
226     // time-out at 100ms
227     while(m_is_disabled && cnt++ < 1000) {
228         usleep(100);
229     }
230    
231     // check if the operation timed out
232     if(cnt==1000) {
233         debugWarning("Timeout when enabling StreamProcessor (%p)\n",this);
234         return false;
235     }
236    
237     return true;
238 }
239
240 bool StreamProcessor::disable()  {
241     int cnt=0;
242    
243     m_disabled=true;
244    
245     // now wait until it is effectively disabled
246     // time-out at
247     while(!m_is_disabled && cnt++ < 1000) {
248         usleep(100);
249     }
250    
251     // check if the operation timed out (100ms)
252     if(cnt==1000) {
253         debugWarning("Timeout when disabling StreamProcessor (%p)\n",this);
254         return false;
255     }
256    
257     return true;
258
259 }
260
261 bool StreamProcessor::setSyncSource(StreamProcessor *s) {
262     m_SyncSource=s;
263     return true;
264 }
265
266 /**
267  * Decrements the frame counter, in a atomic way. This
268  * also sets the buffer head timestamp
269  * is thread safe.
270  */
271 void StreamProcessor::decrementFrameCounter(int nbframes, uint64_t new_timestamp) {
272     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer head timestamp for (%p) to %11llu\n",
273                 this, new_timestamp);
274                
275     pthread_mutex_lock(&m_framecounter_lock);
276     m_framecounter -= nbframes;
277     m_buffer_head_timestamp = new_timestamp;
278     pthread_mutex_unlock(&m_framecounter_lock);
279 }
280
281 /**
282  * Increments the frame counter, in a atomic way.
283  * also sets the buffer tail timestamp
284  * This is thread safe.
285  */
286 void StreamProcessor::incrementFrameCounter(int nbframes, uint64_t new_timestamp) {
287     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer tail timestamp for (%p) to %11llu\n",
288                 this, new_timestamp);
289    
290     pthread_mutex_lock(&m_framecounter_lock);
291     m_framecounter += nbframes;
292     m_buffer_tail_timestamp = new_timestamp;
293     pthread_mutex_unlock(&m_framecounter_lock);
294    
295 }
296
297 /**
298  * Sets the buffer tail timestamp (in usecs)
299  * This is thread safe.
300  */
301 void StreamProcessor::setBufferTailTimestamp(uint64_t new_timestamp) {
302     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer tail timestamp for (%p) to %11llu\n",
303                 this, new_timestamp);
304    
305     pthread_mutex_lock(&m_framecounter_lock);
306     m_buffer_tail_timestamp = new_timestamp;
307     pthread_mutex_unlock(&m_framecounter_lock);
308 }
309
310 /**
311  * Sets the buffer head timestamp (in usecs)
312  * This is thread safe.
313  */
314 void StreamProcessor::setBufferHeadTimestamp(uint64_t new_timestamp) {
315     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer head timestamp for (%p) to %11llu\n",
316                 this, new_timestamp);
317
318     pthread_mutex_lock(&m_framecounter_lock);
319     m_buffer_head_timestamp = new_timestamp;
320     pthread_mutex_unlock(&m_framecounter_lock);
321 }
322
323 /**
324  * Sets both the buffer head and tail timestamps (in usecs)
325  * (avoids multiple mutex lock/unlock's)
326  * This is thread safe.
327  */
328 void StreamProcessor::setBufferTimestamps(uint64_t new_head, uint64_t new_tail) {
329     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Setting buffer head timestamp for (%p) to %11llu\n",
330                 this, new_head);
331     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "    and buffer tail timestamp for (%p) to %11llu\n",
332                 this, new_tail);
333    
334     pthread_mutex_lock(&m_framecounter_lock);
335     m_buffer_head_timestamp = new_head;
336     m_buffer_tail_timestamp = new_tail;
337     pthread_mutex_unlock(&m_framecounter_lock);
338 }
339 /**
340  * \brief return the timestamp of the first frame in the buffer
341  *
342  * This function returns the timestamp of the very first sample in
343  * the StreamProcessor's buffer. This is useful for slave StreamProcessors
344  * to find out what the base for their timestamp generation should
345  * be. It also returns the framecounter value for which this timestamp
346  * is valid.
347  *
348  * The system is built in such a way that we assume that the processing
349  * of the buffers doesn't take any time. Assume we have a buffer transfer at
350  * time T1, meaning that the last sample of this buffer occurs at T1. As
351  * processing does not take time, we don't have to add anything to T1. When
352  * transferring the processed buffer to the xmit processor, the timestamp
353  * of the last sample is still T1.
354  *
355  * When starting the streams, we don't have any information on this last
356  * timestamp. We prefill the buffer at the xmit side, and we should find
357  * out what the timestamp for the last sample in the buffer is. If we sync
358  * on a receive SP, we know that the last prefilled sample corresponds with
359  * the first sample received - 1 sample duration. This is the same as if the last
360  * transfer from iso to client would have emptied the receive buffer.
361  *
362  *
363  * @param ts address to store the timestamp in
364  * @param fc address to store the associated framecounter in
365  */
366 void StreamProcessor::getBufferHeadTimestamp(uint64_t *ts, uint64_t *fc) {
367     pthread_mutex_lock(&m_framecounter_lock);
368     *fc = m_framecounter;
369     *ts = m_buffer_head_timestamp;
370     pthread_mutex_unlock(&m_framecounter_lock);
371 }
372        
373 /**
374  * \brief return the timestamp of the last frame in the buffer
375  *
376  * This function returns the timestamp of the last frame in
377  * the StreamProcessor's buffer. It also returns the framecounter
378  * value for which this timestamp is valid.
379  *
380  * @param ts address to store the timestamp in
381  * @param fc address to store the associated framecounter in
382  */
383 void StreamProcessor::getBufferTailTimestamp(uint64_t *ts, uint64_t *fc) {
384     pthread_mutex_lock(&m_framecounter_lock);
385     *fc = m_framecounter;
386     *ts = m_buffer_tail_timestamp;
387     pthread_mutex_unlock(&m_framecounter_lock);
388 }
389
390 /**
391  * Resets the frame counter, in a atomic way. This
392  * is thread safe.
393  */
394 void StreamProcessor::resetFrameCounter() {
395     pthread_mutex_lock(&m_framecounter_lock);
396     m_framecounter = 0;
397     pthread_mutex_unlock(&m_framecounter_lock);
398 }
399
400 /**
401  * Resets the xrun counter, in a atomic way. This
402  * is thread safe.
403  */
404 void StreamProcessor::resetXrunCounter() {
405         ZERO_ATOMIC((SInt32 *)&m_xruns);
406 }
407
408 void StreamProcessor::setVerboseLevel(int l) {
409         setDebugLevel(l);
410         IsoStream::setVerboseLevel(l);
411         PortManager::setVerboseLevel(l);
412
413 }
414
415 ReceiveStreamProcessor::ReceiveStreamProcessor(int port, int framerate)
416         : StreamProcessor(IsoStream::EST_Receive, port, framerate) {
417
418 }
419
420 ReceiveStreamProcessor::~ReceiveStreamProcessor() {
421
422 }
423
424 void ReceiveStreamProcessor::setVerboseLevel(int l) {
425         setDebugLevel(l);
426         StreamProcessor::setVerboseLevel(l);
427
428 }
429
430
431 TransmitStreamProcessor::TransmitStreamProcessor( int port, int framerate)
432         : StreamProcessor(IsoStream::EST_Transmit, port, framerate) {
433
434 }
435
436 TransmitStreamProcessor::~TransmitStreamProcessor() {
437
438 }
439
440 void TransmitStreamProcessor::setVerboseLevel(int l) {
441         setDebugLevel(l);
442         StreamProcessor::setVerboseLevel(l);
443
444 }
445
446 }
Note: See TracBrowser for help on using the browser.