root/trunk/libffado/src/libutil/IpcRingBuffer.cpp

Revision 1172, 19.7 kB (checked in by ppalmers, 15 years ago)

lay down the foundations for easy ALSA/Pulse support

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "IpcRingBuffer.h"
25 #include "PosixMessageQueue.h"
26 #include "PosixSharedMemory.h"
27 #include "Mutex.h"
28 #include "PosixMutex.h"
29 #include "Functors.h"
30
31 // FIXME: if we restrict the nb_blocks to a power of two, the overflows
32 //        can be implemented using masks
33
34 namespace Util {
35
36 IMPL_DEBUG_MODULE( IpcRingBuffer, IpcRingBuffer, DEBUG_LEVEL_VERBOSE );
37
38 IpcRingBuffer::IpcRingBuffer(std::string name,
39                              enum eBufferType type,
40                              enum eDirection dir,
41                              enum eBlocking blocking,
42                              unsigned int blocks, unsigned int block_size)
43 : m_name(name)
44 , m_blocks(blocks)
45 , m_blocksize(block_size)
46 , m_type( type )
47 , m_direction( dir )
48 , m_blocking( blocking )
49 , m_initialized( false )
50 , m_next_block( 1 )
51 , m_last_block_ack( 0 )
52 , m_idx( 1 )
53 , m_last_idx_ack( 0 )
54 , m_ping_queue( *(new PosixMessageQueue(name+":ping")) )
55 , m_pong_queue( *(new PosixMessageQueue(name+":pong")) )
56 , m_memblock( *(new PosixSharedMemory(name+":mem", blocks*block_size)) )
57 , m_access_lock( *(new PosixMutex()) )
58 , m_notify_functor( *(new MemberFunctor0< IpcRingBuffer*, void (IpcRingBuffer::*)() >
59                      ( this, &IpcRingBuffer::notificationHandler, false )) )
60 , m_block_requested_for_read( false )
61 , m_block_requested_for_write( false )
62 {
63     m_ping_queue.setVerboseLevel(getDebugLevel());
64     m_pong_queue.setVerboseLevel(getDebugLevel());
65     m_memblock.setVerboseLevel(getDebugLevel());
66     m_access_lock.setVerboseLevel(getDebugLevel());
67     sem_init(&m_activity, 0, 0);
68 }
69
70 IpcRingBuffer::~IpcRingBuffer()
71 {
72     // make sure everyone is done with this
73     // should not be necessary AFAIK
74     m_access_lock.Lock();
75     m_initialized=false;
76     delete &m_memblock;
77     delete &m_ping_queue;
78     delete &m_pong_queue;
79     m_access_lock.Unlock();
80
81     delete &m_access_lock;
82     delete &m_notify_functor;
83     sem_destroy(&m_activity);
84 }
85
86 bool
87 IpcRingBuffer::init()
88 {
89     if(m_initialized) {
90         debugError("(%p, %s) Already initialized\n",
91                    this, m_name.c_str());
92         return false;
93     }
94
95     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) init %s\n", this, m_name.c_str());
96     switch(m_type) {
97         case eBT_Master:
98             // a master creates and owns all of the shared memory structures
99             // for outward connections we write the data, else we read
100             if(!m_memblock.Create( PosixSharedMemory::eD_ReadWrite ))
101             {
102                 debugError("(%p, %s) Could not create memblock\n",
103                            this, m_name.c_str());
104                 return false;
105             }
106
107             m_memblock.LockInMemory(true);
108             // for outward connections we do the pinging, else
109             // we do the pong-ing. note that for writing, we open read-write
110             // in order to be able to dequeue when the queue is full
111             if(!m_ping_queue.Create( (m_direction == eD_Outward ?
112                                        PosixMessageQueue::eD_ReadWrite :
113                                        PosixMessageQueue::eD_ReadOnly),
114                                      (m_blocking==eB_Blocking ?
115                                        PosixMessageQueue::eB_Blocking :
116                                        PosixMessageQueue::eB_NonBlocking)
117                                    ))
118             {
119                 debugError("(%p, %s) Could not create ping queue\n",
120                            this, m_name.c_str());
121                 return false;
122             }
123             if(!m_pong_queue.Create( (m_direction == eD_Outward ?
124                                        PosixMessageQueue::eD_ReadOnly :
125                                        PosixMessageQueue::eD_ReadWrite),
126                                      (m_blocking==eB_Blocking ?
127                                        PosixMessageQueue::eB_Blocking :
128                                        PosixMessageQueue::eB_NonBlocking)
129                                    ))
130             {
131                 debugError("(%p, %s) Could not create pong queue\n",
132                            this, m_name.c_str());
133                 return false;
134             }
135             break;
136         case eBT_Slave:
137             // a slave only opens the shared memory structures
138             // for outward connections we write the data, else we read
139             if(!m_memblock.Open( (m_direction == eD_Outward
140                                     ? PosixSharedMemory::eD_ReadWrite
141                                     : PosixSharedMemory::eD_ReadOnly) ))
142             {
143                 debugError("(%p, %s) Could not open memblock\n",
144                            this, m_name.c_str());
145                 return false;
146             }
147             m_memblock.LockInMemory(true);
148             // for outward connections we do the pinging, else
149             // we do the pong-ing. note that for writing, we open read-write
150             // in order to be able to dequeue when the queue is full
151             if(!m_ping_queue.Open( (m_direction == eD_Outward ?
152                                       PosixMessageQueue::eD_ReadWrite :
153                                       PosixMessageQueue::eD_ReadOnly),
154                                    (m_blocking==eB_Blocking ?
155                                       PosixMessageQueue::eB_Blocking :
156                                       PosixMessageQueue::eB_NonBlocking)
157                                  ))
158             {
159                 debugError("(%p, %s) Could not open ping queue\n",
160                            this, m_name.c_str());
161                 return false;
162             }
163             if(!m_pong_queue.Open( (m_direction == eD_Outward ?
164                                       PosixMessageQueue::eD_ReadOnly :
165                                       PosixMessageQueue::eD_ReadWrite),
166                                    (m_blocking==eB_Blocking ?
167                                       PosixMessageQueue::eB_Blocking :
168                                       PosixMessageQueue::eB_NonBlocking)
169                                  ))
170             {
171                 debugError("(%p, %s) Could not open pong queue\n",
172                            this, m_name.c_str());
173                 return false;
174             }
175             break;
176     }
177
178     // if we are on the sending end of the buffer, we need a notifier
179     // on the pong queue
180     // the receiving end is driven by the messages in the ping queue
181     if(m_direction == eD_Outward) {
182         if(!m_pong_queue.setNotificationHandler(&m_notify_functor)) {
183             debugError("Could not set Notification Handler\n");
184             return false;
185         }
186         // enable the handler
187         if(!m_pong_queue.enableNotification()) {
188             debugError("Could not enable notification\n");
189         }
190     }
191
192     m_initialized = true;
193     return true;
194 }
195
196 void
197 IpcRingBuffer::notificationHandler()
198 {
199     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) IpcRingBuffer %s\n", this, m_name.c_str());
200     // prevent multiple access
201     MutexLockHelper lock(m_access_lock);
202
203     // The first thing we do is re-enable the handler
204     // it is not going to be called since there are messages in the queue
205     // first enabling the handler, then reading all received messages will
206     // ensure that we either read or get notified of any message that arrives
207     // while this handler is running
208     if(!m_pong_queue.enableNotification()) {
209         debugError("Could not re-enable notification\n");
210     }
211
212     // no need for a lock protecting the pong queue as long as we are the only ones reading it
213     // while we have messages to read, read them
214     while(m_pong_queue.canReceive()) {
215         // message placeholder
216         IpcMessage m_ack = IpcMessage(); // FIXME: stack allocation not strictly RT safe
217         // read ping message (blocks)
218         enum PosixMessageQueue::eResult msg_res;
219         msg_res = m_pong_queue.Receive(m_ack);
220         switch(msg_res) {
221             case PosixMessageQueue::eR_OK:
222                 break;
223             default: // we were just notified, anything except OK is an error
224                 debugError("Could not read from ping queue\n");
225         }
226    
227         IpcMessage::eMessageType type = m_ack.getType();
228         if(type == IpcMessage::eMT_DataAck) {
229             // get a view on the data
230             struct DataWrittenMessage* data = reinterpret_cast<struct DataWrittenMessage*>(m_ack.getDataPtr());
231    
232             debugOutput(DEBUG_LEVEL_VERBOSE, "Received ack idx %d at id %d\n", data->idx, data->id);
233    
234             // check counters
235             unsigned int expected_block_ack = m_last_block_ack+1;
236             if(expected_block_ack == m_blocks) expected_block_ack = 0;
237             if(data->id != expected_block_ack) {
238                 debugWarning("unexpected block id: %d (expected %d)\n", data->id, expected_block_ack);
239             }
240
241             unsigned int expected_block_idx = m_last_idx_ack+1; //will auto-overflow
242             if(data->idx != expected_block_idx) {
243                 debugWarning("unexpected block idx: %d (expected %d)\n", data->idx, expected_block_idx);
244             }
245    
246             // prepare the next expected values
247             // this is the only value used (and written in case of error) in the other thread
248             m_last_block_ack = data->id;
249             // this is not used
250             m_last_idx_ack = data->idx;
251             // signal activity
252             if(m_blocking == eB_Blocking) {
253                 sem_post(&m_activity);
254             }
255         } else {
256             debugError("Invalid message received (type %d)\n", type);
257         }
258     }
259 }
260
261 unsigned int
262 IpcRingBuffer::getBufferFill()
263 {
264     // the next pointer is last_written+1
265     // so the bufferfill = last_written - last_ack
266     //                   = last_written+1 - last_ack - 1
267     // last_ack is always <= last_written. if not,
268     // last_written has wrapped
269     // => wrap if: last_written < last_ack
270     //          or last_written+1 < last_ack+1
271     //          or m_next_block < last_ack+1
272     // unwrap if this happens
273     int bufferfill = m_next_block - m_last_block_ack - 1;
274     if(m_next_block <= m_last_block_ack) {
275         bufferfill += m_blocks;
276     }
277     assert(bufferfill>=0);
278     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) fill: %d\n", this, m_name.c_str(), bufferfill);
279     return (unsigned int)bufferfill;
280 }
281
282
283 enum IpcRingBuffer::eResult
284 IpcRingBuffer::requestBlockForWrite(void **block)
285 {
286     if(m_block_requested_for_write) {
287         debugError("Already a block requested for write\n");
288         return eR_Error;
289     }
290
291     // check if we can write a message
292     // we can send when:
293     //  - we are not overwriting
294     //    AND -    we are in blocking mode and
295     //        - OR (we are in non-blocking mode and there is space)
296
297     if(m_blocking == eB_Blocking) {
298         if(getBufferFill() >= m_blocks) {
299             debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) full\n", this, m_name.c_str());
300             // make it wait
301             sem_wait(&m_activity);
302         }
303     } else {
304         // there are no free data blocks, or there is no message space
305         if(getBufferFill() >= m_blocks || !m_ping_queue.canSend()) {
306             debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) full\n", this, m_name.c_str());
307             return eR_Again;
308         }
309     }
310
311     // check for overflow
312     if(m_next_block == m_last_block_ack) {
313         debugWarning("Overwriting not yet read block %u\n", m_next_block);
314         // we have to increment the block_read pointer
315         // in order to keep consistency
316         m_last_block_ack++;
317         if(m_last_block_ack == m_blocks) {
318             m_last_block_ack = 0;
319         }
320     }
321
322     int offset = m_next_block * m_blocksize;
323     *block = m_memblock.requestBlock(offset, m_blocksize);
324     if(*block) {
325         m_block_requested_for_write = true;
326         return eR_OK;
327     } else {
328         return eR_Error;
329     }
330 }
331
332 enum IpcRingBuffer::eResult
333 IpcRingBuffer::releaseBlockForWrite()
334 {
335     if(!m_block_requested_for_write) {
336         debugError("No block requested for write\n");
337         return eR_Error;
338     }
339
340     IpcMessage &m = m_LastDataMessageSent;
341
342     // prepare ping message
343     m.setType(IpcMessage::eMT_DataWritten);
344     m.setDataSize(sizeof(struct DataWrittenMessage));
345
346     // get a view on the data
347     struct DataWrittenMessage* data = reinterpret_cast<struct DataWrittenMessage*>(m.getDataPtr());
348     // set the data contents
349     data->id = m_next_block;
350     data->idx = m_idx;
351
352     debugOutput(DEBUG_LEVEL_VERBOSE, "Releasing block idx %d at id %d\n", data->idx, data->id);
353
354     // send ping message
355     enum PosixMessageQueue::eResult msg_res;
356     msg_res = m_ping_queue.Send(m);
357     switch(msg_res) {
358         case PosixMessageQueue::eR_OK:
359             break;
360         case PosixMessageQueue::eR_Again:
361             // this is a bug since we checked whether it was empty or not
362             debugError("Bad response value\n");
363             m_block_requested_for_write = false;
364             return eR_Error;
365         case PosixMessageQueue::eR_Timeout:
366             debugOutput(DEBUG_LEVEL_VERBOSE, "Timeout\n");
367             m_block_requested_for_write = false;
368             return eR_Timeout; // blocking and no space on time
369         default:
370             debugError("Could not send to ping queue\n");
371             m_block_requested_for_write = false;
372             return eR_Error;
373     }
374
375     // increment and wrap
376     m_next_block++;
377     if(m_next_block == m_blocks) {
378         m_next_block = 0;
379     }
380     m_idx++;
381     m_block_requested_for_write = false;
382     return eR_OK;
383 }
384
385 enum IpcRingBuffer::eResult
386 IpcRingBuffer::Write(char *block)
387 {
388     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p,  %s) IpcRingBuffer\n", this, m_name.c_str());
389     if(m_direction == eD_Inward) {
390         debugError("Cannot write to inbound buffer\n");
391         return eR_Error;
392     }
393
394     enum IpcRingBuffer::eResult msg_res;
395     void *xmit_block;
396     // request a block for reading
397     msg_res = requestBlockForWrite(&xmit_block);
398     if(msg_res == eR_OK) {
399         // if we receive a eR_OK, we should always be able to write to the shared memory
400         memcpy(xmit_block, block, m_blocksize);
401         releaseBlockForWrite();
402     }
403     return msg_res;
404 }
405
406 enum IpcRingBuffer::eResult
407 IpcRingBuffer::requestBlockForRead(void **block)
408 {
409     if(m_block_requested_for_read) {
410         debugError("Already a block requested for read\n");
411         return eR_Error;
412     }
413     // message placeholder
414     IpcMessage &m = m_LastDataMessageReceived;
415
416     // read ping message (blocks)
417     enum PosixMessageQueue::eResult msg_res;
418     msg_res = m_ping_queue.Receive(m);
419     switch(msg_res) {
420         case PosixMessageQueue::eR_OK:
421             break;
422         case PosixMessageQueue::eR_Again:
423             return eR_Again; // non-blocking and no message
424         case PosixMessageQueue::eR_Timeout:
425             debugOutput(DEBUG_LEVEL_VERBOSE, "Timeout\n");
426             return eR_Timeout; // blocking and no message on time
427         default:
428             debugError("Could not read from ping queue\n");
429             return eR_Error;
430     }
431
432     IpcMessage::eMessageType type = m.getType();
433     if(type == IpcMessage::eMT_DataWritten) {
434         // get a view on the data
435         struct DataWrittenMessage* data = reinterpret_cast<struct DataWrittenMessage*>(m.getDataPtr());
436         debugOutput(DEBUG_LEVEL_VERBOSE, "Requested block idx %d at id %d\n", data->idx, data->id);
437
438         // check counters
439         if(data->id != m_next_block) {
440             debugWarning("unexpected block id: %d (expected %d)\n", data->id, m_next_block);
441         }
442         if(data->idx != m_idx) {
443             debugWarning("unexpected block idx: %d (expected %d)\n", data->idx, m_idx);
444         }
445
446         int offset = data->id * m_blocksize;
447         *block = m_memblock.requestBlock(offset, m_blocksize);
448         if(*block) {
449             m_block_requested_for_read = true;
450             return eR_OK;
451         } else {
452             return eR_Error;
453         }
454     } else {
455         debugError("Invalid message received (type %d)\n", type);
456         return eR_Error;
457     }
458 }
459
460 enum IpcRingBuffer::eResult
461 IpcRingBuffer::releaseBlockForRead()
462 {
463     if(!m_block_requested_for_read) {
464         debugError("No block requested for read\n");
465         return eR_Error;
466     }
467
468     IpcMessage &m = m_LastDataMessageReceived;
469
470     // get a view on the data
471     struct DataWrittenMessage* data = reinterpret_cast<struct DataWrittenMessage*>(m.getDataPtr());
472     debugOutput(DEBUG_LEVEL_VERBOSE, "Releasing block idx %d at id %d\n", data->idx, data->id);
473
474     // write a response to the pong queue
475     // reuse the message
476     m.setType(IpcMessage::eMT_DataAck);
477     enum PosixMessageQueue::eResult msg_res;
478     msg_res = m_pong_queue.Send(m);
479     switch(msg_res) {
480         case PosixMessageQueue::eR_OK:
481             break;
482         case PosixMessageQueue::eR_Again:
483             debugOutput(DEBUG_LEVEL_VERBOSE, "Again on ACK\n");
484 //             return eR_Again; // non-blocking and no message
485         case PosixMessageQueue::eR_Timeout:
486             debugOutput(DEBUG_LEVEL_VERBOSE, "Timeout on ACK\n");
487 //             return eR_Timeout; // blocking and no message on time
488         default:
489             debugError("Could not write to pong queue\n");
490             m_block_requested_for_read = false;
491             return eR_Error;
492     }
493
494     // prepare the next expected values
495     m_next_block = data->id + 1;
496     if(m_next_block == m_blocks) {
497         m_next_block = 0;
498     }
499     m_idx = data->idx + 1;
500     m_block_requested_for_read = false;
501     return eR_OK;
502 }
503
504 enum IpcRingBuffer::eResult
505 IpcRingBuffer::Read(char *block)
506 {
507     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) IpcRingBuffer %s\n", this, m_name.c_str());
508     if(m_direction == eD_Outward) {
509         debugError("Cannot read from outward buffer\n");
510         return eR_Error;
511     }
512
513     enum IpcRingBuffer::eResult msg_res;
514     void *rcv_block;
515     // request a block for reading
516     msg_res = requestBlockForRead(&rcv_block);
517     if(msg_res == eR_OK) {
518         // if we receive a eR_OK, we should always be able to read the shared memory
519         memcpy(block, rcv_block, m_blocksize);
520         releaseBlockForRead();
521     }
522     return msg_res;
523 }
524
525 void
526 IpcRingBuffer::show()
527 {
528     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) IpcRingBuffer %s\n", this, m_name.c_str());
529 }
530
531 void
532 IpcRingBuffer::setVerboseLevel(int i)
533 {
534     setDebugLevel(i);
535     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) verbose: %d\n", this, m_name.c_str(), i);
536     m_ping_queue.setVerboseLevel(i);
537     m_pong_queue.setVerboseLevel(i);
538     m_memblock.setVerboseLevel(i);
539     m_access_lock.setVerboseLevel(i);
540 }
541
542 bool
543 IpcRingBuffer::IpcMessage::serialize(char *buff)
544 {
545     memcpy(buff, &m_header, sizeof(m_header));
546     buff += sizeof(m_header);
547     memcpy(buff, m_data, m_data_len);
548     return true;
549 }
550 bool
551 IpcRingBuffer::IpcMessage::deserialize(const char *buff, unsigned int length, unsigned prio)
552 {
553     assert(length >= sizeof(m_header));
554     memcpy(&m_header, buff, sizeof(m_header));
555
556     if(m_header.magic != FFADO_IPC_RINGBUFFER_MAGIC) {
557         return false; // invalid magic
558     }
559     if(m_header.version != FFADO_IPC_RINGBUFFER_VERSION) {
560         return false; // invalid version
561     }
562
563     m_data_len = length - sizeof(m_header);
564     buff += sizeof(m_header);
565
566     assert(m_data_len <= FFADO_IPC_MAX_MESSAGE_SIZE);
567     memcpy(m_data, buff, m_data_len);
568
569     m_priority = prio;
570     return true;
571 }
572
573
574 } // Util
Note: See TracBrowser for help on using the browser.