[FFmpeg-devel] [PATCH v2 1/3] avcodec: make a local copy of executor

Zhao Zhili quinkblack at foxmail.com
Tue Oct 1 13:15:44 EEST 2024


> On Oct 1, 2024, at 14:55, Nuo Mi <nuomi2021 at gmail.com> wrote:
> 
> We still need several refactors to improve the current VVC decoder's performance,
> which will frequently break the API/ABI. To mitigate this, we've copied the executor from
> avutil to avcodec. Once the API/ABI is stable, we will move this class back to avutil

This add new external functions to libavcodec, and these new functions have same
symbol names as in libavutil. This may break some build and introduce undefined
behavior to users. How about keep executor inside libavcodec by use ff_ prefix?

> ---
> libavcodec/Makefile     |   1 +
> libavcodec/executor.c   | 221 ++++++++++++++++++++++++++++++++++++++++
> libavcodec/executor.h   |  73 +++++++++++++
> libavcodec/vvc/thread.c |   2 +-
> 4 files changed, 296 insertions(+), 1 deletion(-)
> create mode 100644 libavcodec/executor.c
> create mode 100644 libavcodec/executor.h
> 
> diff --git a/libavcodec/Makefile b/libavcodec/Makefile
> index a4fcce3b42..da1a1aa945 100644
> --- a/libavcodec/Makefile
> +++ b/libavcodec/Makefile
> @@ -43,6 +43,7 @@ OBJS = ac3_parser.o                                                     \
>        dirac.o                                                          \
>        dv_profile.o                                                     \
>        encode.o                                                         \
> +       executor.o                                                       \
>        get_buffer.o                                                     \
>        imgconvert.o                                                     \
>        jni.o                                                            \
> diff --git a/libavcodec/executor.c b/libavcodec/executor.c
> new file mode 100644
> index 0000000000..db80d067ac
> --- /dev/null
> +++ b/libavcodec/executor.c
> @@ -0,0 +1,221 @@
> +/*
> + * Copyright (C) 2024 Nuo Mi
> + *
> + * This file is part of FFmpeg.
> + *
> + * FFmpeg is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * FFmpeg is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with FFmpeg; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> + */
> +
> +#include "config.h"
> +
> +#include <stdbool.h>
> +
> +#include "libavutil/mem.h"
> +#include "libavutil/thread.h"
> +
> +#include "executor.h"
> +
> +#if !HAVE_THREADS
> +
> +#define ExecutorThread  char
> +
> +#define executor_thread_create(t, a, s, ar)      0
> +#define executor_thread_join(t, r)               do {} while(0)
> +
> +#else
> +
> +#define ExecutorThread  pthread_t
> +
> +#define executor_thread_create(t, a, s, ar)      pthread_create(t, a, s, ar)
> +#define executor_thread_join(t, r)               pthread_join(t, r)
> +
> +#endif //!HAVE_THREADS
> +
> +typedef struct ThreadInfo {
> +    AVExecutor *e;
> +    ExecutorThread thread;
> +} ThreadInfo;
> +
> +struct AVExecutor {
> +    AVTaskCallbacks cb;
> +    int thread_count;
> +    bool recursive;
> +
> +    ThreadInfo *threads;
> +    uint8_t *local_contexts;
> +
> +    AVMutex lock;
> +    AVCond cond;
> +    int die;
> +
> +    AVTask *tasks;
> +};
> +
> +static AVTask* remove_task(AVTask **prev, AVTask *t)
> +{
> +    *prev  = t->next;
> +    t->next = NULL;
> +    return t;
> +}
> +
> +static void add_task(AVTask **prev, AVTask *t)
> +{
> +    t->next = *prev;
> +    *prev   = t;
> +}
> +
> +static int run_one_task(AVExecutor *e, void *lc)
> +{
> +    AVTaskCallbacks *cb = &e->cb;
> +    AVTask **prev;
> +
> +    for (prev = &e->tasks; *prev && !cb->ready(*prev, cb->user_data); prev = &(*prev)->next)
> +        /* nothing */;
> +    if (*prev) {
> +        AVTask *t = remove_task(prev, *prev);
> +        if (e->thread_count > 0)
> +            ff_mutex_unlock(&e->lock);
> +        cb->run(t, lc, cb->user_data);
> +        if (e->thread_count > 0)
> +            ff_mutex_lock(&e->lock);
> +        return 1;
> +    }
> +    return 0;
> +}
> +
> +#if HAVE_THREADS
> +static void *executor_worker_task(void *data)
> +{
> +    ThreadInfo *ti = (ThreadInfo*)data;
> +    AVExecutor *e  = ti->e;
> +    void *lc       = e->local_contexts + (ti - e->threads) * e->cb.local_context_size;
> +
> +    ff_mutex_lock(&e->lock);
> +    while (1) {
> +        if (e->die) break;
> +
> +        if (!run_one_task(e, lc)) {
> +            //no task in one loop
> +            ff_cond_wait(&e->cond, &e->lock);
> +        }
> +    }
> +    ff_mutex_unlock(&e->lock);
> +    return NULL;
> +}
> +#endif
> +
> +static void executor_free(AVExecutor *e, const int has_lock, const int has_cond)
> +{
> +    if (e->thread_count) {
> +        //signal die
> +        ff_mutex_lock(&e->lock);
> +        e->die = 1;
> +        ff_cond_broadcast(&e->cond);
> +        ff_mutex_unlock(&e->lock);
> +
> +        for (int i = 0; i < e->thread_count; i++)
> +            executor_thread_join(e->threads[i].thread, NULL);
> +    }
> +    if (has_cond)
> +        ff_cond_destroy(&e->cond);
> +    if (has_lock)
> +        ff_mutex_destroy(&e->lock);
> +
> +    av_free(e->threads);
> +    av_free(e->local_contexts);
> +
> +    av_free(e);
> +}
> +
> +AVExecutor* av_executor_alloc(const AVTaskCallbacks *cb, int thread_count)
> +{
> +    AVExecutor *e;
> +    int has_lock = 0, has_cond = 0;
> +    if (!cb || !cb->user_data || !cb->ready || !cb->run || !cb->priority_higher)
> +        return NULL;
> +
> +    e = av_mallocz(sizeof(*e));
> +    if (!e)
> +        return NULL;
> +    e->cb = *cb;
> +
> +    e->local_contexts = av_calloc(FFMAX(thread_count, 1), e->cb.local_context_size);
> +    if (!e->local_contexts)
> +        goto free_executor;
> +
> +    e->threads = av_calloc(FFMAX(thread_count, 1), sizeof(*e->threads));
> +    if (!e->threads)
> +        goto free_executor;
> +
> +    if (!thread_count)
> +        return e;
> +
> +    has_lock = !ff_mutex_init(&e->lock, NULL);
> +    has_cond = !ff_cond_init(&e->cond, NULL);
> +
> +    if (!has_lock || !has_cond)
> +        goto free_executor;
> +
> +    for (/* nothing */; e->thread_count < thread_count; e->thread_count++) {
> +        ThreadInfo *ti = e->threads + e->thread_count;
> +        ti->e = e;
> +        if (executor_thread_create(&ti->thread, NULL, executor_worker_task, ti))
> +            goto free_executor;
> +    }
> +    return e;
> +
> +free_executor:
> +    executor_free(e, has_lock, has_cond);
> +    return NULL;
> +}
> +
> +void av_executor_free(AVExecutor **executor)
> +{
> +    int thread_count;
> +
> +    if (!executor || !*executor)
> +        return;
> +    thread_count = (*executor)->thread_count;
> +    executor_free(*executor, thread_count, thread_count);
> +    *executor = NULL;
> +}
> +
> +void av_executor_execute(AVExecutor *e, AVTask *t)
> +{
> +    AVTaskCallbacks *cb = &e->cb;
> +    AVTask **prev;
> +
> +    if (e->thread_count)
> +        ff_mutex_lock(&e->lock);
> +    if (t) {
> +        for (prev = &e->tasks; *prev && cb->priority_higher(*prev, t); prev = &(*prev)->next)
> +            /* nothing */;
> +        add_task(prev, t);
> +    }
> +    if (e->thread_count) {
> +        ff_cond_signal(&e->cond);
> +        ff_mutex_unlock(&e->lock);
> +    }
> +
> +    if (!e->thread_count || !HAVE_THREADS) {
> +        if (e->recursive)
> +            return;
> +        e->recursive = true;
> +        // We are running in a single-threaded environment, so we must handle all tasks ourselves
> +        while (run_one_task(e, e->local_contexts))
> +            /* nothing */;
> +        e->recursive = false;
> +    }
> +}
> diff --git a/libavcodec/executor.h b/libavcodec/executor.h
> new file mode 100644
> index 0000000000..6083e41312
> --- /dev/null
> +++ b/libavcodec/executor.h
> @@ -0,0 +1,73 @@
> +/*
> + * Copyright (C) 2024 Nuo Mi
> + *
> + * This file is part of FFmpeg.
> + *
> + * FFmpeg is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * FFmpeg is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with FFmpeg; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> + */
> +
> +/*
> + * We still need several refactors to improve the current VVC decoder's performance,
> + * which will frequently break the API/ABI. To mitigate this, we've copied the executor from
> + * avutil to avcodec. Once the API/ABI is stable, we will move this class back to avutil
> + */
> +
> +#ifndef AVCODEC_EXECUTOR_H
> +#define AVCODEC_EXECUTOR_H
> +
> +typedef struct AVExecutor AVExecutor;
> +typedef struct AVTask AVTask;
> +
> +struct AVTask {
> +    AVTask *next;
> +};
> +
> +typedef struct AVTaskCallbacks {
> +    void *user_data;
> +
> +    int local_context_size;
> +
> +    // return 1 if a's priority > b's priority
> +    int (*priority_higher)(const AVTask *a, const AVTask *b);
> +
> +    // task is ready for run
> +    int (*ready)(const AVTask *t, void *user_data);
> +
> +    // run the task
> +    int (*run)(AVTask *t, void *local_context, void *user_data);
> +} AVTaskCallbacks;
> +
> +/**
> + * Alloc executor
> + * @param callbacks callback structure for executor
> + * @param thread_count worker thread number, 0 for run on caller's thread directly
> + * @return return the executor
> + */
> +AVExecutor* av_executor_alloc(const AVTaskCallbacks *callbacks, int thread_count);
> +
> +/**
> + * Free executor
> + * @param e  pointer to executor
> + */
> +void av_executor_free(AVExecutor **e);
> +
> +/**
> + * Add task to executor
> + * @param e pointer to executor
> + * @param t pointer to task. If NULL, it will wakeup one work thread
> + */
> +void av_executor_execute(AVExecutor *e, AVTask *t);
> +
> +#endif //AVCODEC_EXECUTOR_H
> diff --git a/libavcodec/vvc/thread.c b/libavcodec/vvc/thread.c
> index 86a7753c6a..a4d7b31c37 100644
> --- a/libavcodec/vvc/thread.c
> +++ b/libavcodec/vvc/thread.c
> @@ -22,7 +22,7 @@
> 
> #include <stdatomic.h>
> 
> -#include "libavutil/executor.h"
> +#include "libavcodec/executor.h"
> #include "libavutil/mem.h"
> #include "libavutil/thread.h"
> 
> -- 
> 2.34.1
> 
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel at ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
> 
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request at ffmpeg.org with subject "unsubscribe".



More information about the ffmpeg-devel mailing list