root/trunk/libffado/src/libieee1394/IsoHandlerManager.cpp

Revision 803, 20.4 kB (checked in by ppalmers, 13 years ago)

more reliable streaming. hackish, but a start for a better implementation

Line 
1 /*
2  * Copyright (C) 2005-2007 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "config.h"
25 #include "IsoHandlerManager.h"
26 #include "ieee1394service.h"
27 #include "IsoHandler.h"
28 #include "libstreaming/generic/StreamProcessor.h"
29
30 #include "libutil/Atomic.h"
31
32 #include "libutil/PosixThread.h"
33
34 #include <assert.h>
35
36 IMPL_DEBUG_MODULE( IsoHandlerManager, IsoHandlerManager, DEBUG_LEVEL_NORMAL );
37
38 using namespace Streaming;
39
40 IsoHandlerManager::IsoHandlerManager(Ieee1394Service& service)
41    : m_State(E_Created)
42    , m_service( service )
43    , m_realtime(false), m_priority(0)
44    , m_Thread ( NULL )
45 {}
46
47 IsoHandlerManager::IsoHandlerManager(Ieee1394Service& service, bool run_rt, int rt_prio)
48    : m_State(E_Created)
49    , m_service( service )
50    , m_realtime(run_rt), m_priority(rt_prio)
51    , m_Thread ( NULL )
52 {}
53
54 IsoHandlerManager::~IsoHandlerManager()
55 {
56     stopHandlers();
57     pruneHandlers();
58     if(m_IsoHandlers.size() > 0) {
59         debugError("Still some handlers in use\n");
60     }
61     if (m_Thread) {
62         m_Thread->Stop();
63         delete m_Thread;
64     }
65 }
66
67 bool
68 IsoHandlerManager::setThreadParameters(bool rt, int priority) {
69     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) switch to: (rt=%d, prio=%d)...\n", this, rt, priority);
70     if (priority > 98) priority = 98; // cap the priority
71     m_realtime = rt;
72     m_priority = priority;
73     bool result = true;
74     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
75         it != m_IsoHandlers.end();
76         ++it )
77     {
78         result &= (*it)->setThreadParameters(m_realtime, m_priority);
79     }
80
81     if (m_Thread) {
82         if (m_realtime) {
83             m_Thread->AcquireRealTime(m_priority);
84         } else {
85             m_Thread->DropRealTime();
86         }
87     }
88
89     return result;
90 }
91
92 /**
93  * Update the shadow variables. Should only be called from
94  * the iso handler iteration thread
95  */
96 void
97 IsoHandlerManager::updateShadowVars()
98 {
99     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "updating shadow vars...\n");
100     unsigned int i;
101     m_poll_nfds_shadow = m_IsoHandlers.size();
102     if(m_poll_nfds_shadow > ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT) {
103         debugWarning("Too much ISO Handlers in manager...\n");
104         m_poll_nfds_shadow = ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT;
105     }
106     for (i = 0; i < m_poll_nfds_shadow; i++) {
107         IsoHandler *h = m_IsoHandlers.at(i);
108         assert(h);
109         m_IsoHandler_map_shadow[i] = h;
110
111         m_poll_fds_shadow[i].fd = h->getFileDescriptor();
112         m_poll_fds_shadow[i].revents = 0;
113         if (h->isEnabled()) {
114             m_poll_fds_shadow[i].events = POLLIN;
115         } else {
116             m_poll_fds_shadow[i].events = 0;
117         }
118     }
119     debugOutput( DEBUG_LEVEL_VERY_VERBOSE, " updated shadow vars...\n");
120 }
121
122 bool
123 IsoHandlerManager::Init() {
124     debugOutput( DEBUG_LEVEL_VERBOSE, "%p: Init thread...\n", this);
125     bool result = true;
126     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
127         it != m_IsoHandlers.end();
128         ++it )
129     {
130         result &= (*it)->Init();
131     }
132     return result;
133 }
134
135 bool
136 IsoHandlerManager::Execute() {
137     int err;
138     unsigned int i;
139
140     unsigned int m_poll_timeout = 100;
141
142     updateShadowVars();
143     // bypass if no handlers are registered
144     if (m_poll_nfds_shadow == 0) {
145         debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "bypass iterate since no handlers registered\n");
146         usleep(m_poll_timeout * 1000);
147         return true;
148     }
149
150     // Use a shadow map of the fd's such that the poll call is not in a critical section
151     uint64_t poll_enter = m_service.getCurrentTimeAsUsecs();
152     err = poll (m_poll_fds_shadow, m_poll_nfds_shadow, m_poll_timeout);
153     uint64_t poll_exit = m_service.getCurrentTimeAsUsecs();
154
155     if (err == -1) {
156         if (errno == EINTR) {
157             return true;
158         }
159         debugFatal("poll error: %s\n", strerror (errno));
160         return false;
161     }
162
163     int nb_rcv = 0;
164     int nb_xmit = 0;
165     uint64_t iter_enter = m_service.getCurrentTimeAsUsecs();
166     for (i = 0; i < m_poll_nfds_shadow; i++) {
167         if(m_poll_fds_shadow[i].revents) {
168             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "received events: %08X for (%p)\n",
169                 m_poll_fds_shadow[i].revents, m_IsoHandler_map_shadow[i]);
170         }
171         if (m_poll_fds_shadow[i].revents & POLLERR) {
172             debugWarning("error on fd for %d\n",i);
173         }
174
175         if (m_poll_fds_shadow[i].revents & POLLHUP) {
176             debugWarning("hangup on fd for %d\n",i);
177         }
178
179         if(m_poll_fds_shadow[i].revents & (POLLIN)) {
180             if (m_IsoHandler_map_shadow[i]->getType() == IsoHandler::eHT_Receive) {
181                 m_IsoHandler_map_shadow[i]->iterate();
182                 nb_rcv++;
183             } else {
184                 // only iterate the xmit handler if it makes sense
185                 if(m_IsoHandler_map_shadow[i]->tryWaitForClient()) {
186                     m_IsoHandler_map_shadow[i]->iterate();
187                     nb_xmit++;
188                 }
189             }
190         }
191     }
192     uint64_t iter_exit = m_service.getCurrentTimeAsUsecs();
193
194     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " poll took %6lldus, iterate took %6lldus, iterated (R: %2d, X: %2d) handlers\n",
195                 poll_exit-poll_enter, iter_exit-iter_enter,
196                 nb_rcv, nb_xmit);
197
198     return true;
199 }
200
201 bool IsoHandlerManager::init()
202 {
203     debugOutput( DEBUG_LEVEL_VERBOSE, "Initializing ISO manager %p...\n", this);
204     // check state
205     if(m_State != E_Created) {
206         debugError("Manager already initialized...\n");
207         return false;
208     }
209
210 #if ISOHANDLER_PER_HANDLER_THREAD
211     // the IsoHandlers will create their own thread.
212 #else
213     // create a thread to iterate our handlers
214     debugOutput( DEBUG_LEVEL_VERBOSE, "Start thread for %p...\n", this);
215     m_Thread = new Util::PosixThread(this, m_realtime, m_priority,
216                                      PTHREAD_CANCEL_DEFERRED);
217     if(!m_Thread) {
218         debugFatal("No thread\n");
219         return false;
220     }
221     if (m_Thread->Start() != 0) {
222         debugFatal("Could not start update thread\n");
223         return false;
224     }
225 #endif
226
227     m_State=E_Running;
228     return true;
229 }
230
231 bool
232 IsoHandlerManager::disable(IsoHandler *h) {
233     bool result;
234     int i=0;
235     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Disable on IsoHandler %p\n", h);
236     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
237         it != m_IsoHandlers.end();
238         ++it )
239     {
240         if ((*it) == h) {
241             result = h->disable();
242             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " disabled\n");
243             return result;
244         }
245         i++;
246     }
247     debugError("Handler not found\n");
248     return false;
249 }
250
251 bool
252 IsoHandlerManager::enable(IsoHandler *h) {
253     bool result;
254     int i=0;
255     debugOutput(DEBUG_LEVEL_VERY_VERBOSE, "Enable on IsoHandler %p\n", h);
256     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
257         it != m_IsoHandlers.end();
258         ++it )
259     {
260         if ((*it) == h) {
261             result = h->enable();
262             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " enabled\n");
263             return result;
264         }
265         i++;
266     }
267     debugError("Handler not found\n");
268     return false;
269 }
270
271 bool IsoHandlerManager::registerHandler(IsoHandler *handler)
272 {
273     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
274     assert(handler);
275     handler->setVerboseLevel(getDebugLevel());
276     m_IsoHandlers.push_back(handler);
277     updateShadowVars();
278     return true;
279 }
280
281 bool IsoHandlerManager::unregisterHandler(IsoHandler *handler)
282 {
283     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
284     assert(handler);
285
286     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
287       it != m_IsoHandlers.end();
288       ++it )
289     {
290         if ( *it == handler ) {
291             m_IsoHandlers.erase(it);
292             updateShadowVars();
293             return true;
294         }
295     }
296     debugFatal("Could not find handler (%p)\n", handler);
297     return false; //not found
298 }
299
300 /**
301  * Registers an StreamProcessor with the IsoHandlerManager.
302  *
303  * If nescessary, an IsoHandler is created to handle this stream.
304  * Once an StreamProcessor is registered to the handler, it will be included
305  * in the ISO streaming cycle (i.e. receive/transmit of it will occur).
306  *
307  * @param stream the stream to register
308  * @return true if registration succeeds
309  *
310  * \todo : currently there is a one-to-one mapping
311  *        between streams and handlers, this is not ok for
312  *        multichannel receive
313  */
314 bool IsoHandlerManager::registerStream(StreamProcessor *stream)
315 {
316     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering stream %p\n",stream);
317     assert(stream);
318
319     IsoHandler* h = NULL;
320
321     // make sure the stream isn't already attached to a handler
322     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
323       it != m_IsoHandlers.end();
324       ++it )
325     {
326         if((*it)->isStreamRegistered(stream)) {
327             debugError( "stream already registered!\n");
328             return false;
329         }
330     }
331
332     // clean up all handlers that aren't used
333     pruneHandlers();
334
335     // allocate a handler for this stream
336     if (stream->getType()==StreamProcessor::ePT_Receive) {
337         // setup the optimal parameters for the raw1394 ISO buffering
338         unsigned int packets_per_period = stream->getPacketsPerPeriod();
339         unsigned int max_packet_size = stream->getMaxPacketSize();
340         unsigned int page_size = getpagesize() - 2; // for one reason or another this is necessary
341
342         // Ensure we don't request a packet size bigger than the
343         // kernel-enforced maximum which is currently 1 page.
344         if (max_packet_size > page_size) {
345             debugError("max packet size (%u) > page size (%u)\n", max_packet_size, page_size);
346             return false;
347         }
348
349         unsigned int irq_interval = packets_per_period / MINIMUM_INTERRUPTS_PER_PERIOD;
350         if(irq_interval <= 0) irq_interval=1;
351        
352         // the receive buffer size doesn't matter for the latency,
353         // but it has a minimal value in order for libraw to operate correctly (300)
354         int buffers=400;
355
356         // create the actual handler
357         h = new IsoHandler(*this, IsoHandler::eHT_Receive,
358                            buffers, max_packet_size, irq_interval);
359
360         debugOutput( DEBUG_LEVEL_VERBOSE, " creating IsoRecvHandler\n");
361
362         if(!h) {
363             debugFatal("Could not create IsoRecvHandler\n");
364             return false;
365         }
366
367     } else if (stream->getType()==StreamProcessor::ePT_Transmit) {
368         // setup the optimal parameters for the raw1394 ISO buffering
369         unsigned int packets_per_period = stream->getPacketsPerPeriod();
370         unsigned int max_packet_size = stream->getMaxPacketSize();
371         unsigned int page_size = getpagesize();
372
373         // Ensure we don't request a packet size bigger than the
374         // kernel-enforced maximum which is currently 1 page.
375         if (max_packet_size > page_size) {
376             debugError("max packet size (%u) > page size (%u)\n", max_packet_size, page_size);
377             return false;
378         }
379
380         max_packet_size = page_size;
381         unsigned int irq_interval = packets_per_period / MINIMUM_INTERRUPTS_PER_PERIOD;
382         if(irq_interval <= 0) irq_interval=1;
383
384         // the SP specifies how many packets to ISO-buffer
385         int buffers = stream->getNbPacketsIsoXmitBuffer();
386
387         debugOutput( DEBUG_LEVEL_VERBOSE, " creating IsoXmitHandler\n");
388
389         // create the actual handler
390         h = new IsoHandler(*this, IsoHandler::eHT_Transmit,
391                            buffers, max_packet_size, irq_interval);
392
393         if(!h) {
394             debugFatal("Could not create IsoXmitHandler\n");
395             return false;
396         }
397     } else {
398         debugFatal("Bad stream type\n");
399         return false;
400     }
401
402     h->setVerboseLevel(getDebugLevel());
403
404     // init the handler
405     if(!h->init()) {
406         debugFatal("Could not initialize receive handler\n");
407         return false;
408     }
409
410     // set the handler's thread parameters
411     if(!h->setThreadParameters(m_realtime, m_priority)) {
412         debugFatal("Could not set handler thread parameters\n");
413         return false;
414     }
415
416     // register the stream with the handler
417     if(!h->registerStream(stream)) {
418         debugFatal("Could not register receive stream with handler\n");
419         return false;
420     }
421
422     // register the handler with the manager
423     if(!registerHandler(h)) {
424         debugFatal("Could not register receive handler with manager\n");
425         return false;
426     }
427     debugOutput( DEBUG_LEVEL_VERBOSE, " registered stream (%p) with handler (%p)\n", stream, h);
428
429     m_StreamProcessors.push_back(stream);
430     debugOutput( DEBUG_LEVEL_VERBOSE, " %d streams, %d handlers registered\n",
431                                       m_StreamProcessors.size(), m_IsoHandlers.size());
432     return true;
433 }
434
435 bool IsoHandlerManager::unregisterStream(StreamProcessor *stream)
436 {
437     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering stream %p\n",stream);
438     assert(stream);
439
440     // make sure the stream isn't attached to a handler anymore
441     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
442       it != m_IsoHandlers.end();
443       ++it )
444     {
445         if((*it)->isStreamRegistered(stream)) {
446             if(!(*it)->unregisterStream(stream)) {
447                 debugOutput( DEBUG_LEVEL_VERBOSE, " could not unregister stream (%p) from handler (%p)...\n",stream,*it);
448                 return false;
449             }
450             debugOutput( DEBUG_LEVEL_VERBOSE, " unregistered stream (%p) from handler (%p)...\n",stream,*it);
451         }
452     }
453
454     // clean up all handlers that aren't used
455     pruneHandlers();
456
457     // remove the stream from the registered streams list
458     for ( StreamProcessorVectorIterator it = m_StreamProcessors.begin();
459       it != m_StreamProcessors.end();
460       ++it )
461     {
462         if ( *it == stream ) {
463             m_StreamProcessors.erase(it);
464             debugOutput( DEBUG_LEVEL_VERBOSE, " deleted stream (%p) from list...\n", *it);
465             return true;
466         }
467     }
468     return false; //not found
469 }
470
471 /**
472  * @brief unregister a handler from the manager
473  * @note called without the lock held.
474  */
475 void IsoHandlerManager::pruneHandlers() {
476     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
477     IsoHandlerVector toUnregister;
478
479     // find all handlers that are not in use
480     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
481           it != m_IsoHandlers.end();
482           ++it )
483     {
484         if(!((*it)->inUse())) {
485             debugOutput( DEBUG_LEVEL_VERBOSE, " handler (%p) not in use\n",*it);
486             toUnregister.push_back(*it);
487         }
488     }
489     // delete them
490     for ( IsoHandlerVectorIterator it = toUnregister.begin();
491           it != toUnregister.end();
492           ++it )
493     {
494         unregisterHandler(*it);
495
496         debugOutput( DEBUG_LEVEL_VERBOSE, " deleting handler (%p)\n",*it);
497
498         // Now the handler's been unregistered it won't be reused
499         // again.  Therefore it really needs to be formally deleted
500         // to free up the raw1394 handle.  Otherwise things fall
501         // apart after several xrun recoveries as the system runs
502         // out of resources to support all the disused but still
503         // allocated raw1394 handles.  At least this is the current
504         // theory as to why we end up with "memory allocation"
505         // failures after several Xrun recoveries.
506         delete *it;
507     }
508 }
509
510 bool
511 IsoHandlerManager::stopHandlerForStream(Streaming::StreamProcessor *stream) {
512     // check state
513     if(m_State != E_Running) {
514         debugError("Incorrect state, expected E_Running, got %s\n", eHSToString(m_State));
515         return false;
516     }
517     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
518       it != m_IsoHandlers.end();
519       ++it )
520     {
521         if((*it)->isStreamRegistered(stream)) {
522             bool result;
523             debugOutput( DEBUG_LEVEL_VERBOSE, " stopping handler %p for stream %p\n", *it, stream);
524             result = (*it)->disable();
525             if(!result) {
526                 debugOutput( DEBUG_LEVEL_VERBOSE, " could not disable handler (%p)\n",*it);
527                 return false;
528             }
529             return true;
530         }
531     }
532     debugError("Stream %p has no attached handler\n", stream);
533     return false;
534 }
535
536 int
537 IsoHandlerManager::getPacketLatencyForStream(Streaming::StreamProcessor *stream) {
538     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
539       it != m_IsoHandlers.end();
540       ++it )
541     {
542         if((*it)->isStreamRegistered(stream)) {
543             return (*it)->getPacketLatency();
544         }
545     }
546     debugError("Stream %p has no attached handler\n", stream);
547     return 0;
548 }
549
550 void
551 IsoHandlerManager::flushHandlerForStream(Streaming::StreamProcessor *stream) {
552     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
553       it != m_IsoHandlers.end();
554       ++it )
555     {
556         if((*it)->isStreamRegistered(stream)) {
557             return (*it)->flush();
558         }
559     }
560     debugError("Stream %p has no attached handler\n", stream);
561     return;
562 }
563
564 bool
565 IsoHandlerManager::startHandlerForStream(Streaming::StreamProcessor *stream) {
566     return startHandlerForStream(stream, -1);
567 }
568
569 bool
570 IsoHandlerManager::startHandlerForStream(Streaming::StreamProcessor *stream, int cycle) {
571     // check state
572     if(m_State != E_Running) {
573         debugError("Incorrect state, expected E_Running, got %s\n", eHSToString(m_State));
574         return false;
575     }
576     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
577       it != m_IsoHandlers.end();
578       ++it )
579     {
580         if((*it)->isStreamRegistered(stream)) {
581             bool result;
582             debugOutput( DEBUG_LEVEL_VERBOSE, " starting handler %p for stream %p\n", *it, stream);
583             result = (*it)->enable(cycle);
584             if(!result) {
585                 debugOutput( DEBUG_LEVEL_VERBOSE, " could not enable handler (%p)\n",*it);
586                 return false;
587             }
588             return true;
589         }
590     }
591     debugError("Stream %p has no attached handler\n", stream);
592     return false;
593 }
594
595 bool IsoHandlerManager::stopHandlers() {
596     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
597
598     // check state
599     if(m_State != E_Running) {
600         debugError("Incorrect state, expected E_Running, got %s\n", eHSToString(m_State));
601         return false;
602     }
603
604     bool retval=true;
605
606     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
607         it != m_IsoHandlers.end();
608         ++it )
609     {
610         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handler (%p)\n",*it);
611         if(!(*it)->disable()){
612             debugOutput( DEBUG_LEVEL_VERBOSE, " could not stop handler (%p)\n",*it);
613             retval=false;
614         }
615     }
616
617     if (retval) {
618         m_State=E_Prepared;
619     } else {
620         m_State=E_Error;
621     }
622     return retval;
623 }
624
625 bool IsoHandlerManager::reset() {
626     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
627     // check state
628     if(m_State == E_Error) {
629         debugFatal("Resetting from error condition not yet supported...\n");
630         return false;
631     }
632     // if not in an error condition, reset means stop the handlers
633     return stopHandlers();
634 }
635
636 void IsoHandlerManager::setVerboseLevel(int i) {
637     setDebugLevel(i);
638     // propagate the debug level
639     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
640           it != m_IsoHandlers.end();
641           ++it )
642     {
643         (*it)->setVerboseLevel(i);
644     }
645 }
646
647 void IsoHandlerManager::dumpInfo() {
648     int i=0;
649     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping IsoHandlerManager Stream handler information...\n");
650     debugOutputShort( DEBUG_LEVEL_NORMAL, " State: %d\n",(int)m_State);
651
652     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
653           it != m_IsoHandlers.end();
654           ++it )
655     {
656         debugOutputShort( DEBUG_LEVEL_NORMAL, " IsoHandler %d (%p)\n",i++,*it);
657         (*it)->dumpInfo();
658     }
659 }
660
661 const char *
662 IsoHandlerManager::eHSToString(enum eHandlerStates s) {
663     switch (s) {
664         default: return "Invalid";
665         case E_Created: return "Created";
666         case E_Prepared: return "Prepared";
667         case E_Running: return "Running";
668         case E_Error: return "Error";
669     }
670 }
Note: See TracBrowser for help on using the browser.