root/trunk/libffado/src/libutil/PosixMessageQueue.cpp

Revision 1240, 12.7 kB (checked in by ppalmers, 13 years ago)

fix bugs in IPC comms. Add preliminary FFADO-IPC ALSA plugin

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "PosixMessageQueue.h"
25
26 #include "Functors.h"
27 #include "PosixMutex.h"
28
29 #include <errno.h>
30 #include <string.h>
31
32 #define MQ_INVALID_ID ((mqd_t) -1)
33 // one second
34 #define POSIX_MESSAGEQUEUE_DEFAULT_TIMEOUT_SEC   10
35 #define POSIX_MESSAGEQUEUE_DEFAULT_TIMEOUT_NSEC  0
36
37 #define POSIX_MESSAGEQUEUE_MAX_MESSAGE_SIZE      1024
38 // note 10 is the default hard limit
39 #define POSIX_MESSAGEQUEUE_MAX_NB_MESSAGES       10
40
41 namespace Util
42 {
43
44 IMPL_DEBUG_MODULE( PosixMessageQueue, PosixMessageQueue, DEBUG_LEVEL_NORMAL );
45
46 PosixMessageQueue::PosixMessageQueue(std::string name)
47 : m_name( "/" + name )
48 , m_blocking( eB_Blocking )
49 , m_direction( eD_None )
50 , m_owner( false )
51 , m_handle( MQ_INVALID_ID )
52 , m_tmp_buffer( NULL )
53 , m_notifyHandler( NULL )
54 , m_notifyHandlerLock( *(new PosixMutex()) )
55 {
56     m_timeout.tv_sec = POSIX_MESSAGEQUEUE_DEFAULT_TIMEOUT_SEC;
57     m_timeout.tv_nsec = POSIX_MESSAGEQUEUE_DEFAULT_TIMEOUT_NSEC;
58
59     memset(&m_attr, 0, sizeof(m_attr));
60     m_attr.mq_maxmsg = POSIX_MESSAGEQUEUE_MAX_NB_MESSAGES;
61     m_attr.mq_msgsize = POSIX_MESSAGEQUEUE_MAX_MESSAGE_SIZE;
62     m_tmp_buffer = new char[m_attr.mq_msgsize];
63 }
64
65 PosixMessageQueue::~PosixMessageQueue()
66 {
67     debugOutput(DEBUG_LEVEL_VERBOSE,
68                 "(%p, %s) PosixMessageQueue destroy\n",
69                 this, m_name.c_str());
70     Close();
71     if(m_owner) {
72         debugOutput(DEBUG_LEVEL_VERBOSE,
73                     "(%p, %s) unlink\n",
74                     this, m_name.c_str());
75
76         if(mq_unlink(m_name.c_str()) == MQ_INVALID_ID) {
77             debugError("(%p, %s) could not unlink message queue: %s\n",
78                     this, m_name.c_str(), strerror(errno));
79         }
80     }
81     delete[] m_tmp_buffer;
82 }
83
84 bool
85 PosixMessageQueue::doOpen(enum eDirection t, int flags, enum eBlocking b)
86 {
87
88     if(m_handle != MQ_INVALID_ID) {
89         debugError("(%p, %s) already open\n",
90                    this, m_name.c_str());
91         return false;
92     }
93
94     switch(t) {
95         case eD_ReadOnly: flags |= O_RDONLY; break;
96         case eD_WriteOnly: flags |= O_WRONLY; break;
97         case eD_ReadWrite: flags |= O_RDWR; break;
98         default:
99             debugError("bad direction\n");
100             return false;
101     }
102
103     if(b == eB_NonBlocking) {
104         flags |= O_NONBLOCK;
105     }
106
107     if(flags & O_CREAT) {
108         // only user has permissions
109         m_handle = mq_open(m_name.c_str(), flags, S_IRWXU, &m_attr);
110     } else {
111
112         m_handle = mq_open(m_name.c_str(), flags);
113     }
114     if(m_handle == MQ_INVALID_ID) {
115         debugError("(%p, %s) could not open: %s\n",
116                    this, m_name.c_str(), strerror(errno));
117         return false;
118     }
119     if(flags & O_CREAT) {
120         m_owner = true;
121     }
122     if(mq_getattr(m_handle, &m_attr) == MQ_INVALID_ID) {
123         debugError("(%p, %s) could get attr: %s\n",
124                    this, m_name.c_str(), strerror(errno));
125         return false;
126     }
127     m_blocking = b;
128     return true;
129 }
130
131 bool
132 PosixMessageQueue::Open(enum eDirection t, enum eBlocking b)
133 {
134     debugOutput(DEBUG_LEVEL_VERBOSE,
135                 "(%p, %s) open\n",
136                 this, m_name.c_str());
137
138     if(m_handle != MQ_INVALID_ID) {
139         debugError("(%p, %s) already open\n",
140                    this, m_name.c_str());
141         return false;
142     }
143     return doOpen(t, 0, b);
144 }
145
146 bool
147 PosixMessageQueue::Create(enum eDirection t, enum eBlocking b)
148 {
149     debugOutput(DEBUG_LEVEL_VERBOSE,
150                 "(%p, %s) create\n",
151                 this, m_name.c_str());
152
153     if(m_handle != MQ_INVALID_ID) {
154         debugError("(%p, %s) already open\n",
155                    this, m_name.c_str());
156         return false;
157     }
158     return doOpen(t, O_CREAT | O_EXCL, b);
159 }
160
161 bool
162 PosixMessageQueue::Close()
163 {
164     debugOutput(DEBUG_LEVEL_VERBOSE,
165                 "(%p, %s) close\n",
166                 this, m_name.c_str());
167
168     if(m_handle == MQ_INVALID_ID) {
169         debugOutput(DEBUG_LEVEL_VERBOSE,
170                     "(%p, %s) not open\n",
171                     this, m_name.c_str());
172         return true;
173     }
174     if(mq_close(m_handle)) {
175         debugError("(%p, %s) could not close: %s\n",
176                    this, m_name.c_str(), strerror(errno));
177         return false;
178     }
179     m_handle = MQ_INVALID_ID;
180     return true;
181 }
182
183 enum PosixMessageQueue::eResult
184 PosixMessageQueue::Clear()
185 {
186     debugOutput(DEBUG_LEVEL_VERBOSE,
187                 "(%p, %s) clear\n",
188                 this, m_name.c_str());
189     if(m_direction == eD_WriteOnly) {
190         debugError("Cannot clear write-only queue\n");
191         return eR_Error;
192     }
193
194     // ensure that we don't interfere with the notification handler
195     MutexLockHelper lock(m_notifyHandlerLock);
196     while(countMessages()) {
197         struct timespec timeout;
198         clock_gettime(CLOCK_REALTIME, &timeout);
199         timeout.tv_sec += m_timeout.tv_sec;
200         timeout.tv_nsec += m_timeout.tv_nsec;
201         if(timeout.tv_nsec >= 1000000000LL) {
202             timeout.tv_sec++;
203             timeout.tv_nsec -= 1000000000LL;
204         }
205    
206         signed int len;
207         unsigned prio;
208         if((len = mq_timedreceive(m_handle, m_tmp_buffer, m_attr.mq_msgsize, &prio, &timeout)) < 0) {
209             switch(errno) {
210                 case EAGAIN:
211                     debugOutput(DEBUG_LEVEL_VERBOSE,
212                                 "(%p, %s) empty\n",
213                                 this, m_name.c_str());
214                     return eR_OK;
215                 case ETIMEDOUT:
216                     debugOutput(DEBUG_LEVEL_VERBOSE,
217                                 "(%p, %s) read timed out\n",
218                                 this, m_name.c_str());
219                     return eR_Timeout;
220                 default:
221                     debugError("(%p, %s) could not receive: %s\n",
222                             this, m_name.c_str(), strerror(errno));
223                     return eR_Error;
224             }
225         }
226     }
227     return eR_OK;
228 }
229
230 enum PosixMessageQueue::eResult
231 PosixMessageQueue::Send(PosixMessageQueue::Message &m)
232 {
233     debugOutput(DEBUG_LEVEL_VERBOSE,
234                 "(%p, %s) send\n",
235                 this, m_name.c_str());
236     if(m_direction == eD_ReadOnly) {
237         debugError("Cannot write to read-only queue\n");
238         return eR_Error;
239     }
240
241     int len = m.getLength();
242     if (len > m_attr.mq_msgsize) {
243         debugError("Message too long\n");
244         return eR_Error;
245     }
246
247     struct timespec timeout;
248     clock_gettime(CLOCK_REALTIME, &timeout);
249     timeout.tv_sec += m_timeout.tv_sec;
250     timeout.tv_nsec += m_timeout.tv_nsec;
251     if(timeout.tv_nsec >= 1000000000LL) {
252         timeout.tv_sec++;
253         timeout.tv_nsec -= 1000000000LL;
254     }
255
256     if(!m.serialize(m_tmp_buffer)) {
257         debugError("Could not serialize\n");
258         return eR_Error;
259     }
260
261     if(mq_timedsend(m_handle, m_tmp_buffer, len, m.getPriority(), &timeout) == MQ_INVALID_ID) {
262         switch(errno) {
263             case EAGAIN:
264                 debugOutput(DEBUG_LEVEL_VERBOSE,
265                             "(%p, %s) full\n",
266                             this, m_name.c_str());
267                 return eR_Again;
268             case ETIMEDOUT:
269                 debugOutput(DEBUG_LEVEL_VERBOSE,
270                             "(%p, %s) read timed out\n",
271                             this, m_name.c_str());
272                 return eR_Timeout;
273             default:
274                 debugError("(%p, %s) could not send: %s\n",
275                            this, m_name.c_str(), strerror(errno));
276                 return eR_Error;
277         }
278     }
279     return eR_OK;
280 }
281
282 enum PosixMessageQueue::eResult
283 PosixMessageQueue::Receive(PosixMessageQueue::Message &m)
284 {
285     debugOutput(DEBUG_LEVEL_VERBOSE,
286                 "(%p, %s) receive\n",
287                 this, m_name.c_str());
288     if(m_direction == eD_WriteOnly) {
289         debugError("Cannot read from write-only queue\n");
290         return eR_Error;
291     }
292
293     struct timespec timeout;
294     clock_gettime(CLOCK_REALTIME, &timeout);
295     timeout.tv_sec += m_timeout.tv_sec;
296     timeout.tv_nsec += m_timeout.tv_nsec;
297     if(timeout.tv_nsec >= 1000000000LL) {
298         timeout.tv_sec++;
299         timeout.tv_nsec -= 1000000000LL;
300     }
301
302     signed int len;
303     unsigned prio;
304     if((len = mq_timedreceive(m_handle, m_tmp_buffer, m_attr.mq_msgsize, &prio, &timeout)) < 0) {
305         switch(errno) {
306             case EAGAIN:
307                 debugOutput(DEBUG_LEVEL_VERBOSE,
308                             "(%p, %s) empty\n",
309                             this, m_name.c_str());
310                 return eR_Again;
311             case ETIMEDOUT:
312                 debugOutput(DEBUG_LEVEL_VERBOSE,
313                             "(%p, %s) read timed out\n",
314                             this, m_name.c_str());
315                 return eR_Timeout;
316             default:
317                 debugError("(%p, %s) could not receive: %s\n",
318                            this, m_name.c_str(), strerror(errno));
319                 return eR_Error;
320         }
321     }
322
323     if(!m.deserialize(m_tmp_buffer, len, prio)) {
324         debugError("Could not parse message\n");
325         return eR_Error;
326     }
327     return eR_OK;
328 }
329
330
331 int
332 PosixMessageQueue::countMessages()
333 {
334     if(m_handle == MQ_INVALID_ID) {
335         debugOutput(DEBUG_LEVEL_VERBOSE,
336                     "(%p, %s) invalid handle\n",
337                     this, m_name.c_str());
338         return -1;
339     }
340     struct mq_attr attr;
341     if(mq_getattr(m_handle, &attr) == MQ_INVALID_ID) {
342         debugError("(%p, %s) could get attr: %s\n",
343                    this, m_name.c_str(), strerror(errno));
344         return -1;
345     }
346     return attr.mq_curmsgs;
347 }
348
349 bool
350 PosixMessageQueue::canSend()
351 {
352     return countMessages() < m_attr.mq_maxmsg;
353 }
354
355 bool
356 PosixMessageQueue::canReceive()
357 {
358     return countMessages() > 0;
359 }
360
361 bool
362 PosixMessageQueue::setNotificationHandler(Util::Functor *f)
363 {
364     debugOutput(DEBUG_LEVEL_VERBOSE,
365                 "(%p, %s) setting handler to %p\n",
366                 this, m_name.c_str(), f);
367
368     // ensure we don't change the notifier while
369     // it's used
370     MutexLockHelper lock(m_notifyHandlerLock);
371     if(m_notifyHandler == NULL) {
372         m_notifyHandler = f;
373         return true;
374     } else {
375         debugError("handler already present\n");
376         return false;
377     }
378 }
379
380 bool
381 PosixMessageQueue::unsetNotificationHandler()
382 {
383     debugOutput(DEBUG_LEVEL_VERBOSE,
384                 "(%p, %s) unsetting handler\n",
385                 this, m_name.c_str());
386
387     // ensure we don't change the notifier while
388     // it's used
389     MutexLockHelper lock(m_notifyHandlerLock);
390     if(m_notifyHandler != NULL) {
391         m_notifyHandler = NULL;
392         return true;
393     } else {
394         debugWarning("no handler present\n");
395         return true; // not considered an error
396     }
397 }
398
399 void
400 PosixMessageQueue::notifyCallback()
401 {
402     debugOutput(DEBUG_LEVEL_VERBOSE,
403                 "(%p, %s) Notified\n",
404                 this, m_name.c_str());
405     // make sure the handler is not changed
406     MutexLockHelper lock(m_notifyHandlerLock);
407     if(m_notifyHandler) {
408         (*m_notifyHandler)();
409     }
410 }
411
412 bool
413 PosixMessageQueue::enableNotification()
414 {
415     debugOutput(DEBUG_LEVEL_VERBOSE,
416                 "(%p, %s) set\n",
417                 this, m_name.c_str());
418
419     sigevent evp;
420     memset (&evp, 0, sizeof(evp));
421     evp.sigev_notify = SIGEV_THREAD;
422     evp.sigev_value.sival_ptr = (void *)this;
423     evp.sigev_notify_function = &notifyCallbackStatic;
424
425     if(mq_notify(m_handle, &evp) == MQ_INVALID_ID) {
426         debugError("(%p, %s) could set notifier: %s\n",
427                    this, m_name.c_str(), strerror(errno));
428         return false;
429     }
430     return true;
431 }
432
433 bool
434 PosixMessageQueue::disableNotification()
435 {
436     debugOutput(DEBUG_LEVEL_VERBOSE,
437                 "(%p, %s) unset\n",
438                 this, m_name.c_str());
439
440     if(mq_notify(m_handle, NULL) == MQ_INVALID_ID) {
441         debugError("(%p, %s) could unset notifier: %s\n",
442                    this, m_name.c_str(), strerror(errno));
443         return false;
444     }
445     return true;
446 }
447
448 void
449 PosixMessageQueue::show()
450 {
451     debugOutput(DEBUG_LEVEL_NORMAL, "(%p) MessageQueue %s\n", this, m_name.c_str());
452 }
453
454 void
455 PosixMessageQueue::setVerboseLevel(int i)
456 {
457     setDebugLevel(i);
458     m_notifyHandlerLock.setVerboseLevel(i);
459 }
460
461 } // end of namespace
Note: See TracBrowser for help on using the browser.