root/trunk/libffado/src/libieee1394/IsoHandlerManager.cpp

Revision 1005, 33.1 kB (checked in by ppalmers, 16 years ago)

Improve thread synchronisation. Switch back to separate threads for transmit and
receive since it is not possible to statically schedule things properly. One
of the threads (i.e. the client thread) is out of our control, hence it's
execution can't be controlled. Using separate threads and correct priorities
will shift this problem to the OS. Note that the priority of the packet
receive thread should be lower than the client thread (such that the client
thread is woken ASAP), and the priority of the transmit thread should be
higher than the client thread (such that packets are queued ASAP).
Extra benefit: multi-cores are used.

Some other startup improvements.

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "config.h"
25 #include "IsoHandlerManager.h"
26 #include "ieee1394service.h"
27 #include "libstreaming/generic/StreamProcessor.h"
28
29 #include "libutil/Atomic.h"
30 #include "libutil/PosixThread.h"
31 #include "libutil/SystemTimeSource.h"
32 #include "libutil/Watchdog.h"
33
34 #include <assert.h>
35
36 IMPL_DEBUG_MODULE( IsoHandlerManager, IsoHandlerManager, DEBUG_LEVEL_NORMAL );
37 IMPL_DEBUG_MODULE( IsoTask, IsoTask, DEBUG_LEVEL_NORMAL );
38
39 using namespace Streaming;
40
41 // --- ISO Thread --- //
42
43 IsoTask::IsoTask(IsoHandlerManager& manager, enum IsoHandler::EHandlerType t)
44     : m_manager( manager )
45     , m_SyncIsoHandler ( NULL )
46     , m_handlerType( t )
47 {
48 }
49
50 IsoTask::~IsoTask()
51 {
52     sem_destroy(&m_activity_semaphore);
53 }
54
55 bool
56 IsoTask::Init()
57 {
58     request_update = 0;
59
60     int i;
61     for (i=0; i < ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT; i++) {
62         m_IsoHandler_map_shadow[i] = NULL;
63         m_poll_fds_shadow[i].events = 0;
64     }
65     m_poll_nfds_shadow = 0;
66
67     #ifdef DEBUG
68     m_last_loop_entry = 0;
69     m_successive_short_loops = 0;
70     #endif
71
72     sem_init(&m_activity_semaphore, 0, 0);
73     return true;
74 }
75
76 bool
77 IsoTask::requestShadowMapUpdate()
78 {
79     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) enter\n", this);
80     INC_ATOMIC(&request_update);
81     return true;
82 }
83
84 // updates the internal stream map
85 // note that this should be executed with the guarantee that
86 // nobody will modify the parent data structures
87 void
88 IsoTask::updateShadowMapHelper()
89 {
90     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) updating shadow vars...\n", this);
91     unsigned int i, cnt, max;
92     max = m_manager.m_IsoHandlers.size();
93     m_SyncIsoHandler = NULL;
94     for (i = 0, cnt = 0; i < max; i++) {
95         IsoHandler *h = m_manager.m_IsoHandlers.at(i);
96         assert(h);
97
98         // skip the handlers not intended for us
99         if(h->getType() != m_handlerType) continue;
100
101         if (h->isEnabled()) {
102             m_IsoHandler_map_shadow[cnt] = h;
103             m_poll_fds_shadow[cnt].fd = h->getFileDescriptor();
104             m_poll_fds_shadow[cnt].revents = 0;
105             m_poll_fds_shadow[cnt].events = POLLIN;
106             cnt++;
107             // FIXME: need a more generic approach here
108             if(   m_SyncIsoHandler == NULL
109                && h->getType() == IsoHandler::eHT_Transmit) {
110                 m_SyncIsoHandler = h;
111             }
112
113             debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) %s handler %p added\n",
114                                               this, h->getTypeString(), h);
115         } else {
116             debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) %s handler %p skipped (disabled)\n",
117                                               this, h->getTypeString(), h);
118         }
119         if(cnt > ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT) {
120             debugWarning("Too much ISO Handlers in thread...\n");
121             break;
122         }
123     }
124
125     // FIXME: need a more generic approach here
126     // if there are no active transmit handlers,
127     // use the first receive handler
128     if(   m_SyncIsoHandler == NULL
129        && m_poll_nfds_shadow) {
130         m_SyncIsoHandler = m_IsoHandler_map_shadow[0];
131     }
132     m_poll_nfds_shadow = cnt;
133     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) updated shadow vars...\n", this);
134 }
135
136 bool
137 IsoTask::Execute()
138 {
139     debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
140                        "(%p, %s) Execute\n",
141                        this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"));
142     int err;
143     unsigned int i;
144     unsigned int m_poll_timeout = 10;
145
146     #ifdef DEBUG
147     uint64_t now = m_manager.get1394Service().getCurrentTimeAsUsecs();
148     int diff = now - m_last_loop_entry;
149     if(diff < 100) {
150         debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
151                            "(%p, %s) short loop detected (%d usec), cnt: %d\n",
152                            this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"),
153                            diff, m_successive_short_loops);
154         m_successive_short_loops++;
155         if(m_successive_short_loops > 10000) {
156             debugError("Shutting down runaway thread\n");
157             return false;
158         }
159     } else {
160         // reset the counter
161         m_successive_short_loops = 0;
162     }
163     m_last_loop_entry = now;
164     #endif
165
166     // if some other thread requested a shadow map update, do it
167     if(request_update) {
168         updateShadowMapHelper();
169         DEC_ATOMIC(&request_update); // ack the update
170         assert(request_update >= 0);
171     }
172
173     // bypass if no handlers are registered
174     if (m_poll_nfds_shadow == 0) {
175         debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
176                            "(%p, %s) bypass iterate since no handlers to poll\n",
177                            this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"));
178         usleep(m_poll_timeout * 1000);
179         return true;
180     }
181
182     // FIXME: what can happen is that poll() returns, but not all clients are
183     // ready. there might be some busy waiting behavior that still has to be solved.
184
185     // setup the poll here
186     // we should prevent a poll() where no events are specified, since that will only time-out
187     bool no_one_to_poll = true;
188     while(no_one_to_poll) {
189         for (i = 0; i < m_poll_nfds_shadow; i++) {
190             short events = 0;
191             IsoHandler *h = m_IsoHandler_map_shadow[i];
192             // we should only poll on a transmit handler
193             // that has a client that is ready to send
194             // something. Otherwise it will end up in
195             // busy wait looping since the packet function
196             // will defer processing (also avoids the
197             // AGAIN problem)
198             if (h->canIterateClient()) {
199                 events = POLLIN | POLLPRI;
200                 no_one_to_poll = false;
201                 // if we are going to poll() it, let's ensure
202                 // it can run until someone wants it to exit
203                 h->allowIterateLoop();
204             }
205             m_poll_fds_shadow[i].events = events;
206         }
207
208         if(no_one_to_poll) {
209             debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
210                                "(%p, %s) No one to poll, waiting for something to happen\n",
211                                this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"));
212             // wait for something to happen
213             switch(waitForActivity()) {
214                 case IsoTask::eAR_Error:
215                     debugError("Error while waiting for activity\n");
216                     return false;
217                 case IsoTask::eAR_Interrupted:
218                     // FIXME: what to do here?
219                     debugWarning("Interrupted while waiting for activity\n");
220                     break;
221                 case IsoTask::eAR_Timeout:
222                     // FIXME: what to do here?
223                     debugWarning("Timeout while waiting for activity\n");
224                     break;
225                 case IsoTask::eAR_Activity:
226                     // do nothing
227                     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
228                                        "(%p, %s) something happened\n",
229                                        this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"));
230                     break;
231             }
232         }
233     }
234
235     // Use a shadow map of the fd's such that we don't have to update
236     // the fd map everytime we run poll().
237     err = poll (m_poll_fds_shadow, m_poll_nfds_shadow, m_poll_timeout);
238
239     if (err < 0) {
240         if (errno == EINTR) {
241             debugOutput(DEBUG_LEVEL_VERBOSE, "Ignoring poll return due to signal\n");
242             return true;
243         }
244         debugFatal("poll error: %s\n", strerror (errno));
245         return false;
246     }
247
248     for (i = 0; i < m_poll_nfds_shadow; i++) {
249         #ifdef DEBUG
250         if(m_poll_fds_shadow[i].revents) {
251             debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE,
252                         "(%p, %s) received events: %08X for (%d/%d, %p, %s)\n",
253                         this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"),
254                         m_poll_fds_shadow[i].revents,
255                         i, m_poll_nfds_shadow,
256                         m_IsoHandler_map_shadow[i],
257                         m_IsoHandler_map_shadow[i]->getTypeString());
258         }
259         #endif
260
261         // if we get here, it means two things:
262         // 1) the kernel can accept or provide packets (poll returned POLLIN)
263         // 2) the client can provide or accept packets (since we enabled polling)
264         if(m_poll_fds_shadow[i].revents & (POLLIN)) {
265             m_IsoHandler_map_shadow[i]->iterate();
266         } else {
267             // there might be some error condition
268             if (m_poll_fds_shadow[i].revents & POLLERR) {
269                 debugWarning("(%p) error on fd for %d\n", this, i);
270             }
271             if (m_poll_fds_shadow[i].revents & POLLHUP) {
272                 debugWarning("(%p) hangup on fd for %d\n", this, i);
273             }
274         }
275
276 //         #ifdef DEBUG
277 //         // check if the handler is still alive
278 //         if(m_IsoHandler_map_shadow[i]->isDead()) {
279 //             debugError("Iso handler (%p, %s) is dead!\n",
280 //                        m_IsoHandler_map_shadow[i],
281 //                        m_IsoHandler_map_shadow[i]->getTypeString());
282 //             return false; // shutdown the system
283 //         }
284 //         #endif
285
286     }
287     return true;
288 }
289
290 enum IsoTask::eActivityResult
291 IsoTask::waitForActivity()
292 {
293     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
294                        "(%p, %s) waiting for activity\n",
295                        this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"));
296     struct timespec ts;
297     int result;
298
299     if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
300         debugError("clock_gettime failed\n");
301         return eAR_Error;
302     }
303     long long int timeout_nsec=0;
304     int timeout_sec = 0;
305
306     timeout_nsec = ISOHANDLERMANAGER_ISO_TASK_WAIT_TIMEOUT_USECS * 1000LL;
307     timeout_sec = 0;
308     while(timeout_nsec >= 1000000000LL) {
309         timeout_sec += 1;
310         timeout_nsec -= 1000000000LL;
311     }
312     ts.tv_nsec += timeout_nsec;
313     ts.tv_sec += timeout_sec;
314
315     result = sem_timedwait(&m_activity_semaphore, &ts);
316
317     if(result != 0) {
318         if (result == ETIMEDOUT) {
319             debugOutput(DEBUG_LEVEL_VERBOSE,
320                         "(%p) pthread_cond_timedwait() timed out (result=%d)\n",
321                         this, result);
322             return eAR_Timeout;
323         } else if (result == EINTR) {
324             debugOutput(DEBUG_LEVEL_VERBOSE,
325                         "(%p) pthread_cond_[timed]wait() interrupted by signal (result=%d)\n",
326                         this, result);
327             return eAR_Interrupted;
328         } else {
329             debugError("(%p) pthread_cond_[timed]wait error (result=%d)\n",
330                         this, result);
331             debugError("(%p) timeout_sec=%d timeout_nsec=%lld ts.sec=%d ts.nsec=%lld\n",
332                        this, timeout_sec, timeout_nsec, ts.tv_sec, ts.tv_nsec);
333             return eAR_Error;
334         }
335     }
336
337     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
338                        "(%p, %s) got activity\n",
339                        this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"));
340     return eAR_Activity;
341 }
342
343 void
344 IsoTask::signalActivity()
345 {
346     // signal the activity cond var
347     sem_post(&m_activity_semaphore);
348     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
349                        "(%p, %s) activity\n",
350                        this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"));
351 }
352
353 void IsoTask::setVerboseLevel(int i) {
354     setDebugLevel(i);
355 }
356
357 // -- the ISO handler manager -- //
358 IsoHandlerManager::IsoHandlerManager(Ieee1394Service& service)
359    : m_State(E_Created)
360    , m_service( service )
361    , m_realtime(false), m_priority(0)
362    , m_IsoThreadTransmit ( NULL )
363    , m_IsoTaskTransmit ( NULL )
364    , m_IsoThreadReceive ( NULL )
365    , m_IsoTaskReceive ( NULL )
366 {
367 }
368
369 IsoHandlerManager::IsoHandlerManager(Ieee1394Service& service, bool run_rt, int rt_prio)
370    : m_State(E_Created)
371    , m_service( service )
372    , m_realtime(run_rt), m_priority(rt_prio)
373    , m_IsoThreadTransmit ( NULL )
374    , m_IsoTaskTransmit ( NULL )
375    , m_IsoThreadReceive ( NULL )
376    , m_IsoTaskReceive ( NULL )
377 {
378 }
379
380 IsoHandlerManager::~IsoHandlerManager()
381 {
382     stopHandlers();
383     pruneHandlers();
384     if(m_IsoHandlers.size() > 0) {
385         debugError("Still some handlers in use\n");
386     }
387     if (m_IsoThreadTransmit) {
388         m_IsoThreadTransmit->Stop();
389         delete m_IsoThreadTransmit;
390     }
391     if (m_IsoThreadReceive) {
392         m_IsoThreadReceive->Stop();
393         delete m_IsoThreadReceive;
394     }
395     if (m_IsoTaskTransmit) {
396         delete m_IsoTaskTransmit;
397     }
398     if (m_IsoTaskReceive) {
399         delete m_IsoTaskReceive;
400     }
401 }
402
403 void
404 IsoHandlerManager::requestShadowMapUpdate()
405 {
406     if(m_IsoTaskTransmit) m_IsoTaskTransmit->requestShadowMapUpdate();
407     if(m_IsoTaskReceive) m_IsoTaskReceive->requestShadowMapUpdate();
408 }
409
410 bool
411 IsoHandlerManager::setThreadParameters(bool rt, int priority) {
412     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) switch to: (rt=%d, prio=%d)...\n", this, rt, priority);
413     if (priority > THREAD_MAX_RTPRIO) priority = THREAD_MAX_RTPRIO; // cap the priority
414     m_realtime = rt;
415     m_priority = priority;
416
417     if (m_IsoThreadTransmit) {
418         if (m_realtime) {
419             m_IsoThreadTransmit->AcquireRealTime(m_priority
420                                                  + ISOHANDLERMANAGER_ISO_PRIO_INCREASE
421                                                  + ISOHANDLERMANAGER_ISO_PRIO_INCREASE_XMIT);
422         } else {
423             m_IsoThreadTransmit->DropRealTime();
424         }
425     }
426     if (m_IsoThreadReceive) {
427         if (m_realtime) {
428             m_IsoThreadReceive->AcquireRealTime(m_priority
429                                                 + ISOHANDLERMANAGER_ISO_PRIO_INCREASE
430                                                 + ISOHANDLERMANAGER_ISO_PRIO_INCREASE_RECV);
431         } else {
432             m_IsoThreadReceive->DropRealTime();
433         }
434     }
435
436     return true;
437 }
438
439 bool IsoHandlerManager::init()
440 {
441     debugOutput( DEBUG_LEVEL_VERBOSE, "Initializing ISO manager %p...\n", this);
442     // check state
443     if(m_State != E_Created) {
444         debugError("Manager already initialized...\n");
445         return false;
446     }
447
448     // create threads to iterate our ISO handlers
449     debugOutput( DEBUG_LEVEL_VERBOSE, "Create iso thread for %p transmit...\n", this);
450     m_IsoTaskTransmit = new IsoTask( *this, IsoHandler::eHT_Transmit );
451     if(!m_IsoTaskTransmit) {
452         debugFatal("No task\n");
453         return false;
454     }
455     m_IsoThreadTransmit = new Util::PosixThread(m_IsoTaskTransmit, m_realtime,
456                                                 m_priority + ISOHANDLERMANAGER_ISO_PRIO_INCREASE
457                                                 + ISOHANDLERMANAGER_ISO_PRIO_INCREASE_XMIT,
458                                                 PTHREAD_CANCEL_DEFERRED);
459
460     if(!m_IsoThreadTransmit) {
461         debugFatal("No thread\n");
462         return false;
463     }
464
465     debugOutput( DEBUG_LEVEL_VERBOSE, "Create iso thread for %p receive...\n", this);
466     m_IsoTaskReceive = new IsoTask( *this, IsoHandler::eHT_Receive );
467     if(!m_IsoTaskReceive) {
468         debugFatal("No task\n");
469         return false;
470     }
471     m_IsoThreadReceive = new Util::PosixThread(m_IsoTaskReceive, m_realtime,
472                                                m_priority + ISOHANDLERMANAGER_ISO_PRIO_INCREASE
473                                                + ISOHANDLERMANAGER_ISO_PRIO_INCREASE_RECV,
474                                                PTHREAD_CANCEL_DEFERRED);
475
476     if(!m_IsoThreadReceive) {
477         debugFatal("No thread\n");
478         return false;
479     }
480     // register the thread with the RT watchdog
481     Util::Watchdog *watchdog = m_service.getWatchdog();
482     if(watchdog) {
483         if(!watchdog->registerThread(m_IsoThreadTransmit)) {
484             debugWarning("could not register iso transmit thread with watchdog\n");
485         }
486         if(!watchdog->registerThread(m_IsoThreadReceive)) {
487             debugWarning("could not register iso receive thread with watchdog\n");
488         }
489     } else {
490         debugWarning("could not find valid watchdog\n");
491     }
492
493     if (m_IsoThreadTransmit->Start() != 0) {
494         debugFatal("Could not start ISO Transmit thread\n");
495         return false;
496     }
497     if (m_IsoThreadReceive->Start() != 0) {
498         debugFatal("Could not start ISO Receive thread\n");
499         return false;
500     }
501
502     m_State=E_Running;
503     return true;
504 }
505
506 bool
507 IsoHandlerManager::disable(IsoHandler *h) {
508     bool result;
509     int i=0;
510     debugOutput(DEBUG_LEVEL_VERBOSE, "Disable on IsoHandler %p\n", h);
511     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
512         it != m_IsoHandlers.end();
513         ++it )
514     {
515         if ((*it) == h) {
516             result = h->disable();
517             if(h->getType() == IsoHandler::eHT_Transmit) {
518                 result &= m_IsoTaskTransmit->requestShadowMapUpdate();
519             } else {
520                 result &= m_IsoTaskReceive->requestShadowMapUpdate();
521             }
522             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " disabled\n");
523             return result;
524         }
525         i++;
526     }
527     debugError("Handler not found\n");
528     return false;
529 }
530
531 bool
532 IsoHandlerManager::enable(IsoHandler *h) {
533     bool result;
534     int i=0;
535     debugOutput(DEBUG_LEVEL_VERBOSE, "Enable on IsoHandler %p\n", h);
536     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
537         it != m_IsoHandlers.end();
538         ++it )
539     {
540         if ((*it) == h) {
541             result = h->enable();
542             if(h->getType() == IsoHandler::eHT_Transmit) {
543                 result &= m_IsoTaskTransmit->requestShadowMapUpdate();
544             } else {
545                 result &= m_IsoTaskReceive->requestShadowMapUpdate();
546             }
547             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " enabled\n");
548             return result;
549         }
550         i++;
551     }
552     debugError("Handler not found\n");
553     return false;
554 }
555
556 void
557 IsoHandlerManager::signalActivityTransmit()
558 {
559     assert(m_IsoTaskTransmit);
560     m_IsoTaskTransmit->signalActivity();
561 }
562
563 void
564 IsoHandlerManager::signalActivityReceive()
565 {
566     assert(m_IsoTaskReceive);
567     m_IsoTaskReceive->signalActivity();
568 }
569
570 bool IsoHandlerManager::registerHandler(IsoHandler *handler)
571 {
572     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
573     assert(handler);
574     handler->setVerboseLevel(getDebugLevel());
575     m_IsoHandlers.push_back(handler);
576     requestShadowMapUpdate();
577     return true;
578 }
579
580 bool IsoHandlerManager::unregisterHandler(IsoHandler *handler)
581 {
582     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
583     assert(handler);
584
585     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
586       it != m_IsoHandlers.end();
587       ++it )
588     {
589         if ( *it == handler ) {
590             m_IsoHandlers.erase(it);
591             requestShadowMapUpdate();
592             return true;
593         }
594     }
595     debugFatal("Could not find handler (%p)\n", handler);
596     return false; //not found
597 }
598
599 /**
600  * Registers an StreamProcessor with the IsoHandlerManager.
601  *
602  * If nescessary, an IsoHandler is created to handle this stream.
603  * Once an StreamProcessor is registered to the handler, it will be included
604  * in the ISO streaming cycle (i.e. receive/transmit of it will occur).
605  *
606  * @param stream the stream to register
607  * @return true if registration succeeds
608  *
609  * \todo : currently there is a one-to-one mapping
610  *        between streams and handlers, this is not ok for
611  *        multichannel receive
612  */
613 bool IsoHandlerManager::registerStream(StreamProcessor *stream)
614 {
615     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering stream %p\n",stream);
616     assert(stream);
617
618     IsoHandler* h = NULL;
619
620     // make sure the stream isn't already attached to a handler
621     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
622       it != m_IsoHandlers.end();
623       ++it )
624     {
625         if((*it)->isStreamRegistered(stream)) {
626             debugError( "stream already registered!\n");
627             return false;
628         }
629     }
630
631     // clean up all handlers that aren't used
632     pruneHandlers();
633
634     // allocate a handler for this stream
635     if (stream->getType()==StreamProcessor::ePT_Receive) {
636         // setup the optimal parameters for the raw1394 ISO buffering
637         unsigned int packets_per_period = stream->getPacketsPerPeriod();
638         unsigned int max_packet_size = stream->getMaxPacketSize();
639         unsigned int page_size = getpagesize() - 2; // for one reason or another this is necessary
640
641         // Ensure we don't request a packet size bigger than the
642         // kernel-enforced maximum which is currently 1 page.
643         if (max_packet_size > page_size) {
644             debugError("max packet size (%u) > page size (%u)\n", max_packet_size, page_size);
645             return false;
646         }
647
648         unsigned int irq_interval = packets_per_period / MINIMUM_INTERRUPTS_PER_PERIOD;
649         if(irq_interval <= 0) irq_interval=1;
650        
651         // the receive buffer size doesn't matter for the latency,
652         // but it has a minimal value in order for libraw to operate correctly (300)
653         int buffers=400;
654
655         // create the actual handler
656         h = new IsoHandler(*this, IsoHandler::eHT_Receive,
657                            buffers, max_packet_size, irq_interval);
658
659         debugOutput( DEBUG_LEVEL_VERBOSE, " creating IsoRecvHandler\n");
660
661         if(!h) {
662             debugFatal("Could not create IsoRecvHandler\n");
663             return false;
664         }
665
666     } else if (stream->getType()==StreamProcessor::ePT_Transmit) {
667         // setup the optimal parameters for the raw1394 ISO buffering
668 //        unsigned int packets_per_period = stream->getPacketsPerPeriod();
669         unsigned int max_packet_size = stream->getMaxPacketSize();
670 //         unsigned int page_size = getpagesize();
671
672         // Ensure we don't request a packet size bigger than the
673         // kernel-enforced maximum which is currently 1 page.
674 //         if (max_packet_size > page_size) {
675 //             debugError("max packet size (%u) > page size (%u)\n", max_packet_size, page_size);
676 //             return false;
677 //         }
678         if (max_packet_size > MAX_XMIT_PACKET_SIZE) {
679             debugError("max packet size (%u) > MAX_XMIT_PACKET_SIZE (%u)\n",
680                        max_packet_size, MAX_XMIT_PACKET_SIZE);
681             return false;
682         }
683
684         // the SP specifies how many packets to ISO-buffer
685         int buffers = stream->getNbPacketsIsoXmitBuffer();
686         if (buffers > MAX_XMIT_NB_BUFFERS) {
687             debugOutput(DEBUG_LEVEL_VERBOSE,
688                         "nb buffers (%u) > MAX_XMIT_NB_BUFFERS (%u)\n",
689                         buffers, MAX_XMIT_NB_BUFFERS);
690             buffers = MAX_XMIT_NB_BUFFERS;
691         }
692         unsigned int irq_interval = buffers / MINIMUM_INTERRUPTS_PER_PERIOD;
693         if(irq_interval <= 0) irq_interval=1;
694
695         debugOutput( DEBUG_LEVEL_VERBOSE, " creating IsoXmitHandler\n");
696
697         // create the actual handler
698         h = new IsoHandler(*this, IsoHandler::eHT_Transmit,
699                            buffers, max_packet_size, irq_interval);
700
701         if(!h) {
702             debugFatal("Could not create IsoXmitHandler\n");
703             return false;
704         }
705
706     } else {
707         debugFatal("Bad stream type\n");
708         return false;
709     }
710
711     h->setVerboseLevel(getDebugLevel());
712
713     // init the handler
714     if(!h->init()) {
715         debugFatal("Could not initialize receive handler\n");
716         return false;
717     }
718
719     // register the stream with the handler
720     if(!h->registerStream(stream)) {
721         debugFatal("Could not register receive stream with handler\n");
722         return false;
723     }
724
725     // register the handler with the manager
726     if(!registerHandler(h)) {
727         debugFatal("Could not register receive handler with manager\n");
728         return false;
729     }
730     debugOutput( DEBUG_LEVEL_VERBOSE, " registered stream (%p) with handler (%p)\n", stream, h);
731
732     m_StreamProcessors.push_back(stream);
733     debugOutput( DEBUG_LEVEL_VERBOSE, " %d streams, %d handlers registered\n",
734                                       m_StreamProcessors.size(), m_IsoHandlers.size());
735     return true;
736 }
737
738 bool IsoHandlerManager::unregisterStream(StreamProcessor *stream)
739 {
740     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering stream %p\n",stream);
741     assert(stream);
742
743     // make sure the stream isn't attached to a handler anymore
744     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
745       it != m_IsoHandlers.end();
746       ++it )
747     {
748         if((*it)->isStreamRegistered(stream)) {
749             if(!(*it)->unregisterStream(stream)) {
750                 debugOutput( DEBUG_LEVEL_VERBOSE, " could not unregister stream (%p) from handler (%p)...\n",stream,*it);
751                 return false;
752             }
753             debugOutput( DEBUG_LEVEL_VERBOSE, " unregistered stream (%p) from handler (%p)...\n",stream,*it);
754         }
755     }
756
757     // clean up all handlers that aren't used
758     pruneHandlers();
759
760     // remove the stream from the registered streams list
761     for ( StreamProcessorVectorIterator it = m_StreamProcessors.begin();
762       it != m_StreamProcessors.end();
763       ++it )
764     {
765         if ( *it == stream ) {
766             m_StreamProcessors.erase(it);
767             debugOutput( DEBUG_LEVEL_VERBOSE, " deleted stream (%p) from list...\n", *it);
768             return true;
769         }
770     }
771     return false; //not found
772 }
773
774 /**
775  * @brief unregister a handler from the manager
776  * @note called without the lock held.
777  */
778 void IsoHandlerManager::pruneHandlers() {
779     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
780     IsoHandlerVector toUnregister;
781
782     // find all handlers that are not in use
783     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
784           it != m_IsoHandlers.end();
785           ++it )
786     {
787         if(!((*it)->inUse())) {
788             debugOutput( DEBUG_LEVEL_VERBOSE, " handler (%p) not in use\n",*it);
789             toUnregister.push_back(*it);
790         }
791     }
792     // delete them
793     for ( IsoHandlerVectorIterator it = toUnregister.begin();
794           it != toUnregister.end();
795           ++it )
796     {
797         unregisterHandler(*it);
798
799         debugOutput( DEBUG_LEVEL_VERBOSE, " deleting handler (%p)\n",*it);
800
801         // Now the handler's been unregistered it won't be reused
802         // again.  Therefore it really needs to be formally deleted
803         // to free up the raw1394 handle.  Otherwise things fall
804         // apart after several xrun recoveries as the system runs
805         // out of resources to support all the disused but still
806         // allocated raw1394 handles.  At least this is the current
807         // theory as to why we end up with "memory allocation"
808         // failures after several Xrun recoveries.
809         delete *it;
810     }
811 }
812
813 bool
814 IsoHandlerManager::stopHandlerForStream(Streaming::StreamProcessor *stream) {
815     // check state
816     if(m_State != E_Running) {
817         debugError("Incorrect state, expected E_Running, got %s\n", eHSToString(m_State));
818         return false;
819     }
820     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
821       it != m_IsoHandlers.end();
822       ++it )
823     {
824         if((*it)->isStreamRegistered(stream)) {
825             debugOutput( DEBUG_LEVEL_VERBOSE, " stopping handler %p for stream %p\n", *it, stream);
826             if(!(*it)->disable()) {
827                 debugOutput( DEBUG_LEVEL_VERBOSE, " could not disable handler (%p)\n",*it);
828                 return false;
829             }
830             bool result;
831             if((*it)->getType() == IsoHandler::eHT_Transmit) {
832                 result = m_IsoTaskTransmit->requestShadowMapUpdate();
833             } else {
834                 result = m_IsoTaskReceive->requestShadowMapUpdate();
835             }
836             if(!result) {
837                 debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it);
838                 return false;
839             }
840             return true;
841         }
842     }
843     debugError("Stream %p has no attached handler\n", stream);
844     return false;
845 }
846
847 int
848 IsoHandlerManager::getPacketLatencyForStream(Streaming::StreamProcessor *stream) {
849     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
850       it != m_IsoHandlers.end();
851       ++it )
852     {
853         if((*it)->isStreamRegistered(stream)) {
854             return (*it)->getPacketLatency();
855         }
856     }
857     debugError("Stream %p has no attached handler\n", stream);
858     return 0;
859 }
860
861 void
862 IsoHandlerManager::flushHandlerForStream(Streaming::StreamProcessor *stream) {
863     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
864       it != m_IsoHandlers.end();
865       ++it )
866     {
867         if((*it)->isStreamRegistered(stream)) {
868             return (*it)->flush();
869         }
870     }
871     debugError("Stream %p has no attached handler\n", stream);
872     return;
873 }
874
875 bool
876 IsoHandlerManager::startHandlerForStream(Streaming::StreamProcessor *stream) {
877     return startHandlerForStream(stream, -1);
878 }
879
880 bool
881 IsoHandlerManager::startHandlerForStream(Streaming::StreamProcessor *stream, int cycle) {
882     // check state
883     if(m_State != E_Running) {
884         debugError("Incorrect state, expected E_Running, got %s\n", eHSToString(m_State));
885         return false;
886     }
887     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
888       it != m_IsoHandlers.end();
889       ++it )
890     {
891         if((*it)->isStreamRegistered(stream)) {
892             debugOutput( DEBUG_LEVEL_VERBOSE, " starting handler %p for stream %p\n", *it, stream);
893             if(!(*it)->enable(cycle)) {
894                 debugOutput( DEBUG_LEVEL_VERBOSE, " could not enable handler (%p)\n",*it);
895                 return false;
896             }
897             bool result;
898             if((*it)->getType() == IsoHandler::eHT_Transmit) {
899                 result = m_IsoTaskTransmit->requestShadowMapUpdate();
900             } else {
901                 result = m_IsoTaskReceive->requestShadowMapUpdate();
902             }
903             if(!result) {
904                 debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it);
905                 return false;
906             }
907             return true;
908         }
909     }
910     debugError("Stream %p has no attached handler\n", stream);
911     return false;
912 }
913
914 bool IsoHandlerManager::stopHandlers() {
915     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
916
917     // check state
918     if(m_State != E_Running) {
919         debugError("Incorrect state, expected E_Running, got %s\n", eHSToString(m_State));
920         return false;
921     }
922
923     bool retval=true;
924
925     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
926         it != m_IsoHandlers.end();
927         ++it )
928     {
929         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handler (%p)\n",*it);
930         if(!(*it)->disable()){
931             debugOutput( DEBUG_LEVEL_VERBOSE, " could not stop handler (%p)\n",*it);
932             retval=false;
933         }
934         bool result;
935         if((*it)->getType() == IsoHandler::eHT_Transmit) {
936             result = m_IsoTaskTransmit->requestShadowMapUpdate();
937         } else {
938             result = m_IsoTaskReceive->requestShadowMapUpdate();
939         }
940         if(!result) {
941             debugOutput( DEBUG_LEVEL_VERBOSE, " could not update shadow map for handler (%p)\n",*it);
942             return false;
943         }
944     }
945
946     if (retval) {
947         m_State=E_Prepared;
948     } else {
949         m_State=E_Error;
950     }
951     return retval;
952 }
953
954 bool IsoHandlerManager::reset() {
955     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
956     // check state
957     if(m_State == E_Error) {
958         debugFatal("Resetting from error condition not yet supported...\n");
959         return false;
960     }
961     // if not in an error condition, reset means stop the handlers
962     return stopHandlers();
963 }
964
965 void IsoHandlerManager::setVerboseLevel(int i) {
966     setDebugLevel(i);
967     // propagate the debug level
968     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
969           it != m_IsoHandlers.end();
970           ++it )
971     {
972         (*it)->setVerboseLevel(i);
973     }
974     if(m_IsoThreadTransmit) m_IsoThreadTransmit->setVerboseLevel(i);
975     if(m_IsoTaskTransmit)   m_IsoTaskTransmit->setVerboseLevel(i);
976     if(m_IsoThreadReceive)  m_IsoThreadReceive->setVerboseLevel(i);
977     if(m_IsoTaskReceive)    m_IsoTaskReceive->setVerboseLevel(i);
978 }
979
980 void IsoHandlerManager::dumpInfo() {
981     #ifdef DEBUG
982     unsigned int i=0;
983     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping IsoHandlerManager Stream handler information...\n");
984     debugOutputShort( DEBUG_LEVEL_NORMAL, " State: %d\n",(int)m_State);
985
986     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
987           it != m_IsoHandlers.end();
988           ++it )
989     {
990         debugOutputShort( DEBUG_LEVEL_NORMAL, " IsoHandler %d (%p)\n",i++,*it);
991         (*it)->dumpInfo();
992     }
993     #endif
994 }
995
996 const char *
997 IsoHandlerManager::eHSToString(enum eHandlerStates s) {
998     switch (s) {
999         default: return "Invalid";
1000         case E_Created: return "Created";
1001         case E_Prepared: return "Prepared";
1002         case E_Running: return "Running";
1003         case E_Error: return "Error";
1004     }
1005 }
Note: See TracBrowser for help on using the browser.