[FFmpeg-devel] [PATCH 3/8] avcodec/frame_thread_encoder: Avoid allocations of AVPackets, fix deadlock
Andreas Rheinhardt
andreas.rheinhardt at gmail.com
Mon Feb 15 16:34:49 EET 2021
Andreas Rheinhardt:
> Up until now, when doing frame thread encoding, each worker thread
> tried to allocate an AVPacket for every AVFrame to be encoded; said
> packets would then be handed back to the main thread, where the content
> of said packet is copied into the packet actually destined for output;
> the temporary AVPacket is then freed.
>
> Besides being wasteful this also has another problem: There is a risk of
> deadlock, namely if no AVPacket can be allocated at all. The user
> doesn't get an error at all in this case and the worker threads will
> simply try to allocate a packet again and again. If the user has
> supplied enough frames, the user's thread will block until a task has
> been completed, which just doesn't happen if no packet can ever be
> allocated.
>
> This patch instead modifies the code to allocate the packets during
> init; they are then reused again and again.
>
> Signed-off-by: Andreas Rheinhardt <andreas.rheinhardt at gmail.com>
> ---
> libavcodec/frame_thread_encoder.c | 61 +++++++++++++++++++------------
> 1 file changed, 37 insertions(+), 24 deletions(-)
>
> diff --git a/libavcodec/frame_thread_encoder.c b/libavcodec/frame_thread_encoder.c
> index 9ca34e7ffb..bcd3c94f8b 100644
> --- a/libavcodec/frame_thread_encoder.c
> +++ b/libavcodec/frame_thread_encoder.c
> @@ -32,13 +32,18 @@
> #include "thread.h"
>
> #define MAX_THREADS 64
> -#define BUFFER_SIZE (2*MAX_THREADS)
> +/* There can be as many as MAX_THREADS + 1 outstanding tasks.
> + * An additional + 1 is needed so that one can distinguish
> + * the case of zero and MAX_THREADS + 1 outstanding tasks modulo
> + * the number of buffers. */
> +#define BUFFER_SIZE (MAX_THREADS + 2)
>
> typedef struct{
> AVFrame *indata;
> AVPacket *outdata;
> int64_t return_code;
> unsigned index;
> + int finished;
> } Task;
>
> typedef struct{
> @@ -49,8 +54,9 @@ typedef struct{
> pthread_mutex_t task_fifo_mutex;
> pthread_cond_t task_fifo_cond;
>
> - Task finished_tasks[BUFFER_SIZE];
> - pthread_mutex_t finished_task_mutex;
> + unsigned max_tasks;
> + Task tasks[BUFFER_SIZE];
> + pthread_mutex_t finished_task_mutex; /* Guards tasks[i].finished */
> pthread_cond_t finished_task_cond;
>
> unsigned task_index;
> @@ -63,17 +69,13 @@ typedef struct{
> static void * attribute_align_arg worker(void *v){
> AVCodecContext *avctx = v;
> ThreadContext *c = avctx->internal->frame_thread_encoder;
> - AVPacket *pkt = NULL;
>
> while (!atomic_load(&c->exit)) {
> int got_packet = 0, ret;
> + AVPacket *pkt;
> AVFrame *frame;
> Task task;
>
> - if(!pkt) pkt = av_packet_alloc();
> - if(!pkt) continue;
> - av_init_packet(pkt);
> -
> pthread_mutex_lock(&c->task_fifo_mutex);
> while (av_fifo_size(c->task_fifo) <= 0 || atomic_load(&c->exit)) {
> if (atomic_load(&c->exit)) {
> @@ -84,7 +86,12 @@ static void * attribute_align_arg worker(void *v){
> }
> av_fifo_generic_read(c->task_fifo, &task, sizeof(task), NULL);
> pthread_mutex_unlock(&c->task_fifo_mutex);
> + /* The main thread ensures that any two outstanding tasks have
> + * different indices, ergo each worker thread owns its element
> + * of c->tasks with the exception of finished, which is shared
> + * with the main thread and guarded by finished_task_mutex. */
> frame = task.indata;
> + pkt = c->tasks[task.index].outdata;
>
> ret = avctx->codec->encode2(avctx, pkt, frame, &got_packet);
> if(got_packet) {
> @@ -101,13 +108,12 @@ static void * attribute_align_arg worker(void *v){
> pthread_mutex_unlock(&c->buffer_mutex);
> av_frame_free(&frame);
> pthread_mutex_lock(&c->finished_task_mutex);
> - c->finished_tasks[task.index].outdata = pkt; pkt = NULL;
> - c->finished_tasks[task.index].return_code = ret;
> + c->tasks[task.index].return_code = ret;
> + c->tasks[task.index].finished = 1;
> pthread_cond_signal(&c->finished_task_cond);
> pthread_mutex_unlock(&c->finished_task_mutex);
> }
> end:
> - av_free(pkt);
> pthread_mutex_lock(&c->buffer_mutex);
> avcodec_close(avctx);
> pthread_mutex_unlock(&c->buffer_mutex);
> @@ -194,6 +200,12 @@ int ff_frame_thread_encoder_init(AVCodecContext *avctx, AVDictionary *options){
> pthread_cond_init(&c->finished_task_cond, NULL);
> atomic_init(&c->exit, 0);
>
> + c->max_tasks = avctx->thread_count + 2;
> + for (unsigned i = 0; i < c->max_tasks; i++) {
> + if (!(c->tasks[i].outdata = av_packet_alloc()))
> + goto fail;
> + }
> +
> for(i=0; i<avctx->thread_count ; i++){
> AVDictionary *tmp = NULL;
> int ret;
> @@ -261,8 +273,8 @@ void ff_frame_thread_encoder_free(AVCodecContext *avctx){
> av_frame_free(&task.indata);
> }
>
> - for (i=0; i<BUFFER_SIZE; i++) {
> - av_packet_free(&c->finished_tasks[i].outdata);
> + for (unsigned i = 0; i < c->max_tasks; i++) {
> + av_packet_free(&c->tasks[i].outdata);
> }
>
> pthread_mutex_destroy(&c->task_fifo_mutex);
> @@ -276,7 +288,7 @@ void ff_frame_thread_encoder_free(AVCodecContext *avctx){
>
> int ff_thread_video_encode_frame(AVCodecContext *avctx, AVPacket *pkt, const AVFrame *frame, int *got_packet_ptr){
> ThreadContext *c = avctx->internal->frame_thread_encoder;
> - Task task;
> + Task *outtask, task;
> int ret;
>
> av_assert1(!*got_packet_ptr);
> @@ -298,27 +310,28 @@ int ff_thread_video_encode_frame(AVCodecContext *avctx, AVPacket *pkt, const AVF
> pthread_cond_signal(&c->task_fifo_cond);
> pthread_mutex_unlock(&c->task_fifo_mutex);
>
> - c->task_index = (c->task_index+1) % BUFFER_SIZE;
> + c->task_index = (c->task_index + 1) % c->max_tasks;
> }
>
> + outtask = &c->tasks[c->finished_task_index];
> pthread_mutex_lock(&c->finished_task_mutex);
> if (c->task_index == c->finished_task_index ||
> - (frame && !c->finished_tasks[c->finished_task_index].outdata &&
> - (c->task_index - c->finished_task_index) % BUFFER_SIZE <= avctx->thread_count)) {
> + (frame && !outtask->finished &&
> + (c->task_index - c->finished_task_index + c->max_tasks) % c->max_tasks <= avctx->thread_count)) {
> pthread_mutex_unlock(&c->finished_task_mutex);
> return 0;
> }
> -
> - while (!c->finished_tasks[c->finished_task_index].outdata) {
> + while (!outtask->finished) {
> pthread_cond_wait(&c->finished_task_cond, &c->finished_task_mutex);
> }
> - task = c->finished_tasks[c->finished_task_index];
> - *pkt = *(AVPacket*)(task.outdata);
> + /* We now own outtask completely: No worker thread touches it any more,
> + * because there is no outstanding task with this index. */
> + outtask->finished = 0;
> + av_packet_move_ref(pkt, outtask->outdata);
> if(pkt->data)
> *got_packet_ptr = 1;
> - av_freep(&c->finished_tasks[c->finished_task_index].outdata);
> - c->finished_task_index = (c->finished_task_index+1) % BUFFER_SIZE;
> + c->finished_task_index = (c->finished_task_index + 1) % c->max_tasks;
> pthread_mutex_unlock(&c->finished_task_mutex);
>
> - return task.return_code;
> + return outtask->return_code;
> }
>
Will apply this patchset tomorrow unless there are objections. Thanks to
Paul for looking at some of the patches.
- Andreas
More information about the ffmpeg-devel
mailing list