root/branches/libfreebob-2.0/src/libstreaming/StreamProcessorManager.cpp

Revision 221, 11.2 kB (checked in by pieterpalmers, 18 years ago)

--

Line 
1 /* $Id$ */
2
3 /*
4  *   FreeBob Streaming API
5  *   FreeBob = Firewire (pro-)audio for linux
6  *
7  *   http://freebob.sf.net
8  *
9  *   Copyright (C) 2005,2006 Pieter Palmers <pieterpalmers@users.sourceforge.net>
10  *
11  *   This program is free software {} you can redistribute it and/or modify
12  *   it under the terms of the GNU General Public License as published by
13  *   the Free Software Foundation {} either version 2 of the License, or
14  *   (at your option) any later version.
15  *
16  *   This program is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY {} without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with this program {} if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  *
26  *
27  */
28
29 #include "StreamProcessorManager.h"
30 #include "StreamProcessor.h"
31 #include "Port.h"
32 #include <errno.h>
33 #include <assert.h>
34
35
36 namespace FreebobStreaming {
37
38 IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_NORMAL );
39
40 StreamProcessorManager::StreamProcessorManager(unsigned int period, unsigned int nb_buffers)
41         : m_nb_buffers(nb_buffers), m_period(period), m_xruns(0) {
42
43 }
44
45 StreamProcessorManager::~StreamProcessorManager() {
46
47 }
48
49 int StreamProcessorManager::registerProcessor(StreamProcessor *processor)
50 {
51         debugOutput( DEBUG_LEVEL_VERBOSE, "Registering processor (%p) with manager\n",processor);
52         assert(processor);
53
54         if (processor->getType()==StreamProcessor::E_Receive) {
55                 m_ReceiveProcessors.push_back(processor);
56                 processor->setManager(this);
57                 processor->setVerboseLevel(getDebugLevel());
58                 return 0;
59         }
60        
61         if (processor->getType()==StreamProcessor::E_Transmit) {
62                 m_TransmitProcessors.push_back(processor);
63                 processor->setManager(this);
64                 processor->setVerboseLevel(getDebugLevel());
65                 return 0;
66         }
67
68         return -1;
69 }
70
71 int StreamProcessorManager::unregisterProcessor(StreamProcessor *processor)
72 {
73         debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering processor (%p) with manager\n",processor);
74         assert(processor);
75
76         if (processor->getType()==StreamProcessor::E_Receive) {
77
78                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
79                         it != m_ReceiveProcessors.end();
80                         ++it ) {
81
82                         if ( *it == processor ) {
83                                         m_ReceiveProcessors.erase(it);
84                                         processor->clearManager();
85                                         return 0;
86                                 }
87                 }
88         }
89
90         if (processor->getType()==StreamProcessor::E_Transmit) {
91                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
92                         it != m_TransmitProcessors.end();
93                         ++it ) {
94
95                         if ( *it == processor ) {
96                                         m_TransmitProcessors.erase(it);
97                                         processor->clearManager();
98                                         return 0;
99                                 }
100                 }
101         }
102
103         return -1; //not found
104
105 }
106
107 int StreamProcessorManager::getPortCount(enum Port::E_PortType type, enum Port::E_Direction direction) {
108         int count=0;
109
110         if (direction == Port::E_Capture) {
111                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
112                         it != m_ReceiveProcessors.end();
113                         ++it ) {
114                         count += (*it)->getPortCount(type);
115                 }
116         } else {
117                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
118                         it != m_TransmitProcessors.end();
119                         ++it ) {
120                         count += (*it)->getPortCount(type);
121                 }
122         }
123         return count;
124 }
125
126 int StreamProcessorManager::getPortCount(enum Port::E_Direction direction) {
127         int count=0;
128
129         if (direction == Port::E_Capture) {
130                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
131                         it != m_ReceiveProcessors.end();
132                         ++it ) {
133                         count += (*it)->getPortCount();
134                 }
135         } else {
136                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
137                         it != m_TransmitProcessors.end();
138                         ++it ) {
139                         count += (*it)->getPortCount();
140                 }
141         }
142         return count;
143 }
144
145 Port* StreamProcessorManager::getPortByIndex(int idx, enum Port::E_Direction direction) {
146         int count=0;
147         int prevcount=0;
148
149         if (direction == Port::E_Capture) {
150                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
151                         it != m_ReceiveProcessors.end();
152                         ++it ) {
153                         count += (*it)->getPortCount();
154                         if (count > idx) {
155                                 return (*it)->getPortAtIdx(idx-prevcount);
156                         }
157                         prevcount=count;
158                 }
159         } else {
160                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
161                         it != m_TransmitProcessors.end();
162                         ++it ) {
163                         count += (*it)->getPortCount();
164                         if (count > idx) {
165                                 return (*it)->getPortAtIdx(idx-prevcount);
166                         }
167                         prevcount=count;
168                 }
169         }
170         return NULL;
171 }
172
173 bool StreamProcessorManager::Init()
174 {
175         debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
176         if(sem_init(&m_period_semaphore, 0, 0)) {
177                 debugFatal( "Cannot init packet transfer semaphore\n");
178                 debugFatal( " Error: %s\n",strerror(errno));
179                 return false;
180         } else {
181                 debugOutput( DEBUG_LEVEL_VERBOSE,"FREEBOB: successfull init of packet transfer semaphore\n");
182         }
183
184 //      debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, " Receive processors...\n");
185         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
186                 it != m_ReceiveProcessors.end();
187                 ++it ) {
188                 if((*it)->init()) {
189                         debugFatal("Could not initialize receive processor\n");
190                         return false;
191                 }
192         }
193
194 //      debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, " Transmit processors...\n");
195         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
196                 it != m_TransmitProcessors.end();
197                 ++it ) {
198                 if((*it)->init()) {
199                         debugFatal("Could not initialize receive processor\n");
200                         return false;
201                 }
202         }
203
204         return true;
205 }
206
207 bool StreamProcessorManager::Execute()
208 {
209         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
210
211         bool period_ready=true;
212         m_xrun_has_occured=false;
213
214 //      debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, " Receive processors...\n");
215         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
216                 it != m_ReceiveProcessors.end();
217                 ++it ) {
218                 period_ready = period_ready && (*it)->isOnePeriodReady();
219                 m_xrun_has_occured = m_xrun_has_occured || (*it)->xrunOccurred();
220         }
221
222 //      debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, " Transmit processors...\n");
223         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
224                 it != m_TransmitProcessors.end();
225                 ++it ) {
226                 period_ready = period_ready && (*it)->isOnePeriodReady();
227                 m_xrun_has_occured = m_xrun_has_occured || (*it)->xrunOccurred();
228         }
229
230         if(m_xrun_has_occured) {
231                 // do xrun signaling/handling
232                 m_xruns++;
233                 sem_post(&m_period_semaphore);
234                 return false;
235         }
236
237         if(period_ready) {
238                 // signal the waiting thread(s?) that a period is ready
239                 sem_post(&m_period_semaphore);
240                 debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, "Period done...\n");
241
242                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
243                         it != m_ReceiveProcessors.end();
244                         ++it ) {
245                         (*it)->decrementFrameCounter();
246                 }
247        
248                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
249                         it != m_TransmitProcessors.end();
250                         ++it ) {
251                         (*it)->decrementFrameCounter();
252                 }
253         }
254
255         return true;
256
257 }
258
259 int StreamProcessorManager::waitForPeriod() {
260         int ret;
261         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "enter...\n");
262
263         // Wait for packetizer thread to signal a period completion
264         sem_wait(&m_period_semaphore);
265        
266         if(m_xrun_has_occured) {
267                 // notify the driver of the underrun
268                 ret = 0;
269         } else {
270                 ret=m_period;
271         }
272        
273         return ret;
274
275 }
276
277 void StreamProcessorManager::reset() {
278
279         debugOutput( DEBUG_LEVEL_VERBOSE, "Resetting processors...\n");
280
281         /*
282          * Reset means:
283          * Bringing all buffers & connections into a know state
284          *   - Clear all capture buffers
285          *   - Put nb_periods*period_size of null frames into the playback buffers
286          *  => implemented by a reset() call, implementation dependant on the type
287          */
288        
289 //      debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, " Receive processors...\n");
290         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
291                 it != m_ReceiveProcessors.end();
292                 ++it ) {
293                 (*it)->reset();
294         }
295
296 //      debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, " Transmit processors...\n");
297         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
298                 it != m_TransmitProcessors.end();
299                 ++it ) {
300                 (*it)->reset();
301         }
302 }
303
304 int StreamProcessorManager::transfer() {
305
306         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
307
308         // a static cast could make sure that there is no performance
309         // penalty for the virtual functions (to be checked)
310
311 //      debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, " Receive processors...\n");
312         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
313                 it != m_ReceiveProcessors.end();
314                 ++it ) {
315                 (*it)->transfer();
316         }
317
318 //      debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, " Transmit processors...\n");
319         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
320                 it != m_TransmitProcessors.end();
321                 ++it ) {
322                 (*it)->transfer();
323         }
324
325         return 0;
326 }
327
328 int StreamProcessorManager::transfer(enum StreamProcessor::EProcessorType t) {
329
330         debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "Transferring period...\n");
331
332         // a static cast could make sure that there is no performance
333         // penalty for the virtual functions (to be checked)
334
335 //      debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, " Receive processors...\n");
336         if (t==StreamProcessor::E_Receive) {
337                 for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
338                         it != m_ReceiveProcessors.end();
339                         ++it ) {
340                         (*it)->transfer();
341                 }
342         } else {
343 //      debugOutputShort( DEBUG_LEVEL_VERY_VERBOSE, " Transmit processors...\n");
344                 for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
345                         it != m_TransmitProcessors.end();
346                         ++it ) {
347                         (*it)->transfer();
348                 }
349         }
350
351         return 0;
352 }
353
354 void StreamProcessorManager::dumpInfo() {
355         debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping StreamProcessorManager information...\n");
356
357         debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
358         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
359                 it != m_ReceiveProcessors.end();
360                 ++it ) {
361                 (*it)->dumpInfo();
362         }
363
364         debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
365         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
366                 it != m_TransmitProcessors.end();
367                 ++it ) {
368                 (*it)->dumpInfo();
369         }
370
371 }
372
373 void StreamProcessorManager::setVerboseLevel(int l) {
374         setDebugLevel(l);
375
376         debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
377         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
378                 it != m_ReceiveProcessors.end();
379                 ++it ) {
380                 (*it)->setVerboseLevel(l);
381         }
382
383         debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
384         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
385                 it != m_TransmitProcessors.end();
386                 ++it ) {
387                 (*it)->setVerboseLevel(l);
388         }
389 }
390
391 bool StreamProcessorManager::prepare() {
392         debugOutputShort( DEBUG_LEVEL_NORMAL, "setting port buffersize...\n");
393         debugOutputShort( DEBUG_LEVEL_NORMAL, " Receive processors...\n");
394         for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
395                 it != m_ReceiveProcessors.end();
396                 ++it ) {
397                 if(!(*it)->setPortBuffersize(m_period)) {
398                         debugOutputShort( DEBUG_LEVEL_NORMAL, " could not set buffer size...\n");
399                         return false;
400                        
401                 }
402         }
403
404         debugOutputShort( DEBUG_LEVEL_NORMAL, " Transmit processors...\n");
405         for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
406                 it != m_TransmitProcessors.end();
407                 ++it ) {
408                 if(!(*it)->setPortBuffersize(m_period)) {
409                         debugOutputShort( DEBUG_LEVEL_NORMAL, " could not set buffer size...\n");
410                         return false;
411                        
412                 }
413         }
414
415         return true;
416 }
417
418
419 } // end of namespace
Note: See TracBrowser for help on using the browser.