root/trunk/libffado/src/libutil/IpcRingBuffer.cpp

Revision 1240, 20.8 kB (checked in by ppalmers, 13 years ago)

fix bugs in IPC comms. Add preliminary FFADO-IPC ALSA plugin

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "IpcRingBuffer.h"
25 #include "PosixMessageQueue.h"
26 #include "PosixSharedMemory.h"
27 #include "Mutex.h"
28 #include "PosixMutex.h"
29 #include "Functors.h"
30
31 #include <cstring>
32
33 // FIXME: if we restrict the nb_blocks to a power of two, the overflows
34 //        can be implemented using masks
35
36 namespace Util {
37
38 IMPL_DEBUG_MODULE( IpcRingBuffer, IpcRingBuffer, DEBUG_LEVEL_VERBOSE );
39
40 IpcRingBuffer::IpcRingBuffer(std::string name,
41                              enum eBufferType type,
42                              enum eDirection dir,
43                              enum eBlocking blocking,
44                              unsigned int blocks, unsigned int block_size)
45 : m_name(name)
46 , m_blocks(blocks)
47 , m_blocksize(block_size)
48 , m_type( type )
49 , m_direction( dir )
50 , m_blocking( blocking )
51 , m_initialized( false )
52 , m_next_block( 1 )
53 , m_last_block_ack( 0 )
54 , m_idx( 1 )
55 , m_last_idx_ack( 0 )
56 , m_ping_queue( *(new PosixMessageQueue(name+":ping")) )
57 , m_pong_queue( *(new PosixMessageQueue(name+":pong")) )
58 , m_memblock( *(new PosixSharedMemory(name+":mem", blocks*block_size)) )
59 , m_access_lock( *(new PosixMutex()) )
60 , m_notify_functor( *(new MemberFunctor0< IpcRingBuffer*, void (IpcRingBuffer::*)() >
61                      ( this, &IpcRingBuffer::notificationHandler, false )) )
62 , m_block_requested_for_read( *(new PosixMutex()) )
63 , m_block_requested_for_write( *(new PosixMutex()) )
64 {
65     m_ping_queue.setVerboseLevel(getDebugLevel());
66     m_pong_queue.setVerboseLevel(getDebugLevel());
67     m_memblock.setVerboseLevel(getDebugLevel());
68     m_access_lock.setVerboseLevel(getDebugLevel());
69     sem_init(&m_activity, 0, 0);
70 }
71
72 IpcRingBuffer::~IpcRingBuffer()
73 {
74     // make sure everyone is done with this
75     // should not be necessary AFAIK
76     m_access_lock.Lock();
77     m_initialized=false;
78     delete &m_memblock;
79     delete &m_ping_queue;
80     delete &m_pong_queue;
81     m_access_lock.Unlock();
82
83     delete &m_access_lock;
84     delete &m_notify_functor;
85     sem_destroy(&m_activity);
86 }
87
88 bool
89 IpcRingBuffer::init()
90 {
91     if(m_initialized) {
92         debugError("(%p, %s) Already initialized\n",
93                    this, m_name.c_str());
94         return false;
95     }
96
97     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) init %s\n", this, m_name.c_str());
98     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) direction %d, %d blocks of %d bytes\n",
99                                      this, m_direction, m_blocks, m_blocksize);
100     switch(m_type) {
101         case eBT_Master:
102             // a master creates and owns all of the shared memory structures
103             // for outward connections we write the data, else we read
104             if(!m_memblock.Create( PosixSharedMemory::eD_ReadWrite ))
105             {
106                 debugError("(%p, %s) Could not create memblock\n",
107                            this, m_name.c_str());
108                 return false;
109             }
110
111             m_memblock.LockInMemory(true);
112             // for outward connections we do the pinging, else
113             // we do the pong-ing. note that for writing, we open read-write
114             // in order to be able to dequeue when the queue is full
115             if(!m_ping_queue.Create( (m_direction == eD_Outward ?
116                                        PosixMessageQueue::eD_ReadWrite :
117                                        PosixMessageQueue::eD_ReadOnly),
118                                      (m_blocking==eB_Blocking ?
119                                        PosixMessageQueue::eB_Blocking :
120                                        PosixMessageQueue::eB_NonBlocking)
121                                    ))
122             {
123                 debugError("(%p, %s) Could not create ping queue\n",
124                            this, m_name.c_str());
125                 return false;
126             }
127             if(!m_pong_queue.Create( (m_direction == eD_Outward ?
128                                        PosixMessageQueue::eD_ReadOnly :
129                                        PosixMessageQueue::eD_ReadWrite),
130                                      (m_blocking==eB_Blocking ?
131                                        PosixMessageQueue::eB_Blocking :
132                                        PosixMessageQueue::eB_NonBlocking)
133                                    ))
134             {
135                 debugError("(%p, %s) Could not create pong queue\n",
136                            this, m_name.c_str());
137                 return false;
138             }
139             break;
140         case eBT_Slave:
141             // a slave only opens the shared memory structures
142             // for outward connections we write the data, else we read
143             if(!m_memblock.Open( (m_direction == eD_Outward
144                                     ? PosixSharedMemory::eD_ReadWrite
145                                     : PosixSharedMemory::eD_ReadOnly) ))
146             {
147                 debugError("(%p, %s) Could not open memblock\n",
148                            this, m_name.c_str());
149                 return false;
150             }
151             m_memblock.LockInMemory(true);
152             // for outward connections we do the pinging, else
153             // we do the pong-ing. note that for writing, we open read-write
154             // in order to be able to dequeue when the queue is full
155             if(!m_ping_queue.Open( (m_direction == eD_Outward ?
156                                       PosixMessageQueue::eD_ReadWrite :
157                                       PosixMessageQueue::eD_ReadOnly),
158                                    (m_blocking==eB_Blocking ?
159                                       PosixMessageQueue::eB_Blocking :
160                                       PosixMessageQueue::eB_NonBlocking)
161                                  ))
162             {
163                 debugError("(%p, %s) Could not open ping queue\n",
164                            this, m_name.c_str());
165                 return false;
166             }
167             if(!m_pong_queue.Open( (m_direction == eD_Outward ?
168                                       PosixMessageQueue::eD_ReadOnly :
169                                       PosixMessageQueue::eD_ReadWrite),
170                                    (m_blocking==eB_Blocking ?
171                                       PosixMessageQueue::eB_Blocking :
172                                       PosixMessageQueue::eB_NonBlocking)
173                                  ))
174             {
175                 debugError("(%p, %s) Could not open pong queue\n",
176                            this, m_name.c_str());
177                 return false;
178             }
179             break;
180     }
181
182     // if we are on the sending end of the buffer, we need a notifier
183     // on the pong queue
184     // the receiving end is driven by the messages in the ping queue
185     if(m_direction == eD_Outward) {
186         if(!m_pong_queue.setNotificationHandler(&m_notify_functor)) {
187             debugError("Could not set Notification Handler\n");
188             return false;
189         }
190         // enable the handler
191         if(!m_pong_queue.enableNotification()) {
192             debugError("Could not enable notification\n");
193         }
194         // now clear the queue to eliminate messages that might be there
195         // from earlier runs
196         m_pong_queue.Clear();
197     } else {
198         // if we are on the receiving end, clear any waiting messages in the ping
199         // queue
200         m_ping_queue.Clear();
201     }
202    
203     m_initialized = true;
204     return true;
205 }
206
207 void
208 IpcRingBuffer::notificationHandler()
209 {
210     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) IpcRingBuffer %s\n", this, m_name.c_str());
211     // prevent multiple access
212     MutexLockHelper lock(m_access_lock);
213
214     // The first thing we do is re-enable the handler
215     // it is not going to be called since there are messages in the queue
216     // first enabling the handler, then reading all received messages will
217     // ensure that we either read or get notified of any message that arrives
218     // while this handler is running
219     if(!m_pong_queue.enableNotification()) {
220         debugError("Could not re-enable notification\n");
221     }
222
223     // no need for a lock protecting the pong queue as long as we are the only ones reading it
224     // while we have messages to read, read them
225     while(m_pong_queue.canReceive()) {
226         // message placeholder
227         IpcMessage m_ack = IpcMessage(); // FIXME: stack allocation not strictly RT safe
228         // read ping message (blocks)
229         enum PosixMessageQueue::eResult msg_res;
230         msg_res = m_pong_queue.Receive(m_ack);
231         switch(msg_res) {
232             case PosixMessageQueue::eR_OK:
233                 break;
234             default: // we were just notified, anything except OK is an error
235                 debugError("Could not read from ping queue\n");
236         }
237    
238         IpcMessage::eMessageType type = m_ack.getType();
239         if(type == IpcMessage::eMT_DataAck) {
240             // get a view on the data
241             struct DataWrittenMessage* data = reinterpret_cast<struct DataWrittenMessage*>(m_ack.getDataPtr());
242    
243             debugOutput(DEBUG_LEVEL_VERBOSE, "Received ack idx %d at id %d\n", data->idx, data->id);
244    
245             // check counters
246             unsigned int expected_block_ack = m_last_block_ack+1;
247             if(expected_block_ack == m_blocks) expected_block_ack = 0;
248             if(data->id != expected_block_ack) {
249                 debugWarning("unexpected block id: %d (expected %d)\n", data->id, expected_block_ack);
250             }
251
252             unsigned int expected_block_idx = m_last_idx_ack+1; //will auto-overflow
253             if(data->idx != expected_block_idx) {
254                 debugWarning("unexpected block idx: %d (expected %d)\n", data->idx, expected_block_idx);
255             }
256    
257             // prepare the next expected values
258             // this is the only value used (and written in case of error) in the other thread
259             m_last_block_ack = data->id;
260             // this is not used
261             m_last_idx_ack = data->idx;
262             // signal activity
263             if(m_blocking == eB_Blocking) {
264                 sem_post(&m_activity);
265             }
266         } else {
267             debugError("Invalid message received (type %d)\n", type);
268         }
269     }
270 }
271
272 unsigned int
273 IpcRingBuffer::getBufferFill()
274 {
275     // the next pointer is last_written+1
276     // so the bufferfill = last_written - last_ack
277     //                   = last_written+1 - last_ack - 1
278     // last_ack is always <= last_written. if not,
279     // last_written has wrapped
280     // => wrap if: last_written < last_ack
281     //          or last_written+1 < last_ack+1
282     //          or m_next_block < last_ack+1
283     // unwrap if this happens
284     int bufferfill = m_next_block - m_last_block_ack - 1;
285     if(m_next_block <= m_last_block_ack) {
286         bufferfill += m_blocks;
287     }
288     assert(bufferfill>=0);
289     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) fill: %d\n", this, m_name.c_str(), bufferfill);
290     return (unsigned int)bufferfill;
291 }
292
293
294 enum IpcRingBuffer::eResult
295 IpcRingBuffer::requestBlockForWrite(void **block)
296 {
297     if(!m_block_requested_for_write.TryLock()) {
298         debugError("Already a block requested for write\n");
299         return eR_Error;
300     }
301
302     // check if we can write a message
303     // we can send when:
304     //  - we are not overwriting
305     //    AND -    we are in blocking mode and
306     //        - OR (we are in non-blocking mode and there is space)
307
308     if(m_blocking == eB_Blocking) {
309         if(getBufferFill() >= m_blocks) {
310             debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) full\n", this, m_name.c_str());
311             // make it wait
312             sem_wait(&m_activity);
313         }
314     } else {
315         // there are no free data blocks, or there is no message space
316         if(getBufferFill() >= m_blocks || !m_ping_queue.canSend()) {
317             debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) full\n", this, m_name.c_str());
318             m_block_requested_for_write.Unlock();
319             return eR_Again;
320         }
321     }
322
323     // check for overflow
324     if(m_next_block == m_last_block_ack) {
325         debugWarning("Overwriting not yet read block %u\n", m_next_block);
326         // we have to increment the block_read pointer
327         // in order to keep consistency
328         m_last_block_ack++;
329         if(m_last_block_ack == m_blocks) {
330             m_last_block_ack = 0;
331         }
332     }
333
334     int offset = m_next_block * m_blocksize;
335     *block = m_memblock.requestBlock(offset, m_blocksize);
336     if(*block) {
337         // keep the lock, to be released by releaseBlockForWrite
338         return eR_OK;
339     } else {
340         m_block_requested_for_write.Unlock();
341         return eR_Error;
342     }
343 }
344
345 enum IpcRingBuffer::eResult
346 IpcRingBuffer::releaseBlockForWrite()
347 {
348     if(!m_block_requested_for_write.isLocked()) {
349         debugError("No block requested for write\n");
350         return eR_Error;
351     }
352
353     IpcMessage &m = m_LastDataMessageSent;
354
355     // prepare ping message
356     m.setType(IpcMessage::eMT_DataWritten);
357     m.setDataSize(sizeof(struct DataWrittenMessage));
358
359     // get a view on the data
360     struct DataWrittenMessage* data = reinterpret_cast<struct DataWrittenMessage*>(m.getDataPtr());
361     // set the data contents
362     data->id = m_next_block;
363     data->idx = m_idx;
364
365     debugOutput(DEBUG_LEVEL_VERBOSE, "Releasing block idx %d at id %d\n", data->idx, data->id);
366
367     // send ping message
368     enum PosixMessageQueue::eResult msg_res;
369     msg_res = m_ping_queue.Send(m);
370     switch(msg_res) {
371         case PosixMessageQueue::eR_OK:
372             break;
373         case PosixMessageQueue::eR_Again:
374             // this is a bug since we checked whether it was empty or not
375             debugError("Bad response value\n");
376             m_block_requested_for_write.Unlock();
377             return eR_Error;
378         case PosixMessageQueue::eR_Timeout:
379             debugOutput(DEBUG_LEVEL_VERBOSE, "Timeout\n");
380             m_block_requested_for_write.Unlock();
381             return eR_Timeout; // blocking and no space on time
382         default:
383             debugError("Could not send to ping queue\n");
384             m_block_requested_for_write.Unlock();
385             return eR_Error;
386     }
387
388     // increment and wrap
389     m_next_block++;
390     if(m_next_block == m_blocks) {
391         m_next_block = 0;
392     }
393     m_idx++;
394     m_block_requested_for_write.Unlock();
395     return eR_OK;
396 }
397
398 enum IpcRingBuffer::eResult
399 IpcRingBuffer::Write(char *block)
400 {
401     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p,  %s) IpcRingBuffer\n", this, m_name.c_str());
402     if(m_direction == eD_Inward) {
403         debugError("Cannot write to inbound buffer\n");
404         return eR_Error;
405     }
406
407     enum IpcRingBuffer::eResult msg_res;
408     void *xmit_block;
409     // request a block for reading
410     msg_res = requestBlockForWrite(&xmit_block);
411     if(msg_res == eR_OK) {
412         // if we receive a eR_OK, we should always be able to write to the shared memory
413         memcpy(xmit_block, block, m_blocksize);
414         releaseBlockForWrite();
415     }
416     return msg_res;
417 }
418
419 enum IpcRingBuffer::eResult
420 IpcRingBuffer::requestBlockForRead(void **block)
421 {
422     if(!m_block_requested_for_read.TryLock()) {
423         debugError("Already a block requested for read\n");
424         return eR_Error;
425     }
426     // message placeholder
427     IpcMessage &m = m_LastDataMessageReceived;
428
429     // read ping message (blocks)
430     enum PosixMessageQueue::eResult msg_res;
431     msg_res = m_ping_queue.Receive(m);
432     switch(msg_res) {
433         case PosixMessageQueue::eR_OK:
434             break;
435         case PosixMessageQueue::eR_Again:
436             m_block_requested_for_read.Unlock();
437             return eR_Again; // non-blocking and no message
438         case PosixMessageQueue::eR_Timeout:
439             debugOutput(DEBUG_LEVEL_VERBOSE, "Timeout\n");
440             m_block_requested_for_read.Unlock();
441             return eR_Timeout; // blocking and no message on time
442         default:
443             debugError("Could not read from ping queue\n");
444             m_block_requested_for_read.Unlock();
445             return eR_Error;
446     }
447
448     IpcMessage::eMessageType type = m.getType();
449     if(type == IpcMessage::eMT_DataWritten) {
450         // get a view on the data
451         struct DataWrittenMessage* data = reinterpret_cast<struct DataWrittenMessage*>(m.getDataPtr());
452         debugOutput(DEBUG_LEVEL_VERBOSE, "Requested block idx %d at id %d\n", data->idx, data->id);
453
454         // check counters
455         if(data->id != m_next_block) {
456             debugWarning("unexpected block id: %d (expected %d)\n", data->id, m_next_block);
457         }
458         if(data->idx != m_idx) {
459             debugWarning("unexpected block idx: %d (expected %d)\n", data->idx, m_idx);
460         }
461
462         int offset = data->id * m_blocksize;
463         *block = m_memblock.requestBlock(offset, m_blocksize);
464         if(*block) {
465             // keep the mutex locked, we expect the thread that grabbed the block to also return it
466             return eR_OK;
467         } else {
468             m_block_requested_for_read.Unlock();
469             return eR_Error;
470         }
471     } else {
472         debugError("Invalid message received (type %d)\n", type);
473         m_block_requested_for_read.Unlock();
474         return eR_Error;
475     }
476 }
477
478 enum IpcRingBuffer::eResult
479 IpcRingBuffer::releaseBlockForRead()
480 {
481     if(!m_block_requested_for_read.isLocked()) {
482         debugError("No block requested for read\n");
483         return eR_Error;
484     }
485
486     IpcMessage &m = m_LastDataMessageReceived;
487
488     // get a view on the data
489     struct DataWrittenMessage* data = reinterpret_cast<struct DataWrittenMessage*>(m.getDataPtr());
490     debugOutput(DEBUG_LEVEL_VERBOSE, "Releasing block idx %d at id %d\n", data->idx, data->id);
491
492     // write a response to the pong queue
493     // reuse the message
494     m.setType(IpcMessage::eMT_DataAck);
495     enum PosixMessageQueue::eResult msg_res;
496     msg_res = m_pong_queue.Send(m);
497     switch(msg_res) {
498         case PosixMessageQueue::eR_OK:
499             break;
500         case PosixMessageQueue::eR_Again:
501             m_block_requested_for_read.Unlock(); // FIXME: this is not very correct
502             debugOutput(DEBUG_LEVEL_VERBOSE, "Again on ACK\n");
503             return eR_Again; // non-blocking and no message
504         case PosixMessageQueue::eR_Timeout:
505             m_block_requested_for_read.Unlock();
506             debugOutput(DEBUG_LEVEL_VERBOSE, "Timeout on ACK\n");
507             return eR_Timeout; // blocking and no message on time
508         default:
509             debugError("Could not write to pong queue\n");
510             m_block_requested_for_read.Unlock();
511             return eR_Error;
512     }
513
514     // prepare the next expected values
515     m_next_block = data->id + 1;
516     if(m_next_block == m_blocks) {
517         m_next_block = 0;
518     }
519     m_idx = data->idx + 1;
520
521     m_block_requested_for_read.Unlock();
522     return eR_OK;
523 }
524
525 enum IpcRingBuffer::eResult
526 IpcRingBuffer::Read(char *block)
527 {
528     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) IpcRingBuffer %s\n", this, m_name.c_str());
529     if(m_direction == eD_Outward) {
530         debugError("Cannot read from outward buffer\n");
531         return eR_Error;
532     }
533
534     enum IpcRingBuffer::eResult msg_res;
535     void *rcv_block;
536     // request a block for reading
537     msg_res = requestBlockForRead(&rcv_block);
538     if(msg_res == eR_OK) {
539         // if we receive a eR_OK, we should always be able to read the shared memory
540         memcpy(block, rcv_block, m_blocksize);
541         releaseBlockForRead();
542     }
543     return msg_res;
544 }
545
546 void
547 IpcRingBuffer::show()
548 {
549     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) IpcRingBuffer %s\n", this, m_name.c_str());
550 }
551
552 void
553 IpcRingBuffer::setVerboseLevel(int i)
554 {
555     setDebugLevel(i);
556     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) verbose: %d\n", this, m_name.c_str(), i);
557     m_ping_queue.setVerboseLevel(i);
558     m_pong_queue.setVerboseLevel(i);
559     m_memblock.setVerboseLevel(i);
560     m_access_lock.setVerboseLevel(i);
561 }
562
563 bool
564 IpcRingBuffer::IpcMessage::serialize(char *buff)
565 {
566     memcpy(buff, &m_header, sizeof(m_header));
567     buff += sizeof(m_header);
568     memcpy(buff, m_data, m_data_len);
569     return true;
570 }
571 bool
572 IpcRingBuffer::IpcMessage::deserialize(const char *buff, unsigned int length, unsigned prio)
573 {
574     assert(length >= sizeof(m_header));
575     memcpy(&m_header, buff, sizeof(m_header));
576
577     if(m_header.magic != FFADO_IPC_RINGBUFFER_MAGIC) {
578         return false; // invalid magic
579     }
580     if(m_header.version != FFADO_IPC_RINGBUFFER_VERSION) {
581         return false; // invalid version
582     }
583
584     m_data_len = length - sizeof(m_header);
585     buff += sizeof(m_header);
586
587     assert(m_data_len <= FFADO_IPC_MAX_MESSAGE_SIZE);
588     memcpy(m_data, buff, m_data_len);
589
590     m_priority = prio;
591     return true;
592 }
593
594
595 } // Util
Note: See TracBrowser for help on using the browser.