root/trunk/libffado/src/libutil/PosixMessageQueue.cpp

Revision 1172, 11.1 kB (checked in by ppalmers, 15 years ago)

lay down the foundations for easy ALSA/Pulse support

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "PosixMessageQueue.h"
25
26 #include "Functors.h"
27 #include "PosixMutex.h"
28
29 #include <errno.h>
30 #include <string.h>
31
32 #define MQ_INVALID_ID ((mqd_t) -1)
33 // one second
34 #define POSIX_MESSAGEQUEUE_DEFAULT_TIMEOUT_SEC   10
35 #define POSIX_MESSAGEQUEUE_DEFAULT_TIMEOUT_NSEC  0
36
37 #define POSIX_MESSAGEQUEUE_MAX_MESSAGE_SIZE      1024
38 // note 10 is the default hard limit
39 #define POSIX_MESSAGEQUEUE_MAX_NB_MESSAGES       10
40
41 namespace Util
42 {
43
44 IMPL_DEBUG_MODULE( PosixMessageQueue, PosixMessageQueue, DEBUG_LEVEL_NORMAL );
45
46 PosixMessageQueue::PosixMessageQueue(std::string name)
47 : m_name( "/" + name )
48 , m_blocking( eB_Blocking )
49 , m_direction( eD_None )
50 , m_owner( false )
51 , m_handle( MQ_INVALID_ID )
52 , m_tmp_buffer( NULL )
53 , m_notifyHandler( NULL )
54 , m_notifyHandlerLock( *(new PosixMutex()) )
55 {
56     m_timeout.tv_sec = POSIX_MESSAGEQUEUE_DEFAULT_TIMEOUT_SEC;
57     m_timeout.tv_nsec = POSIX_MESSAGEQUEUE_DEFAULT_TIMEOUT_NSEC;
58
59     memset(&m_attr, 0, sizeof(m_attr));
60     m_attr.mq_maxmsg = POSIX_MESSAGEQUEUE_MAX_NB_MESSAGES;
61     m_attr.mq_msgsize = POSIX_MESSAGEQUEUE_MAX_MESSAGE_SIZE;
62     m_tmp_buffer = new char[m_attr.mq_msgsize];
63 }
64
65 PosixMessageQueue::~PosixMessageQueue()
66 {
67     debugOutput(DEBUG_LEVEL_VERBOSE,
68                 "(%p, %s) PosixMessageQueue destroy\n",
69                 this, m_name.c_str());
70     Close();
71     if(m_owner) {
72         debugOutput(DEBUG_LEVEL_VERBOSE,
73                     "(%p, %s) unlink\n",
74                     this, m_name.c_str());
75
76         if(mq_unlink(m_name.c_str()) == MQ_INVALID_ID) {
77             debugError("(%p, %s) could not unlink message queue: %s\n",
78                     this, m_name.c_str(), strerror(errno));
79         }
80     }
81     delete[] m_tmp_buffer;
82 }
83
84 bool
85 PosixMessageQueue::doOpen(enum eDirection t, int flags, enum eBlocking b)
86 {
87
88     if(m_handle != MQ_INVALID_ID) {
89         debugError("(%p, %s) already open\n",
90                    this, m_name.c_str());
91         return false;
92     }
93
94     switch(t) {
95         case eD_ReadOnly: flags |= O_RDONLY; break;
96         case eD_WriteOnly: flags |= O_WRONLY; break;
97         case eD_ReadWrite: flags |= O_RDWR; break;
98         default:
99             debugError("bad direction\n");
100             return false;
101     }
102
103     if(b == eB_NonBlocking) {
104         flags |= O_NONBLOCK;
105     }
106
107     if(flags & O_CREAT) {
108         // only user has permissions
109         m_handle = mq_open(m_name.c_str(), flags, S_IRWXU, &m_attr);
110     } else {
111
112         m_handle = mq_open(m_name.c_str(), flags);
113     }
114     if(m_handle == MQ_INVALID_ID) {
115         debugError("(%p, %s) could not open: %s\n",
116                    this, m_name.c_str(), strerror(errno));
117         return false;
118     }
119     if(flags & O_CREAT) {
120         m_owner = true;
121     }
122     if(mq_getattr(m_handle, &m_attr) == MQ_INVALID_ID) {
123         debugError("(%p, %s) could get attr: %s\n",
124                    this, m_name.c_str(), strerror(errno));
125         return false;
126     }
127     m_blocking = b;
128     return true;
129 }
130
131 bool
132 PosixMessageQueue::Open(enum eDirection t, enum eBlocking b)
133 {
134     debugOutput(DEBUG_LEVEL_VERBOSE,
135                 "(%p, %s) open\n",
136                 this, m_name.c_str());
137
138     if(m_handle != MQ_INVALID_ID) {
139         debugError("(%p, %s) already open\n",
140                    this, m_name.c_str());
141         return false;
142     }
143     return doOpen(t, 0, b);
144 }
145
146 bool
147 PosixMessageQueue::Create(enum eDirection t, enum eBlocking b)
148 {
149     debugOutput(DEBUG_LEVEL_VERBOSE,
150                 "(%p, %s) create\n",
151                 this, m_name.c_str());
152
153     if(m_handle != MQ_INVALID_ID) {
154         debugError("(%p, %s) already open\n",
155                    this, m_name.c_str());
156         return false;
157     }
158     return doOpen(t, O_CREAT | O_EXCL, b);
159 }
160
161 bool
162 PosixMessageQueue::Close()
163 {
164     debugOutput(DEBUG_LEVEL_VERBOSE,
165                 "(%p, %s) close\n",
166                 this, m_name.c_str());
167
168     if(m_handle == MQ_INVALID_ID) {
169         debugOutput(DEBUG_LEVEL_VERBOSE,
170                     "(%p, %s) not open\n",
171                     this, m_name.c_str());
172         return true;
173     }
174     if(mq_close(m_handle)) {
175         debugError("(%p, %s) could not close: %s\n",
176                    this, m_name.c_str(), strerror(errno));
177         return false;
178     }
179     m_handle = MQ_INVALID_ID;
180     return true;
181 }
182
183 enum PosixMessageQueue::eResult
184 PosixMessageQueue::Send(PosixMessageQueue::Message &m)
185 {
186     debugOutput(DEBUG_LEVEL_VERBOSE,
187                 "(%p, %s) send\n",
188                 this, m_name.c_str());
189     if(m_direction == eD_ReadOnly) {
190         debugError("Cannot write to read-only queue\n");
191         return eR_Error;
192     }
193
194     int len = m.getLength();
195     if (len > m_attr.mq_msgsize) {
196         debugError("Message too long\n");
197         return eR_Error;
198     }
199
200     struct timespec timeout;
201     clock_gettime(CLOCK_REALTIME, &timeout);
202     timeout.tv_sec += m_timeout.tv_sec;
203     timeout.tv_nsec += m_timeout.tv_nsec;
204     if(timeout.tv_nsec >= 1000000000LL) {
205         timeout.tv_sec++;
206         timeout.tv_nsec -= 1000000000LL;
207     }
208
209     if(!m.serialize(m_tmp_buffer)) {
210         debugError("Could not serialize\n");
211         return eR_Error;
212     }
213
214     if(mq_timedsend(m_handle, m_tmp_buffer, len, m.getPriority(), &timeout) == MQ_INVALID_ID) {
215         switch(errno) {
216             case EAGAIN:
217                 debugOutput(DEBUG_LEVEL_VERBOSE,
218                             "(%p, %s) full\n",
219                             this, m_name.c_str());
220                 return eR_Again;
221             case ETIMEDOUT:
222                 debugOutput(DEBUG_LEVEL_VERBOSE,
223                             "(%p, %s) read timed out\n",
224                             this, m_name.c_str());
225                 return eR_Timeout;
226             default:
227                 debugError("(%p, %s) could not send: %s\n",
228                            this, m_name.c_str(), strerror(errno));
229                 return eR_Error;
230         }
231     }
232     return eR_OK;
233 }
234
235 enum PosixMessageQueue::eResult
236 PosixMessageQueue::Receive(PosixMessageQueue::Message &m)
237 {
238     debugOutput(DEBUG_LEVEL_VERBOSE,
239                 "(%p, %s) receive\n",
240                 this, m_name.c_str());
241     if(m_direction == eD_WriteOnly) {
242         debugError("Cannot read from write-only queue\n");
243         return eR_Error;
244     }
245
246     struct timespec timeout;
247     clock_gettime(CLOCK_REALTIME, &timeout);
248     timeout.tv_sec += m_timeout.tv_sec;
249     timeout.tv_nsec += m_timeout.tv_nsec;
250     if(timeout.tv_nsec >= 1000000000LL) {
251         timeout.tv_sec++;
252         timeout.tv_nsec -= 1000000000LL;
253     }
254
255     signed int len;
256     unsigned prio;
257     if((len = mq_timedreceive(m_handle, m_tmp_buffer, m_attr.mq_msgsize, &prio, &timeout)) < 0) {
258         switch(errno) {
259             case EAGAIN:
260                 debugOutput(DEBUG_LEVEL_VERBOSE,
261                             "(%p, %s) empty\n",
262                             this, m_name.c_str());
263                 return eR_Again;
264             case ETIMEDOUT:
265                 debugOutput(DEBUG_LEVEL_VERBOSE,
266                             "(%p, %s) read timed out\n",
267                             this, m_name.c_str());
268                 return eR_Timeout;
269             default:
270                 debugError("(%p, %s) could not receive: %s\n",
271                            this, m_name.c_str(), strerror(errno));
272                 return eR_Error;
273         }
274     }
275
276     if(!m.deserialize(m_tmp_buffer, len, prio)) {
277         debugError("Could not parse message\n");
278         return eR_Error;
279     }
280     return eR_OK;
281 }
282
283
284 int
285 PosixMessageQueue::countMessages()
286 {
287     if(m_handle == MQ_INVALID_ID) {
288         debugOutput(DEBUG_LEVEL_VERBOSE,
289                     "(%p, %s) invalid handle\n",
290                     this, m_name.c_str());
291         return -1;
292     }
293     struct mq_attr attr;
294     if(mq_getattr(m_handle, &attr) == MQ_INVALID_ID) {
295         debugError("(%p, %s) could get attr: %s\n",
296                    this, m_name.c_str(), strerror(errno));
297         return -1;
298     }
299     return attr.mq_curmsgs;
300 }
301
302 bool
303 PosixMessageQueue::canSend()
304 {
305     return countMessages() < m_attr.mq_maxmsg;
306 }
307
308 bool
309 PosixMessageQueue::canReceive()
310 {
311     return countMessages() > 0;
312 }
313
314 bool
315 PosixMessageQueue::setNotificationHandler(Util::Functor *f)
316 {
317     debugOutput(DEBUG_LEVEL_VERBOSE,
318                 "(%p, %s) setting handler to %p\n",
319                 this, m_name.c_str(), f);
320
321     // ensure we don't change the notifier while
322     // it's used
323     MutexLockHelper lock(m_notifyHandlerLock);
324     if(m_notifyHandler == NULL) {
325         m_notifyHandler = f;
326         return true;
327     } else {
328         debugError("handler already present\n");
329         return false;
330     }
331 }
332
333 bool
334 PosixMessageQueue::unsetNotificationHandler()
335 {
336     debugOutput(DEBUG_LEVEL_VERBOSE,
337                 "(%p, %s) unsetting handler\n",
338                 this, m_name.c_str());
339
340     // ensure we don't change the notifier while
341     // it's used
342     MutexLockHelper lock(m_notifyHandlerLock);
343     if(m_notifyHandler != NULL) {
344         m_notifyHandler = NULL;
345         return true;
346     } else {
347         debugWarning("no handler present\n");
348         return true; // not considered an error
349     }
350 }
351
352 void
353 PosixMessageQueue::notifyCallback()
354 {
355     debugOutput(DEBUG_LEVEL_VERBOSE,
356                 "(%p, %s) Notified\n",
357                 this, m_name.c_str());
358     // make sure the handler is not changed
359     MutexLockHelper lock(m_notifyHandlerLock);
360     if(m_notifyHandler) {
361         (*m_notifyHandler)();
362     }
363 }
364
365 bool
366 PosixMessageQueue::enableNotification()
367 {
368     debugOutput(DEBUG_LEVEL_VERBOSE,
369                 "(%p, %s) set\n",
370                 this, m_name.c_str());
371
372     sigevent evp;
373     memset (&evp, 0, sizeof(evp));
374     evp.sigev_notify = SIGEV_THREAD;
375     evp.sigev_value.sival_ptr = (void *)this;
376     evp.sigev_notify_function = &notifyCallbackStatic;
377
378     if(mq_notify(m_handle, &evp) == MQ_INVALID_ID) {
379         debugError("(%p, %s) could set notifier: %s\n",
380                    this, m_name.c_str(), strerror(errno));
381         return false;
382     }
383     return true;
384 }
385
386 bool
387 PosixMessageQueue::disableNotification()
388 {
389     debugOutput(DEBUG_LEVEL_VERBOSE,
390                 "(%p, %s) unset\n",
391                 this, m_name.c_str());
392
393     if(mq_notify(m_handle, NULL) == MQ_INVALID_ID) {
394         debugError("(%p, %s) could unset notifier: %s\n",
395                    this, m_name.c_str(), strerror(errno));
396         return false;
397     }
398     return true;
399 }
400
401 void
402 PosixMessageQueue::show()
403 {
404     debugOutput(DEBUG_LEVEL_NORMAL, "(%p) MessageQueue %s\n", this, m_name.c_str());
405 }
406
407 void
408 PosixMessageQueue::setVerboseLevel(int i)
409 {
410     setDebugLevel(i);
411     m_notifyHandlerLock.setVerboseLevel(i);
412 }
413
414 } // end of namespace
Note: See TracBrowser for help on using the browser.