FFmpeg
slicethread.c
Go to the documentation of this file.
1 /*
2  * This file is part of FFmpeg.
3  *
4  * FFmpeg is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * FFmpeg is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with FFmpeg; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17  */
18 
19 #include <stdatomic.h>
20 #include "cpu.h"
21 #include "internal.h"
22 #include "slicethread.h"
23 #include "mem.h"
24 #include "thread.h"
25 #include "avassert.h"
26 
27 #define MAX_AUTO_THREADS 16
28 
29 #if HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS2THREADS
30 
31 typedef struct WorkerContext {
35  pthread_t thread;
36  int done;
37 } WorkerContext;
38 
39 struct AVSliceThread {
40  WorkerContext *workers;
41  int nb_threads;
42  int nb_active_threads;
43  int nb_jobs;
44 
45  atomic_uint first_job;
46  atomic_uint current_job;
47  pthread_mutex_t done_mutex;
48  pthread_cond_t done_cond;
49  int done;
50  int finished;
51 
52  void *priv;
53  void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads);
54  void (*main_func)(void *priv);
55 };
56 
57 static int run_jobs(AVSliceThread *ctx)
58 {
59  unsigned nb_jobs = ctx->nb_jobs;
60  unsigned nb_active_threads = ctx->nb_active_threads;
61  unsigned first_job = atomic_fetch_add_explicit(&ctx->first_job, 1, memory_order_acq_rel);
62  unsigned current_job = first_job;
63 
64  do {
65  ctx->worker_func(ctx->priv, current_job, first_job, nb_jobs, nb_active_threads);
66  } while ((current_job = atomic_fetch_add_explicit(&ctx->current_job, 1, memory_order_acq_rel)) < nb_jobs);
67 
68  return current_job == nb_jobs + nb_active_threads - 1;
69 }
70 
71 static void *attribute_align_arg thread_worker(void *v)
72 {
73  WorkerContext *w = v;
74  AVSliceThread *ctx = w->ctx;
75 
76  pthread_mutex_lock(&w->mutex);
77  pthread_cond_signal(&w->cond);
78 
79  while (1) {
80  w->done = 1;
81  while (w->done)
82  pthread_cond_wait(&w->cond, &w->mutex);
83 
84  if (ctx->finished) {
85  pthread_mutex_unlock(&w->mutex);
86  return NULL;
87  }
88 
89  if (run_jobs(ctx)) {
90  pthread_mutex_lock(&ctx->done_mutex);
91  ctx->done = 1;
92  pthread_cond_signal(&ctx->done_cond);
93  pthread_mutex_unlock(&ctx->done_mutex);
94  }
95  }
96 }
97 
98 int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
99  void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
100  void (*main_func)(void *priv),
101  int nb_threads)
102 {
104  int nb_workers, i;
105 
106  av_assert0(nb_threads >= 0);
107  if (!nb_threads) {
108  int nb_cpus = av_cpu_count();
109  if (nb_cpus > 1)
110  nb_threads = FFMIN(nb_cpus + 1, MAX_AUTO_THREADS);
111  else
112  nb_threads = 1;
113  }
114 
115  nb_workers = nb_threads;
116  if (!main_func)
117  nb_workers--;
118 
119  *pctx = ctx = av_mallocz(sizeof(*ctx));
120  if (!ctx)
121  return AVERROR(ENOMEM);
122 
123  if (nb_workers && !(ctx->workers = av_calloc(nb_workers, sizeof(*ctx->workers)))) {
124  av_freep(pctx);
125  return AVERROR(ENOMEM);
126  }
127 
128  ctx->priv = priv;
129  ctx->worker_func = worker_func;
130  ctx->main_func = main_func;
131  ctx->nb_threads = nb_threads;
132  ctx->nb_active_threads = 0;
133  ctx->nb_jobs = 0;
134  ctx->finished = 0;
135 
136  atomic_init(&ctx->first_job, 0);
137  atomic_init(&ctx->current_job, 0);
138  pthread_mutex_init(&ctx->done_mutex, NULL);
139  pthread_cond_init(&ctx->done_cond, NULL);
140  ctx->done = 0;
141 
142  for (i = 0; i < nb_workers; i++) {
143  WorkerContext *w = &ctx->workers[i];
144  int ret;
145  w->ctx = ctx;
146  pthread_mutex_init(&w->mutex, NULL);
147  pthread_cond_init(&w->cond, NULL);
148  pthread_mutex_lock(&w->mutex);
149  w->done = 0;
150 
151  if (ret = pthread_create(&w->thread, NULL, thread_worker, w)) {
152  ctx->nb_threads = main_func ? i : i + 1;
153  pthread_mutex_unlock(&w->mutex);
154  pthread_cond_destroy(&w->cond);
155  pthread_mutex_destroy(&w->mutex);
157  return AVERROR(ret);
158  }
159 
160  while (!w->done)
161  pthread_cond_wait(&w->cond, &w->mutex);
162  pthread_mutex_unlock(&w->mutex);
163  }
164 
165  return nb_threads;
166 }
167 
168 void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
169 {
170  int nb_workers, i, is_last = 0;
171 
172  av_assert0(nb_jobs > 0);
173  ctx->nb_jobs = nb_jobs;
174  ctx->nb_active_threads = FFMIN(nb_jobs, ctx->nb_threads);
175  atomic_store_explicit(&ctx->first_job, 0, memory_order_relaxed);
176  atomic_store_explicit(&ctx->current_job, ctx->nb_active_threads, memory_order_relaxed);
177  nb_workers = ctx->nb_active_threads;
178  if (!ctx->main_func || !execute_main)
179  nb_workers--;
180 
181  for (i = 0; i < nb_workers; i++) {
182  WorkerContext *w = &ctx->workers[i];
183  pthread_mutex_lock(&w->mutex);
184  w->done = 0;
185  pthread_cond_signal(&w->cond);
186  pthread_mutex_unlock(&w->mutex);
187  }
188 
189  if (ctx->main_func && execute_main)
190  ctx->main_func(ctx->priv);
191  else
192  is_last = run_jobs(ctx);
193 
194  if (!is_last) {
195  pthread_mutex_lock(&ctx->done_mutex);
196  while (!ctx->done)
197  pthread_cond_wait(&ctx->done_cond, &ctx->done_mutex);
198  ctx->done = 0;
199  pthread_mutex_unlock(&ctx->done_mutex);
200  }
201 }
202 
204 {
206  int nb_workers, i;
207 
208  if (!pctx || !*pctx)
209  return;
210 
211  ctx = *pctx;
212  nb_workers = ctx->nb_threads;
213  if (!ctx->main_func)
214  nb_workers--;
215 
216  ctx->finished = 1;
217  for (i = 0; i < nb_workers; i++) {
218  WorkerContext *w = &ctx->workers[i];
219  pthread_mutex_lock(&w->mutex);
220  w->done = 0;
221  pthread_cond_signal(&w->cond);
222  pthread_mutex_unlock(&w->mutex);
223  }
224 
225  for (i = 0; i < nb_workers; i++) {
226  WorkerContext *w = &ctx->workers[i];
227  pthread_join(w->thread, NULL);
228  pthread_cond_destroy(&w->cond);
229  pthread_mutex_destroy(&w->mutex);
230  }
231 
232  pthread_cond_destroy(&ctx->done_cond);
233  pthread_mutex_destroy(&ctx->done_mutex);
234  av_freep(&ctx->workers);
235  av_freep(pctx);
236 }
237 
238 #else /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
239 
241  void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
242  void (*main_func)(void *priv),
243  int nb_threads)
244 {
245  *pctx = NULL;
246  return AVERROR(ENOSYS);
247 }
248 
249 void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
250 {
251  av_assert0(0);
252 }
253 
255 {
256  av_assert0(!pctx || !*pctx);
257 }
258 
259 #endif /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
pthread_mutex_t
_fmutex pthread_mutex_t
Definition: os2threads.h:53
pthread_join
static av_always_inline int pthread_join(pthread_t thread, void **value_ptr)
Definition: os2threads.h:94
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
thread.h
pthread_mutex_init
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
avpriv_slicethread_execute
void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
Execute slice threading.
Definition: slicethread.c:249
w
uint8_t w
Definition: llviddspenc.c:38
AVSliceThread
struct AVSliceThread AVSliceThread
Definition: slicethread.h:22
avpriv_slicethread_create
int avpriv_slicethread_create(AVSliceThread **pctx, void *priv, void(*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads), void(*main_func)(void *priv), int nb_threads)
Create slice threading context.
Definition: slicethread.c:240
avassert.h
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:40
ctx
AVFormatContext * ctx
Definition: movenc.c:48
MAX_AUTO_THREADS
#define MAX_AUTO_THREADS
Definition: slicethread.c:27
pthread_create
static av_always_inline int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
Definition: os2threads.h:80
NULL
#define NULL
Definition: coverity.c:32
worker_func
static void worker_func(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads)
Definition: pthread_slice.c:70
pthread_mutex_unlock
#define pthread_mutex_unlock(a)
Definition: ffprobe.c:81
av_cpu_count
int av_cpu_count(void)
Definition: cpu.c:209
attribute_align_arg
#define attribute_align_arg
Definition: internal.h:50
cpu.h
atomic_fetch_add_explicit
#define atomic_fetch_add_explicit(object, operand, order)
Definition: stdatomic.h:149
pthread_t
Definition: os2threads.h:44
pthread_cond_destroy
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:144
slicethread.h
pthread_mutex_destroy
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:255
internal.h
atomic_store_explicit
#define atomic_store_explicit(object, desired, order)
Definition: stdatomic.h:90
FFMIN
#define FFMIN(a, b)
Definition: macros.h:49
main_func
int() main_func(AVCodecContext *c)
Definition: pthread_slice.c:42
av_mallocz
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:254
pthread_cond_t
Definition: os2threads.h:58
av_calloc
void * av_calloc(size_t nmemb, size_t size)
Definition: mem.c:262
ret
ret
Definition: filter_design.txt:187
pthread_cond_signal
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:152
atomic_uint
intptr_t atomic_uint
Definition: stdatomic.h:56
pthread_cond_wait
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:192
mem.h
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:34
avpriv_slicethread_free
void avpriv_slicethread_free(AVSliceThread **pctx)
Destroy slice threading context.
Definition: slicethread.c:254
pthread_cond_init
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:133
atomic_init
#define atomic_init(obj, value)
Definition: stdatomic.h:33
cond
int(* cond)(enum AVPixelFormat pix_fmt)
Definition: pixdesc_query.c:28
mutex
static AVMutex mutex
Definition: log.c:46
pthread_mutex_lock
#define pthread_mutex_lock(a)
Definition: ffprobe.c:77