root/trunk/libffado/src/libutil/IpcRingBuffer.cpp

Revision 1234, 19.8 kB (checked in by holin, 13 years ago)

fix gcc 4.3 compile errors and some warnings (largely from Adrian Knoth)

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "IpcRingBuffer.h"
25 #include "PosixMessageQueue.h"
26 #include "PosixSharedMemory.h"
27 #include "Mutex.h"
28 #include "PosixMutex.h"
29 #include "Functors.h"
30
31 #include <cstring>
32
33 // FIXME: if we restrict the nb_blocks to a power of two, the overflows
34 //        can be implemented using masks
35
36 namespace Util {
37
38 IMPL_DEBUG_MODULE( IpcRingBuffer, IpcRingBuffer, DEBUG_LEVEL_VERBOSE );
39
40 IpcRingBuffer::IpcRingBuffer(std::string name,
41                              enum eBufferType type,
42                              enum eDirection dir,
43                              enum eBlocking blocking,
44                              unsigned int blocks, unsigned int block_size)
45 : m_name(name)
46 , m_blocks(blocks)
47 , m_blocksize(block_size)
48 , m_type( type )
49 , m_direction( dir )
50 , m_blocking( blocking )
51 , m_initialized( false )
52 , m_next_block( 1 )
53 , m_last_block_ack( 0 )
54 , m_idx( 1 )
55 , m_last_idx_ack( 0 )
56 , m_ping_queue( *(new PosixMessageQueue(name+":ping")) )
57 , m_pong_queue( *(new PosixMessageQueue(name+":pong")) )
58 , m_memblock( *(new PosixSharedMemory(name+":mem", blocks*block_size)) )
59 , m_access_lock( *(new PosixMutex()) )
60 , m_notify_functor( *(new MemberFunctor0< IpcRingBuffer*, void (IpcRingBuffer::*)() >
61                      ( this, &IpcRingBuffer::notificationHandler, false )) )
62 , m_block_requested_for_read( false )
63 , m_block_requested_for_write( false )
64 {
65     m_ping_queue.setVerboseLevel(getDebugLevel());
66     m_pong_queue.setVerboseLevel(getDebugLevel());
67     m_memblock.setVerboseLevel(getDebugLevel());
68     m_access_lock.setVerboseLevel(getDebugLevel());
69     sem_init(&m_activity, 0, 0);
70 }
71
72 IpcRingBuffer::~IpcRingBuffer()
73 {
74     // make sure everyone is done with this
75     // should not be necessary AFAIK
76     m_access_lock.Lock();
77     m_initialized=false;
78     delete &m_memblock;
79     delete &m_ping_queue;
80     delete &m_pong_queue;
81     m_access_lock.Unlock();
82
83     delete &m_access_lock;
84     delete &m_notify_functor;
85     sem_destroy(&m_activity);
86 }
87
88 bool
89 IpcRingBuffer::init()
90 {
91     if(m_initialized) {
92         debugError("(%p, %s) Already initialized\n",
93                    this, m_name.c_str());
94         return false;
95     }
96
97     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) init %s\n", this, m_name.c_str());
98     switch(m_type) {
99         case eBT_Master:
100             // a master creates and owns all of the shared memory structures
101             // for outward connections we write the data, else we read
102             if(!m_memblock.Create( PosixSharedMemory::eD_ReadWrite ))
103             {
104                 debugError("(%p, %s) Could not create memblock\n",
105                            this, m_name.c_str());
106                 return false;
107             }
108
109             m_memblock.LockInMemory(true);
110             // for outward connections we do the pinging, else
111             // we do the pong-ing. note that for writing, we open read-write
112             // in order to be able to dequeue when the queue is full
113             if(!m_ping_queue.Create( (m_direction == eD_Outward ?
114                                        PosixMessageQueue::eD_ReadWrite :
115                                        PosixMessageQueue::eD_ReadOnly),
116                                      (m_blocking==eB_Blocking ?
117                                        PosixMessageQueue::eB_Blocking :
118                                        PosixMessageQueue::eB_NonBlocking)
119                                    ))
120             {
121                 debugError("(%p, %s) Could not create ping queue\n",
122                            this, m_name.c_str());
123                 return false;
124             }
125             if(!m_pong_queue.Create( (m_direction == eD_Outward ?
126                                        PosixMessageQueue::eD_ReadOnly :
127                                        PosixMessageQueue::eD_ReadWrite),
128                                      (m_blocking==eB_Blocking ?
129                                        PosixMessageQueue::eB_Blocking :
130                                        PosixMessageQueue::eB_NonBlocking)
131                                    ))
132             {
133                 debugError("(%p, %s) Could not create pong queue\n",
134                            this, m_name.c_str());
135                 return false;
136             }
137             break;
138         case eBT_Slave:
139             // a slave only opens the shared memory structures
140             // for outward connections we write the data, else we read
141             if(!m_memblock.Open( (m_direction == eD_Outward
142                                     ? PosixSharedMemory::eD_ReadWrite
143                                     : PosixSharedMemory::eD_ReadOnly) ))
144             {
145                 debugError("(%p, %s) Could not open memblock\n",
146                            this, m_name.c_str());
147                 return false;
148             }
149             m_memblock.LockInMemory(true);
150             // for outward connections we do the pinging, else
151             // we do the pong-ing. note that for writing, we open read-write
152             // in order to be able to dequeue when the queue is full
153             if(!m_ping_queue.Open( (m_direction == eD_Outward ?
154                                       PosixMessageQueue::eD_ReadWrite :
155                                       PosixMessageQueue::eD_ReadOnly),
156                                    (m_blocking==eB_Blocking ?
157                                       PosixMessageQueue::eB_Blocking :
158                                       PosixMessageQueue::eB_NonBlocking)
159                                  ))
160             {
161                 debugError("(%p, %s) Could not open ping queue\n",
162                            this, m_name.c_str());
163                 return false;
164             }
165             if(!m_pong_queue.Open( (m_direction == eD_Outward ?
166                                       PosixMessageQueue::eD_ReadOnly :
167                                       PosixMessageQueue::eD_ReadWrite),
168                                    (m_blocking==eB_Blocking ?
169                                       PosixMessageQueue::eB_Blocking :
170                                       PosixMessageQueue::eB_NonBlocking)
171                                  ))
172             {
173                 debugError("(%p, %s) Could not open pong queue\n",
174                            this, m_name.c_str());
175                 return false;
176             }
177             break;
178     }
179
180     // if we are on the sending end of the buffer, we need a notifier
181     // on the pong queue
182     // the receiving end is driven by the messages in the ping queue
183     if(m_direction == eD_Outward) {
184         if(!m_pong_queue.setNotificationHandler(&m_notify_functor)) {
185             debugError("Could not set Notification Handler\n");
186             return false;
187         }
188         // enable the handler
189         if(!m_pong_queue.enableNotification()) {
190             debugError("Could not enable notification\n");
191         }
192     }
193
194     m_initialized = true;
195     return true;
196 }
197
198 void
199 IpcRingBuffer::notificationHandler()
200 {
201     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) IpcRingBuffer %s\n", this, m_name.c_str());
202     // prevent multiple access
203     MutexLockHelper lock(m_access_lock);
204
205     // The first thing we do is re-enable the handler
206     // it is not going to be called since there are messages in the queue
207     // first enabling the handler, then reading all received messages will
208     // ensure that we either read or get notified of any message that arrives
209     // while this handler is running
210     if(!m_pong_queue.enableNotification()) {
211         debugError("Could not re-enable notification\n");
212     }
213
214     // no need for a lock protecting the pong queue as long as we are the only ones reading it
215     // while we have messages to read, read them
216     while(m_pong_queue.canReceive()) {
217         // message placeholder
218         IpcMessage m_ack = IpcMessage(); // FIXME: stack allocation not strictly RT safe
219         // read ping message (blocks)
220         enum PosixMessageQueue::eResult msg_res;
221         msg_res = m_pong_queue.Receive(m_ack);
222         switch(msg_res) {
223             case PosixMessageQueue::eR_OK:
224                 break;
225             default: // we were just notified, anything except OK is an error
226                 debugError("Could not read from ping queue\n");
227         }
228    
229         IpcMessage::eMessageType type = m_ack.getType();
230         if(type == IpcMessage::eMT_DataAck) {
231             // get a view on the data
232             struct DataWrittenMessage* data = reinterpret_cast<struct DataWrittenMessage*>(m_ack.getDataPtr());
233    
234             debugOutput(DEBUG_LEVEL_VERBOSE, "Received ack idx %d at id %d\n", data->idx, data->id);
235    
236             // check counters
237             unsigned int expected_block_ack = m_last_block_ack+1;
238             if(expected_block_ack == m_blocks) expected_block_ack = 0;
239             if(data->id != expected_block_ack) {
240                 debugWarning("unexpected block id: %d (expected %d)\n", data->id, expected_block_ack);
241             }
242
243             unsigned int expected_block_idx = m_last_idx_ack+1; //will auto-overflow
244             if(data->idx != expected_block_idx) {
245                 debugWarning("unexpected block idx: %d (expected %d)\n", data->idx, expected_block_idx);
246             }
247    
248             // prepare the next expected values
249             // this is the only value used (and written in case of error) in the other thread
250             m_last_block_ack = data->id;
251             // this is not used
252             m_last_idx_ack = data->idx;
253             // signal activity
254             if(m_blocking == eB_Blocking) {
255                 sem_post(&m_activity);
256             }
257         } else {
258             debugError("Invalid message received (type %d)\n", type);
259         }
260     }
261 }
262
263 unsigned int
264 IpcRingBuffer::getBufferFill()
265 {
266     // the next pointer is last_written+1
267     // so the bufferfill = last_written - last_ack
268     //                   = last_written+1 - last_ack - 1
269     // last_ack is always <= last_written. if not,
270     // last_written has wrapped
271     // => wrap if: last_written < last_ack
272     //          or last_written+1 < last_ack+1
273     //          or m_next_block < last_ack+1
274     // unwrap if this happens
275     int bufferfill = m_next_block - m_last_block_ack - 1;
276     if(m_next_block <= m_last_block_ack) {
277         bufferfill += m_blocks;
278     }
279     assert(bufferfill>=0);
280     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) fill: %d\n", this, m_name.c_str(), bufferfill);
281     return (unsigned int)bufferfill;
282 }
283
284
285 enum IpcRingBuffer::eResult
286 IpcRingBuffer::requestBlockForWrite(void **block)
287 {
288     if(m_block_requested_for_write) {
289         debugError("Already a block requested for write\n");
290         return eR_Error;
291     }
292
293     // check if we can write a message
294     // we can send when:
295     //  - we are not overwriting
296     //    AND -    we are in blocking mode and
297     //        - OR (we are in non-blocking mode and there is space)
298
299     if(m_blocking == eB_Blocking) {
300         if(getBufferFill() >= m_blocks) {
301             debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) full\n", this, m_name.c_str());
302             // make it wait
303             sem_wait(&m_activity);
304         }
305     } else {
306         // there are no free data blocks, or there is no message space
307         if(getBufferFill() >= m_blocks || !m_ping_queue.canSend()) {
308             debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) full\n", this, m_name.c_str());
309             return eR_Again;
310         }
311     }
312
313     // check for overflow
314     if(m_next_block == m_last_block_ack) {
315         debugWarning("Overwriting not yet read block %u\n", m_next_block);
316         // we have to increment the block_read pointer
317         // in order to keep consistency
318         m_last_block_ack++;
319         if(m_last_block_ack == m_blocks) {
320             m_last_block_ack = 0;
321         }
322     }
323
324     int offset = m_next_block * m_blocksize;
325     *block = m_memblock.requestBlock(offset, m_blocksize);
326     if(*block) {
327         m_block_requested_for_write = true;
328         return eR_OK;
329     } else {
330         return eR_Error;
331     }
332 }
333
334 enum IpcRingBuffer::eResult
335 IpcRingBuffer::releaseBlockForWrite()
336 {
337     if(!m_block_requested_for_write) {
338         debugError("No block requested for write\n");
339         return eR_Error;
340     }
341
342     IpcMessage &m = m_LastDataMessageSent;
343
344     // prepare ping message
345     m.setType(IpcMessage::eMT_DataWritten);
346     m.setDataSize(sizeof(struct DataWrittenMessage));
347
348     // get a view on the data
349     struct DataWrittenMessage* data = reinterpret_cast<struct DataWrittenMessage*>(m.getDataPtr());
350     // set the data contents
351     data->id = m_next_block;
352     data->idx = m_idx;
353
354     debugOutput(DEBUG_LEVEL_VERBOSE, "Releasing block idx %d at id %d\n", data->idx, data->id);
355
356     // send ping message
357     enum PosixMessageQueue::eResult msg_res;
358     msg_res = m_ping_queue.Send(m);
359     switch(msg_res) {
360         case PosixMessageQueue::eR_OK:
361             break;
362         case PosixMessageQueue::eR_Again:
363             // this is a bug since we checked whether it was empty or not
364             debugError("Bad response value\n");
365             m_block_requested_for_write = false;
366             return eR_Error;
367         case PosixMessageQueue::eR_Timeout:
368             debugOutput(DEBUG_LEVEL_VERBOSE, "Timeout\n");
369             m_block_requested_for_write = false;
370             return eR_Timeout; // blocking and no space on time
371         default:
372             debugError("Could not send to ping queue\n");
373             m_block_requested_for_write = false;
374             return eR_Error;
375     }
376
377     // increment and wrap
378     m_next_block++;
379     if(m_next_block == m_blocks) {
380         m_next_block = 0;
381     }
382     m_idx++;
383     m_block_requested_for_write = false;
384     return eR_OK;
385 }
386
387 enum IpcRingBuffer::eResult
388 IpcRingBuffer::Write(char *block)
389 {
390     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p,  %s) IpcRingBuffer\n", this, m_name.c_str());
391     if(m_direction == eD_Inward) {
392         debugError("Cannot write to inbound buffer\n");
393         return eR_Error;
394     }
395
396     enum IpcRingBuffer::eResult msg_res;
397     void *xmit_block;
398     // request a block for reading
399     msg_res = requestBlockForWrite(&xmit_block);
400     if(msg_res == eR_OK) {
401         // if we receive a eR_OK, we should always be able to write to the shared memory
402         memcpy(xmit_block, block, m_blocksize);
403         releaseBlockForWrite();
404     }
405     return msg_res;
406 }
407
408 enum IpcRingBuffer::eResult
409 IpcRingBuffer::requestBlockForRead(void **block)
410 {
411     if(m_block_requested_for_read) {
412         debugError("Already a block requested for read\n");
413         return eR_Error;
414     }
415     // message placeholder
416     IpcMessage &m = m_LastDataMessageReceived;
417
418     // read ping message (blocks)
419     enum PosixMessageQueue::eResult msg_res;
420     msg_res = m_ping_queue.Receive(m);
421     switch(msg_res) {
422         case PosixMessageQueue::eR_OK:
423             break;
424         case PosixMessageQueue::eR_Again:
425             return eR_Again; // non-blocking and no message
426         case PosixMessageQueue::eR_Timeout:
427             debugOutput(DEBUG_LEVEL_VERBOSE, "Timeout\n");
428             return eR_Timeout; // blocking and no message on time
429         default:
430             debugError("Could not read from ping queue\n");
431             return eR_Error;
432     }
433
434     IpcMessage::eMessageType type = m.getType();
435     if(type == IpcMessage::eMT_DataWritten) {
436         // get a view on the data
437         struct DataWrittenMessage* data = reinterpret_cast<struct DataWrittenMessage*>(m.getDataPtr());
438         debugOutput(DEBUG_LEVEL_VERBOSE, "Requested block idx %d at id %d\n", data->idx, data->id);
439
440         // check counters
441         if(data->id != m_next_block) {
442             debugWarning("unexpected block id: %d (expected %d)\n", data->id, m_next_block);
443         }
444         if(data->idx != m_idx) {
445             debugWarning("unexpected block idx: %d (expected %d)\n", data->idx, m_idx);
446         }
447
448         int offset = data->id * m_blocksize;
449         *block = m_memblock.requestBlock(offset, m_blocksize);
450         if(*block) {
451             m_block_requested_for_read = true;
452             return eR_OK;
453         } else {
454             return eR_Error;
455         }
456     } else {
457         debugError("Invalid message received (type %d)\n", type);
458         return eR_Error;
459     }
460 }
461
462 enum IpcRingBuffer::eResult
463 IpcRingBuffer::releaseBlockForRead()
464 {
465     if(!m_block_requested_for_read) {
466         debugError("No block requested for read\n");
467         return eR_Error;
468     }
469
470     IpcMessage &m = m_LastDataMessageReceived;
471
472     // get a view on the data
473     struct DataWrittenMessage* data = reinterpret_cast<struct DataWrittenMessage*>(m.getDataPtr());
474     debugOutput(DEBUG_LEVEL_VERBOSE, "Releasing block idx %d at id %d\n", data->idx, data->id);
475
476     // write a response to the pong queue
477     // reuse the message
478     m.setType(IpcMessage::eMT_DataAck);
479     enum PosixMessageQueue::eResult msg_res;
480     msg_res = m_pong_queue.Send(m);
481     switch(msg_res) {
482         case PosixMessageQueue::eR_OK:
483             break;
484         case PosixMessageQueue::eR_Again:
485             debugOutput(DEBUG_LEVEL_VERBOSE, "Again on ACK\n");
486 //             return eR_Again; // non-blocking and no message
487         case PosixMessageQueue::eR_Timeout:
488             debugOutput(DEBUG_LEVEL_VERBOSE, "Timeout on ACK\n");
489 //             return eR_Timeout; // blocking and no message on time
490         default:
491             debugError("Could not write to pong queue\n");
492             m_block_requested_for_read = false;
493             return eR_Error;
494     }
495
496     // prepare the next expected values
497     m_next_block = data->id + 1;
498     if(m_next_block == m_blocks) {
499         m_next_block = 0;
500     }
501     m_idx = data->idx + 1;
502     m_block_requested_for_read = false;
503     return eR_OK;
504 }
505
506 enum IpcRingBuffer::eResult
507 IpcRingBuffer::Read(char *block)
508 {
509     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) IpcRingBuffer %s\n", this, m_name.c_str());
510     if(m_direction == eD_Outward) {
511         debugError("Cannot read from outward buffer\n");
512         return eR_Error;
513     }
514
515     enum IpcRingBuffer::eResult msg_res;
516     void *rcv_block;
517     // request a block for reading
518     msg_res = requestBlockForRead(&rcv_block);
519     if(msg_res == eR_OK) {
520         // if we receive a eR_OK, we should always be able to read the shared memory
521         memcpy(block, rcv_block, m_blocksize);
522         releaseBlockForRead();
523     }
524     return msg_res;
525 }
526
527 void
528 IpcRingBuffer::show()
529 {
530     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) IpcRingBuffer %s\n", this, m_name.c_str());
531 }
532
533 void
534 IpcRingBuffer::setVerboseLevel(int i)
535 {
536     setDebugLevel(i);
537     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) verbose: %d\n", this, m_name.c_str(), i);
538     m_ping_queue.setVerboseLevel(i);
539     m_pong_queue.setVerboseLevel(i);
540     m_memblock.setVerboseLevel(i);
541     m_access_lock.setVerboseLevel(i);
542 }
543
544 bool
545 IpcRingBuffer::IpcMessage::serialize(char *buff)
546 {
547     memcpy(buff, &m_header, sizeof(m_header));
548     buff += sizeof(m_header);
549     memcpy(buff, m_data, m_data_len);
550     return true;
551 }
552 bool
553 IpcRingBuffer::IpcMessage::deserialize(const char *buff, unsigned int length, unsigned prio)
554 {
555     assert(length >= sizeof(m_header));
556     memcpy(&m_header, buff, sizeof(m_header));
557
558     if(m_header.magic != FFADO_IPC_RINGBUFFER_MAGIC) {
559         return false; // invalid magic
560     }
561     if(m_header.version != FFADO_IPC_RINGBUFFER_VERSION) {
562         return false; // invalid version
563     }
564
565     m_data_len = length - sizeof(m_header);
566     buff += sizeof(m_header);
567
568     assert(m_data_len <= FFADO_IPC_MAX_MESSAGE_SIZE);
569     memcpy(m_data, buff, m_data_len);
570
571     m_priority = prio;
572     return true;
573 }
574
575
576 } // Util
Note: See TracBrowser for help on using the browser.