root/trunk/libffado/src/libutil/PosixMessageQueue.cpp

Revision 2706, 13.3 kB (checked in by jwoithe, 1 year ago)

PosixMessageQueue?: ensure visibility of SIGEV_THREAD declaration.

Orcan Ogetbil reported on the ffado-devel list that during the recent Fedora
rebuild effort FFADO 2.3.0 failed to compile because SIGEV_THREAD was
undeclared. Jano Svitok made the plausible suggestion that signal.h needs
to be included explicitly under newer glibc versions. This trivial patch
implements this. As of this commit it has not been verified that the
problem is fixed with this addition. However, there's no harm done by the
change in general and conceptionally the change is consistent with the
source file's content given the use of signal-related functionality.

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "PosixMessageQueue.h"
25
26 #include "Functors.h"
27 #include "PosixMutex.h"
28 #include "Time.h"
29
30 #include <errno.h>
31 #include <string.h>
32 #include <poll.h>
33 #include <signal.h>
34
35 #define MQ_INVALID_ID ((mqd_t) -1)
36 // one second
37 #define POSIX_MESSAGEQUEUE_DEFAULT_TIMEOUT_SEC   10
38 #define POSIX_MESSAGEQUEUE_DEFAULT_TIMEOUT_NSEC  0
39
40 #define POSIX_MESSAGEQUEUE_MAX_MESSAGE_SIZE      1024
41 // note 10 is the default hard limit
42 #define POSIX_MESSAGEQUEUE_MAX_NB_MESSAGES       10
43
44 namespace Util
45 {
46
47 IMPL_DEBUG_MODULE( PosixMessageQueue, PosixMessageQueue, DEBUG_LEVEL_NORMAL );
48
49 PosixMessageQueue::PosixMessageQueue(std::string name)
50 : m_name( "/" + name )
51 , m_blocking( eB_Blocking )
52 , m_direction( eD_None )
53 , m_owner( false )
54 , m_handle( MQ_INVALID_ID )
55 , m_tmp_buffer( NULL )
56 , m_notifyHandler( NULL )
57 , m_notifyHandlerLock( *(new PosixMutex()) )
58 {
59     m_timeout.tv_sec = POSIX_MESSAGEQUEUE_DEFAULT_TIMEOUT_SEC;
60     m_timeout.tv_nsec = POSIX_MESSAGEQUEUE_DEFAULT_TIMEOUT_NSEC;
61
62     memset(&m_attr, 0, sizeof(m_attr));
63     m_attr.mq_maxmsg = POSIX_MESSAGEQUEUE_MAX_NB_MESSAGES;
64     m_attr.mq_msgsize = POSIX_MESSAGEQUEUE_MAX_MESSAGE_SIZE;
65     m_tmp_buffer = new char[m_attr.mq_msgsize];
66 }
67
68 PosixMessageQueue::~PosixMessageQueue()
69 {
70     debugOutput(DEBUG_LEVEL_VERBOSE,
71                 "(%p, %s) PosixMessageQueue destroy\n",
72                 this, m_name.c_str());
73     Close();
74     if(m_owner) {
75         debugOutput(DEBUG_LEVEL_VERBOSE,
76                     "(%p, %s) unlink\n",
77                     this, m_name.c_str());
78
79         if(mq_unlink(m_name.c_str()) == MQ_INVALID_ID) {
80             debugError("(%p, %s) could not unlink message queue: %s\n",
81                     this, m_name.c_str(), strerror(errno));
82         }
83     }
84     delete[] m_tmp_buffer;
85 }
86
87 bool
88 PosixMessageQueue::doOpen(enum eDirection t, int flags, enum eBlocking b)
89 {
90
91     if(m_handle != MQ_INVALID_ID) {
92         debugError("(%p, %s) already open\n",
93                    this, m_name.c_str());
94         return false;
95     }
96
97     switch(t) {
98         case eD_ReadOnly: flags |= O_RDONLY; break;
99         case eD_WriteOnly: flags |= O_WRONLY; break;
100         case eD_ReadWrite: flags |= O_RDWR; break;
101         default:
102             debugError("bad direction\n");
103             return false;
104     }
105
106     if(b == eB_NonBlocking) {
107         flags |= O_NONBLOCK;
108     }
109
110     if(flags & O_CREAT) {
111         // only user has permissions
112         m_handle = mq_open(m_name.c_str(), flags, S_IRWXU, &m_attr);
113     } else {
114
115         m_handle = mq_open(m_name.c_str(), flags);
116     }
117     if(m_handle == MQ_INVALID_ID) {
118         debugError("(%p, %s) could not open: %s\n",
119                    this, m_name.c_str(), strerror(errno));
120         return false;
121     }
122     if(flags & O_CREAT) {
123         m_owner = true;
124     }
125     if(mq_getattr(m_handle, &m_attr) == MQ_INVALID_ID) {
126         debugError("(%p, %s) could get attr: %s\n",
127                    this, m_name.c_str(), strerror(errno));
128         return false;
129     }
130     m_blocking = b;
131     return true;
132 }
133
134 bool
135 PosixMessageQueue::Open(enum eDirection t, enum eBlocking b)
136 {
137     debugOutput(DEBUG_LEVEL_VERBOSE,
138                 "(%p, %s) open\n",
139                 this, m_name.c_str());
140
141     if(m_handle != MQ_INVALID_ID) {
142         debugError("(%p, %s) already open\n",
143                    this, m_name.c_str());
144         return false;
145     }
146     return doOpen(t, 0, b);
147 }
148
149 bool
150 PosixMessageQueue::Create(enum eDirection t, enum eBlocking b)
151 {
152     debugOutput(DEBUG_LEVEL_VERBOSE,
153                 "(%p, %s) create\n",
154                 this, m_name.c_str());
155
156     if(m_handle != MQ_INVALID_ID) {
157         debugError("(%p, %s) already open\n",
158                    this, m_name.c_str());
159         return false;
160     }
161     return doOpen(t, O_CREAT | O_EXCL, b);
162 }
163
164 bool
165 PosixMessageQueue::Close()
166 {
167     debugOutput(DEBUG_LEVEL_VERBOSE,
168                 "(%p, %s) close\n",
169                 this, m_name.c_str());
170
171     if(m_handle == MQ_INVALID_ID) {
172         debugOutput(DEBUG_LEVEL_VERBOSE,
173                     "(%p, %s) not open\n",
174                     this, m_name.c_str());
175         return true;
176     }
177     if(mq_close(m_handle)) {
178         debugError("(%p, %s) could not close: %s\n",
179                    this, m_name.c_str(), strerror(errno));
180         return false;
181     }
182     m_handle = MQ_INVALID_ID;
183     return true;
184 }
185
186 enum PosixMessageQueue::eResult
187 PosixMessageQueue::Clear()
188 {
189     debugOutput(DEBUG_LEVEL_VERBOSE,
190                 "(%p, %s) clear\n",
191                 this, m_name.c_str());
192     if(m_direction == eD_WriteOnly) {
193         debugError("Cannot clear write-only queue\n");
194         return eR_Error;
195     }
196
197     // ensure that we don't interfere with the notification handler
198     MutexLockHelper lock(m_notifyHandlerLock);
199     while(countMessages()) {
200         struct timespec timeout;
201         Util::SystemTimeSource::clockGettime(&timeout);
202         timeout.tv_sec += m_timeout.tv_sec;
203         timeout.tv_nsec += m_timeout.tv_nsec;
204         if(timeout.tv_nsec >= 1000000000LL) {
205             timeout.tv_sec++;
206             timeout.tv_nsec -= 1000000000LL;
207         }
208    
209         signed int len;
210         unsigned prio;
211         if((len = mq_timedreceive(m_handle, m_tmp_buffer, m_attr.mq_msgsize, &prio, &timeout)) < 0) {
212             switch(errno) {
213                 case EAGAIN:
214                     debugOutput(DEBUG_LEVEL_VERBOSE,
215                                 "(%p, %s) empty\n",
216                                 this, m_name.c_str());
217                     return eR_OK;
218                 case ETIMEDOUT:
219                     debugOutput(DEBUG_LEVEL_VERBOSE,
220                                 "(%p, %s) read timed out\n",
221                                 this, m_name.c_str());
222                     return eR_Timeout;
223                 default:
224                     debugError("(%p, %s) could not receive: %s\n",
225                             this, m_name.c_str(), strerror(errno));
226                     return eR_Error;
227             }
228         }
229     }
230     return eR_OK;
231 }
232
233 enum PosixMessageQueue::eResult
234 PosixMessageQueue::Send(PosixMessageQueue::Message &m)
235 {
236     debugOutput(DEBUG_LEVEL_VERBOSE,
237                 "(%p, %s) send\n",
238                 this, m_name.c_str());
239     if(m_direction == eD_ReadOnly) {
240         debugError("Cannot write to read-only queue\n");
241         return eR_Error;
242     }
243
244     int len = m.getLength();
245     if (len > m_attr.mq_msgsize) {
246         debugError("Message too long\n");
247         return eR_Error;
248     }
249
250     struct timespec timeout;
251     Util::SystemTimeSource::clockGettime(&timeout);
252     timeout.tv_sec += m_timeout.tv_sec;
253     timeout.tv_nsec += m_timeout.tv_nsec;
254     if(timeout.tv_nsec >= 1000000000LL) {
255         timeout.tv_sec++;
256         timeout.tv_nsec -= 1000000000LL;
257     }
258
259     if(!m.serialize(m_tmp_buffer)) {
260         debugError("Could not serialize\n");
261         return eR_Error;
262     }
263
264     if(mq_timedsend(m_handle, m_tmp_buffer, len, m.getPriority(), &timeout) == MQ_INVALID_ID) {
265         switch(errno) {
266             case EAGAIN:
267                 debugOutput(DEBUG_LEVEL_VERBOSE,
268                             "(%p, %s) full\n",
269                             this, m_name.c_str());
270                 return eR_Again;
271             case ETIMEDOUT:
272                 debugOutput(DEBUG_LEVEL_VERBOSE,
273                             "(%p, %s) read timed out\n",
274                             this, m_name.c_str());
275                 return eR_Timeout;
276             default:
277                 debugError("(%p, %s) could not send: %s\n",
278                            this, m_name.c_str(), strerror(errno));
279                 return eR_Error;
280         }
281     }
282     return eR_OK;
283 }
284
285 enum PosixMessageQueue::eResult
286 PosixMessageQueue::Receive(PosixMessageQueue::Message &m)
287 {
288     debugOutput(DEBUG_LEVEL_VERBOSE,
289                 "(%p, %s) receive\n",
290                 this, m_name.c_str());
291     if(m_direction == eD_WriteOnly) {
292         debugError("Cannot read from write-only queue\n");
293         return eR_Error;
294     }
295
296     struct timespec timeout;
297     Util::SystemTimeSource::clockGettime(&timeout);
298     timeout.tv_sec += m_timeout.tv_sec;
299     timeout.tv_nsec += m_timeout.tv_nsec;
300     if(timeout.tv_nsec >= 1000000000LL) {
301         timeout.tv_sec++;
302         timeout.tv_nsec -= 1000000000LL;
303     }
304
305     signed int len;
306     unsigned prio;
307     if((len = mq_timedreceive(m_handle, m_tmp_buffer, m_attr.mq_msgsize, &prio, &timeout)) < 0) {
308         switch(errno) {
309             case EAGAIN:
310                 debugOutput(DEBUG_LEVEL_VERBOSE,
311                             "(%p, %s) empty\n",
312                             this, m_name.c_str());
313                 return eR_Again;
314             case ETIMEDOUT:
315                 debugOutput(DEBUG_LEVEL_VERBOSE,
316                             "(%p, %s) read timed out\n",
317                             this, m_name.c_str());
318                 return eR_Timeout;
319             default:
320                 debugError("(%p, %s) could not receive: %s\n",
321                            this, m_name.c_str(), strerror(errno));
322                 return eR_Error;
323         }
324     }
325
326     if(!m.deserialize(m_tmp_buffer, len, prio)) {
327         debugError("Could not parse message\n");
328         return eR_Error;
329     }
330     return eR_OK;
331 }
332
333 bool
334 PosixMessageQueue::Wait()
335 {
336     int err;
337     struct pollfd poll_fds[1];
338     poll_fds[0].fd = m_handle; // NOTE: not portable, Linux only
339     poll_fds[0].events = POLLIN;
340
341     err = poll (poll_fds, 1, -1);
342
343     if (err < 0) {
344         if (errno == EINTR) {
345             debugOutput(DEBUG_LEVEL_VERBOSE, "Ignoring poll return due to signal\n");
346             return true;
347         }
348         debugFatal("poll error: %s\n", strerror (errno));
349         return false;
350     }
351
352     return true;
353 }
354
355 int
356 PosixMessageQueue::countMessages()
357 {
358     if(m_handle == MQ_INVALID_ID) {
359         debugOutput(DEBUG_LEVEL_VERBOSE,
360                     "(%p, %s) invalid handle\n",
361                     this, m_name.c_str());
362         return -1;
363     }
364     struct mq_attr attr;
365     if(mq_getattr(m_handle, &attr) == MQ_INVALID_ID) {
366         debugError("(%p, %s) could get attr: %s\n",
367                    this, m_name.c_str(), strerror(errno));
368         return -1;
369     }
370     return attr.mq_curmsgs;
371 }
372
373 bool
374 PosixMessageQueue::canSend()
375 {
376     return countMessages() < m_attr.mq_maxmsg;
377 }
378
379 bool
380 PosixMessageQueue::canReceive()
381 {
382     return countMessages() > 0;
383 }
384
385 bool
386 PosixMessageQueue::setNotificationHandler(Util::Functor *f)
387 {
388     debugOutput(DEBUG_LEVEL_VERBOSE,
389                 "(%p, %s) setting handler to %p\n",
390                 this, m_name.c_str(), f);
391
392     // ensure we don't change the notifier while
393     // it's used
394     MutexLockHelper lock(m_notifyHandlerLock);
395     if(m_notifyHandler == NULL) {
396         m_notifyHandler = f;
397         return true;
398     } else {
399         debugError("handler already present\n");
400         return false;
401     }
402 }
403
404 bool
405 PosixMessageQueue::unsetNotificationHandler()
406 {
407     debugOutput(DEBUG_LEVEL_VERBOSE,
408                 "(%p, %s) unsetting handler\n",
409                 this, m_name.c_str());
410
411     // ensure we don't change the notifier while
412     // it's used
413     MutexLockHelper lock(m_notifyHandlerLock);
414     if(m_notifyHandler != NULL) {
415         m_notifyHandler = NULL;
416         return true;
417     } else {
418         debugWarning("no handler present\n");
419         return true; // not considered an error
420     }
421 }
422
423 void
424 PosixMessageQueue::notifyCallback()
425 {
426     debugOutput(DEBUG_LEVEL_VERBOSE,
427                 "(%p, %s) Notified\n",
428                 this, m_name.c_str());
429     // make sure the handler is not changed
430     MutexLockHelper lock(m_notifyHandlerLock);
431     if(m_notifyHandler) {
432         (*m_notifyHandler)();
433     }
434 }
435
436 bool
437 PosixMessageQueue::enableNotification()
438 {
439     debugOutput(DEBUG_LEVEL_VERBOSE,
440                 "(%p, %s) set\n",
441                 this, m_name.c_str());
442
443     sigevent evp;
444     memset (&evp, 0, sizeof(evp));
445     evp.sigev_notify = SIGEV_THREAD;
446     evp.sigev_value.sival_ptr = (void *)this;
447     evp.sigev_notify_function = &notifyCallbackStatic;
448
449     if(mq_notify(m_handle, &evp) == MQ_INVALID_ID) {
450         debugError("(%p, %s) could set notifier: %s\n",
451                    this, m_name.c_str(), strerror(errno));
452         return false;
453     }
454     return true;
455 }
456
457 bool
458 PosixMessageQueue::disableNotification()
459 {
460     debugOutput(DEBUG_LEVEL_VERBOSE,
461                 "(%p, %s) unset\n",
462                 this, m_name.c_str());
463
464     if(mq_notify(m_handle, NULL) == MQ_INVALID_ID) {
465         debugError("(%p, %s) could unset notifier: %s\n",
466                    this, m_name.c_str(), strerror(errno));
467         return false;
468     }
469     return true;
470 }
471
472 void
473 PosixMessageQueue::show()
474 {
475     debugOutput(DEBUG_LEVEL_NORMAL, "(%p) MessageQueue %s\n", this, m_name.c_str());
476 }
477
478 void
479 PosixMessageQueue::setVerboseLevel(int i)
480 {
481     setDebugLevel(i);
482     m_notifyHandlerLock.setVerboseLevel(i);
483 }
484
485 } // end of namespace
Note: See TracBrowser for help on using the browser.