Changeset 1240

Show
Ignore:
Timestamp:
06/01/08 09:15:34 (13 years ago)
Author:
ppalmers
Message:

fix bugs in IPC comms. Add preliminary FFADO-IPC ALSA plugin

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/libffado/SConstruct

    r1239 r1240  
    282282        # 
    283283 
    284 if conf.CheckForApp( "which pyuic" ) and conf.CheckForPyModule( 'dbus' ) and conf.CheckForPyModule( 'qt' ): 
    285         env['PYUIC'] = True 
    286  
    287         if conf.CheckForApp( "xdg-desktop-menu --help" ): 
    288                 env['XDG_TOOLS'] = True 
     284        # PyQT checks 
     285        if conf.CheckForApp( "which pyuic" ) and conf.CheckForPyModule( 'dbus' ) and conf.CheckForPyModule( 'qt' ): 
     286                env['PYUIC'] = True 
     287         
     288                if conf.CheckForApp( "xdg-desktop-menu --help" ): 
     289                        env['XDG_TOOLS'] = True 
     290                else: 
     291                        print """ 
     292        I couldn't find the program 'xdg-desktop-menu'. Together with xdg-icon-resource 
     293        this is needed to add the fancy entry to your menu. But the mixer will be installed, you can start it by executing "ffadomixer". 
     294        """ 
     295         
    289296        else: 
    290297                print """ 
    291 I couldn't find the program 'xdg-desktop-menu'. Together with xdg-icon-resource 
    292 this is needed to add the fancy entry to your menu. But the mixer will be installed, you can start it by executing "ffadomixer". 
    293 """ 
    294  
    295 else: 
    296         print """ 
    297 I couldn't find all the prerequisites ('pyuic' and the python-modules 'dbus' and 
    298 'qt', the packages could be named like dbus-python and PyQt) to build the mixer. 
    299 Therefor the mixer won't get installed. 
    300 """ 
     298        I couldn't find all the prerequisites ('pyuic' and the python-modules 'dbus' and 
     299        'qt', the packages could be named like dbus-python and PyQt) to build the mixer. 
     300        Therefor the mixer won't get installed. 
     301        """ 
     302 
     303        # ALSA checks 
     304        pkg = 'alsa' 
     305        name2 = pkg.replace("+","").replace(".","").replace("-","").upper() 
     306        env['%s_FLAGS' % name2] = conf.GetPKGFlags( pkg, '1.0.0' ) 
     307        if env['%s_FLAGS'%name2] == 0: 
     308                env['HAVE_ALSA'] = False 
     309                print " ALSA not found, not building ALSA plugin." 
     310        else: 
     311                env['HAVE_ALSA'] = True 
     312                print " ALSA found, building ALSA plugin." 
     313 
    301314 
    302315config_guess = conf.ConfigGuess() 
  • trunk/libffado/src/libutil/IpcRingBuffer.cpp

    r1234 r1240  
    6060, m_notify_functor( *(new MemberFunctor0< IpcRingBuffer*, void (IpcRingBuffer::*)() > 
    6161                     ( this, &IpcRingBuffer::notificationHandler, false )) ) 
    62 , m_block_requested_for_read( false
    63 , m_block_requested_for_write( false
     62, m_block_requested_for_read( *(new PosixMutex())
     63, m_block_requested_for_write( *(new PosixMutex())
    6464{ 
    6565    m_ping_queue.setVerboseLevel(getDebugLevel()); 
     
    9696 
    9797    debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) init %s\n", this, m_name.c_str()); 
     98    debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) direction %d, %d blocks of %d bytes\n", 
     99                                     this, m_direction, m_blocks, m_blocksize); 
    98100    switch(m_type) { 
    99101        case eBT_Master: 
     
    190192            debugError("Could not enable notification\n"); 
    191193        } 
    192     } 
    193  
     194        // now clear the queue to eliminate messages that might be there 
     195        // from earlier runs 
     196        m_pong_queue.Clear(); 
     197    } else { 
     198        // if we are on the receiving end, clear any waiting messages in the ping 
     199        // queue 
     200        m_ping_queue.Clear(); 
     201    } 
     202     
    194203    m_initialized = true; 
    195204    return true; 
     
    286295IpcRingBuffer::requestBlockForWrite(void **block) 
    287296{ 
    288     if(m_block_requested_for_write) { 
     297    if(!m_block_requested_for_write.TryLock()) { 
    289298        debugError("Already a block requested for write\n"); 
    290299        return eR_Error; 
     
    307316        if(getBufferFill() >= m_blocks || !m_ping_queue.canSend()) { 
    308317            debugOutput(DEBUG_LEVEL_VERBOSE, "(%p, %s) full\n", this, m_name.c_str()); 
     318            m_block_requested_for_write.Unlock(); 
    309319            return eR_Again; 
    310320        } 
     
    325335    *block = m_memblock.requestBlock(offset, m_blocksize); 
    326336    if(*block) { 
    327         m_block_requested_for_write = true; 
     337        // keep the lock, to be released by releaseBlockForWrite 
    328338        return eR_OK; 
    329339    } else { 
     340        m_block_requested_for_write.Unlock(); 
    330341        return eR_Error; 
    331342    } 
     
    335346IpcRingBuffer::releaseBlockForWrite() 
    336347{ 
    337     if(!m_block_requested_for_write) { 
     348    if(!m_block_requested_for_write.isLocked()) { 
    338349        debugError("No block requested for write\n"); 
    339350        return eR_Error; 
     
    363374            // this is a bug since we checked whether it was empty or not 
    364375            debugError("Bad response value\n"); 
    365             m_block_requested_for_write = false
     376            m_block_requested_for_write.Unlock()
    366377            return eR_Error;  
    367378        case PosixMessageQueue::eR_Timeout: 
    368379            debugOutput(DEBUG_LEVEL_VERBOSE, "Timeout\n"); 
    369             m_block_requested_for_write = false
     380            m_block_requested_for_write.Unlock()
    370381            return eR_Timeout; // blocking and no space on time 
    371382        default: 
    372383            debugError("Could not send to ping queue\n"); 
    373             m_block_requested_for_write = false
     384            m_block_requested_for_write.Unlock()
    374385            return eR_Error; 
    375386    } 
     
    381392    } 
    382393    m_idx++; 
    383     m_block_requested_for_write = false
     394    m_block_requested_for_write.Unlock()
    384395    return eR_OK; 
    385396} 
     
    409420IpcRingBuffer::requestBlockForRead(void **block) 
    410421{ 
    411     if(m_block_requested_for_read) { 
     422    if(!m_block_requested_for_read.TryLock()) { 
    412423        debugError("Already a block requested for read\n"); 
    413424        return eR_Error; 
     
    423434            break; 
    424435        case PosixMessageQueue::eR_Again: 
     436            m_block_requested_for_read.Unlock(); 
    425437            return eR_Again; // non-blocking and no message 
    426438        case PosixMessageQueue::eR_Timeout: 
    427439            debugOutput(DEBUG_LEVEL_VERBOSE, "Timeout\n"); 
     440            m_block_requested_for_read.Unlock(); 
    428441            return eR_Timeout; // blocking and no message on time 
    429442        default: 
    430443            debugError("Could not read from ping queue\n"); 
     444            m_block_requested_for_read.Unlock(); 
    431445            return eR_Error; 
    432446    } 
     
    449463        *block = m_memblock.requestBlock(offset, m_blocksize); 
    450464        if(*block) { 
    451             m_block_requested_for_read = true; 
     465            // keep the mutex locked, we expect the thread that grabbed the block to also return it 
    452466            return eR_OK; 
    453467        } else { 
     468            m_block_requested_for_read.Unlock(); 
    454469            return eR_Error; 
    455470        } 
    456471    } else { 
    457472        debugError("Invalid message received (type %d)\n", type); 
     473        m_block_requested_for_read.Unlock(); 
    458474        return eR_Error; 
    459475    } 
     
    463479IpcRingBuffer::releaseBlockForRead() 
    464480{ 
    465     if(!m_block_requested_for_read) { 
     481    if(!m_block_requested_for_read.isLocked()) { 
    466482        debugError("No block requested for read\n"); 
    467483        return eR_Error; 
     
    483499            break; 
    484500        case PosixMessageQueue::eR_Again: 
     501            m_block_requested_for_read.Unlock(); // FIXME: this is not very correct 
    485502            debugOutput(DEBUG_LEVEL_VERBOSE, "Again on ACK\n"); 
    486 //             return eR_Again; // non-blocking and no message 
     503            return eR_Again; // non-blocking and no message 
    487504        case PosixMessageQueue::eR_Timeout: 
     505            m_block_requested_for_read.Unlock(); 
    488506            debugOutput(DEBUG_LEVEL_VERBOSE, "Timeout on ACK\n"); 
    489 //             return eR_Timeout; // blocking and no message on time 
     507            return eR_Timeout; // blocking and no message on time 
    490508        default: 
    491509            debugError("Could not write to pong queue\n"); 
    492             m_block_requested_for_read = false
     510            m_block_requested_for_read.Unlock()
    493511            return eR_Error; 
    494512    } 
     
    500518    } 
    501519    m_idx = data->idx + 1; 
    502     m_block_requested_for_read = false; 
     520 
     521    m_block_requested_for_read.Unlock(); 
    503522    return eR_OK; 
    504523} 
  • trunk/libffado/src/libutil/IpcRingBuffer.h

    r1172 r1240  
    199199 
    200200    IpcMessage          m_LastDataMessageReceived; 
    201     bool                m_block_requested_for_read; 
     201    Mutex&              m_block_requested_for_read; 
    202202    IpcMessage          m_LastDataMessageSent; 
    203     bool                m_block_requested_for_write; 
     203    Mutex&              m_block_requested_for_write; 
    204204 
    205205protected: 
  • trunk/libffado/src/libutil/PosixMessageQueue.cpp

    r1172 r1240  
    179179    m_handle = MQ_INVALID_ID; 
    180180    return true; 
     181} 
     182 
     183enum PosixMessageQueue::eResult 
     184PosixMessageQueue::Clear() 
     185{ 
     186    debugOutput(DEBUG_LEVEL_VERBOSE,  
     187                "(%p, %s) clear\n", 
     188                this, m_name.c_str()); 
     189    if(m_direction == eD_WriteOnly) { 
     190        debugError("Cannot clear write-only queue\n"); 
     191        return eR_Error; 
     192    } 
     193 
     194    // ensure that we don't interfere with the notification handler 
     195    MutexLockHelper lock(m_notifyHandlerLock); 
     196    while(countMessages()) { 
     197        struct timespec timeout; 
     198        clock_gettime(CLOCK_REALTIME, &timeout); 
     199        timeout.tv_sec += m_timeout.tv_sec; 
     200        timeout.tv_nsec += m_timeout.tv_nsec; 
     201        if(timeout.tv_nsec >= 1000000000LL) { 
     202            timeout.tv_sec++; 
     203            timeout.tv_nsec -= 1000000000LL; 
     204        } 
     205     
     206        signed int len; 
     207        unsigned prio; 
     208        if((len = mq_timedreceive(m_handle, m_tmp_buffer, m_attr.mq_msgsize, &prio, &timeout)) < 0) { 
     209            switch(errno) { 
     210                case EAGAIN: 
     211                    debugOutput(DEBUG_LEVEL_VERBOSE, 
     212                                "(%p, %s) empty\n", 
     213                                this, m_name.c_str()); 
     214                    return eR_OK; 
     215                case ETIMEDOUT: 
     216                    debugOutput(DEBUG_LEVEL_VERBOSE, 
     217                                "(%p, %s) read timed out\n", 
     218                                this, m_name.c_str()); 
     219                    return eR_Timeout; 
     220                default: 
     221                    debugError("(%p, %s) could not receive: %s\n",  
     222                            this, m_name.c_str(), strerror(errno)); 
     223                    return eR_Error; 
     224            } 
     225        } 
     226    } 
     227    return eR_OK; 
    181228} 
    182229 
  • trunk/libffado/src/libutil/PosixMessageQueue.h

    r1172 r1240  
    8989    virtual enum eResult Receive(Message &m); 
    9090 
     91    virtual enum eResult Clear(); 
     92 
    9193    virtual int countMessages(); 
    9294    virtual bool canSend(); 
  • trunk/libffado/support/SConscript

    r1185 r1240  
    2626env = env.Clone() 
    2727 
    28 env.SConscript( dirs=["mixer","firmware","dbus"], exports="env" ) 
     28env.SConscript( dirs=["mixer","firmware","dbus", "alsa"], exports="env" ) 
    2929 
  • trunk/libffado/tests/streaming/test-ipcclient.cpp

    r1172 r1240  
    5151const char *argp_program_bug_address = "<ffado-devel@lists.sf.net>"; 
    5252static char doc[] = "test-avccmd -- test program to test the ipc ringbuffer class."; 
    53 static char args_doc[] = "DIRECTION"; 
     53static char args_doc[] = ""; 
    5454static struct argp_option options[] = { 
    5555    {"verbose",  'v', "level",    0,  "Produce verbose output" }, 
     
    146146        break; 
    147147    case ARGP_KEY_END: 
    148         if(arguments->nargs <= 0) { 
    149             printMessage("not enough arguments\n"); 
    150             return -1; 
    151         } 
     148//         if(arguments->nargs <= 0) { 
     149//             printMessage("not enough arguments\n"); 
     150//             return -1; 
     151//         } 
    152152        break; 
    153153    default: 
     
    168168    signal (SIGPIPE, sighandler); 
    169169 
     170    arguments.verbose           = 6; 
     171    arguments.period            = 1024; 
     172    arguments.nb_buffers        = 3; 
     173    arguments.playback          = 0; 
     174    arguments.capture           = 0; 
     175 
    170176    // arg parsing 
    171177    if ( argp_parse ( &argp, argc, argv, 0, 0, &arguments ) ) { 
     
    182188 
    183189    printMessage("Testing shared memory streaming IPC\n"); 
     190    printMessage(" period %d, nb_buffers %d, playback %d, capture %d\n", 
     191                 arguments.period, arguments.nb_buffers, 
     192                 arguments.playback, 
     193                 arguments.capture ); 
    184194 
    185195    // prepare the IPC buffers 
     
    188198    IpcRingBuffer* capturebuffer = NULL; 
    189199    IpcRingBuffer* playbackbuffer = NULL; 
    190     if(arguments.playback == 0) { 
     200    if(arguments.playback) { 
    191201        playbackbuffer = new IpcRingBuffer("playbackbuffer", 
    192202                              IpcRingBuffer::eBT_Slave, 
     
    231241 
    232242    int cnt = 0; 
    233  
     243    int pbkcnt = 0; 
    234244 
    235245    run=1; 
     
    237247        // write the data 
    238248        IpcRingBuffer::eResult res; 
    239         res = playbackbuffer->Write(playback_buff); 
    240         if(res != IpcRingBuffer::eR_OK && res != IpcRingBuffer::eR_Again) { 
    241             debugError("Could not write to segment\n"); 
    242             goto out_err; 
    243         } 
    244         if(res == IpcRingBuffer::eR_Again) { 
    245             printMessage(" Try playback again on %d...\n", cnt); 
    246         } 
    247  
    248         res = capturebuffer->Read(capture_buff); 
    249         if(res != IpcRingBuffer::eR_OK && res != IpcRingBuffer::eR_Again) { 
    250             debugError("Could not receive from queue\n"); 
    251             goto out_err; 
    252         } 
    253         if(res == IpcRingBuffer::eR_Again) { 
    254             printMessage(" Try again on %d...\n", cnt); 
    255         } else { 
    256             if(cnt%10==0) { 
    257                 uint32_t *tmp = (uint32_t *)capture_buff; 
    258                 for(int i=0;i<arguments.capture;i++) { 
    259                     printMessage(" channel %d: ", i); 
    260                     for(int j=0; j < 6;j+=1) { 
    261                         uint32_t tmp2 = tmp[j] << 8; 
    262                         int32_t *tmp3 = (int32_t *)&tmp2; 
    263                         printMessageShort("%10d ", *tmp3); 
     249        if(playbackbuffer) { 
     250            res = playbackbuffer->Write(playback_buff); 
     251            if(res != IpcRingBuffer::eR_OK && res != IpcRingBuffer::eR_Again) { 
     252                debugError("Could not write to segment\n"); 
     253                goto out_err; 
     254            } 
     255            if(res == IpcRingBuffer::eR_Again) { 
     256                printMessage(" Try playback again on %d...\n", cnt); 
     257            } else { 
     258                if(pbkcnt%100==0) { 
     259                    printMessage(" Period %d...\n", pbkcnt); 
     260                } 
     261                pbkcnt++; 
     262            } 
     263        } 
     264        // read data 
     265        if (capturebuffer) { 
     266            res = capturebuffer->Read(capture_buff); 
     267            if(res != IpcRingBuffer::eR_OK && res != IpcRingBuffer::eR_Again) { 
     268                debugError("Could not receive from queue\n"); 
     269                goto out_err; 
     270            } 
     271            if(res == IpcRingBuffer::eR_Again) { 
     272                printMessage(" Try again on %d...\n", cnt); 
     273            } else { 
     274                if(cnt%10==0) { 
     275                    uint32_t *tmp = (uint32_t *)capture_buff; 
     276                    for(int i=0;i<arguments.capture;i++) { 
     277                        printMessage(" channel %d: ", i); 
     278                        for(int j=0; j < 6;j+=1) { 
     279                            uint32_t tmp2 = tmp[j] << 8; 
     280                            int32_t *tmp3 = (int32_t *)&tmp2; 
     281                            printMessageShort("%10d ", *tmp3); 
     282                        } 
     283                        tmp += arguments.period; 
     284                        printMessageShort("\n"); 
    264285                    } 
    265                     tmp += arguments.period; 
    266                     printMessageShort("\n"); 
    267286                } 
    268             } 
    269             cnt++; 
     287                cnt++; 
     288            } 
    270289        } 
    271290    } 
  • trunk/libffado/tests/streaming/teststreaming-ipc.cpp

    r1172 r1240  
    310310 
    311311    debugOutput(DEBUG_LEVEL_NORMAL, "FFADO streaming test application (3)\n"); 
     312    printMessage(" period %d, nb_buffers %d, playback %d, capture %d\n", 
     313                 arguments.period, arguments.nb_buffers, 
     314                 arguments.playback, 
     315                 arguments.capture ); 
    312316 
    313317    signal (SIGINT, sighandler); 
     
    370374    } 
    371375 
    372     printMessage("Channel count: %d capture, %d playback\n", 
     376    printMessage("Device channel count: %d capture, %d playback\n", 
    373377                 nb_in_channels, nb_out_channels); 
     378    printMessage("Requested channel count: %d capture, %d playback\n", 
     379                 arguments.capture, arguments.playback); 
    374380 
    375381    if(arguments.playback > nb_out_channels) { 
     
    389395 
    390396    // allocate the IPC structures 
    391     #define NB_BUFFERS 4 
    392397    IpcRingBuffer* capturebuffer = NULL; 
    393398    IpcRingBuffer* playbackbuffer = NULL; 
     
    398403                                IpcRingBuffer::eD_Outward, 
    399404                                IpcRingBuffer::eB_NonBlocking, 
    400                                 NB_BUFFERS, dev_options.period_size * arguments.capture * 4); 
     405                                arguments.nb_buffers, 
     406                                dev_options.period_size * arguments.capture * 4); 
    401407        if(capturebuffer == NULL) { 
    402408            debugError("Could not create capture IPC buffer\n"); 
     
    421427                                IpcRingBuffer::eD_Inward, 
    422428                                IpcRingBuffer::eB_NonBlocking, 
    423                                 NB_BUFFERS, dev_options.period_size * arguments.playback * 4); 
     429                                arguments.nb_buffers, 
     430                                dev_options.period_size * arguments.playback * 4); 
    424431        if(playbackbuffer == NULL) { 
    425432            debugError("Could not create playback IPC buffer\n"); 
     
    505512                debugOutput(DEBUG_LEVEL_NORMAL, "CAP: missed period %d\n", nb_periods); 
    506513            } 
     514        } else { 
     515            need_silent=true; 
    507516        } 
    508517        if(need_silent) { 
     
    528537        ffado_streaming_transfer_capture_buffers(dev); 
    529538 
    530         if(!need_silent && msg_res == IpcRingBuffer::eR_OK) { 
     539        if(capturebuffer && !need_silent && msg_res == IpcRingBuffer::eR_OK) { 
    531540            // if we had a good block, release it 
    532541            // FIXME: we should check for errors here 
     
    592601        ffado_streaming_transfer_playback_buffers(dev); 
    593602 
    594         if(!need_silent && msg_res == IpcRingBuffer::eR_OK) { 
     603        if(playbackbuffer && !need_silent && msg_res == IpcRingBuffer::eR_OK) { 
    595604            // if we had a good block, release it 
    596605            // FIXME: we should check for errors here 
    597             capturebuffer->releaseBlockForRead(); 
     606            playbackbuffer->releaseBlockForRead(); 
    598607        } 
    599608