root/branches/streaming-rework/src/libstreaming/StreamProcessor.cpp

Revision 384, 10.6 kB (checked in by pieterpalmers, 16 years ago)

- temporary commit as backup measure
- rewrote synchronisation code
- receive streaming based on SYT works
- transmit streaming synced to received stream sort of works, still

have to iron out some issues.

NOTE: all devices but the bebob's are disabled in this code,

because they still have to be ported to the new sync
mechanism.

Line 
1 /* $Id$ */
2
3 /*
4  *   FreeBob Streaming API
5  *   FreeBob = Firewire (pro-)audio for linux
6  *
7  *   http://freebob.sf.net
8  *
9  *   Copyright (C) 2005,2006 Pieter Palmers <pieterpalmers@users.sourceforge.net>
10  *
11  *   This program is free software {} you can redistribute it and/or modify
12  *   it under the terms of the GNU General Public License as published by
13  *   the Free Software Foundation {} either version 2 of the License, or
14  *   (at your option) any later version.
15  *
16  *   This program is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY {} without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with this program {} if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  *
26  *
27  */
28
29 #include "../libutil/Atomic.h"
30
31 #include "StreamProcessor.h"
32 #include "StreamProcessorManager.h"
33 #include <assert.h>
34
35 namespace FreebobStreaming {
36
37 IMPL_DEBUG_MODULE( StreamProcessor, StreamProcessor, DEBUG_LEVEL_NORMAL );
38 IMPL_DEBUG_MODULE( ReceiveStreamProcessor, ReceiveStreamProcessor, DEBUG_LEVEL_NORMAL );
39 IMPL_DEBUG_MODULE( TransmitStreamProcessor, TransmitStreamProcessor, DEBUG_LEVEL_NORMAL );
40
41 StreamProcessor::StreamProcessor(enum IsoStream::EStreamType type, int port, int framerate)
42         : IsoStream(type, port)
43         , m_nb_buffers(0)
44         , m_period(0)
45         , m_xruns(0)
46         , m_framecounter(0)
47         , m_framerate(framerate)
48         , m_manager(NULL)
49         , m_SyncSource(NULL)
50         , m_ticks_per_frame(0)
51         , m_running(false)
52         , m_disabled(true)
53 {
54
55 }
56
57 StreamProcessor::~StreamProcessor() {
58
59 }
60
61 void StreamProcessor::dumpInfo()
62 {
63
64     debugOutputShort( DEBUG_LEVEL_NORMAL, " StreamProcessor information\n");
65     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Iso stream info:\n");
66    
67     IsoStream::dumpInfo();
68     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Frame counter  : %d\n", m_framecounter);
69     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Xruns          : %d\n", m_xruns);
70     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Running        : %d\n", m_running);
71     debugOutputShort( DEBUG_LEVEL_NORMAL, "  Enabled        : %d\n", !m_disabled);
72    
73 //     m_PeriodStat.dumpInfo();
74 //     m_PacketStat.dumpInfo();
75 //     m_WakeupStat.dumpInfo();
76
77 }
78
79 bool StreamProcessor::init()
80 {
81     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
82    
83     pthread_mutex_init(&m_framecounter_lock, NULL);
84
85     return IsoStream::init();
86 }
87
88 /**
89  * Resets the frame counter, the xrun counter, the ports and the iso stream.
90  * @return true if reset succeeded
91  */
92 bool StreamProcessor::reset() {
93
94     debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting...\n");
95
96     resetFrameCounter();
97
98     resetXrunCounter();
99
100     // loop over the ports to reset them
101     if (!PortManager::resetPorts()) {
102         debugFatal("Could not reset ports\n");
103         return false;
104     }
105
106     // reset the iso stream
107     if (!IsoStream::reset()) {
108         debugFatal("Could not reset isostream\n");
109         return false;
110     }
111     return true;
112        
113 }
114
115 bool StreamProcessor::prepare() {
116
117         debugOutput( DEBUG_LEVEL_VERBOSE, "Preparing...\n");
118 // TODO: implement
119
120         // init the ports
121        
122         if(!m_manager) {
123                 debugFatal("Not attached to a manager!\n");
124                 return -1;
125         }
126
127         m_nb_buffers=m_manager->getNbBuffers();
128         debugOutput( DEBUG_LEVEL_VERBOSE, "Setting m_nb_buffers  : %d\n", m_nb_buffers);
129
130         m_period=m_manager->getPeriodSize();
131         debugOutput( DEBUG_LEVEL_VERBOSE, "Setting m_period      : %d\n", m_period);
132
133         // loop over the ports to reset them
134         PortManager::preparePorts();
135
136         // reset the iso stream
137         IsoStream::prepare();
138        
139         return true;
140
141 }
142
143 /**
144  * @brief Notify the StreamProcessor that frames were written
145  *
146  * This notifies the StreamProcessor of the fact that frames were written to the internal
147  * buffer. This is for framecounter & timestamp bookkeeping.
148  *
149  * @param nbframes the number of frames that are written to the internal buffers
150  * @param ts the new timestamp of the 'tail' of the buffer, i.e. the last sample
151  *           present in the buffer.
152  * @return true if successful
153  */
154 bool StreamProcessor::putFrames(unsigned int nbframes, int64_t ts) {
155
156         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Putting %d frames for %llu into frame buffer...\n", nbframes,ts);
157         incrementFrameCounter(nbframes, ts);
158         return true;
159 }
160
161 /**
162  * @brief Notify the StreamProcessor that frames were read
163  *
164  * This notifies the StreamProcessor of the fact that frames were read from the internal
165  * buffer. This is for framecounter & timestamp bookkeeping.
166  *
167  * @param nbframes the number of frames that are read from the internal buffers
168  * @param ts the new timestamp of the 'head' of the buffer, i.e. the first sample
169  *           present in the buffer.
170  * @return true if successful
171  */
172 bool StreamProcessor::getFrames(unsigned int nbframes, int64_t ts) {
173
174         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Getting %d frames from frame buffer...\n", nbframes);
175         decrementFrameCounter(nbframes, ts);
176         return true;
177 }
178
179 bool StreamProcessor::isRunning() {
180         return m_running;
181 }
182
183 void StreamProcessor::enable()  {
184         if(!m_running) {
185                 debugWarning("The StreamProcessor is not running yet, enable() might not be a good idea.\n");
186         }
187         m_disabled=false;
188 }
189
190 bool StreamProcessor::setSyncSource(StreamProcessor *s) {
191     m_SyncSource=s;
192     return true;
193 }
194
195 /**
196  * Decrements the frame counter, in a atomic way. This
197  * also sets the buffer tail timestamp
198  * is thread safe.
199  */
200 void StreamProcessor::decrementFrameCounter(int nbframes, uint64_t new_timestamp) {
201     pthread_mutex_lock(&m_framecounter_lock);
202     m_framecounter -= nbframes;
203     m_buffer_head_timestamp = new_timestamp;
204     pthread_mutex_unlock(&m_framecounter_lock);
205 }
206
207 /**
208  * Increments the frame counter, in a atomic way.
209  * also sets the buffer head timestamp
210  * This is thread safe.
211  */
212 void StreamProcessor::incrementFrameCounter(int nbframes, uint64_t new_timestamp) {
213     pthread_mutex_lock(&m_framecounter_lock);
214     m_framecounter += nbframes;
215     m_buffer_tail_timestamp = new_timestamp;
216     pthread_mutex_unlock(&m_framecounter_lock);
217    
218 }
219
220 /**
221  * Sets the frame counter, in a atomic way.
222  * also sets the buffer head timestamp
223  * This is thread safe.
224  */
225 void StreamProcessor::setFrameCounter(int new_framecounter, uint64_t new_timestamp) {
226     pthread_mutex_lock(&m_framecounter_lock);
227     m_framecounter = new_framecounter;
228     m_buffer_tail_timestamp = new_timestamp;
229     pthread_mutex_unlock(&m_framecounter_lock);
230 }
231
232 /**
233  * Sets the buffer tail timestamp (in usecs)
234  * This is thread safe.
235  */
236 void StreamProcessor::setBufferTailTimestamp(uint64_t new_timestamp) {
237     pthread_mutex_lock(&m_framecounter_lock);
238     m_buffer_tail_timestamp = new_timestamp;
239     pthread_mutex_unlock(&m_framecounter_lock);
240 }
241
242 /**
243  * Sets the buffer head timestamp (in usecs)
244  * This is thread safe.
245  */
246 void StreamProcessor::setBufferHeadTimestamp(uint64_t new_timestamp) {
247     pthread_mutex_lock(&m_framecounter_lock);
248     m_buffer_head_timestamp = new_timestamp;
249     pthread_mutex_unlock(&m_framecounter_lock);
250 }
251
252 /**
253  * Sets both the buffer head and tail timestamps (in usecs)
254  * (avoids multiple mutex lock/unlock's)
255  * This is thread safe.
256  */
257 void StreamProcessor::setBufferTimestamps(uint64_t new_head, uint64_t new_tail) {
258     pthread_mutex_lock(&m_framecounter_lock);
259     m_buffer_head_timestamp = new_head;
260     m_buffer_tail_timestamp = new_tail;
261     pthread_mutex_unlock(&m_framecounter_lock);
262 }
263 /**
264  * \brief return the timestamp of the first frame in the buffer
265  *
266  * This function returns the timestamp of the very first sample in
267  * the StreamProcessor's buffer. This is useful for slave StreamProcessors
268  * to find out what the base for their timestamp generation should
269  * be. It also returns the framecounter value for which this timestamp
270  * is valid.
271  *
272  * The system is built in such a way that we assume that the processing
273  * of the buffers doesn't take any time. Assume we have a buffer transfer at
274  * time T1, meaning that the last sample of this buffer occurs at T1. As
275  * processing does not take time, we don't have to add anything to T1. When
276  * transferring the processed buffer to the xmit processor, the timestamp
277  * of the last sample is still T1.
278  *
279  * When starting the streams, we don't have any information on this last
280  * timestamp. We prefill the buffer at the xmit side, and we should find
281  * out what the timestamp for the last sample in the buffer is. If we sync
282  * on a receive SP, we know that the last prefilled sample corresponds with
283  * the first sample received - 1 sample duration. This is the same as if the last
284  * transfer from iso to client would have emptied the receive buffer.
285  *
286  *
287  * @param ts address to store the timestamp in
288  * @param fc address to store the associated framecounter in
289  */
290 void StreamProcessor::getBufferHeadTimestamp(uint64_t *ts, uint64_t *fc) {
291     pthread_mutex_lock(&m_framecounter_lock);
292     *fc = m_framecounter;
293     *ts = m_buffer_head_timestamp;
294     pthread_mutex_unlock(&m_framecounter_lock);
295 }
296        
297 /**
298  * \brief return the timestamp of the last frame in the buffer
299  *
300  * This function returns the timestamp of the last frame in
301  * the StreamProcessor's buffer. It also returns the framecounter
302  * value for which this timestamp is valid.
303  *
304  * @param ts address to store the timestamp in
305  * @param fc address to store the associated framecounter in
306  */
307 void StreamProcessor::getBufferTailTimestamp(uint64_t *ts, uint64_t *fc) {
308     pthread_mutex_lock(&m_framecounter_lock);
309     *fc = m_framecounter;
310     *ts = m_buffer_tail_timestamp;
311     pthread_mutex_unlock(&m_framecounter_lock);
312 }
313
314 /**
315  * Resets the frame counter, in a atomic way. This
316  * is thread safe.
317  */
318 void StreamProcessor::resetFrameCounter() {
319     pthread_mutex_lock(&m_framecounter_lock);
320     m_framecounter = 0;
321     pthread_mutex_unlock(&m_framecounter_lock);
322 }
323
324 /**
325  * Resets the xrun counter, in a atomic way. This
326  * is thread safe.
327  */
328 void StreamProcessor::resetXrunCounter() {
329         ZERO_ATOMIC((SInt32 *)&m_xruns);
330 }
331
332 void StreamProcessor::setVerboseLevel(int l) {
333         setDebugLevel(l);
334         IsoStream::setVerboseLevel(l);
335         PortManager::setVerboseLevel(l);
336
337 }
338
339 ReceiveStreamProcessor::ReceiveStreamProcessor(int port, int framerate)
340         : StreamProcessor(IsoStream::EST_Receive, port, framerate) {
341
342 }
343
344 ReceiveStreamProcessor::~ReceiveStreamProcessor() {
345
346 }
347
348 void ReceiveStreamProcessor::setVerboseLevel(int l) {
349         setDebugLevel(l);
350         StreamProcessor::setVerboseLevel(l);
351
352 }
353
354
355 TransmitStreamProcessor::TransmitStreamProcessor( int port, int framerate)
356         : StreamProcessor(IsoStream::EST_Transmit, port, framerate) {
357
358 }
359
360 TransmitStreamProcessor::~TransmitStreamProcessor() {
361
362 }
363
364 void TransmitStreamProcessor::setVerboseLevel(int l) {
365         setDebugLevel(l);
366         StreamProcessor::setVerboseLevel(l);
367
368 }
369
370 }
Note: See TracBrowser for help on using the browser.