FFmpeg
thread_queue.c
Go to the documentation of this file.
1 /*
2  * This file is part of FFmpeg.
3  *
4  * FFmpeg is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * FFmpeg is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with FFmpeg; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17  */
18 
19 #include <stdint.h>
20 #include <string.h>
21 
22 #include "libavutil/avassert.h"
23 #include "libavutil/error.h"
24 #include "libavutil/fifo.h"
25 #include "libavutil/intreadwrite.h"
26 #include "libavutil/mem.h"
27 #include "libavutil/thread.h"
28 
29 #include "objpool.h"
30 #include "thread_queue.h"
31 
32 enum {
33  FINISHED_SEND = (1 << 0),
34  FINISHED_RECV = (1 << 1),
35 };
36 
37 typedef struct FifoElem {
38  void *obj;
39  unsigned int stream_idx;
40 } FifoElem;
41 
42 struct ThreadQueue {
43  int *finished;
44  unsigned int nb_streams;
45 
47 
49  void (*obj_move)(void *dst, void *src);
50 
53 };
54 
55 void tq_free(ThreadQueue **ptq)
56 {
57  ThreadQueue *tq = *ptq;
58 
59  if (!tq)
60  return;
61 
62  if (tq->fifo) {
63  FifoElem elem;
64  while (av_fifo_read(tq->fifo, &elem, 1) >= 0)
65  objpool_release(tq->obj_pool, &elem.obj);
66  }
67  av_fifo_freep2(&tq->fifo);
68 
69  objpool_free(&tq->obj_pool);
70 
71  av_freep(&tq->finished);
72 
75 
76  av_freep(ptq);
77 }
78 
79 ThreadQueue *tq_alloc(unsigned int nb_streams, size_t queue_size,
80  ObjPool *obj_pool, void (*obj_move)(void *dst, void *src))
81 {
82  ThreadQueue *tq;
83  int ret;
84 
85  tq = av_mallocz(sizeof(*tq));
86  if (!tq)
87  return NULL;
88 
89  ret = pthread_cond_init(&tq->cond, NULL);
90  if (ret) {
91  av_freep(&tq);
92  return NULL;
93  }
94 
96  if (ret) {
98  av_freep(&tq);
99  return NULL;
100  }
101 
102  tq->finished = av_calloc(nb_streams, sizeof(*tq->finished));
103  if (!tq->finished)
104  goto fail;
105  tq->nb_streams = nb_streams;
106 
107  tq->fifo = av_fifo_alloc2(queue_size, sizeof(FifoElem), 0);
108  if (!tq->fifo)
109  goto fail;
110 
111  tq->obj_pool = obj_pool;
112  tq->obj_move = obj_move;
113 
114  return tq;
115 fail:
116  tq_free(&tq);
117  return NULL;
118 }
119 
120 int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
121 {
122  int *finished;
123  int ret;
124 
125  av_assert0(stream_idx < tq->nb_streams);
126  finished = &tq->finished[stream_idx];
127 
128  pthread_mutex_lock(&tq->lock);
129 
130  if (*finished & FINISHED_SEND) {
131  ret = AVERROR(EINVAL);
132  goto finish;
133  }
134 
135  while (!(*finished & FINISHED_RECV) && !av_fifo_can_write(tq->fifo))
136  pthread_cond_wait(&tq->cond, &tq->lock);
137 
138  if (*finished & FINISHED_RECV) {
139  ret = AVERROR_EOF;
140  *finished |= FINISHED_SEND;
141  } else {
142  FifoElem elem = { .stream_idx = stream_idx };
143 
144  ret = objpool_get(tq->obj_pool, &elem.obj);
145  if (ret < 0)
146  goto finish;
147 
148  tq->obj_move(elem.obj, data);
149 
150  ret = av_fifo_write(tq->fifo, &elem, 1);
151  av_assert0(ret >= 0);
153  }
154 
155 finish:
157 
158  return ret;
159 }
160 
161 static int receive_locked(ThreadQueue *tq, int *stream_idx,
162  void *data)
163 {
164  FifoElem elem;
165  unsigned int nb_finished = 0;
166 
167  if (av_fifo_read(tq->fifo, &elem, 1) >= 0) {
168  tq->obj_move(data, elem.obj);
169  objpool_release(tq->obj_pool, &elem.obj);
170  *stream_idx = elem.stream_idx;
171  return 0;
172  }
173 
174  for (unsigned int i = 0; i < tq->nb_streams; i++) {
175  if (!(tq->finished[i] & FINISHED_SEND))
176  continue;
177 
178  /* return EOF to the consumer at most once for each stream */
179  if (!(tq->finished[i] & FINISHED_RECV)) {
180  tq->finished[i] |= FINISHED_RECV;
181  *stream_idx = i;
182  return AVERROR_EOF;
183  }
184 
185  nb_finished++;
186  }
187 
188  return nb_finished == tq->nb_streams ? AVERROR_EOF : AVERROR(EAGAIN);
189 }
190 
191 int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
192 {
193  int ret;
194 
195  *stream_idx = -1;
196 
197  pthread_mutex_lock(&tq->lock);
198 
199  while (1) {
200  ret = receive_locked(tq, stream_idx, data);
201  if (ret == AVERROR(EAGAIN)) {
202  pthread_cond_wait(&tq->cond, &tq->lock);
203  continue;
204  }
205 
206  break;
207  }
208 
209  if (ret == 0)
211 
213 
214  return ret;
215 }
216 
217 void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
218 {
219  av_assert0(stream_idx < tq->nb_streams);
220 
221  pthread_mutex_lock(&tq->lock);
222 
223  /* mark the stream as send-finished;
224  * next time the consumer thread tries to read this stream it will get
225  * an EOF and recv-finished flag will be set */
226  tq->finished[stream_idx] |= FINISHED_SEND;
228 
230 }
231 
232 void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
233 {
234  av_assert0(stream_idx < tq->nb_streams);
235 
236  pthread_mutex_lock(&tq->lock);
237 
238  /* mark the stream as recv-finished;
239  * next time the producer thread tries to send for this stream, it will
240  * get an EOF and send-finished flag will be set */
241  tq->finished[stream_idx] |= FINISHED_RECV;
243 
245 }
pthread_mutex_t
_fmutex pthread_mutex_t
Definition: os2threads.h:53
FifoElem::obj
void * obj
Definition: thread_queue.c:38
av_fifo_can_write
size_t av_fifo_can_write(const AVFifo *f)
Definition: fifo.c:94
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
thread.h
AVERROR_EOF
#define AVERROR_EOF
End of file.
Definition: error.h:57
pthread_mutex_init
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
ThreadQueue::lock
pthread_mutex_t lock
Definition: thread_queue.c:51
ThreadQueue::obj_move
void(* obj_move)(void *dst, void *src)
Definition: thread_queue.c:49
FifoElem::stream_idx
unsigned int stream_idx
Definition: thread_queue.c:39
ThreadQueue::cond
pthread_cond_t cond
Definition: thread_queue.c:52
data
const char data[16]
Definition: mxf.c:146
objpool_free
void objpool_free(ObjPool **pop)
Definition: objpool.c:54
objpool.h
fifo.h
finish
static void finish(void)
Definition: movenc.c:342
fail
#define fail()
Definition: checkasm.h:134
av_fifo_write
int av_fifo_write(AVFifo *f, const void *buf, size_t nb_elems)
Write data into a FIFO.
Definition: fifo.c:188
objpool_release
void objpool_release(ObjPool *op, void **obj)
Definition: objpool.c:78
avassert.h
av_fifo_read
int av_fifo_read(AVFifo *f, void *buf, size_t nb_elems)
Read data from a FIFO.
Definition: fifo.c:240
intreadwrite.h
FINISHED_SEND
@ FINISHED_SEND
Definition: thread_queue.c:33
receive_locked
static int receive_locked(ThreadQueue *tq, int *stream_idx, void *data)
Definition: thread_queue.c:161
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:37
nb_streams
static int nb_streams
Definition: ffprobe.c:309
pthread_cond_broadcast
static av_always_inline int pthread_cond_broadcast(pthread_cond_t *cond)
Definition: os2threads.h:162
tq_free
void tq_free(ThreadQueue **ptq)
Definition: thread_queue.c:55
NULL
#define NULL
Definition: coverity.c:32
ThreadQueue::finished
int * finished
Definition: thread_queue.c:43
tq_receive_finish
void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the receiving side.
Definition: thread_queue.c:232
objpool_get
int objpool_get(ObjPool *op, void **obj)
Definition: objpool.c:67
pthread_mutex_unlock
#define pthread_mutex_unlock(a)
Definition: ffprobe.c:79
error.h
tq_send
int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
Send an item for the given stream to the queue.
Definition: thread_queue.c:120
AVFifo
Definition: fifo.c:35
ThreadQueue::fifo
AVFifo * fifo
Definition: thread_queue.c:46
ObjPool
Definition: objpool.c:30
tq_alloc
ThreadQueue * tq_alloc(unsigned int nb_streams, size_t queue_size, ObjPool *obj_pool, void(*obj_move)(void *dst, void *src))
Allocate a queue for sending data between threads.
Definition: thread_queue.c:79
pthread_cond_destroy
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:144
pthread_mutex_destroy
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
ThreadQueue::obj_pool
ObjPool * obj_pool
Definition: thread_queue.c:48
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:269
av_mallocz
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:254
pthread_cond_t
Definition: os2threads.h:58
tq_receive
int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
Read the next item from the queue.
Definition: thread_queue.c:191
av_calloc
void * av_calloc(size_t nmemb, size_t size)
Definition: mem.c:262
FifoElem
Definition: thread_queue.c:37
ret
ret
Definition: filter_design.txt:187
ThreadQueue
Definition: thread_queue.c:42
av_fifo_alloc2
AVFifo * av_fifo_alloc2(size_t nb_elems, size_t elem_size, unsigned int flags)
Allocate and initialize an AVFifo with a given element size.
Definition: fifo.c:47
FINISHED_RECV
@ FINISHED_RECV
Definition: thread_queue.c:34
thread_queue.h
pthread_cond_wait
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:192
mem.h
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:34
src
INIT_CLIP pixel * src
Definition: h264pred_template.c:418
av_fifo_freep2
void av_fifo_freep2(AVFifo **f)
Free an AVFifo and reset pointer to NULL.
Definition: fifo.c:286
pthread_cond_init
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:133
pthread_mutex_lock
#define pthread_mutex_lock(a)
Definition: ffprobe.c:75
tq_send_finish
void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the sending side.
Definition: thread_queue.c:217
ThreadQueue::nb_streams
unsigned int nb_streams
Definition: thread_queue.c:44