root/trunk/libffado/src/libstreaming/generic/StreamProcessor.h

Revision 1005, 18.5 kB (checked in by ppalmers, 13 years ago)

Improve thread synchronisation. Switch back to separate threads for transmit and
receive since it is not possible to statically schedule things properly. One
of the threads (i.e. the client thread) is out of our control, hence it's
execution can't be controlled. Using separate threads and correct priorities
will shift this problem to the OS. Note that the priority of the packet
receive thread should be lower than the client thread (such that the client
thread is woken ASAP), and the priority of the transmit thread should be
higher than the client thread (such that packets are queued ASAP).
Extra benefit: multi-cores are used.

Some other startup improvements.

Line 
1 /*
2  * Copyright (C) 2005-2008 by Pieter Palmers
3  *
4  * This file is part of FFADO
5  * FFADO = Free Firewire (pro-)audio drivers for linux
6  *
7  * FFADO is based upon FreeBoB.
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 2 of the License, or
12  * (at your option) version 3 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  */
23
24 #ifndef __FFADO_STREAMPROCESSOR__
25 #define __FFADO_STREAMPROCESSOR__
26
27 #include "ffadodevice.h"
28
29 #include "PortManager.h"
30
31 #include "libutil/StreamStatistics.h"
32 #include "libutil/TimestampedBuffer.h"
33 #include "libutil/OptionContainer.h"
34
35 #include "debugmodule/debugmodule.h"
36
37 #include <pthread.h>
38
39 class Ieee1394Service;
40 class IsoHandlerManager;
41
42 namespace Streaming {
43
44     class StreamProcessorManager;
45 /*!
46 \brief Class providing a generic interface for Stream Processors
47
48  A stream processor multiplexes or demultiplexes an ISO stream into a
49  collection of ports. This class should be subclassed, and the relevant
50  functions should be overloaded.
51
52 */
53 class StreamProcessor : public PortManager,
54                         public Util::TimestampedBufferClient,
55                         public Util::OptionContainer
56 {
57 public:
58     ///> the streamprocessor type
59     enum eProcessorType {
60         ePT_Receive,
61         ePT_Transmit
62     };
63     ///> returns the type of the streamprocessor
64     virtual enum eProcessorType getType() { return m_processor_type; };
65 private:
66     // this can only be set by the constructor
67     enum eProcessorType m_processor_type;
68     // pretty printing
69     const char *ePTToString(enum eProcessorType);
70 protected:
71     ///> the state the streamprocessor is in
72     enum eProcessorState {
73         ePS_Invalid,
74         ePS_Created,
75         ePS_Stopped,
76         ePS_WaitingForStream,
77         ePS_DryRunning,
78         ePS_WaitingForStreamEnable,
79         ePS_Running,
80         ePS_WaitingForStreamDisable,
81     };
82
83     ///> set the SP state to a specific value
84     void setState(enum eProcessorState);
85     ///> get the SP state
86     enum eProcessorState getState() {return m_state;};
87 private:
88     enum eProcessorState m_state;
89     // state switching
90     enum eProcessorState m_next_state;
91     unsigned int m_cycle_to_switch_state;
92     bool updateState();
93     // pretty printing
94     const char *ePSToString(enum eProcessorState);
95
96     bool doStop();
97     bool doWaitForRunningStream();
98     bool doDryRunning();
99     bool doWaitForStreamEnable();
100     bool doRunning();
101     bool doWaitForStreamDisable();
102
103     bool scheduleStateTransition(enum eProcessorState state, uint64_t time_instant);
104     bool waitForState(enum eProcessorState state, unsigned int timeout);
105
106 public: //--- state stuff
107     bool isRunning()
108             {return m_state == ePS_Running;};
109     bool isDryRunning()
110             {return m_state == ePS_DryRunning;};
111     bool isStopped()
112             {return m_state == ePS_Stopped;};
113     bool isWaitingForStream()
114             {return m_state == ePS_WaitingForStream;};
115
116     // these schedule and wait for the state transition
117     bool startDryRunning(int64_t time_to_start_at);
118     bool startRunning(int64_t time_to_start_at);
119     bool stopDryRunning(int64_t time_to_stop_at);
120     bool stopRunning(int64_t time_to_stop_at);
121
122     // these only schedule the transition
123     bool scheduleStartDryRunning(int64_t time_to_start_at);
124     bool scheduleStartRunning(int64_t time_to_start_at);
125     bool scheduleStopDryRunning(int64_t time_to_stop_at);
126     bool scheduleStopRunning(int64_t time_to_stop_at);
127
128     // the main difference between init and prepare is that when prepare is called,
129     // the SP is registered to a manager (FIXME: can't it be called by the manager?)
130     bool init();
131     bool prepare();
132
133     void handleBusReset();
134
135 public: // constructor/destructor
136     StreamProcessor(FFADODevice &parent, enum eProcessorType type);
137     virtual ~StreamProcessor();
138 protected:
139     FFADODevice&                m_Parent;
140     Ieee1394Service&            m_1394service;
141     IsoHandlerManager&          m_IsoHandlerManager;
142     StreamProcessorManager&     m_StreamProcessorManager;
143     unsigned int                m_local_node_id;
144
145 public: // the public receive/transmit functions
146     // the transmit interface accepts frames and provides packets
147     // implement these for a transmit SP
148     // leave default for a receive SP
149
150     // the receive interface accepts packets and provides frames
151     // these are implemented by the parent SP
152     enum raw1394_iso_disposition
153         putPacket(unsigned char *data, unsigned int length,
154                   unsigned char channel, unsigned char tag, unsigned char sy,
155                   unsigned int cycle, unsigned int dropped, unsigned int skipped);
156
157     enum raw1394_iso_disposition
158     getPacket(unsigned char *data, unsigned int *length,
159                 unsigned char *tag, unsigned char *sy,
160                 int cycle, unsigned int dropped, unsigned int skipped, unsigned int max_length);
161
162     bool getFrames(unsigned int nbframes, int64_t ts); ///< transfer the buffer contents to the client
163     bool putFrames(unsigned int nbframes, int64_t ts); ///< transfer the client contents to the buffer
164
165     bool canProducePacket();
166     bool canProducePeriod();
167     bool canProduce(unsigned int nframes);
168
169     bool canConsumePacket();
170     bool canConsumePeriod();
171     bool canConsume(unsigned int nframes);
172
173 public:
174     /**
175      * @brief drop nframes from the internal buffer as if they were transferred to the client side
176      *
177      * Gets nframes of frames from the buffer as done by getFrames(), but does not transfer them
178      * to the client side. Instead they are discarded.
179      *
180      * @param nframes number of frames
181      * @return true if the operation was successful
182      */
183     bool dropFrames(unsigned int nframes, int64_t ts);
184
185     /**
186      * @brief put silence frames into the internal buffer
187      *
188      * Puts nframes of frames into the buffer as done by putFrames(), but does not transfer them
189      * from the client side. Instead, silent frames are used.
190      *
191      * @param nframes number of frames
192      * @return true if the operation was successful
193      */
194     bool putSilenceFrames(unsigned int nbframes, int64_t ts);
195
196     /**
197      * @brief Shifts the stream with the specified number of frames
198      *
199      * Used to align several streams to each other. It comes down to
200      * making sure the head timestamp corresponds to the timestamp of
201      * one master stream
202      *
203      * @param nframes the number of frames to shift
204      * @return true if successful
205      */
206     bool shiftStream(int nframes);
207
208     /**
209      * @brief tries to fill/sink the stream as far as possible
210      */
211     void flush();
212
213 protected: // the helper receive/transmit functions
214     enum eChildReturnValue {
215         eCRV_OK,
216         eCRV_Invalid,
217         eCRV_Packet,
218         eCRV_EmptyPacket,
219         eCRV_XRun,
220         eCRV_Again,
221         eCRV_Defer,
222     };
223     // to be implemented by the children
224     // the following methods are to be implemented by receive SP subclasses
225     virtual enum eChildReturnValue processPacketHeader(unsigned char *data, unsigned int length,
226                                      unsigned char channel, unsigned char tag,
227                                      unsigned char sy, unsigned int cycle,
228                                      unsigned int dropped)
229         {debugWarning("call not allowed\n"); return eCRV_Invalid;};
230     virtual enum eChildReturnValue processPacketData(unsigned char *data, unsigned int length,
231                                    unsigned char channel, unsigned char tag,
232                                    unsigned char sy, unsigned int cycle,
233                                    unsigned int dropped)
234         {debugWarning("call not allowed\n"); return eCRV_Invalid;};
235     virtual bool processReadBlock(char *data, unsigned int nevents, unsigned int offset)
236         {debugWarning("call not allowed\n"); return false;};
237
238     // the following methods are to be implemented by transmit SP subclasses
239     virtual enum eChildReturnValue generatePacketHeader(unsigned char *data, unsigned int *length,
240                                       unsigned char *tag, unsigned char *sy,
241                                       int cycle, unsigned int dropped,
242                                       unsigned int max_length)
243         {debugWarning("call not allowed\n"); return eCRV_Invalid;};
244     virtual enum eChildReturnValue generatePacketData(unsigned char *data, unsigned int *length,
245                                     unsigned char *tag, unsigned char *sy,
246                                     int cycle, unsigned int dropped,
247                                     unsigned int max_length)
248         {debugWarning("call not allowed\n"); return eCRV_Invalid;};
249     virtual enum eChildReturnValue generateEmptyPacketHeader(unsigned char *data, unsigned int *length,
250                                             unsigned char *tag, unsigned char *sy,
251                                             int cycle, unsigned int dropped,
252                                             unsigned int max_length)
253         {debugWarning("call not allowed\n"); return eCRV_Invalid;};
254     virtual enum eChildReturnValue generateEmptyPacketData(unsigned char *data, unsigned int *length,
255                                           unsigned char *tag, unsigned char *sy,
256                                           int cycle, unsigned int dropped,
257                                           unsigned int max_length)
258         {debugWarning("call not allowed\n"); return eCRV_Invalid;};
259     virtual enum eChildReturnValue generateSilentPacketHeader(unsigned char *data, unsigned int *length,
260                                             unsigned char *tag, unsigned char *sy,
261                                             int cycle, unsigned int dropped,
262                                             unsigned int max_length)
263         {debugWarning("call not allowed\n"); return eCRV_Invalid;};
264     virtual enum eChildReturnValue generateSilentPacketData(unsigned char *data, unsigned int *length,
265                                           unsigned char *tag, unsigned char *sy,
266                                           int cycle, unsigned int dropped,
267                                           unsigned int max_length)
268         {debugWarning("call not allowed\n"); return eCRV_Invalid;};
269     virtual bool processWriteBlock(char *data, unsigned int nevents, unsigned int offset)
270         {debugWarning("call not allowed\n"); return false;};
271     virtual bool transmitSilenceBlock(char *data, unsigned int nevents, unsigned int offset)
272         {debugWarning("call not allowed\n"); return false;};
273 protected: // some generic helpers
274     int provideSilenceToPort(Port *p, unsigned int offset, unsigned int nevents);
275     bool provideSilenceBlock(unsigned int nevents, unsigned int offset);
276
277 private:
278     bool getFramesDry(unsigned int nbframes, int64_t ts);
279     bool getFramesWet(unsigned int nbframes, int64_t ts);
280     bool putFramesDry(unsigned int nbframes, int64_t ts);
281     bool putFramesWet(unsigned int nbframes, int64_t ts);
282
283     bool transferSilence(unsigned int size);
284
285 public:
286     // move to private?
287     bool xrunOccurred() { return m_in_xrun; };
288     void handlerDied();
289
290 // the ISO interface (can we get rid of this?)
291 public:
292     int getChannel() {return m_channel;};
293     bool setChannel(int c)
294         {m_channel = c; return true;};
295
296     virtual unsigned int getNbPacketsIsoXmitBuffer();
297     virtual unsigned int getPacketsPerPeriod();
298     virtual unsigned int getMaxPacketSize() = 0;
299 private:
300     int m_channel;
301
302 protected: // FIXME: move to private
303     uint64_t m_dropped; /// FIXME:debug
304     uint64_t m_last_dropped; /// FIXME:debug
305     int m_last_good_cycle; /// FIXME:debug
306     uint64_t m_last_timestamp; /// last timestamp (in ticks)
307 private:
308     uint64_t m_last_timestamp2; /// last timestamp (in ticks)
309 protected:
310     bool m_correct_last_timestamp;
311     uint64_t m_last_timestamp_at_period_ticks; // FIXME: still used?
312
313 //--- data buffering and accounting
314 public:
315     void getBufferHeadTimestamp ( ffado_timestamp_t *ts, signed int *fc )
316         {m_data_buffer->getBufferHeadTimestamp(ts, fc);};
317     void getBufferTailTimestamp ( ffado_timestamp_t *ts, signed int *fc )
318         {m_data_buffer->getBufferTailTimestamp(ts, fc);};
319
320     void setBufferTailTimestamp ( ffado_timestamp_t new_timestamp )
321         {m_data_buffer->setBufferTailTimestamp(new_timestamp);};
322     void setBufferHeadTimestamp ( ffado_timestamp_t new_timestamp )
323         {m_data_buffer->setBufferHeadTimestamp(new_timestamp);};
324 protected:
325     Util::TimestampedBuffer *m_data_buffer;
326     // the scratch buffer is temporary buffer space that can be
327     // used by any function. It's pre-allocated when the SP is created.
328     // the purpose is to avoid allocation of memory (or heap/stack) in
329     // an RT context
330     byte_t*         m_scratch_buffer;
331     size_t          m_scratch_buffer_size_bytes;
332
333 protected:
334     // frame counter & sync stuff
335     public:
336         /**
337          * @brief Can this StreamProcessor handle a transfer of nframes frames?
338          *
339          * this function indicates if the streamprocessor can handle a transfer of
340          * nframes frames. It is used to detect underruns-to-be.
341          *
342          * @param nframes number of frames
343          * @return true if the StreamProcessor can handle this amount of frames
344          *         false if it can't
345          */
346         bool canClientTransferFrames(unsigned int nframes);
347
348         /**
349          * \brief return the time until the next period boundary should be signaled (in microseconds)
350          *
351          * Return the time until the next period boundary signal. If this StreamProcessor
352          * is the current synchronization source, this function is called to
353          * determine when a buffer transfer can be made. When this value is
354          * smaller than 0, a period boundary is assumed to be crossed, hence a
355          * transfer can be made.
356          *
357          * \return the time in usecs
358          */
359         int64_t getTimeUntilNextPeriodSignalUsecs();
360         /**
361          * \brief return the time of the next period boundary (in microseconds)
362          *
363          * Returns the time of the next period boundary, in microseconds. The
364          * goal of this function is to determine the exact point of the period
365          * boundary. This is assumed to be the point at which the buffer transfer should
366          * take place, meaning that it can be used as a reference timestamp for transmitting
367          * StreamProcessors
368          *
369          * \return the time in usecs
370          */
371         uint64_t getTimeAtPeriodUsecs();
372
373         /**
374          * \brief return the time of the next period boundary (in internal units)
375          *
376          * The same as getTimeAtPeriodUsecs() but in internal units.
377          *
378          * @return the time in internal units
379          */
380         uint64_t getTimeAtPeriod();
381
382         uint64_t getTimeNow(); // FIXME: should disappear
383
384
385         /**
386          * Returns the sync delay. This is the time a syncsource
387          * delays a period signal, e.g. to cope with buffering.
388          * @return the sync delay
389          */
390         unsigned int getSyncDelay() {return m_sync_delay;};
391         /**
392          * sets the sync delay
393          * @param d sync delay
394          */
395         void setSyncDelay(unsigned int d);
396
397         /**
398          * @brief get the maximal frame latency
399          *
400          * The maximum frame latency is the maximum time that will elapse
401          * between the frame being received by the 1394 stack, and the moment this
402          * frame is presented to the StreamProcessor.
403          *
404          * For transmit SP's this is the maximum time that a frame is requested by
405          * the handler ahead of the time the frame is intended to be transmitted.
406          *
407          * This is useful to figure out how longer than the actual reception time
408          * we have to wait before trying to read the frame from the SP.
409          *
410          * @return maximal frame latency
411          */
412         int getMaxFrameLatency();
413
414         float getTicksPerFrame();
415         void setTicksPerFrame(float tpf);
416
417         int getLastCycle() {return m_last_cycle;};
418
419         int getBufferFill();
420
421         // Child implementation interface
422         /**
423         * @brief prepare the child SP
424         * @return true if successful, false otherwise
425         * @pre the m_manager pointer points to a valid manager
426         * @post getEventsPerFrame() returns the correct value
427         * @post getEventSize() returns the correct value
428         * @post getUpdatePeriod() returns the correct value
429         * @post processPacketHeader(...) can be called
430         * @post processPacketData(...) can be called
431         */
432         virtual bool prepareChild() = 0;
433         /**
434          * @brief get the number of events contained in one frame
435          * @return the number of events contained in one frame
436          */
437         virtual unsigned int getEventsPerFrame() = 0;
438
439         /**
440          * @brief get the size of one frame in bytes
441          * @return the size of one frame in bytes
442          */
443         virtual unsigned int getEventSize() = 0;
444
445         /**
446          * @brief get the nominal number of frames in a packet
447          *
448          * This is the amount of frames that is nominally present
449          * in one packet. It is recommended that in the receive handler
450          * you write this amount of frames when a valid packet has
451          * been received. (although this is not mandatory)
452          *
453          * @return the nominal number of frames in a packet
454          */
455         virtual unsigned int getNominalFramesPerPacket() = 0;
456
457         /**
458          * @brief get the nominal number of packets needed for a certain amount of frames
459          * @return the nominal number of packet necessary
460          */
461         virtual unsigned int getNominalPacketsNeeded(unsigned int nframes);
462
463     protected:
464         float m_ticks_per_frame;
465         int m_last_cycle;
466         unsigned int m_sync_delay;
467     private:
468         bool m_in_xrun;
469
470 public:
471     // debug stuff
472     virtual void dumpInfo();
473     virtual void printBufferInfo();
474     virtual void setVerboseLevel(int l);
475     const char *getStateString()
476         {return ePSToString(getState());};
477     const char *getTypeString()
478         {return ePTToString(getType());};
479
480     int m_min_ahead; // DEBUG
481
482     DECLARE_DEBUG_MODULE;
483 };
484
485 }
486
487 #endif /* __FFADO_STREAMPROCESSOR__ */
488
489
Note: See TracBrowser for help on using the browser.