root/trunk/libffado/src/libieee1394/IsoHandlerManager.cpp

Revision 1763, 41.0 kB (checked in by ppalmers, 11 years ago)

Merged revisions 1536,1541,1544-1546,1549,1554-1562,1571,1579-1581,1618,1632,1634-1635,1661,1677-1679,1703-1704,1715,1720-1723,1743-1745,1755 via svnmerge from
svn+ssh://ffadosvn@ffado.org/ffado/branches/libffado-2.0

Also fix remaining format string warnings.

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #include "config.h"
25
26 #include "IsoHandlerManager.h"
27 #include "ieee1394service.h"
28 #include "cycletimer.h"
29 #include "libstreaming/generic/StreamProcessor.h"
30
31 #include "libutil/Atomic.h"
32 #include "libutil/PosixThread.h"
33 #include "libutil/SystemTimeSource.h"
34 #include "libutil/Watchdog.h"
35 #include "libutil/Configuration.h"
36
37 #include <cstring>
38 #include <assert.h>
39
40 IMPL_DEBUG_MODULE( IsoHandlerManager, IsoHandlerManager, DEBUG_LEVEL_NORMAL );
41 IMPL_DEBUG_MODULE( IsoTask, IsoTask, DEBUG_LEVEL_NORMAL );
42
43 using namespace Streaming;
44
45 // --- ISO Thread --- //
46
47 IsoTask::IsoTask(IsoHandlerManager& manager, enum IsoHandler::EHandlerType t)
48     : m_manager( manager )
49     , m_SyncIsoHandler ( NULL )
50     , m_handlerType( t )
51     , m_running( false )
52     , m_in_busreset( false )
53     , m_activity_wait_timeout_nsec (ISOHANDLERMANAGER_ISO_TASK_WAIT_TIMEOUT_USECS * 1000LL)
54 {
55 }
56
57 IsoTask::~IsoTask()
58 {
59     sem_destroy(&m_activity_semaphore);
60 }
61
62 bool
63 IsoTask::Init()
64 {
65     request_update = 0;
66
67     int i;
68     for (i=0; i < ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT; i++) {
69         m_IsoHandler_map_shadow[i] = NULL;
70         m_poll_fds_shadow[i].events = 0;
71     }
72     m_poll_nfds_shadow = 0;
73
74     #ifdef DEBUG
75     m_last_loop_entry = 0;
76     m_successive_short_loops = 0;
77     #endif
78
79     sem_init(&m_activity_semaphore, 0, 0);
80     m_running = true;
81     return true;
82 }
83
84 void
85 IsoTask::requestShadowMapUpdate()
86 {
87     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) enter\n", this);
88     INC_ATOMIC(&request_update);
89
90     // get the thread going again
91     signalActivity();
92     debugOutput(DEBUG_LEVEL_VERBOSE, "(%p) exit\n", this);
93 }
94
95 bool
96 IsoTask::handleBusReset()
97 {
98     bool retval = true;
99     if(!m_running) {
100         // nothing to do here
101         return true;
102     }
103     m_in_busreset = true;
104     requestShadowMapUpdate();
105
106     unsigned int i, max;
107     max = m_manager.m_IsoHandlers.size();
108     for (i = 0; i < max; i++) {
109         IsoHandler *h = m_manager.m_IsoHandlers.at(i);
110         assert(h);
111
112         // skip the handlers not intended for us
113         if(h->getType() != m_handlerType) continue;
114
115         if (!h->handleBusReset()) {
116             debugWarning("Failed to handle busreset on %p\n", h);
117             retval = false;
118         }
119     }
120
121     // re-enable processing
122     m_in_busreset = false;
123     requestShadowMapUpdate();
124     return retval;
125 }
126
127 // updates the internal stream map
128 // note that this should be executed with the guarantee that
129 // nobody will modify the parent data structures
130 void
131 IsoTask::updateShadowMapHelper()
132 {
133     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) updating shadow vars...\n", this);
134     // we are handling a busreset
135     if(m_in_busreset) {
136         m_poll_nfds_shadow = 0;
137         return;
138     }
139     unsigned int i, cnt, max;
140     max = m_manager.m_IsoHandlers.size();
141     m_SyncIsoHandler = NULL;
142     for (i = 0, cnt = 0; i < max; i++) {
143         IsoHandler *h = m_manager.m_IsoHandlers.at(i);
144         assert(h);
145
146         // skip the handlers not intended for us
147         if(h->getType() != m_handlerType) continue;
148
149         // update the state of the handler
150         // FIXME: maybe this is not the best place to do this
151         // it might be better to eliminate the 'requestShadowMapUpdate'
152         // entirely and replace it with a mechanism that implements all
153         // actions on the m_manager.m_IsoHandlers in the loop
154         h->updateState();
155
156         // rebuild the map
157         if (h->isEnabled()) {
158             m_IsoHandler_map_shadow[cnt] = h;
159             m_poll_fds_shadow[cnt].fd = h->getFileDescriptor();
160             m_poll_fds_shadow[cnt].revents = 0;
161             m_poll_fds_shadow[cnt].events = POLLIN;
162             cnt++;
163             // FIXME: need a more generic approach here
164             if(   m_SyncIsoHandler == NULL
165                && h->getType() == IsoHandler::eHT_Transmit) {
166                 m_SyncIsoHandler = h;
167             }
168
169             debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) %s handler %p added\n",
170                                               this, h->getTypeString(), h);
171         } else {
172             debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) %s handler %p skipped (disabled)\n",
173                                               this, h->getTypeString(), h);
174         }
175         if(cnt > ISOHANDLERMANAGER_MAX_ISO_HANDLERS_PER_PORT) {
176             debugWarning("Too much ISO Handlers in thread...\n");
177             break;
178         }
179     }
180
181     // FIXME: need a more generic approach here
182     // if there are no active transmit handlers,
183     // use the first receive handler
184     if(   m_SyncIsoHandler == NULL
185        && m_poll_nfds_shadow) {
186         m_SyncIsoHandler = m_IsoHandler_map_shadow[0];
187     }
188     m_poll_nfds_shadow = cnt;
189     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) updated shadow vars...\n", this);
190 }
191
192 bool
193 IsoTask::Execute()
194 {
195     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE,
196                 "(%p, %s) Execute\n",
197                 this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"));
198     int err;
199     unsigned int i;
200     unsigned int m_poll_timeout = 10;
201
202     #ifdef DEBUG
203     uint64_t now = Util::SystemTimeSource::getCurrentTimeAsUsecs();
204     int diff = now - m_last_loop_entry;
205     if(diff < 100) {
206         debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
207                            "(%p, %s) short loop detected (%d usec), cnt: %d\n",
208                            this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"),
209                            diff, m_successive_short_loops);
210         m_successive_short_loops++;
211         if(m_successive_short_loops > 10000) {
212             debugError("Shutting down runaway thread\n");
213             m_running = false;
214             return false;
215         }
216     } else {
217         // reset the counter
218         m_successive_short_loops = 0;
219     }
220     m_last_loop_entry = now;
221     #endif
222
223     // if some other thread requested a shadow map update, do it
224     if(request_update) {
225         updateShadowMapHelper();
226         DEC_ATOMIC(&request_update); // ack the update
227         assert(request_update >= 0);
228     }
229
230     // bypass if no handlers are registered
231     if (m_poll_nfds_shadow == 0) {
232         debugOutputExtreme(DEBUG_LEVEL_VERY_VERBOSE,
233                            "(%p, %s) bypass iterate since no handlers to poll\n",
234                            this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"));
235         usleep(m_poll_timeout * 1000);
236         return true;
237     }
238
239     // FIXME: what can happen is that poll() returns, but not all clients are
240     // ready. there might be some busy waiting behavior that still has to be solved.
241
242     // setup the poll here
243     // we should prevent a poll() where no events are specified, since that will only time-out
244     bool no_one_to_poll = true;
245     while(no_one_to_poll) {
246         for (i = 0; i < m_poll_nfds_shadow; i++) {
247             short events = 0;
248             IsoHandler *h = m_IsoHandler_map_shadow[i];
249             // we should only poll on a transmit handler
250             // that has a client that is ready to send
251             // something. Otherwise it will end up in
252             // busy wait looping since the packet function
253             // will defer processing (also avoids the
254             // AGAIN problem)
255             if (h->canIterateClient()) {
256                 events = POLLIN | POLLPRI;
257                 no_one_to_poll = false;
258             }
259             m_poll_fds_shadow[i].events = events;
260         }
261
262         if(no_one_to_poll) {
263             debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
264                         "(%p, %s) No one to poll, waiting for something to happen\n",
265                         this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"));
266             // wait for something to happen
267             switch(waitForActivity()) {
268                 case IsoTask::eAR_Error:
269                     debugError("Error while waiting for activity\n");
270                     return false;
271                 case IsoTask::eAR_Interrupted:
272                     // FIXME: what to do here?
273                     debugWarning("Interrupted while waiting for activity\n");
274                     break;
275                 case IsoTask::eAR_Timeout:
276                     // FIXME: what to do here?
277                     debugWarning("Timeout while waiting for activity\n");
278                     no_one_to_poll = false; // exit the loop to be able to detect failing handlers
279                     break;
280                 case IsoTask::eAR_Activity:
281                     // do nothing
282                     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
283                                        "(%p, %s) something happened\n",
284                                        this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"));
285                     break;
286             }
287         }
288     }
289
290     // Use a shadow map of the fd's such that we don't have to update
291     // the fd map everytime we run poll().
292     err = poll (m_poll_fds_shadow, m_poll_nfds_shadow, m_poll_timeout);
293     uint32_t ctr_at_poll_return = m_manager.get1394Service().getCycleTimer();
294
295     if (err < 0) {
296         if (errno == EINTR) {
297             debugOutput(DEBUG_LEVEL_VERBOSE, "Ignoring poll return due to signal\n");
298             return true;
299         }
300         debugFatal("poll error: %s\n", strerror (errno));
301         m_running = false;
302         return false;
303     }
304
305     // find handlers that have died
306     uint64_t ctr_at_poll_return_ticks = CYCLE_TIMER_TO_TICKS(ctr_at_poll_return);
307     bool handler_died = false;
308     for (i = 0; i < m_poll_nfds_shadow; i++) {
309         // figure out if a handler has died
310
311         // this is the time of the last packet we saw in the iterate() handler
312         uint32_t last_packet_seen = m_IsoHandler_map_shadow[i]->getLastPacketTime();
313         if (last_packet_seen == 0xFFFFFFFF) {
314             // this was not iterated yet, so can't be dead
315             debugOutput(DEBUG_LEVEL_VERY_VERBOSE,
316                         "(%p, %s) handler %d didn't see any packets yet\n",
317                         this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"), i);
318             continue;
319         }
320
321         uint64_t last_packet_seen_ticks = CYCLE_TIMER_TO_TICKS(last_packet_seen);
322         // we use a relatively large value to distinguish between "death" and xrun
323         int64_t max_diff_ticks = TICKS_PER_SECOND * 2;
324         int64_t measured_diff_ticks = diffTicks(ctr_at_poll_return_ticks, last_packet_seen_ticks);
325
326         debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
327                            "(%p, %s) check handler %d: diff = %"PRId64", max = %"PRId64", now: %08X, last: %08X\n",
328                            this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"),
329                            i, measured_diff_ticks, max_diff_ticks, ctr_at_poll_return, last_packet_seen);
330         if(measured_diff_ticks > max_diff_ticks) {
331             debugFatal("(%p, %s) Handler died: now: %08X, last: %08X, diff: %"PRId64" (max: %"PRId64")\n",
332                        this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"),
333                        ctr_at_poll_return, last_packet_seen, measured_diff_ticks, max_diff_ticks);
334             m_IsoHandler_map_shadow[i]->notifyOfDeath();
335             handler_died = true;
336         }
337     }
338
339     if(handler_died) {
340         m_running = false;
341         return false; // one or more handlers have died
342     }
343
344     // iterate the handlers
345     for (i = 0; i < m_poll_nfds_shadow; i++) {
346         #ifdef DEBUG
347         if(m_poll_fds_shadow[i].revents) {
348             debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
349                         "(%p, %s) received events: %08X for (%d/%d, %p, %s)\n",
350                         this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"),
351                         m_poll_fds_shadow[i].revents,
352                         i, m_poll_nfds_shadow,
353                         m_IsoHandler_map_shadow[i],
354                         m_IsoHandler_map_shadow[i]->getTypeString());
355         }
356         #endif
357
358         // if we get here, it means two things:
359         // 1) the kernel can accept or provide packets (poll returned POLLIN)
360         // 2) the client can provide or accept packets (since we enabled polling)
361         if(m_poll_fds_shadow[i].revents & (POLLIN)) {
362             m_IsoHandler_map_shadow[i]->iterate(ctr_at_poll_return);
363         } else {
364             // there might be some error condition
365             if (m_poll_fds_shadow[i].revents & POLLERR) {
366                 debugWarning("(%p) error on fd for %d\n", this, i);
367             }
368             if (m_poll_fds_shadow[i].revents & POLLHUP) {
369                 debugWarning("(%p) hangup on fd for %d\n", this, i);
370             }
371         }
372     }
373     return true;
374 }
375
376 enum IsoTask::eActivityResult
377 IsoTask::waitForActivity()
378 {
379     debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
380                        "(%p, %s) waiting for activity\n",
381                        this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"));
382     struct timespec ts;
383     int result;
384
385     if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
386         debugError("clock_gettime failed\n");
387         return eAR_Error;
388     }
389
390     ts.tv_nsec += m_activity_wait_timeout_nsec;
391     while(ts.tv_nsec >= 1000000000LL) {
392         ts.tv_sec += 1;
393         ts.tv_nsec -= 1000000000LL;
394     }
395
396     result = sem_timedwait(&m_activity_semaphore, &ts);
397
398     if(result != 0) {
399         if (errno == ETIMEDOUT) {
400             debugOutput(DEBUG_LEVEL_VERBOSE,
401                         "(%p) sem_timedwait() timed out (result=%d)\n",
402                         this, result);
403             return eAR_Timeout;
404         } else if (errno == EINTR) {
405             debugOutput(DEBUG_LEVEL_VERBOSE,
406                         "(%p) sem_timedwait() interrupted by signal (result=%d)\n",
407                         this, result);
408             return eAR_Interrupted;
409         } else if (errno == EINVAL) {
410             debugError("(%p) sem_timedwait error (result=%d errno=EINVAL)\n",
411                         this, result);
412             debugError("(%p) timeout_nsec=%lld ts.sec=%"PRId64" ts.nsec=%"PRId64"\n",
413                        this, m_activity_wait_timeout_nsec,
414                        (int64_t)ts.tv_sec, (int64_t)ts.tv_nsec);
415             return eAR_Error;
416         } else {
417             debugError("(%p) sem_timedwait error (result=%d errno=%d)\n",
418                         this, result, errno);
419             debugError("(%p) timeout_nsec=%lld ts.sec=%"PRId64" ts.nsec=%"PRId64"\n",
420                        this, m_activity_wait_timeout_nsec,
421                        (int64_t)ts.tv_sec, (int64_t)ts.tv_nsec);
422             return eAR_Error;
423         }
424     }
425
426     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE,
427                 "(%p, %s) got activity\n",
428                 this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"));
429     return eAR_Activity;
430 }
431
432 void
433 IsoTask::signalActivity()
434 {
435     // signal the activity cond var
436     sem_post(&m_activity_semaphore);
437     debugOutput(DEBUG_LEVEL_ULTRA_VERBOSE,
438                 "(%p, %s) activity\n",
439                 this, (m_handlerType == IsoHandler::eHT_Transmit? "Transmit": "Receive"));
440 }
441
442 void IsoTask::setVerboseLevel(int i) {
443     setDebugLevel(i);
444     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting verbose level to %d...\n", i );
445 }
446
447 // -- the ISO handler manager -- //
448 IsoHandlerManager::IsoHandlerManager(Ieee1394Service& service)
449    : m_State(E_Created)
450    , m_service( service )
451    , m_realtime(false), m_priority(0)
452    , m_IsoThreadTransmit ( NULL )
453    , m_IsoTaskTransmit ( NULL )
454    , m_IsoThreadReceive ( NULL )
455    , m_IsoTaskReceive ( NULL )
456 {
457 }
458
459 IsoHandlerManager::IsoHandlerManager(Ieee1394Service& service, bool run_rt, int rt_prio)
460    : m_State(E_Created)
461    , m_service( service )
462    , m_realtime(run_rt), m_priority(rt_prio)
463    , m_IsoThreadTransmit ( NULL )
464    , m_IsoTaskTransmit ( NULL )
465    , m_IsoThreadReceive ( NULL )
466    , m_IsoTaskReceive ( NULL )
467 {
468 }
469
470 IsoHandlerManager::~IsoHandlerManager()
471 {
472     stopHandlers();
473     pruneHandlers();
474     if(m_IsoHandlers.size() > 0) {
475         debugError("Still some handlers in use\n");
476     }
477     if (m_IsoThreadTransmit) {
478         m_IsoThreadTransmit->Stop();
479         delete m_IsoThreadTransmit;
480     }
481     if (m_IsoThreadReceive) {
482         m_IsoThreadReceive->Stop();
483         delete m_IsoThreadReceive;
484     }
485     if (m_IsoTaskTransmit) {
486         delete m_IsoTaskTransmit;
487     }
488     if (m_IsoTaskReceive) {
489         delete m_IsoTaskReceive;
490     }
491 }
492
493 bool
494 IsoHandlerManager::handleBusReset()
495 {
496     debugOutput( DEBUG_LEVEL_NORMAL, "bus reset...\n");
497     // A few things can happen on bus reset:
498     // 1) no devices added/removed => streams are still valid, but might have to be restarted
499     // 2) a device was removed => some streams become invalid
500     // 3) a device was added => same as 1, new device is ignored
501     if (!m_IsoTaskTransmit) {
502         debugError("No xmit task\n");
503         return false;
504     }
505     if (!m_IsoTaskReceive) {
506         debugError("No receive task\n");
507         return false;
508     }
509     if (!m_IsoTaskTransmit->handleBusReset()) {
510         debugWarning("could no handle busreset on xmit\n");
511     }
512     if (!m_IsoTaskReceive->handleBusReset()) {
513         debugWarning("could no handle busreset on recv\n");
514     }
515     return true;
516 }
517
518 void
519 IsoHandlerManager::requestShadowMapUpdate()
520 {
521     if(m_IsoTaskTransmit) m_IsoTaskTransmit->requestShadowMapUpdate();
522     if(m_IsoTaskReceive) m_IsoTaskReceive->requestShadowMapUpdate();
523 }
524
525 bool
526 IsoHandlerManager::setThreadParameters(bool rt, int priority) {
527     debugOutput( DEBUG_LEVEL_VERBOSE, "(%p) switch to: (rt=%d, prio=%d)...\n", this, rt, priority);
528     if (priority > THREAD_MAX_RTPRIO) priority = THREAD_MAX_RTPRIO; // cap the priority
529     if (priority < THREAD_MIN_RTPRIO) priority = THREAD_MIN_RTPRIO; // cap the priority
530     m_realtime = rt;
531     m_priority = priority;
532
533     // grab the options from the parent
534     Util::Configuration *config = m_service.getConfiguration();
535     int ihm_iso_prio_increase = ISOHANDLERMANAGER_ISO_PRIO_INCREASE;
536     int ihm_iso_prio_increase_xmit = ISOHANDLERMANAGER_ISO_PRIO_INCREASE_XMIT;
537     int ihm_iso_prio_increase_recv = ISOHANDLERMANAGER_ISO_PRIO_INCREASE_RECV;
538     if(config) {
539         config->getValueForSetting("ieee1394.isomanager.prio_increase", ihm_iso_prio_increase);
540         config->getValueForSetting("ieee1394.isomanager.prio_increase_xmit", ihm_iso_prio_increase_xmit);
541         config->getValueForSetting("ieee1394.isomanager.prio_increase_recv", ihm_iso_prio_increase_recv);
542     }
543
544     if (m_IsoThreadTransmit) {
545         if (m_realtime) {
546             m_IsoThreadTransmit->AcquireRealTime(m_priority
547                                                  + ihm_iso_prio_increase
548                                                  + ihm_iso_prio_increase_xmit);
549         } else {
550             m_IsoThreadTransmit->DropRealTime();
551         }
552     }
553     if (m_IsoThreadReceive) {
554         if (m_realtime) {
555             m_IsoThreadReceive->AcquireRealTime(m_priority
556                                                 + ihm_iso_prio_increase
557                                                 + ihm_iso_prio_increase_recv);
558         } else {
559             m_IsoThreadReceive->DropRealTime();
560         }
561     }
562
563     return true;
564 }
565
566 bool IsoHandlerManager::init()
567 {
568     debugOutput( DEBUG_LEVEL_VERBOSE, "Initializing ISO manager %p...\n", this);
569     // check state
570     if(m_State != E_Created) {
571         debugError("Manager already initialized...\n");
572         return false;
573     }
574
575     // grab the options from the parent
576     Util::Configuration *config = m_service.getConfiguration();
577     int ihm_iso_prio_increase = ISOHANDLERMANAGER_ISO_PRIO_INCREASE;
578     int ihm_iso_prio_increase_xmit = ISOHANDLERMANAGER_ISO_PRIO_INCREASE_XMIT;
579     int ihm_iso_prio_increase_recv = ISOHANDLERMANAGER_ISO_PRIO_INCREASE_RECV;
580     int64_t isotask_activity_timeout_usecs = ISOHANDLERMANAGER_ISO_TASK_WAIT_TIMEOUT_USECS;
581     if(config) {
582         config->getValueForSetting("ieee1394.isomanager.prio_increase", ihm_iso_prio_increase);
583         config->getValueForSetting("ieee1394.isomanager.prio_increase_xmit", ihm_iso_prio_increase_xmit);
584         config->getValueForSetting("ieee1394.isomanager.prio_increase_recv", ihm_iso_prio_increase_recv);
585         config->getValueForSetting("ieee1394.isomanager.isotask_activity_timeout_usecs", isotask_activity_timeout_usecs);
586     }
587
588     // create threads to iterate our ISO handlers
589     debugOutput( DEBUG_LEVEL_VERBOSE, "Create iso thread for %p transmit...\n", this);
590     m_IsoTaskTransmit = new IsoTask( *this, IsoHandler::eHT_Transmit );
591     if(!m_IsoTaskTransmit) {
592         debugFatal("No task\n");
593         return false;
594     }
595     m_IsoTaskTransmit->setVerboseLevel(getDebugLevel());
596     m_IsoTaskTransmit->m_activity_wait_timeout_nsec = isotask_activity_timeout_usecs * 1000LL;
597     m_IsoThreadTransmit = new Util::PosixThread(m_IsoTaskTransmit, "ISOXMT", m_realtime,
598                                                 m_priority + ihm_iso_prio_increase
599                                                 + ihm_iso_prio_increase_xmit,
600                                                 PTHREAD_CANCEL_DEFERRED);
601
602     if(!m_IsoThreadTransmit) {
603         debugFatal("No thread\n");
604         return false;
605     }
606     m_IsoThreadTransmit->setVerboseLevel(getDebugLevel());
607
608     debugOutput( DEBUG_LEVEL_VERBOSE, "Create iso thread for %p receive...\n", this);
609     m_IsoTaskReceive = new IsoTask( *this, IsoHandler::eHT_Receive );
610     if(!m_IsoTaskReceive) {
611         debugFatal("No task\n");
612         return false;
613     }
614     m_IsoTaskReceive->setVerboseLevel(getDebugLevel());
615     m_IsoThreadReceive = new Util::PosixThread(m_IsoTaskReceive, "ISORCV", m_realtime,
616                                                m_priority + ihm_iso_prio_increase
617                                                + ihm_iso_prio_increase_recv,
618                                                PTHREAD_CANCEL_DEFERRED);
619
620     if(!m_IsoThreadReceive) {
621         debugFatal("No thread\n");
622         return false;
623     }
624     m_IsoThreadReceive->setVerboseLevel(getDebugLevel());
625     // register the thread with the RT watchdog
626     Util::Watchdog *watchdog = m_service.getWatchdog();
627     if(watchdog) {
628         if(!watchdog->registerThread(m_IsoThreadTransmit)) {
629             debugWarning("could not register iso transmit thread with watchdog\n");
630         }
631         if(!watchdog->registerThread(m_IsoThreadReceive)) {
632             debugWarning("could not register iso receive thread with watchdog\n");
633         }
634     } else {
635         debugWarning("could not find valid watchdog\n");
636     }
637
638     if (m_IsoThreadTransmit->Start() != 0) {
639         debugFatal("Could not start ISO Transmit thread\n");
640         return false;
641     }
642     if (m_IsoThreadReceive->Start() != 0) {
643         debugFatal("Could not start ISO Receive thread\n");
644         return false;
645     }
646
647     m_State=E_Running;
648     return true;
649 }
650
651 void
652 IsoHandlerManager::signalActivityTransmit()
653 {
654     assert(m_IsoTaskTransmit);
655     m_IsoTaskTransmit->signalActivity();
656 }
657
658 void
659 IsoHandlerManager::signalActivityReceive()
660 {
661     assert(m_IsoTaskReceive);
662     m_IsoTaskReceive->signalActivity();
663 }
664
665 bool IsoHandlerManager::registerHandler(IsoHandler *handler)
666 {
667     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
668     assert(handler);
669     handler->setVerboseLevel(getDebugLevel());
670     m_IsoHandlers.push_back(handler);
671     requestShadowMapUpdate();
672     return true;
673 }
674
675 bool IsoHandlerManager::unregisterHandler(IsoHandler *handler)
676 {
677     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
678     assert(handler);
679
680     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
681       it != m_IsoHandlers.end();
682       ++it )
683     {
684         if ( *it == handler ) {
685             m_IsoHandlers.erase(it);
686             requestShadowMapUpdate();
687             return true;
688         }
689     }
690     debugFatal("Could not find handler (%p)\n", handler);
691     return false; //not found
692 }
693
694 /**
695  * Registers an StreamProcessor with the IsoHandlerManager.
696  *
697  * If nescessary, an IsoHandler is created to handle this stream.
698  * Once an StreamProcessor is registered to the handler, it will be included
699  * in the ISO streaming cycle (i.e. receive/transmit of it will occur).
700  *
701  * @param stream the stream to register
702  * @return true if registration succeeds
703  *
704  * \todo : currently there is a one-to-one mapping
705  *        between streams and handlers, this is not ok for
706  *        multichannel receive
707  */
708 bool IsoHandlerManager::registerStream(StreamProcessor *stream)
709 {
710     debugOutput( DEBUG_LEVEL_VERBOSE, "Registering %s stream %p\n", stream->getTypeString(), stream);
711     assert(stream);
712
713     IsoHandler* h = NULL;
714
715     // make sure the stream isn't already attached to a handler
716     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
717       it != m_IsoHandlers.end();
718       ++it )
719     {
720         if((*it)->isStreamRegistered(stream)) {
721             debugError( "stream already registered!\n");
722             return false;
723         }
724     }
725
726     // clean up all handlers that aren't used
727     pruneHandlers();
728
729     // allocate a handler for this stream
730     if (stream->getType()==StreamProcessor::ePT_Receive) {
731         // grab the options from the parent
732         Util::Configuration *config = m_service.getConfiguration();
733         int receive_mode_setting = DEFAULT_ISO_RECEIVE_MODE;
734         int bufferfill_mode_threshold = BUFFERFILL_MODE_THRESHOLD;
735         int min_interrupts_per_period = MINIMUM_INTERRUPTS_PER_PERIOD;
736         int max_nb_buffers_recv = MAX_RECV_NB_BUFFERS;
737         int min_packetsize_recv = MIN_RECV_PACKET_SIZE;
738         if(config) {
739             config->getValueForSetting("ieee1394.isomanager.iso_receive_mode", receive_mode_setting);
740             config->getValueForSetting("ieee1394.isomanager.bufferfill_mode_threshold", bufferfill_mode_threshold);
741             config->getValueForSetting("ieee1394.isomanager.min_interrupts_per_period", min_interrupts_per_period);
742             config->getValueForSetting("ieee1394.isomanager.max_nb_buffers_recv", max_nb_buffers_recv);
743             config->getValueForSetting("ieee1394.isomanager.min_packetsize_recv", min_packetsize_recv);
744         }
745
746         // setup the optimal parameters for the raw1394 ISO buffering
747         unsigned int packets_per_period = stream->getPacketsPerPeriod();
748         // reserve space for the 1394 header too (might not be necessary)
749         unsigned int max_packet_size = stream->getMaxPacketSize() + 8;
750         unsigned int page_size = getpagesize();
751
752         enum raw1394_iso_dma_recv_mode receive_mode;
753         switch(receive_mode_setting) {
754             case 0:
755                 if(packets_per_period < (unsigned)bufferfill_mode_threshold) {
756                     debugOutput( DEBUG_LEVEL_VERBOSE, "Using packet-per-buffer mode (auto) [%d, %d]\n",
757                                  packets_per_period, bufferfill_mode_threshold);
758                     receive_mode = RAW1394_DMA_PACKET_PER_BUFFER;
759                 } else {
760                     debugOutput( DEBUG_LEVEL_VERBOSE, "Using bufferfill mode (auto) [%d, %d]\n",
761                                  packets_per_period, bufferfill_mode_threshold);
762                     receive_mode = RAW1394_DMA_BUFFERFILL;
763                 }
764                 break;
765             case 1:
766                 debugOutput( DEBUG_LEVEL_VERBOSE, "Using packet-per-buffer mode (config)\n");
767                 receive_mode = RAW1394_DMA_PACKET_PER_BUFFER;
768                 break;
769             case 2:
770                 debugOutput( DEBUG_LEVEL_VERBOSE, "Using bufferfill mode (config)\n");
771                 receive_mode = RAW1394_DMA_BUFFERFILL;
772                 break;
773             default: debugWarning("Bogus receive mode setting in config: %d\n", receive_mode_setting);
774         }
775
776         // Ensure we don't request a packet size bigger than the
777         // kernel-enforced maximum which is currently 1 page.
778         // NOTE: PP: this is not really true AFAICT
779         if (max_packet_size > page_size) {
780             debugError("max packet size (%u) > page size (%u)\n", max_packet_size, page_size);
781             return false;
782         }
783         if (max_packet_size < (unsigned)min_packetsize_recv) {
784             debugError("min packet size (%u) < MIN_RECV_PACKET_SIZE (%u), using min value\n",
785                        max_packet_size, min_packetsize_recv);
786             max_packet_size = min_packetsize_recv;
787         }
788
789         // apparently a too small value causes issues too
790         if(max_packet_size < 200) max_packet_size = 200;
791
792         // the interrupt/wakeup interval prediction of raw1394 is a mess...
793         int irq_interval = (packets_per_period-1) / min_interrupts_per_period;
794         if(irq_interval <= 0) irq_interval=1;
795
796         // the receive buffer size doesn't matter for the latency,
797         // it does seem to be confined to a certain region for correct
798         // operation. However it is not clear how many.
799         int buffers = max_nb_buffers_recv;
800
801         // ensure at least 2 hardware interrupts per ISO buffer wraparound
802         if(irq_interval > buffers/2) {
803             irq_interval = buffers/2;
804         }
805
806         // create the actual handler
807         debugOutput( DEBUG_LEVEL_VERBOSE, " creating IsoRecvHandler\n");
808         h = new IsoHandler(*this, IsoHandler::eHT_Receive,
809                            buffers, max_packet_size, irq_interval);
810
811         if(!h) {
812             debugFatal("Could not create IsoRecvHandler\n");
813             return false;
814         }
815
816         h->setReceiveMode(receive_mode);
817
818     } else if (stream->getType()==StreamProcessor::ePT_Transmit) {
819         // grab the options from the parent
820         Util::Configuration *config = m_service.getConfiguration();
821         int min_interrupts_per_period = MINIMUM_INTERRUPTS_PER_PERIOD;
822         int max_nb_buffers_xmit = MAX_XMIT_NB_BUFFERS;
823         int max_packetsize_xmit = MAX_XMIT_PACKET_SIZE;
824         int min_packetsize_xmit = MIN_XMIT_PACKET_SIZE;
825         if(config) {
826             config->getValueForSetting("ieee1394.isomanager.min_interrupts_per_period", min_interrupts_per_period);
827             config->getValueForSetting("ieee1394.isomanager.max_nb_buffers_xmit", max_nb_buffers_xmit);
828             config->getValueForSetting("ieee1394.isomanager.max_packetsize_xmit", max_packetsize_xmit);
829             config->getValueForSetting("ieee1394.isomanager.min_packetsize_xmit", min_packetsize_xmit);
830         }
831
832         // setup the optimal parameters for the raw1394 ISO buffering
833         // reserve space for the 1394 header too (might not be necessary)
834         unsigned int max_packet_size = stream->getMaxPacketSize() + 8;
835
836         if (max_packet_size > (unsigned)max_packetsize_xmit) {
837             debugError("max packet size (%u) > MAX_XMIT_PACKET_SIZE (%u)\n",
838                        max_packet_size, max_packetsize_xmit);
839             return false;
840         }
841         if (max_packet_size < (unsigned)min_packetsize_xmit) {
842             debugError("min packet size (%u) < MIN_XMIT_PACKET_SIZE (%u), using min value\n",
843                        max_packet_size, min_packetsize_xmit);
844             max_packet_size = min_packetsize_xmit;
845         }
846
847         int buffers = max_nb_buffers_xmit;
848         unsigned int packets_per_period = stream->getPacketsPerPeriod();
849
850         int irq_interval = (packets_per_period-1) / min_interrupts_per_period;
851         if(irq_interval <= 0) irq_interval=1;
852         // ensure at least 2 hardware interrupts per ISO buffer wraparound
853         if(irq_interval > buffers/2) {
854             irq_interval = buffers/2;
855         }
856
857         debugOutput( DEBUG_LEVEL_VERBOSE, " creating IsoXmitHandler\n");
858
859         // create the actual handler
860         h = new IsoHandler(*this, IsoHandler::eHT_Transmit,
861                            buffers, max_packet_size, irq_interval);
862
863         if(!h) {
864             debugFatal("Could not create IsoXmitHandler\n");
865             return false;
866         }
867
868     } else {
869         debugFatal("Bad stream type\n");
870         return false;
871     }
872
873     h->setVerboseLevel(getDebugLevel());
874
875     // register the stream with the handler
876     if(!h->registerStream(stream)) {
877         debugFatal("Could not register receive stream with handler\n");
878         return false;
879     }
880
881     // register the handler with the manager
882     if(!registerHandler(h)) {
883         debugFatal("Could not register receive handler with manager\n");
884         return false;
885     }
886     debugOutput( DEBUG_LEVEL_VERBOSE, " registered stream (%p) with handler (%p)\n", stream, h);
887
888     m_StreamProcessors.push_back(stream);
889     debugOutput( DEBUG_LEVEL_VERBOSE, " %zd streams, %zd handlers registered\n",
890                                       m_StreamProcessors.size(), m_IsoHandlers.size());
891     return true;
892 }
893
894 bool IsoHandlerManager::unregisterStream(StreamProcessor *stream)
895 {
896     debugOutput( DEBUG_LEVEL_VERBOSE, "Unregistering %s stream %p\n", stream->getTypeString(), stream);
897     assert(stream);
898
899     // make sure the stream isn't attached to a handler anymore
900     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
901       it != m_IsoHandlers.end();
902       ++it )
903     {
904         if((*it)->isStreamRegistered(stream)) {
905             if(!(*it)->unregisterStream(stream)) {
906                 debugOutput( DEBUG_LEVEL_VERBOSE, " could not unregister stream (%p) from handler (%p)...\n",stream,*it);
907                 return false;
908             }
909             debugOutput( DEBUG_LEVEL_VERBOSE, " unregistered stream (%p) from handler (%p)...\n",stream,*it);
910         }
911     }
912
913     // clean up all handlers that aren't used
914     pruneHandlers();
915
916     // remove the stream from the registered streams list
917     for ( StreamProcessorVectorIterator it = m_StreamProcessors.begin();
918       it != m_StreamProcessors.end();
919       ++it )
920     {
921         if ( *it == stream ) {
922             m_StreamProcessors.erase(it);
923             debugOutput( DEBUG_LEVEL_VERBOSE, " deleted stream (%p) from list...\n", *it);
924             return true;
925         }
926     }
927     return false; //not found
928 }
929
930 /**
931  * @brief unregister a handler from the manager
932  * @note called without the lock held.
933  */
934 void IsoHandlerManager::pruneHandlers() {
935     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
936     IsoHandlerVector toUnregister;
937
938     // find all handlers that are not in use
939     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
940           it != m_IsoHandlers.end();
941           ++it )
942     {
943         if(!((*it)->inUse())) {
944             debugOutput( DEBUG_LEVEL_VERBOSE, " handler (%p) not in use\n",*it);
945             toUnregister.push_back(*it);
946         }
947     }
948     // delete them
949     for ( IsoHandlerVectorIterator it = toUnregister.begin();
950           it != toUnregister.end();
951           ++it )
952     {
953         unregisterHandler(*it);
954
955         debugOutput( DEBUG_LEVEL_VERBOSE, " deleting handler (%p)\n",*it);
956
957         // Now the handler's been unregistered it won't be reused
958         // again.  Therefore it really needs to be formally deleted
959         // to free up the raw1394 handle.  Otherwise things fall
960         // apart after several xrun recoveries as the system runs
961         // out of resources to support all the disused but still
962         // allocated raw1394 handles.  At least this is the current
963         // theory as to why we end up with "memory allocation"
964         // failures after several Xrun recoveries.
965         delete *it;
966     }
967 }
968
969 int
970 IsoHandlerManager::getPacketLatencyForStream(Streaming::StreamProcessor *stream) {
971     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
972       it != m_IsoHandlers.end();
973       ++it )
974     {
975         if((*it)->isStreamRegistered(stream)) {
976             return (*it)->getIrqInterval();
977         }
978     }
979     debugError("Stream %p has no attached handler\n", stream);
980     return 0;
981 }
982
983 IsoHandler *
984 IsoHandlerManager::getHandlerForStream(Streaming::StreamProcessor *stream) {
985     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
986       it != m_IsoHandlers.end();
987       ++it )
988     {
989         if((*it)->isStreamRegistered(stream)) {
990             return (*it);
991         }
992     }
993     debugError("Stream %p has no attached handler\n", stream);
994     return NULL;
995 }
996
997 bool
998 IsoHandlerManager::startHandlerForStream(Streaming::StreamProcessor *stream) {
999     return startHandlerForStream(stream, -1);
1000 }
1001
1002 bool
1003 IsoHandlerManager::startHandlerForStream(Streaming::StreamProcessor *stream, int cycle) {
1004     // check state
1005     if(m_State != E_Running) {
1006         debugError("Incorrect state, expected E_Running, got %s\n", eHSToString(m_State));
1007         return false;
1008     }
1009     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
1010       it != m_IsoHandlers.end();
1011       ++it )
1012     {
1013         if((*it)->isStreamRegistered(stream)) {
1014             debugOutput( DEBUG_LEVEL_VERBOSE, " starting handler %p for stream %p\n", *it, stream);
1015             if(!(*it)->requestEnable(cycle)) {
1016                 debugOutput( DEBUG_LEVEL_VERBOSE, " could not request enable for handler %p)\n",*it);
1017                 return false;
1018             }
1019
1020             if((*it)->getType() == IsoHandler::eHT_Transmit) {
1021                 m_IsoTaskTransmit->requestShadowMapUpdate();
1022             } else {
1023                 m_IsoTaskReceive->requestShadowMapUpdate();
1024             }
1025
1026             debugOutput(DEBUG_LEVEL_VERY_VERBOSE, " requested enable for handler %p\n", *it);
1027             return true;
1028         }
1029     }
1030     debugError("Stream %p has no attached handler\n", stream);
1031     return false;
1032 }
1033
1034 bool
1035 IsoHandlerManager::stopHandlerForStream(Streaming::StreamProcessor *stream) {
1036     // check state
1037     if(m_State != E_Running) {
1038         debugError("Incorrect state, expected E_Running, got %s\n", eHSToString(m_State));
1039         return false;
1040     }
1041     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
1042       it != m_IsoHandlers.end();
1043       ++it )
1044     {
1045         if((*it)->isStreamRegistered(stream)) {
1046             debugOutput( DEBUG_LEVEL_VERBOSE, " stopping handler %p for stream %p\n", *it, stream);
1047             if(!(*it)->requestDisable()) {
1048                 debugOutput( DEBUG_LEVEL_VERBOSE, " could not request disable for handler %p\n",*it);
1049                 return false;
1050             }
1051
1052             if((*it)->getType() == IsoHandler::eHT_Transmit) {
1053                 m_IsoTaskTransmit->requestShadowMapUpdate();
1054             } else {
1055                 m_IsoTaskReceive->requestShadowMapUpdate();
1056             }
1057
1058             debugOutput(DEBUG_LEVEL_VERBOSE, " requested disable for handler %p\n", *it);
1059             return true;
1060         }
1061     }
1062     debugError("Stream %p has no attached handler\n", stream);
1063     return false;
1064 }
1065
1066 bool IsoHandlerManager::stopHandlers() {
1067     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
1068
1069     // check state
1070     if(m_State != E_Running) {
1071         debugError("Incorrect state, expected E_Running, got %s\n", eHSToString(m_State));
1072         return false;
1073     }
1074
1075     bool retval=true;
1076
1077     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
1078         it != m_IsoHandlers.end();
1079         ++it )
1080     {
1081         debugOutput( DEBUG_LEVEL_VERBOSE, "Stopping handler (%p)\n",*it);
1082
1083         if(!(*it)->requestDisable()) {
1084             debugOutput( DEBUG_LEVEL_VERBOSE, " could not request disable for handler %p\n",*it);
1085             return false;
1086         }
1087
1088         if((*it)->getType() == IsoHandler::eHT_Transmit) {
1089             m_IsoTaskTransmit->requestShadowMapUpdate();
1090         } else {
1091             m_IsoTaskReceive->requestShadowMapUpdate();
1092         }
1093
1094         debugOutput(DEBUG_LEVEL_VERBOSE, " requested disable for handler %p\n", *it);
1095     }
1096
1097     if (retval) {
1098         m_State=E_Prepared;
1099     } else {
1100         m_State=E_Error;
1101     }
1102     return retval;
1103 }
1104
1105 bool IsoHandlerManager::reset() {
1106     debugOutput( DEBUG_LEVEL_VERBOSE, "enter...\n");
1107     // check state
1108     if(m_State == E_Error) {
1109         debugFatal("Resetting from error condition not yet supported...\n");
1110         return false;
1111     }
1112     // if not in an error condition, reset means stop the handlers
1113     return stopHandlers();
1114 }
1115
1116 void IsoHandlerManager::setVerboseLevel(int i) {
1117     setDebugLevel(i);
1118     // propagate the debug level
1119     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
1120           it != m_IsoHandlers.end();
1121           ++it )
1122     {
1123         (*it)->setVerboseLevel(i);
1124     }
1125     if(m_IsoThreadTransmit) m_IsoThreadTransmit->setVerboseLevel(i);
1126     if(m_IsoTaskTransmit)   m_IsoTaskTransmit->setVerboseLevel(i);
1127     if(m_IsoThreadReceive)  m_IsoThreadReceive->setVerboseLevel(i);
1128     if(m_IsoTaskReceive)    m_IsoTaskReceive->setVerboseLevel(i);
1129     setDebugLevel(i);
1130     debugOutput( DEBUG_LEVEL_VERBOSE, "Setting verbose level to %d...\n", i );
1131 }
1132
1133 void IsoHandlerManager::dumpInfo() {
1134     #ifdef DEBUG
1135     unsigned int i=0;
1136     debugOutputShort( DEBUG_LEVEL_NORMAL, "Dumping IsoHandlerManager Stream handler information...\n");
1137     debugOutputShort( DEBUG_LEVEL_NORMAL, " State: %d\n",(int)m_State);
1138
1139     for ( IsoHandlerVectorIterator it = m_IsoHandlers.begin();
1140           it != m_IsoHandlers.end();
1141           ++it )
1142     {
1143         debugOutputShort( DEBUG_LEVEL_NORMAL, " IsoHandler %d (%p)\n",i++,*it);
1144         (*it)->dumpInfo();
1145     }
1146     #endif
1147 }
1148
1149 const char *
1150 IsoHandlerManager::eHSToString(enum eHandlerStates s) {
1151     switch (s) {
1152         default: return "Invalid";
1153         case E_Created: return "Created";
1154         case E_Prepared: return "Prepared";
1155         case E_Running: return "Running";
1156         case E_Error: return "Error";
1157     }
1158 }
Note: See TracBrowser for help on using the browser.