root/trunk/libffado/src/libutil/PosixMessageQueue.cpp

Revision 2171, 13.3 kB (checked in by jwoithe, 12 years ago)

A second pass at addressing ticket #242. Define a global clock source within the SystemTimeSource? object and use this whenever clock_gettime() is called. On systems which support the new raw1394_read_cycle_timer_and_clock() libraw1394 call and CLOCK_MONOTONIC_RAW, these changes should ensure that all timing-sensitive parts of FFADO are using the same clock source. System tests under tests/systemtests/ have not been converted to use this new framework because they exist for different purposes and are not using the FFADO streaming infrastructure.

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "PosixMessageQueue.h"
25
26 #include "Functors.h"
27 #include "PosixMutex.h"
28 #include "Time.h"
29
30 #include <errno.h>
31 #include <string.h>
32 #include <poll.h>
33
34 #define MQ_INVALID_ID ((mqd_t) -1)
35 // one second
36 #define POSIX_MESSAGEQUEUE_DEFAULT_TIMEOUT_SEC   10
37 #define POSIX_MESSAGEQUEUE_DEFAULT_TIMEOUT_NSEC  0
38
39 #define POSIX_MESSAGEQUEUE_MAX_MESSAGE_SIZE      1024
40 // note 10 is the default hard limit
41 #define POSIX_MESSAGEQUEUE_MAX_NB_MESSAGES       10
42
43 namespace Util
44 {
45
46 IMPL_DEBUG_MODULE( PosixMessageQueue, PosixMessageQueue, DEBUG_LEVEL_NORMAL );
47
48 PosixMessageQueue::PosixMessageQueue(std::string name)
49 : m_name( "/" + name )
50 , m_blocking( eB_Blocking )
51 , m_direction( eD_None )
52 , m_owner( false )
53 , m_handle( MQ_INVALID_ID )
54 , m_tmp_buffer( NULL )
55 , m_notifyHandler( NULL )
56 , m_notifyHandlerLock( *(new PosixMutex()) )
57 {
58     m_timeout.tv_sec = POSIX_MESSAGEQUEUE_DEFAULT_TIMEOUT_SEC;
59     m_timeout.tv_nsec = POSIX_MESSAGEQUEUE_DEFAULT_TIMEOUT_NSEC;
60
61     memset(&m_attr, 0, sizeof(m_attr));
62     m_attr.mq_maxmsg = POSIX_MESSAGEQUEUE_MAX_NB_MESSAGES;
63     m_attr.mq_msgsize = POSIX_MESSAGEQUEUE_MAX_MESSAGE_SIZE;
64     m_tmp_buffer = new char[m_attr.mq_msgsize];
65 }
66
67 PosixMessageQueue::~PosixMessageQueue()
68 {
69     debugOutput(DEBUG_LEVEL_VERBOSE,
70                 "(%p, %s) PosixMessageQueue destroy\n",
71                 this, m_name.c_str());
72     Close();
73     if(m_owner) {
74         debugOutput(DEBUG_LEVEL_VERBOSE,
75                     "(%p, %s) unlink\n",
76                     this, m_name.c_str());
77
78         if(mq_unlink(m_name.c_str()) == MQ_INVALID_ID) {
79             debugError("(%p, %s) could not unlink message queue: %s\n",
80                     this, m_name.c_str(), strerror(errno));
81         }
82     }
83     delete[] m_tmp_buffer;
84 }
85
86 bool
87 PosixMessageQueue::doOpen(enum eDirection t, int flags, enum eBlocking b)
88 {
89
90     if(m_handle != MQ_INVALID_ID) {
91         debugError("(%p, %s) already open\n",
92                    this, m_name.c_str());
93         return false;
94     }
95
96     switch(t) {
97         case eD_ReadOnly: flags |= O_RDONLY; break;
98         case eD_WriteOnly: flags |= O_WRONLY; break;
99         case eD_ReadWrite: flags |= O_RDWR; break;
100         default:
101             debugError("bad direction\n");
102             return false;
103     }
104
105     if(b == eB_NonBlocking) {
106         flags |= O_NONBLOCK;
107     }
108
109     if(flags & O_CREAT) {
110         // only user has permissions
111         m_handle = mq_open(m_name.c_str(), flags, S_IRWXU, &m_attr);
112     } else {
113
114         m_handle = mq_open(m_name.c_str(), flags);
115     }
116     if(m_handle == MQ_INVALID_ID) {
117         debugError("(%p, %s) could not open: %s\n",
118                    this, m_name.c_str(), strerror(errno));
119         return false;
120     }
121     if(flags & O_CREAT) {
122         m_owner = true;
123     }
124     if(mq_getattr(m_handle, &m_attr) == MQ_INVALID_ID) {
125         debugError("(%p, %s) could get attr: %s\n",
126                    this, m_name.c_str(), strerror(errno));
127         return false;
128     }
129     m_blocking = b;
130     return true;
131 }
132
133 bool
134 PosixMessageQueue::Open(enum eDirection t, enum eBlocking b)
135 {
136     debugOutput(DEBUG_LEVEL_VERBOSE,
137                 "(%p, %s) open\n",
138                 this, m_name.c_str());
139
140     if(m_handle != MQ_INVALID_ID) {
141         debugError("(%p, %s) already open\n",
142                    this, m_name.c_str());
143         return false;
144     }
145     return doOpen(t, 0, b);
146 }
147
148 bool
149 PosixMessageQueue::Create(enum eDirection t, enum eBlocking b)
150 {
151     debugOutput(DEBUG_LEVEL_VERBOSE,
152                 "(%p, %s) create\n",
153                 this, m_name.c_str());
154
155     if(m_handle != MQ_INVALID_ID) {
156         debugError("(%p, %s) already open\n",
157                    this, m_name.c_str());
158         return false;
159     }
160     return doOpen(t, O_CREAT | O_EXCL, b);
161 }
162
163 bool
164 PosixMessageQueue::Close()
165 {
166     debugOutput(DEBUG_LEVEL_VERBOSE,
167                 "(%p, %s) close\n",
168                 this, m_name.c_str());
169
170     if(m_handle == MQ_INVALID_ID) {
171         debugOutput(DEBUG_LEVEL_VERBOSE,
172                     "(%p, %s) not open\n",
173                     this, m_name.c_str());
174         return true;
175     }
176     if(mq_close(m_handle)) {
177         debugError("(%p, %s) could not close: %s\n",
178                    this, m_name.c_str(), strerror(errno));
179         return false;
180     }
181     m_handle = MQ_INVALID_ID;
182     return true;
183 }
184
185 enum PosixMessageQueue::eResult
186 PosixMessageQueue::Clear()
187 {
188     debugOutput(DEBUG_LEVEL_VERBOSE,
189                 "(%p, %s) clear\n",
190                 this, m_name.c_str());
191     if(m_direction == eD_WriteOnly) {
192         debugError("Cannot clear write-only queue\n");
193         return eR_Error;
194     }
195
196     // ensure that we don't interfere with the notification handler
197     MutexLockHelper lock(m_notifyHandlerLock);
198     while(countMessages()) {
199         struct timespec timeout;
200         Util::SystemTimeSource::clockGettime(&timeout);
201         timeout.tv_sec += m_timeout.tv_sec;
202         timeout.tv_nsec += m_timeout.tv_nsec;
203         if(timeout.tv_nsec >= 1000000000LL) {
204             timeout.tv_sec++;
205             timeout.tv_nsec -= 1000000000LL;
206         }
207    
208         signed int len;
209         unsigned prio;
210         if((len = mq_timedreceive(m_handle, m_tmp_buffer, m_attr.mq_msgsize, &prio, &timeout)) < 0) {
211             switch(errno) {
212                 case EAGAIN:
213                     debugOutput(DEBUG_LEVEL_VERBOSE,
214                                 "(%p, %s) empty\n",
215                                 this, m_name.c_str());
216                     return eR_OK;
217                 case ETIMEDOUT:
218                     debugOutput(DEBUG_LEVEL_VERBOSE,
219                                 "(%p, %s) read timed out\n",
220                                 this, m_name.c_str());
221                     return eR_Timeout;
222                 default:
223                     debugError("(%p, %s) could not receive: %s\n",
224                             this, m_name.c_str(), strerror(errno));
225                     return eR_Error;
226             }
227         }
228     }
229     return eR_OK;
230 }
231
232 enum PosixMessageQueue::eResult
233 PosixMessageQueue::Send(PosixMessageQueue::Message &m)
234 {
235     debugOutput(DEBUG_LEVEL_VERBOSE,
236                 "(%p, %s) send\n",
237                 this, m_name.c_str());
238     if(m_direction == eD_ReadOnly) {
239         debugError("Cannot write to read-only queue\n");
240         return eR_Error;
241     }
242
243     int len = m.getLength();
244     if (len > m_attr.mq_msgsize) {
245         debugError("Message too long\n");
246         return eR_Error;
247     }
248
249     struct timespec timeout;
250     Util::SystemTimeSource::clockGettime(&timeout);
251     timeout.tv_sec += m_timeout.tv_sec;
252     timeout.tv_nsec += m_timeout.tv_nsec;
253     if(timeout.tv_nsec >= 1000000000LL) {
254         timeout.tv_sec++;
255         timeout.tv_nsec -= 1000000000LL;
256     }
257
258     if(!m.serialize(m_tmp_buffer)) {
259         debugError("Could not serialize\n");
260         return eR_Error;
261     }
262
263     if(mq_timedsend(m_handle, m_tmp_buffer, len, m.getPriority(), &timeout) == MQ_INVALID_ID) {
264         switch(errno) {
265             case EAGAIN:
266                 debugOutput(DEBUG_LEVEL_VERBOSE,
267                             "(%p, %s) full\n",
268                             this, m_name.c_str());
269                 return eR_Again;
270             case ETIMEDOUT:
271                 debugOutput(DEBUG_LEVEL_VERBOSE,
272                             "(%p, %s) read timed out\n",
273                             this, m_name.c_str());
274                 return eR_Timeout;
275             default:
276                 debugError("(%p, %s) could not send: %s\n",
277                            this, m_name.c_str(), strerror(errno));
278                 return eR_Error;
279         }
280     }
281     return eR_OK;
282 }
283
284 enum PosixMessageQueue::eResult
285 PosixMessageQueue::Receive(PosixMessageQueue::Message &m)
286 {
287     debugOutput(DEBUG_LEVEL_VERBOSE,
288                 "(%p, %s) receive\n",
289                 this, m_name.c_str());
290     if(m_direction == eD_WriteOnly) {
291         debugError("Cannot read from write-only queue\n");
292         return eR_Error;
293     }
294
295     struct timespec timeout;
296     Util::SystemTimeSource::clockGettime(&timeout);
297     timeout.tv_sec += m_timeout.tv_sec;
298     timeout.tv_nsec += m_timeout.tv_nsec;
299     if(timeout.tv_nsec >= 1000000000LL) {
300         timeout.tv_sec++;
301         timeout.tv_nsec -= 1000000000LL;
302     }
303
304     signed int len;
305     unsigned prio;
306     if((len = mq_timedreceive(m_handle, m_tmp_buffer, m_attr.mq_msgsize, &prio, &timeout)) < 0) {
307         switch(errno) {
308             case EAGAIN:
309                 debugOutput(DEBUG_LEVEL_VERBOSE,
310                             "(%p, %s) empty\n",
311                             this, m_name.c_str());
312                 return eR_Again;
313             case ETIMEDOUT:
314                 debugOutput(DEBUG_LEVEL_VERBOSE,
315                             "(%p, %s) read timed out\n",
316                             this, m_name.c_str());
317                 return eR_Timeout;
318             default:
319                 debugError("(%p, %s) could not receive: %s\n",
320                            this, m_name.c_str(), strerror(errno));
321                 return eR_Error;
322         }
323     }
324
325     if(!m.deserialize(m_tmp_buffer, len, prio)) {
326         debugError("Could not parse message\n");
327         return eR_Error;
328     }
329     return eR_OK;
330 }
331
332 bool
333 PosixMessageQueue::Wait()
334 {
335     int err;
336     struct pollfd poll_fds[1];
337     poll_fds[0].fd = m_handle; // NOTE: not portable, Linux only
338     poll_fds[0].events = POLLIN;
339
340     err = poll (poll_fds, 1, -1);
341
342     if (err < 0) {
343         if (errno == EINTR) {
344             debugOutput(DEBUG_LEVEL_VERBOSE, "Ignoring poll return due to signal\n");
345             return true;
346         }
347         debugFatal("poll error: %s\n", strerror (errno));
348         return false;
349     }
350
351     return true;
352 }
353
354 int
355 PosixMessageQueue::countMessages()
356 {
357     if(m_handle == MQ_INVALID_ID) {
358         debugOutput(DEBUG_LEVEL_VERBOSE,
359                     "(%p, %s) invalid handle\n",
360                     this, m_name.c_str());
361         return -1;
362     }
363     struct mq_attr attr;
364     if(mq_getattr(m_handle, &attr) == MQ_INVALID_ID) {
365         debugError("(%p, %s) could get attr: %s\n",
366                    this, m_name.c_str(), strerror(errno));
367         return -1;
368     }
369     return attr.mq_curmsgs;
370 }
371
372 bool
373 PosixMessageQueue::canSend()
374 {
375     return countMessages() < m_attr.mq_maxmsg;
376 }
377
378 bool
379 PosixMessageQueue::canReceive()
380 {
381     return countMessages() > 0;
382 }
383
384 bool
385 PosixMessageQueue::setNotificationHandler(Util::Functor *f)
386 {
387     debugOutput(DEBUG_LEVEL_VERBOSE,
388                 "(%p, %s) setting handler to %p\n",
389                 this, m_name.c_str(), f);
390
391     // ensure we don't change the notifier while
392     // it's used
393     MutexLockHelper lock(m_notifyHandlerLock);
394     if(m_notifyHandler == NULL) {
395         m_notifyHandler = f;
396         return true;
397     } else {
398         debugError("handler already present\n");
399         return false;
400     }
401 }
402
403 bool
404 PosixMessageQueue::unsetNotificationHandler()
405 {
406     debugOutput(DEBUG_LEVEL_VERBOSE,
407                 "(%p, %s) unsetting handler\n",
408                 this, m_name.c_str());
409
410     // ensure we don't change the notifier while
411     // it's used
412     MutexLockHelper lock(m_notifyHandlerLock);
413     if(m_notifyHandler != NULL) {
414         m_notifyHandler = NULL;
415         return true;
416     } else {
417         debugWarning("no handler present\n");
418         return true; // not considered an error
419     }
420 }
421
422 void
423 PosixMessageQueue::notifyCallback()
424 {
425     debugOutput(DEBUG_LEVEL_VERBOSE,
426                 "(%p, %s) Notified\n",
427                 this, m_name.c_str());
428     // make sure the handler is not changed
429     MutexLockHelper lock(m_notifyHandlerLock);
430     if(m_notifyHandler) {
431         (*m_notifyHandler)();
432     }
433 }
434
435 bool
436 PosixMessageQueue::enableNotification()
437 {
438     debugOutput(DEBUG_LEVEL_VERBOSE,
439                 "(%p, %s) set\n",
440                 this, m_name.c_str());
441
442     sigevent evp;
443     memset (&evp, 0, sizeof(evp));
444     evp.sigev_notify = SIGEV_THREAD;
445     evp.sigev_value.sival_ptr = (void *)this;
446     evp.sigev_notify_function = &notifyCallbackStatic;
447
448     if(mq_notify(m_handle, &evp) == MQ_INVALID_ID) {
449         debugError("(%p, %s) could set notifier: %s\n",
450                    this, m_name.c_str(), strerror(errno));
451         return false;
452     }
453     return true;
454 }
455
456 bool
457 PosixMessageQueue::disableNotification()
458 {
459     debugOutput(DEBUG_LEVEL_VERBOSE,
460                 "(%p, %s) unset\n",
461                 this, m_name.c_str());
462
463     if(mq_notify(m_handle, NULL) == MQ_INVALID_ID) {
464         debugError("(%p, %s) could unset notifier: %s\n",
465                    this, m_name.c_str(), strerror(errno));
466         return false;
467     }
468     return true;
469 }
470
471 void
472 PosixMessageQueue::show()
473 {
474     debugOutput(DEBUG_LEVEL_NORMAL, "(%p) MessageQueue %s\n", this, m_name.c_str());
475 }
476
477 void
478 PosixMessageQueue::setVerboseLevel(int i)
479 {
480     setDebugLevel(i);
481     m_notifyHandlerLock.setVerboseLevel(i);
482 }
483
484 } // end of namespace
Note: See TracBrowser for help on using the browser.