FFmpeg
thread_queue.c
Go to the documentation of this file.
1 /*
2  * This file is part of FFmpeg.
3  *
4  * FFmpeg is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * FFmpeg is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with FFmpeg; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17  */
18 
19 #include <stdint.h>
20 #include <string.h>
21 
22 #include "libavutil/avassert.h"
23 #include "libavutil/error.h"
24 #include "libavutil/fifo.h"
25 #include "libavutil/intreadwrite.h"
26 #include "libavutil/mem.h"
27 #include "libavutil/thread.h"
28 
29 #include "objpool.h"
30 #include "thread_queue.h"
31 
32 enum {
33  FINISHED_SEND = (1 << 0),
34  FINISHED_RECV = (1 << 1),
35 };
36 
37 typedef struct FifoElem {
38  void *obj;
39  unsigned int stream_idx;
40 } FifoElem;
41 
42 struct ThreadQueue {
43  int *finished;
44  unsigned int nb_streams;
45 
47 
49  void (*obj_move)(void *dst, void *src);
50 
53 };
54 
55 void tq_free(ThreadQueue **ptq)
56 {
57  ThreadQueue *tq = *ptq;
58 
59  if (!tq)
60  return;
61 
62  if (tq->fifo) {
63  FifoElem elem;
64  while (av_fifo_read(tq->fifo, &elem, 1) >= 0)
65  objpool_release(tq->obj_pool, &elem.obj);
66  }
67  av_fifo_freep2(&tq->fifo);
68 
69  objpool_free(&tq->obj_pool);
70 
71  av_freep(&tq->finished);
72 
75 
76  av_freep(ptq);
77 }
78 
79 ThreadQueue *tq_alloc(unsigned int nb_streams, size_t queue_size,
80  ObjPool *obj_pool, void (*obj_move)(void *dst, void *src))
81 {
82  ThreadQueue *tq;
83  int ret;
84 
85  tq = av_mallocz(sizeof(*tq));
86  if (!tq)
87  return NULL;
88 
89  ret = pthread_cond_init(&tq->cond, NULL);
90  if (ret) {
91  av_freep(&tq);
92  return NULL;
93  }
94 
96  if (ret) {
98  av_freep(&tq);
99  return NULL;
100  }
101 
102  tq->finished = av_calloc(nb_streams, sizeof(*tq->finished));
103  if (!tq->finished)
104  goto fail;
105  tq->nb_streams = nb_streams;
106 
107  tq->fifo = av_fifo_alloc2(queue_size, sizeof(FifoElem), 0);
108  if (!tq->fifo)
109  goto fail;
110 
111  tq->obj_pool = obj_pool;
112  tq->obj_move = obj_move;
113 
114  return tq;
115 fail:
116  tq_free(&tq);
117  return NULL;
118 }
119 
120 int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
121 {
122  int *finished;
123  int ret;
124 
125  av_assert0(stream_idx < tq->nb_streams);
126  finished = &tq->finished[stream_idx];
127 
128  pthread_mutex_lock(&tq->lock);
129 
130  if (*finished & FINISHED_SEND) {
131  ret = AVERROR(EINVAL);
132  goto finish;
133  }
134 
135  while (!(*finished & FINISHED_RECV) && !av_fifo_can_write(tq->fifo))
136  pthread_cond_wait(&tq->cond, &tq->lock);
137 
138  if (*finished & FINISHED_RECV) {
139  ret = AVERROR_EOF;
140  *finished |= FINISHED_SEND;
141  } else {
142  FifoElem elem = { .stream_idx = stream_idx };
143 
144  ret = objpool_get(tq->obj_pool, &elem.obj);
145  if (ret < 0)
146  goto finish;
147 
148  tq->obj_move(elem.obj, data);
149 
150  ret = av_fifo_write(tq->fifo, &elem, 1);
151  av_assert0(ret >= 0);
153  }
154 
155 finish:
157 
158  return ret;
159 }
160 
161 static int receive_locked(ThreadQueue *tq, int *stream_idx,
162  void *data)
163 {
164  FifoElem elem;
165  unsigned int nb_finished = 0;
166 
167  while (av_fifo_read(tq->fifo, &elem, 1) >= 0) {
168  if (tq->finished[elem.stream_idx] & FINISHED_RECV) {
169  objpool_release(tq->obj_pool, &elem.obj);
170  continue;
171  }
172 
173  tq->obj_move(data, elem.obj);
174  objpool_release(tq->obj_pool, &elem.obj);
175  *stream_idx = elem.stream_idx;
176  return 0;
177  }
178 
179  for (unsigned int i = 0; i < tq->nb_streams; i++) {
180  if (!tq->finished[i])
181  continue;
182 
183  /* return EOF to the consumer at most once for each stream */
184  if (!(tq->finished[i] & FINISHED_RECV)) {
185  tq->finished[i] |= FINISHED_RECV;
186  *stream_idx = i;
187  return AVERROR_EOF;
188  }
189 
190  nb_finished++;
191  }
192 
193  return nb_finished == tq->nb_streams ? AVERROR_EOF : AVERROR(EAGAIN);
194 }
195 
196 int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
197 {
198  int ret;
199 
200  *stream_idx = -1;
201 
202  pthread_mutex_lock(&tq->lock);
203 
204  while (1) {
205  size_t can_read = av_fifo_can_read(tq->fifo);
206 
207  ret = receive_locked(tq, stream_idx, data);
208 
209  // signal other threads if the fifo state changed
210  if (can_read != av_fifo_can_read(tq->fifo))
212 
213  if (ret == AVERROR(EAGAIN)) {
214  pthread_cond_wait(&tq->cond, &tq->lock);
215  continue;
216  }
217 
218  break;
219  }
220 
222 
223  return ret;
224 }
225 
226 void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
227 {
228  av_assert0(stream_idx < tq->nb_streams);
229 
230  pthread_mutex_lock(&tq->lock);
231 
232  /* mark the stream as send-finished;
233  * next time the consumer thread tries to read this stream it will get
234  * an EOF and recv-finished flag will be set */
235  tq->finished[stream_idx] |= FINISHED_SEND;
237 
239 }
240 
241 void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
242 {
243  av_assert0(stream_idx < tq->nb_streams);
244 
245  pthread_mutex_lock(&tq->lock);
246 
247  /* mark the stream as recv-finished;
248  * next time the producer thread tries to send for this stream, it will
249  * get an EOF and send-finished flag will be set */
250  tq->finished[stream_idx] |= FINISHED_RECV;
252 
254 }
pthread_mutex_t
_fmutex pthread_mutex_t
Definition: os2threads.h:53
FifoElem::obj
void * obj
Definition: thread_queue.c:38
av_fifo_can_write
size_t av_fifo_can_write(const AVFifo *f)
Definition: fifo.c:94
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
thread.h
AVERROR_EOF
#define AVERROR_EOF
End of file.
Definition: error.h:57
FINISHED_RECV
@ FINISHED_RECV
Definition: thread_queue.c:34
pthread_mutex_init
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
ThreadQueue::lock
pthread_mutex_t lock
Definition: thread_queue.c:51
ThreadQueue::obj_move
void(* obj_move)(void *dst, void *src)
Definition: thread_queue.c:49
FifoElem::stream_idx
unsigned int stream_idx
Definition: thread_queue.c:39
ThreadQueue::cond
pthread_cond_t cond
Definition: thread_queue.c:52
data
const char data[16]
Definition: mxf.c:149
objpool_free
void objpool_free(ObjPool **pop)
Definition: objpool.c:54
objpool.h
fifo.h
finish
static void finish(void)
Definition: movenc.c:374
fail
#define fail()
Definition: checkasm.h:189
av_fifo_write
int av_fifo_write(AVFifo *f, const void *buf, size_t nb_elems)
Write data into a FIFO.
Definition: fifo.c:188
objpool_release
void objpool_release(ObjPool *op, void **obj)
Definition: objpool.c:78
avassert.h
av_fifo_read
int av_fifo_read(AVFifo *f, void *buf, size_t nb_elems)
Read data from a FIFO.
Definition: fifo.c:240
FINISHED_SEND
@ FINISHED_SEND
Definition: thread_queue.c:33
intreadwrite.h
receive_locked
static int receive_locked(ThreadQueue *tq, int *stream_idx, void *data)
Definition: thread_queue.c:161
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:40
nb_streams
static int nb_streams
Definition: ffprobe.c:384
pthread_cond_broadcast
static av_always_inline int pthread_cond_broadcast(pthread_cond_t *cond)
Definition: os2threads.h:162
tq_free
void tq_free(ThreadQueue **ptq)
Definition: thread_queue.c:55
NULL
#define NULL
Definition: coverity.c:32
ThreadQueue::finished
int * finished
Definition: thread_queue.c:43
tq_receive_finish
void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the receiving side.
Definition: thread_queue.c:241
av_fifo_can_read
size_t av_fifo_can_read(const AVFifo *f)
Definition: fifo.c:87
objpool_get
int objpool_get(ObjPool *op, void **obj)
Definition: objpool.c:67
pthread_mutex_unlock
#define pthread_mutex_unlock(a)
Definition: ffprobe.c:82
error.h
tq_send
int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
Send an item for the given stream to the queue.
Definition: thread_queue.c:120
AVFifo
Definition: fifo.c:35
dst
uint8_t ptrdiff_t const uint8_t ptrdiff_t int intptr_t intptr_t int int16_t * dst
Definition: dsp.h:83
ThreadQueue::fifo
AVFifo * fifo
Definition: thread_queue.c:46
ObjPool
Definition: objpool.c:30
tq_alloc
ThreadQueue * tq_alloc(unsigned int nb_streams, size_t queue_size, ObjPool *obj_pool, void(*obj_move)(void *dst, void *src))
Allocate a queue for sending data between threads.
Definition: thread_queue.c:79
pthread_cond_destroy
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:144
pthread_mutex_destroy
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
ThreadQueue::obj_pool
ObjPool * obj_pool
Definition: thread_queue.c:48
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:256
av_mallocz
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:256
pthread_cond_t
Definition: os2threads.h:58
tq_receive
int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
Read the next item from the queue.
Definition: thread_queue.c:196
av_calloc
void * av_calloc(size_t nmemb, size_t size)
Definition: mem.c:264
FifoElem
Definition: thread_queue.c:37
ret
ret
Definition: filter_design.txt:187
ThreadQueue
Definition: thread_queue.c:42
av_fifo_alloc2
AVFifo * av_fifo_alloc2(size_t nb_elems, size_t elem_size, unsigned int flags)
Allocate and initialize an AVFifo with a given element size.
Definition: fifo.c:47
thread_queue.h
pthread_cond_wait
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:192
mem.h
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:34
av_fifo_freep2
void av_fifo_freep2(AVFifo **f)
Free an AVFifo and reset pointer to NULL.
Definition: fifo.c:286
pthread_cond_init
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:133
src
#define src
Definition: vp8dsp.c:248
pthread_mutex_lock
#define pthread_mutex_lock(a)
Definition: ffprobe.c:78
tq_send_finish
void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the sending side.
Definition: thread_queue.c:226
ThreadQueue::nb_streams
unsigned int nb_streams
Definition: thread_queue.c:44