[FFmpeg-devel] [PATCH v2] avcodec/pthread_slice: rewrite implementation
wm4
nfxjfg at googlemail.com
Mon Jul 10 12:25:23 EEST 2017
On Sun, 9 Jul 2017 23:26:54 +0700
Muhammad Faiz <mfcc64 at gmail.com> wrote:
> Avoid pthread_cond_broadcast that wakes up all workers. Make each of them
> uses distict mutex/cond. Also let main thread help running jobs, but still
> allocate thread_count workers. The last worker is currently unused, emulated
> by main thread.
> Similar to 'avfilter/pthread: rewrite implementation'
>
> Benchmark on x86_64 with 4 cpus (2 cores x 2 hyperthread)
> ./ffmpeg -threads $threads -thread_type slice -i 10slices.mp4 -f rawvideo -y /dev/null
> threads=2:
> old: 1m15.888s
> new: 1m5.710s
> threads=3:
> old: 1m6.480s
> new: 1m5.260s
> threads=4:
> old: 1m2.292s
> new: 59.677s
> threads=5:
> old: 58.939s
> new: 55.166s
>
> Signed-off-by: Muhammad Faiz <mfcc64 at gmail.com>
> ---
> libavcodec/pthread_slice.c | 219 +++++++++++++++++++++++++++++----------------
> 1 file changed, 142 insertions(+), 77 deletions(-)
>
> diff --git a/libavcodec/pthread_slice.c b/libavcodec/pthread_slice.c
> index 60f5b78..7223205 100644
> --- a/libavcodec/pthread_slice.c
> +++ b/libavcodec/pthread_slice.c
> @@ -22,6 +22,7 @@
> * @see doc/multithreading.txt
> */
>
> +#include <stdatomic.h>
> #include "config.h"
>
> #include "avcodec.h"
> @@ -38,21 +39,26 @@
> typedef int (action_func)(AVCodecContext *c, void *arg);
> typedef int (action_func2)(AVCodecContext *c, void *arg, int jobnr, int threadnr);
>
> +typedef struct WorkerContext WorkerContext;
> +
> typedef struct SliceThreadContext {
> - pthread_t *workers;
> + AVCodecContext *avctx;
> + WorkerContext *workers;
> action_func *func;
> action_func2 *func2;
> void *args;
> int *rets;
> - int job_count;
> - int job_size;
> -
> - pthread_cond_t last_job_cond;
> - pthread_cond_t current_job_cond;
> - pthread_mutex_t current_job_lock;
> - unsigned current_execute;
> - int current_job;
> + unsigned job_count;
> + unsigned job_size;
> +
> + pthread_mutex_t mutex_user;
> + pthread_mutex_t mutex_done;
> + pthread_cond_t cond_done;
> + atomic_uint first_job;
> + atomic_uint current_job;
> + atomic_uint nb_finished_jobs;
> int done;
> + int worker_done;
>
> int *entries;
> int entries_count;
> @@ -61,42 +67,55 @@ typedef struct SliceThreadContext {
> pthread_mutex_t *progress_mutex;
> } SliceThreadContext;
>
> -static void* attribute_align_arg worker(void *v)
> +struct WorkerContext {
> + SliceThreadContext *ctx;
> + pthread_t thread;
> + pthread_mutex_t mutex;
> + pthread_cond_t cond;
> + int done;
> +};
> +
> +static unsigned run_jobs(SliceThreadContext *c)
> {
> - AVCodecContext *avctx = v;
> - SliceThreadContext *c = avctx->internal->thread_ctx;
> - unsigned last_execute = 0;
> - int our_job = c->job_count;
> - int thread_count = avctx->thread_count;
> - int self_id;
> -
> - pthread_mutex_lock(&c->current_job_lock);
> - self_id = c->current_job++;
> - for (;;){
> - int ret;
> - while (our_job >= c->job_count) {
> - if (c->current_job == thread_count + c->job_count)
> - pthread_cond_signal(&c->last_job_cond);
> -
> - while (last_execute == c->current_execute && !c->done)
> - pthread_cond_wait(&c->current_job_cond, &c->current_job_lock);
> - last_execute = c->current_execute;
> - our_job = self_id;
> -
> - if (c->done) {
> - pthread_mutex_unlock(&c->current_job_lock);
> - return NULL;
> - }
> - }
> - pthread_mutex_unlock(&c->current_job_lock);
> + unsigned current_job, first_job, nb_finished_jobs;
> +
> + current_job = first_job = atomic_fetch_add_explicit(&c->first_job, 1, memory_order_acq_rel);
>
> - ret = c->func ? c->func(avctx, (char*)c->args + our_job*c->job_size):
> - c->func2(avctx, c->args, our_job, self_id);
> + do {
> + int ret = c->func ? c->func(c->avctx, (char *)c->args + current_job * (size_t) c->job_size)
> + : c->func2(c->avctx, c->args, current_job, first_job);
> if (c->rets)
> - c->rets[our_job%c->job_count] = ret;
> + c->rets[current_job] = ret;
> + nb_finished_jobs = atomic_fetch_add_explicit(&c->nb_finished_jobs, 1, memory_order_relaxed) + 1;
> + } while ((current_job = atomic_fetch_add_explicit(&c->current_job, 1, memory_order_acq_rel)) < c->job_count);
>
> - pthread_mutex_lock(&c->current_job_lock);
> - our_job = c->current_job++;
> + return nb_finished_jobs;
> +}
> +
> +static void* attribute_align_arg worker(void *v)
> +{
> + WorkerContext *w = v;
> + SliceThreadContext *c = w->ctx;
> +
> + pthread_mutex_lock(&w->mutex);
> + pthread_cond_signal(&w->cond);
> +
> + while (1) {
> + w->done = 1;
> + while (w->done)
> + pthread_cond_wait(&w->cond, &w->mutex);
> +
> + if (c->done) {
> + pthread_mutex_unlock(&w->mutex);
> + return NULL;
> + }
> +
> + if (run_jobs(c) == c->job_count) {
> + pthread_mutex_lock(&c->mutex_done);
> + c->worker_done = 1;
> + pthread_cond_signal(&c->cond_done);
> + pthread_mutex_unlock(&c->mutex_done);
> + }
> }
> }
>
> @@ -105,24 +124,36 @@ void ff_slice_thread_free(AVCodecContext *avctx)
> SliceThreadContext *c = avctx->internal->thread_ctx;
> int i;
>
> - pthread_mutex_lock(&c->current_job_lock);
> + for (i = 0; i < avctx->thread_count; i++)
> + pthread_mutex_lock(&c->workers[i].mutex);
> +
> c->done = 1;
> - pthread_cond_broadcast(&c->current_job_cond);
> +
> for (i = 0; i < c->thread_count; i++)
> pthread_cond_broadcast(&c->progress_cond[i]);
> - pthread_mutex_unlock(&c->current_job_lock);
>
> - for (i=0; i<avctx->thread_count; i++)
> - pthread_join(c->workers[i], NULL);
> + for (i = 0; i < avctx->thread_count; i++) {
> + WorkerContext *w = &c->workers[i];
> + w->done = 0;
> + pthread_cond_signal(&w->cond);
> + pthread_mutex_unlock(&w->mutex);
> + }
> +
> + for (i = 0; i < avctx->thread_count; i++) {
> + WorkerContext *w = &c->workers[i];
> + pthread_join(w->thread, NULL);
> + pthread_cond_destroy(&w->cond);
> + pthread_mutex_destroy(&w->mutex);
> + }
>
> for (i = 0; i < c->thread_count; i++) {
> pthread_mutex_destroy(&c->progress_mutex[i]);
> pthread_cond_destroy(&c->progress_cond[i]);
> }
>
> - pthread_mutex_destroy(&c->current_job_lock);
> - pthread_cond_destroy(&c->current_job_cond);
> - pthread_cond_destroy(&c->last_job_cond);
> + pthread_cond_destroy(&c->cond_done);
> + pthread_mutex_destroy(&c->mutex_done);
> + pthread_mutex_lock(&c->mutex_user);
This looks suspicious. Does it really acquire the lock and keep it
locked after leaving this deinit function?
>
> av_freep(&c->entries);
> av_freep(&c->progress_mutex);
> @@ -132,16 +163,11 @@ void ff_slice_thread_free(AVCodecContext *avctx)
> av_freep(&avctx->internal->thread_ctx);
> }
>
> -static av_always_inline void thread_park_workers(SliceThreadContext *c, int thread_count)
> -{
> - while (c->current_job != thread_count + c->job_count)
> - pthread_cond_wait(&c->last_job_cond, &c->current_job_lock);
> - pthread_mutex_unlock(&c->current_job_lock);
> -}
> -
> -static int thread_execute(AVCodecContext *avctx, action_func* func, void *arg, int *ret, int job_count, int job_size)
> +static int thread_execute_internal(AVCodecContext *avctx, action_func *func, action_func2 *func2,
> + void *arg, int *ret, int job_count, int job_size)
> {
> SliceThreadContext *c = avctx->internal->thread_ctx;
> + int i, nb_workers;
>
> if (!(avctx->active_thread_type&FF_THREAD_SLICE) || avctx->thread_count <= 1)
> return avcodec_default_execute(avctx, func, arg, ret, job_count, job_size);
> @@ -149,27 +175,49 @@ static int thread_execute(AVCodecContext *avctx, action_func* func, void *arg, i
> if (job_count <= 0)
> return 0;
>
> - pthread_mutex_lock(&c->current_job_lock);
> + // last worker is unused
> + nb_workers = FFMIN(job_count - 1, avctx->thread_count - 1);
> +
> + for (i = 0; i < nb_workers; i++)
> + pthread_mutex_lock(&c->workers[i].mutex);
This looks suspicious... does it lock all workers in the "hot" path?
Wouldn't this cause a lot of contention? And why mix this with atomic
accesses?
>
> - c->current_job = avctx->thread_count;
> + atomic_store_explicit(&c->first_job, 0, memory_order_relaxed);
> + atomic_store_explicit(&c->current_job, avctx->thread_count, memory_order_relaxed);
> + atomic_store_explicit(&c->nb_finished_jobs, 0, memory_order_relaxed);
> c->job_count = job_count;
> c->job_size = job_size;
> c->args = arg;
> c->func = func;
> + c->func2 = func2;
> c->rets = ret;
> - c->current_execute++;
> - pthread_cond_broadcast(&c->current_job_cond);
>
> - thread_park_workers(c, avctx->thread_count);
> + for (i = 0; i < nb_workers; i++) {
> + WorkerContext *w = &c->workers[i];
> + w->done = 0;
> + pthread_cond_signal(&w->cond);
> + pthread_mutex_unlock(&w->mutex);
> + }
> +
> + // emulate the last worker, no need to wait if all jobs is complete
> + if (run_jobs(c) != c->job_count) {
> + pthread_mutex_lock(&c->mutex_done);
> + while (!c->worker_done)
> + pthread_cond_wait(&c->cond_done, &c->mutex_done);
> + c->worker_done = 0;
> + pthread_mutex_unlock(&c->mutex_done);
> + }
>
> return 0;
> }
>
> +static int thread_execute(AVCodecContext *avctx, action_func *func, void *arg, int *ret, int job_count, int job_size)
> +{
> + return thread_execute_internal(avctx, func, NULL, arg, ret, job_count, job_size);
> +}
> +
> static int thread_execute2(AVCodecContext *avctx, action_func2* func2, void *arg, int *ret, int job_count)
> {
> - SliceThreadContext *c = avctx->internal->thread_ctx;
> - c->func2 = func2;
> - return thread_execute(avctx, NULL, arg, ret, job_count, 0);
> + return thread_execute_internal(avctx, NULL, func2, arg, ret, job_count, 0);
> }
>
> int ff_slice_thread_init(AVCodecContext *avctx)
> @@ -208,31 +256,48 @@ int ff_slice_thread_init(AVCodecContext *avctx)
> if (!c)
> return -1;
>
> - c->workers = av_mallocz_array(thread_count, sizeof(pthread_t));
> + // allocate thread_count workers, but currently last worker is unused, emulated by main thread
> + // anticipate when main thread needs to do something
> + c->workers = av_mallocz_array(thread_count, sizeof(*c->workers));
> if (!c->workers) {
> av_free(c);
> return -1;
> }
>
> avctx->internal->thread_ctx = c;
> - c->current_job = 0;
> + c->avctx = avctx;
> + pthread_mutex_init(&c->mutex_user, NULL);
> + pthread_mutex_init(&c->mutex_done, NULL);
> + pthread_cond_init(&c->cond_done, NULL);
> + atomic_init(&c->first_job, 0);
> + atomic_init(&c->current_job, 0);
> + atomic_init(&c->nb_finished_jobs, 0);
> c->job_count = 0;
> c->job_size = 0;
> c->done = 0;
> - pthread_cond_init(&c->current_job_cond, NULL);
> - pthread_cond_init(&c->last_job_cond, NULL);
> - pthread_mutex_init(&c->current_job_lock, NULL);
> - pthread_mutex_lock(&c->current_job_lock);
> - for (i=0; i<thread_count; i++) {
> - if(pthread_create(&c->workers[i], NULL, worker, avctx)) {
> - avctx->thread_count = i;
> - pthread_mutex_unlock(&c->current_job_lock);
> + c->worker_done = 0;
> +
> + for (i = 0; i < thread_count; i++) {
> + WorkerContext *w = &c->workers[i];
> +
> + w->ctx = c;
> + pthread_mutex_init(&w->mutex, NULL);
> + pthread_cond_init(&w->cond, NULL);
> + pthread_mutex_lock(&w->mutex);
> + w->done = 0;
> + if (pthread_create(&w->thread, NULL, worker, w)) {
> + avctx->thread_count = i + 1;
> + pthread_mutex_unlock(&w->mutex);
> + pthread_cond_destroy(&w->cond);
> + pthread_mutex_destroy(&w->mutex);
> ff_thread_free(avctx);
> return -1;
> }
> - }
>
> - thread_park_workers(c, thread_count);
> + while (!w->done)
> + pthread_cond_wait(&w->cond, &w->mutex);
> + pthread_mutex_unlock(&w->mutex);
> + }
>
> avctx->execute = thread_execute;
> avctx->execute2 = thread_execute2;
More information about the ffmpeg-devel
mailing list