FFmpeg
sync_queue.c
Go to the documentation of this file.
1 /*
2  * This file is part of FFmpeg.
3  *
4  * FFmpeg is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * FFmpeg is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with FFmpeg; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17  */
18 
19 #include <stdint.h>
20 #include <string.h>
21 
22 #include "libavutil/avassert.h"
23 #include "libavutil/error.h"
24 #include "libavutil/fifo.h"
25 #include "libavutil/mathematics.h"
26 #include "libavutil/mem.h"
27 
28 #include "objpool.h"
29 #include "sync_queue.h"
30 
31 typedef struct SyncQueueStream {
34 
35  /* stream head: largest timestamp seen */
36  int64_t head_ts;
37  int limiting;
38  /* no more frames will be sent for this stream */
39  int finished;
40 
41  uint64_t frames_sent;
42  uint64_t frames_max;
44 
45 struct SyncQueue {
47 
48  /* no more frames will be sent for any stream */
49  int finished;
50  /* sync head: the stream with the _smallest_ head timestamp
51  * this stream determines which frames can be output */
53  /* the finished stream with the smallest finish timestamp or -1 */
55 
56  // maximum buffering duration in microseconds
57  int64_t buf_size_us;
58 
60  unsigned int nb_streams;
61 
62  // pool of preallocated frames to avoid constant allocations
64 };
65 
66 static void frame_move(const SyncQueue *sq, SyncQueueFrame dst,
68 {
69  if (sq->type == SYNC_QUEUE_PACKETS)
70  av_packet_move_ref(dst.p, src.p);
71  else
72  av_frame_move_ref(dst.f, src.f);
73 }
74 
75 static int64_t frame_ts(const SyncQueue *sq, SyncQueueFrame frame)
76 {
77  return (sq->type == SYNC_QUEUE_PACKETS) ?
78  frame.p->pts + frame.p->duration :
79  frame.f->pts + frame.f->duration;
80 }
81 
82 static int frame_null(const SyncQueue *sq, SyncQueueFrame frame)
83 {
84  return (sq->type == SYNC_QUEUE_PACKETS) ? (frame.p == NULL) : (frame.f == NULL);
85 }
86 
87 static void finish_stream(SyncQueue *sq, unsigned int stream_idx)
88 {
89  SyncQueueStream *st = &sq->streams[stream_idx];
90 
91  st->finished = 1;
92 
93  if (st->limiting && st->head_ts != AV_NOPTS_VALUE) {
94  /* check if this stream is the new finished head */
95  if (sq->head_finished_stream < 0 ||
96  av_compare_ts(st->head_ts, st->tb,
98  sq->streams[sq->head_finished_stream].tb) < 0) {
99  sq->head_finished_stream = stream_idx;
100  }
101 
102  /* mark as finished all streams that should no longer receive new frames,
103  * due to them being ahead of some finished stream */
104  st = &sq->streams[sq->head_finished_stream];
105  for (unsigned int i = 0; i < sq->nb_streams; i++) {
106  SyncQueueStream *st1 = &sq->streams[i];
107  if (st != st1 && st1->head_ts != AV_NOPTS_VALUE &&
108  av_compare_ts(st->head_ts, st->tb, st1->head_ts, st1->tb) <= 0)
109  st1->finished = 1;
110  }
111  }
112 
113  /* mark the whole queue as finished if all streams are finished */
114  for (unsigned int i = 0; i < sq->nb_streams; i++) {
115  if (!sq->streams[i].finished)
116  return;
117  }
118  sq->finished = 1;
119 }
120 
121 static void queue_head_update(SyncQueue *sq)
122 {
123  if (sq->head_stream < 0) {
124  /* wait for one timestamp in each stream before determining
125  * the queue head */
126  for (unsigned int i = 0; i < sq->nb_streams; i++) {
127  SyncQueueStream *st = &sq->streams[i];
128  if (st->limiting && st->head_ts == AV_NOPTS_VALUE)
129  return;
130  }
131 
132  // placeholder value, correct one will be found below
133  sq->head_stream = 0;
134  }
135 
136  for (unsigned int i = 0; i < sq->nb_streams; i++) {
137  SyncQueueStream *st_head = &sq->streams[sq->head_stream];
138  SyncQueueStream *st_other = &sq->streams[i];
139  if (st_other->limiting && st_other->head_ts != AV_NOPTS_VALUE &&
140  av_compare_ts(st_other->head_ts, st_other->tb,
141  st_head->head_ts, st_head->tb) < 0)
142  sq->head_stream = i;
143  }
144 }
145 
146 /* update this stream's head timestamp */
147 static void stream_update_ts(SyncQueue *sq, unsigned int stream_idx, int64_t ts)
148 {
149  SyncQueueStream *st = &sq->streams[stream_idx];
150 
151  if (ts == AV_NOPTS_VALUE ||
152  (st->head_ts != AV_NOPTS_VALUE && st->head_ts >= ts))
153  return;
154 
155  st->head_ts = ts;
156 
157  /* if this stream is now ahead of some finished stream, then
158  * this stream is also finished */
159  if (sq->head_finished_stream >= 0 &&
162  ts, st->tb) <= 0)
163  finish_stream(sq, stream_idx);
164 
165  /* update the overall head timestamp if it could have changed */
166  if (st->limiting &&
167  (sq->head_stream < 0 || sq->head_stream == stream_idx))
168  queue_head_update(sq);
169 }
170 
171 /* If the queue for the given stream (or all streams when stream_idx=-1)
172  * is overflowing, trigger a fake heartbeat on lagging streams.
173  *
174  * @return 1 if heartbeat triggered, 0 otherwise
175  */
176 static int overflow_heartbeat(SyncQueue *sq, int stream_idx)
177 {
178  SyncQueueStream *st;
180  int64_t tail_ts = AV_NOPTS_VALUE;
181 
182  /* if no stream specified, pick the one that is most ahead */
183  if (stream_idx < 0) {
184  int64_t ts = AV_NOPTS_VALUE;
185 
186  for (int i = 0; i < sq->nb_streams; i++) {
187  st = &sq->streams[i];
188  if (st->head_ts != AV_NOPTS_VALUE &&
189  (ts == AV_NOPTS_VALUE ||
190  av_compare_ts(ts, sq->streams[stream_idx].tb,
191  st->head_ts, st->tb) < 0)) {
192  ts = st->head_ts;
193  stream_idx = i;
194  }
195  }
196  /* no stream has a timestamp yet -> nothing to do */
197  if (stream_idx < 0)
198  return 0;
199  }
200 
201  st = &sq->streams[stream_idx];
202 
203  /* get the chosen stream's tail timestamp */
204  for (size_t i = 0; tail_ts == AV_NOPTS_VALUE &&
205  av_fifo_peek(st->fifo, &frame, 1, i) >= 0; i++)
206  tail_ts = frame_ts(sq, frame);
207 
208  /* overflow triggers when the tail is over specified duration behind the head */
209  if (tail_ts == AV_NOPTS_VALUE || tail_ts >= st->head_ts ||
210  av_rescale_q(st->head_ts - tail_ts, st->tb, AV_TIME_BASE_Q) < sq->buf_size_us)
211  return 0;
212 
213  /* signal a fake timestamp for all streams that prevent tail_ts from being output */
214  tail_ts++;
215  for (unsigned int i = 0; i < sq->nb_streams; i++) {
216  SyncQueueStream *st1 = &sq->streams[i];
217  int64_t ts;
218 
219  if (st == st1 || st1->finished ||
220  (st1->head_ts != AV_NOPTS_VALUE &&
221  av_compare_ts(tail_ts, st->tb, st1->head_ts, st1->tb) <= 0))
222  continue;
223 
224  ts = av_rescale_q(tail_ts, st->tb, st1->tb);
225  if (st1->head_ts != AV_NOPTS_VALUE)
226  ts = FFMAX(st1->head_ts + 1, ts);
227 
228  stream_update_ts(sq, i, ts);
229  }
230 
231  return 1;
232 }
233 
234 int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame)
235 {
236  SyncQueueStream *st;
237  SyncQueueFrame dst;
238  int64_t ts;
239  int ret;
240 
241  av_assert0(stream_idx < sq->nb_streams);
242  st = &sq->streams[stream_idx];
243 
244  av_assert0(st->tb.num > 0 && st->tb.den > 0);
245 
246  if (frame_null(sq, frame)) {
247  finish_stream(sq, stream_idx);
248  return 0;
249  }
250  if (st->finished)
251  return AVERROR_EOF;
252 
253  ret = objpool_get(sq->pool, (void**)&dst);
254  if (ret < 0)
255  return ret;
256 
257  frame_move(sq, dst, frame);
258 
259  ts = frame_ts(sq, dst);
260 
261  ret = av_fifo_write(st->fifo, &dst, 1);
262  if (ret < 0) {
263  frame_move(sq, frame, dst);
264  objpool_release(sq->pool, (void**)&dst);
265  return ret;
266  }
267 
268  stream_update_ts(sq, stream_idx, ts);
269 
270  st->frames_sent++;
271  if (st->frames_sent >= st->frames_max)
272  finish_stream(sq, stream_idx);
273 
274  return 0;
275 }
276 
277 static int receive_for_stream(SyncQueue *sq, unsigned int stream_idx,
279 {
280  SyncQueueStream *st_head = sq->head_stream >= 0 ?
281  &sq->streams[sq->head_stream] : NULL;
282  SyncQueueStream *st;
283 
284  av_assert0(stream_idx < sq->nb_streams);
285  st = &sq->streams[stream_idx];
286 
287  if (av_fifo_can_read(st->fifo)) {
289  int64_t ts;
290  int cmp = 1;
291 
292  av_fifo_peek(st->fifo, &peek, 1, 0);
293  ts = frame_ts(sq, peek);
294 
295  /* check if this stream's tail timestamp does not overtake
296  * the overall queue head */
297  if (ts != AV_NOPTS_VALUE && st_head)
298  cmp = av_compare_ts(ts, st->tb, st_head->head_ts, st_head->tb);
299 
300  /* We can release frames that do not end after the queue head.
301  * Frames with no timestamps are just passed through with no conditions.
302  */
303  if (cmp <= 0 || ts == AV_NOPTS_VALUE) {
304  frame_move(sq, frame, peek);
305  objpool_release(sq->pool, (void**)&peek);
306  av_fifo_drain2(st->fifo, 1);
307  return 0;
308  }
309  }
310 
311  return (sq->finished || (st->finished && !av_fifo_can_read(st->fifo))) ?
312  AVERROR_EOF : AVERROR(EAGAIN);
313 }
314 
315 static int receive_internal(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
316 {
317  int nb_eof = 0;
318  int ret;
319 
320  /* read a frame for a specific stream */
321  if (stream_idx >= 0) {
322  ret = receive_for_stream(sq, stream_idx, frame);
323  return (ret < 0) ? ret : stream_idx;
324  }
325 
326  /* read a frame for any stream with available output */
327  for (unsigned int i = 0; i < sq->nb_streams; i++) {
328  ret = receive_for_stream(sq, i, frame);
329  if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) {
330  nb_eof += (ret == AVERROR_EOF);
331  continue;
332  }
333  return (ret < 0) ? ret : i;
334  }
335 
336  return (nb_eof == sq->nb_streams) ? AVERROR_EOF : AVERROR(EAGAIN);
337 }
338 
339 int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
340 {
341  int ret = receive_internal(sq, stream_idx, frame);
342 
343  /* try again if the queue overflowed and triggered a fake heartbeat
344  * for lagging streams */
345  if (ret == AVERROR(EAGAIN) && overflow_heartbeat(sq, stream_idx))
346  ret = receive_internal(sq, stream_idx, frame);
347 
348  return ret;
349 }
350 
351 int sq_add_stream(SyncQueue *sq, int limiting)
352 {
353  SyncQueueStream *tmp, *st;
354 
355  tmp = av_realloc_array(sq->streams, sq->nb_streams + 1, sizeof(*sq->streams));
356  if (!tmp)
357  return AVERROR(ENOMEM);
358  sq->streams = tmp;
359 
360  st = &sq->streams[sq->nb_streams];
361  memset(st, 0, sizeof(*st));
362 
364  if (!st->fifo)
365  return AVERROR(ENOMEM);
366 
367  /* we set a valid default, so that a pathological stream that never
368  * receives even a real timebase (and no frames) won't stall all other
369  * streams forever; cf. overflow_heartbeat() */
370  st->tb = (AVRational){ 1, 1 };
371  st->head_ts = AV_NOPTS_VALUE;
372  st->frames_max = UINT64_MAX;
373  st->limiting = limiting;
374 
375  return sq->nb_streams++;
376 }
377 
378 void sq_set_tb(SyncQueue *sq, unsigned int stream_idx, AVRational tb)
379 {
380  SyncQueueStream *st;
381 
382  av_assert0(stream_idx < sq->nb_streams);
383  st = &sq->streams[stream_idx];
384 
386 
387  if (st->head_ts != AV_NOPTS_VALUE)
388  st->head_ts = av_rescale_q(st->head_ts, st->tb, tb);
389 
390  st->tb = tb;
391 }
392 
393 void sq_limit_frames(SyncQueue *sq, unsigned int stream_idx, uint64_t frames)
394 {
395  SyncQueueStream *st;
396 
397  av_assert0(stream_idx < sq->nb_streams);
398  st = &sq->streams[stream_idx];
399 
400  st->frames_max = frames;
401  if (st->frames_sent >= st->frames_max)
402  finish_stream(sq, stream_idx);
403 }
404 
405 SyncQueue *sq_alloc(enum SyncQueueType type, int64_t buf_size_us)
406 {
407  SyncQueue *sq = av_mallocz(sizeof(*sq));
408 
409  if (!sq)
410  return NULL;
411 
412  sq->type = type;
413  sq->buf_size_us = buf_size_us;
414 
415  sq->head_stream = -1;
416  sq->head_finished_stream = -1;
417 
420  if (!sq->pool) {
421  av_freep(&sq);
422  return NULL;
423  }
424 
425  return sq;
426 }
427 
428 void sq_free(SyncQueue **psq)
429 {
430  SyncQueue *sq = *psq;
431 
432  if (!sq)
433  return;
434 
435  for (unsigned int i = 0; i < sq->nb_streams; i++) {
437  while (av_fifo_read(sq->streams[i].fifo, &frame, 1) >= 0)
438  objpool_release(sq->pool, (void**)&frame);
439 
440  av_fifo_freep2(&sq->streams[i].fifo);
441  }
442 
443  av_freep(&sq->streams);
444 
445  objpool_free(&sq->pool);
446 
447  av_freep(psq);
448 }
SYNC_QUEUE_PACKETS
@ SYNC_QUEUE_PACKETS
Definition: sync_queue.h:29
av_fifo_drain2
void av_fifo_drain2(AVFifo *f, size_t size)
Discard the specified amount of data from an AVFifo.
Definition: fifo.c:266
SyncQueueStream
Definition: sync_queue.c:31
stream_update_ts
static void stream_update_ts(SyncQueue *sq, unsigned int stream_idx, int64_t ts)
Definition: sync_queue.c:147
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
av_compare_ts
int av_compare_ts(int64_t ts_a, AVRational tb_a, int64_t ts_b, AVRational tb_b)
Compare two timestamps each in its own time base.
Definition: mathematics.c:147
SyncQueueFrame::f
AVFrame * f
Definition: sync_queue.h:34
AVERROR_EOF
#define AVERROR_EOF
End of file.
Definition: error.h:57
sq_limit_frames
void sq_limit_frames(SyncQueue *sq, unsigned int stream_idx, uint64_t frames)
Limit the number of output frames for stream with index stream_idx to max_frames.
Definition: sync_queue.c:393
SyncQueue::streams
SyncQueueStream * streams
Definition: sync_queue.c:59
AV_TIME_BASE_Q
#define AV_TIME_BASE_Q
Internal time base represented as fractional value.
Definition: avutil.h:260
SyncQueueType
SyncQueueType
Definition: sync_queue.h:28
tmp
static uint8_t tmp[11]
Definition: aes_ctr.c:28
sync_queue.h
SyncQueue::head_stream
int head_stream
Definition: sync_queue.c:52
overflow_heartbeat
static int overflow_heartbeat(SyncQueue *sq, int stream_idx)
Definition: sync_queue.c:176
objpool_free
void objpool_free(ObjPool **pop)
Definition: objpool.c:54
frame_ts
static int64_t frame_ts(const SyncQueue *sq, SyncQueueFrame frame)
Definition: sync_queue.c:75
receive_internal
static int receive_internal(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
Definition: sync_queue.c:315
mathematics.h
FFMAX
#define FFMAX(a, b)
Definition: macros.h:47
SyncQueueStream::finished
int finished
Definition: sync_queue.c:39
peek
static uint32_t BS_FUNC() peek(BSCTX *bc, unsigned int n)
Return n bits from the buffer but do not change the buffer state.
Definition: bitstream_template.h:336
objpool_alloc_packets
ObjPool * objpool_alloc_packets(void)
Definition: objpool.c:124
SyncQueueFrame::p
AVPacket * p
Definition: sync_queue.h:35
SyncQueue::head_finished_stream
int head_finished_stream
Definition: sync_queue.c:54
objpool.h
fifo.h
frame_null
static int frame_null(const SyncQueue *sq, SyncQueueFrame frame)
Definition: sync_queue.c:82
av_fifo_write
int av_fifo_write(AVFifo *f, const void *buf, size_t nb_elems)
Write data into a FIFO.
Definition: fifo.c:188
frames
if it could not because there are no more frames
Definition: filter_design.txt:266
objpool_release
void objpool_release(ObjPool *op, void **obj)
Definition: objpool.c:78
sq_receive
int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
Read a frame from the queue.
Definition: sync_queue.c:339
type
it s the only field you need to keep assuming you have a context There is some magic you don t need to care about around this just let it vf type
Definition: writing_filters.txt:86
SyncQueueStream::frames_sent
uint64_t frames_sent
Definition: sync_queue.c:41
AVRational::num
int num
Numerator.
Definition: rational.h:59
avassert.h
av_fifo_read
int av_fifo_read(AVFifo *f, void *buf, size_t nb_elems)
Read data from a FIFO.
Definition: fifo.c:240
av_realloc_array
void * av_realloc_array(void *ptr, size_t nmemb, size_t size)
Definition: mem.c:215
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:37
sq_set_tb
void sq_set_tb(SyncQueue *sq, unsigned int stream_idx, AVRational tb)
Set the timebase for the stream with index stream_idx.
Definition: sync_queue.c:378
nb_streams
static int nb_streams
Definition: ffprobe.c:309
av_rescale_q
int64_t av_rescale_q(int64_t a, AVRational bq, AVRational cq)
Rescale a 64-bit integer by 2 rational numbers.
Definition: mathematics.c:142
SyncQueueFrame
Definition: sync_queue.h:33
objpool_alloc_frames
ObjPool * objpool_alloc_frames(void)
Definition: objpool.c:128
cmp
static av_always_inline int cmp(MpegEncContext *s, const int x, const int y, const int subx, const int suby, const int size, const int h, int ref_index, int src_index, me_cmp_func cmp_func, me_cmp_func chroma_cmp_func, const int flags)
compares a block (either a full macroblock or a partition thereof) against a proposed motion-compensa...
Definition: motion_est.c:262
sq_add_stream
int sq_add_stream(SyncQueue *sq, int limiting)
Add a new stream to the sync queue.
Definition: sync_queue.c:351
SyncQueueStream::head_ts
int64_t head_ts
Definition: sync_queue.c:36
NULL
#define NULL
Definition: coverity.c:32
SyncQueueStream::frames_max
uint64_t frames_max
Definition: sync_queue.c:42
SyncQueue::nb_streams
unsigned int nb_streams
Definition: sync_queue.c:60
AVRational
Rational number (pair of numerator and denominator).
Definition: rational.h:58
frame_move
static void frame_move(const SyncQueue *sq, SyncQueueFrame dst, SyncQueueFrame src)
Definition: sync_queue.c:66
SyncQueueStream::tb
AVRational tb
Definition: sync_queue.c:33
av_fifo_can_read
size_t av_fifo_can_read(const AVFifo *f)
Definition: fifo.c:87
SyncQueueStream::fifo
AVFifo * fifo
Definition: sync_queue.c:32
av_packet_move_ref
void av_packet_move_ref(AVPacket *dst, AVPacket *src)
Move every field in src to dst and reset src.
Definition: avpacket.c:479
objpool_get
int objpool_get(ObjPool *op, void **obj)
Definition: objpool.c:67
error.h
AVFifo
Definition: fifo.c:35
sq_send
int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame)
Submit a frame for the stream with index stream_idx.
Definition: sync_queue.c:234
sq_free
void sq_free(SyncQueue **psq)
Definition: sync_queue.c:428
AV_NOPTS_VALUE
#define AV_NOPTS_VALUE
Undefined timestamp value.
Definition: avutil.h:248
finish_stream
static void finish_stream(SyncQueue *sq, unsigned int stream_idx)
Definition: sync_queue.c:87
ObjPool
Definition: objpool.c:30
av_fifo_peek
int av_fifo_peek(AVFifo *f, void *buf, size_t nb_elems, size_t offset)
Read data from a FIFO without modifying FIFO state.
Definition: fifo.c:255
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:269
SyncQueue::buf_size_us
int64_t buf_size_us
Definition: sync_queue.c:57
av_frame_move_ref
void av_frame_move_ref(AVFrame *dst, AVFrame *src)
Move everything contained in src to dst and reset src.
Definition: frame.c:507
tb
#define tb
Definition: regdef.h:68
av_mallocz
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:254
ret
ret
Definition: filter_design.txt:187
SyncQueueStream::limiting
int limiting
Definition: sync_queue.c:37
frame
these buffered frames must be flushed immediately if a new input produces new the filter must not call request_frame to get more It must just process the frame or queue it The task of requesting more frames is left to the filter s request_frame method or the application If a filter has several the filter must be ready for frames arriving randomly on any input any filter with several inputs will most likely require some kind of queuing mechanism It is perfectly acceptable to have a limited queue and to drop frames when the inputs are too unbalanced request_frame For filters that do not use the this method is called when a frame is wanted on an output For a it should directly call filter_frame on the corresponding output For a if there are queued frames already one of these frames should be pushed If the filter should request a frame on one of its repeatedly until at least one frame has been pushed Return or at least make progress towards producing a frame
Definition: filter_design.txt:264
av_fifo_alloc2
AVFifo * av_fifo_alloc2(size_t nb_elems, size_t elem_size, unsigned int flags)
Allocate and initialize an AVFifo with a given element size.
Definition: fifo.c:47
SyncQueue
Definition: sync_queue.c:45
AVRational::den
int den
Denominator.
Definition: rational.h:60
SyncQueue::type
enum SyncQueueType type
Definition: sync_queue.c:46
receive_for_stream
static int receive_for_stream(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame)
Definition: sync_queue.c:277
mem.h
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:34
src
INIT_CLIP pixel * src
Definition: h264pred_template.c:418
SyncQueue::pool
ObjPool * pool
Definition: sync_queue.c:63
av_fifo_freep2
void av_fifo_freep2(AVFifo **f)
Free an AVFifo and reset pointer to NULL.
Definition: fifo.c:286
SyncQueue::finished
int finished
Definition: sync_queue.c:49
queue_head_update
static void queue_head_update(SyncQueue *sq)
Definition: sync_queue.c:121
sq_alloc
SyncQueue * sq_alloc(enum SyncQueueType type, int64_t buf_size_us)
Allocate a sync queue of the given type.
Definition: sync_queue.c:405
AV_FIFO_FLAG_AUTO_GROW
#define AV_FIFO_FLAG_AUTO_GROW
Automatically resize the FIFO on writes, so that the data fits.
Definition: fifo.h:67