[FFmpeg-devel] [PATCH] fftools/ffmpeg: optimize inter-thread queue sizes
Anton Khirnov
anton at khirnov.net
Wed Jan 24 21:29:16 EET 2024
Use 8 packets/frames by default rather than 1, which seems to provide
better throughput.
Allow -thread_queue_size to set the muxer queue size manually again.
---
fftools/ffmpeg_mux.h | 2 --
fftools/ffmpeg_mux_init.c | 3 +--
fftools/ffmpeg_opt.c | 2 +-
fftools/ffmpeg_sched.c | 15 ++++++++++-----
fftools/ffmpeg_sched.h | 4 +++-
tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat | 5 -----
6 files changed, 15 insertions(+), 16 deletions(-)
diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h
index d0be8a51ea..e1b44142cf 100644
--- a/fftools/ffmpeg_mux.h
+++ b/fftools/ffmpeg_mux.h
@@ -94,8 +94,6 @@ typedef struct Muxer {
AVDictionary *opts;
- int thread_queue_size;
-
/* filesize limit expressed in bytes */
int64_t limit_filesize;
atomic_int_least64_t last_filesize;
diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c
index 6b5e4f8b3c..8ada837555 100644
--- a/fftools/ffmpeg_mux_init.c
+++ b/fftools/ffmpeg_mux_init.c
@@ -3047,7 +3047,6 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
of->start_time = o->start_time;
of->shortest = o->shortest;
- mux->thread_queue_size = o->thread_queue_size > 0 ? o->thread_queue_size : 8;
mux->limit_filesize = o->limit_filesize;
av_dict_copy(&mux->opts, o->g->format_opts, 0);
@@ -3081,7 +3080,7 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
}
err = sch_add_mux(sch, muxer_thread, mux_check_init, mux,
- !strcmp(oc->oformat->name, "rtp"));
+ !strcmp(oc->oformat->name, "rtp"), o->thread_queue_size);
if (err < 0)
return err;
mux->sch = sch;
diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
index 304c493dcf..7505b0cf90 100644
--- a/fftools/ffmpeg_opt.c
+++ b/fftools/ffmpeg_opt.c
@@ -144,7 +144,7 @@ static void init_options(OptionsContext *o)
o->limit_filesize = INT64_MAX;
o->chapters_input_file = INT_MAX;
o->accurate_seek = 1;
- o->thread_queue_size = -1;
+ o->thread_queue_size = 0;
o->input_sync_ref = -1;
o->find_stream_info = 1;
o->shortest_buf_duration = 10.f;
diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index 4fc5a33941..62a40c6057 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -218,6 +218,7 @@ typedef struct SchMux {
*/
atomic_int mux_started;
ThreadQueue *queue;
+ unsigned queue_size;
AVPacket *sub_heartbeat_pkt;
} SchMux;
@@ -358,6 +359,8 @@ static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_si
ThreadQueue *tq;
ObjPool *op;
+ queue_size = queue_size > 0 ? queue_size : 8;
+
op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() :
objpool_alloc_frames();
if (!op)
@@ -653,7 +656,7 @@ static const AVClass sch_mux_class = {
};
int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
- void *arg, int sdp_auto)
+ void *arg, int sdp_auto, unsigned thread_queue_size)
{
const unsigned idx = sch->nb_mux;
@@ -667,6 +670,7 @@ int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
mux = &sch->mux[idx];
mux->class = &sch_mux_class;
mux->init = init;
+ mux->queue_size = thread_queue_size;
task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
@@ -773,7 +777,7 @@ int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx,
if (!dec->send_frame)
return AVERROR(ENOMEM);
- ret = queue_alloc(&dec->queue, 1, 1, QUEUE_PACKETS);
+ ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
if (ret < 0)
return ret;
@@ -813,7 +817,7 @@ int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
- ret = queue_alloc(&enc->queue, 1, 1, QUEUE_FRAMES);
+ ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
if (ret < 0)
return ret;
@@ -861,7 +865,7 @@ int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
if (ret < 0)
return ret;
- ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 1, QUEUE_FRAMES);
+ ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
if (ret < 0)
return ret;
@@ -1313,7 +1317,8 @@ int sch_start(Scheduler *sch)
}
}
- ret = queue_alloc(&mux->queue, mux->nb_streams, 1, QUEUE_PACKETS);
+ ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
+ QUEUE_PACKETS);
if (ret < 0)
return ret;
diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h
index b167d8d158..d12affa69d 100644
--- a/fftools/ffmpeg_sched.h
+++ b/fftools/ffmpeg_sched.h
@@ -225,12 +225,14 @@ int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
* streams in the muxer.
* @param ctx Muxer state; will be passed to func/init and used for logging.
* @param sdp_auto Determines automatic SDP writing - see sch_sdp_filename().
+ * @param thread_queue_size number of packets that can be buffered before
+ * sending to the muxer blocks
*
* @retval ">=0" Index of the newly-created muxer.
* @retval "<0" Error code.
*/
int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
- void *ctx, int sdp_auto);
+ void *ctx, int sdp_auto, unsigned thread_queue_size);
/**
* Add a muxed stream for a previously added muxer.
*
diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
index bc9b833799..3a3ec96637 100644
--- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
+++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
@@ -33,8 +33,3 @@
<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
>> Safety remains our numb</font>
-9
-00:00:03,704 --> 00:00:04,004
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
->> Safety remains our number one</font>
-
--
2.42.0
More information about the ffmpeg-devel
mailing list