FFmpeg
slicethread.c
Go to the documentation of this file.
1 /*
2  * This file is part of FFmpeg.
3  *
4  * FFmpeg is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * FFmpeg is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with FFmpeg; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17  */
18 
19 #include <stdatomic.h>
20 #include "cpu.h"
21 #include "internal.h"
22 #include "slicethread.h"
23 #include "mem.h"
24 #include "thread.h"
25 #include "avassert.h"
26 
27 #define MAX_AUTO_THREADS 16
28 
29 #if HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS2THREADS
30 
31 typedef struct WorkerContext {
35  pthread_t thread;
36  int done;
37 } WorkerContext;
38 
39 struct AVSliceThread {
40  WorkerContext *workers;
41  int nb_threads;
42  int nb_active_threads;
43  int nb_jobs;
44 
45  atomic_uint first_job;
46  atomic_uint current_job;
47  pthread_mutex_t done_mutex;
48  pthread_cond_t done_cond;
49  int done;
50  int finished;
51 
52  void *priv;
53  void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads);
54  void (*main_func)(void *priv);
55 };
56 
57 static int run_jobs(AVSliceThread *ctx)
58 {
59  unsigned nb_jobs = ctx->nb_jobs;
60  unsigned nb_active_threads = ctx->nb_active_threads;
61  unsigned first_job = atomic_fetch_add_explicit(&ctx->first_job, 1, memory_order_acq_rel);
62  unsigned current_job = first_job;
63 
64  do {
65  ctx->worker_func(ctx->priv, current_job, first_job, nb_jobs, nb_active_threads);
66  } while ((current_job = atomic_fetch_add_explicit(&ctx->current_job, 1, memory_order_acq_rel)) < nb_jobs);
67 
68  return current_job == nb_jobs + nb_active_threads - 1;
69 }
70 
71 static void *attribute_align_arg thread_worker(void *v)
72 {
73  WorkerContext *w = v;
74  AVSliceThread *ctx = w->ctx;
75 
76  pthread_mutex_lock(&w->mutex);
77  pthread_cond_signal(&w->cond);
78 
79  while (1) {
80  w->done = 1;
81  while (w->done)
82  pthread_cond_wait(&w->cond, &w->mutex);
83 
84  if (ctx->finished) {
85  pthread_mutex_unlock(&w->mutex);
86  return NULL;
87  }
88 
89  if (run_jobs(ctx)) {
90  pthread_mutex_lock(&ctx->done_mutex);
91  ctx->done = 1;
92  pthread_cond_signal(&ctx->done_cond);
93  pthread_mutex_unlock(&ctx->done_mutex);
94  }
95  }
96 }
97 
98 av_cold
99 int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
100  void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
101  void (*main_func)(void *priv),
102  int nb_threads)
103 {
105  int nb_workers, i;
106  int ret;
107 
108  av_assert0(nb_threads >= 0);
109  if (!nb_threads) {
110  int nb_cpus = av_cpu_count();
111  if (nb_cpus > 1)
112  nb_threads = FFMIN(nb_cpus + 1, MAX_AUTO_THREADS);
113  else
114  nb_threads = 1;
115  }
116 
117  nb_workers = nb_threads;
118  if (!main_func)
119  nb_workers--;
120 
121  *pctx = ctx = av_mallocz(sizeof(*ctx));
122  if (!ctx)
123  return AVERROR(ENOMEM);
124 
125  if (nb_workers && !(ctx->workers = av_calloc(nb_workers, sizeof(*ctx->workers)))) {
126  av_freep(pctx);
127  return AVERROR(ENOMEM);
128  }
129 
130  ctx->priv = priv;
131  ctx->worker_func = worker_func;
132  ctx->main_func = main_func;
133  ctx->nb_threads = nb_threads;
134  ctx->nb_active_threads = 0;
135  ctx->nb_jobs = 0;
136  ctx->finished = 0;
137 
138  atomic_init(&ctx->first_job, 0);
139  atomic_init(&ctx->current_job, 0);
140  ret = pthread_mutex_init(&ctx->done_mutex, NULL);
141  if (ret) {
142  av_freep(&ctx->workers);
143  av_freep(pctx);
144  return AVERROR(ret);
145  }
146  ret = pthread_cond_init(&ctx->done_cond, NULL);
147  if (ret) {
148  ctx->nb_threads = main_func ? 0 : 1;
150  return AVERROR(ret);
151  }
152  ctx->done = 0;
153 
154  for (i = 0; i < nb_workers; i++) {
155  WorkerContext *w = &ctx->workers[i];
156  int ret;
157  w->ctx = ctx;
158  ret = pthread_mutex_init(&w->mutex, NULL);
159  if (ret) {
160  ctx->nb_threads = main_func ? i : i + 1;
162  return AVERROR(ret);
163  }
164  ret = pthread_cond_init(&w->cond, NULL);
165  if (ret) {
166  pthread_mutex_destroy(&w->mutex);
167  ctx->nb_threads = main_func ? i : i + 1;
169  return AVERROR(ret);
170  }
171  pthread_mutex_lock(&w->mutex);
172  w->done = 0;
173 
174  if (ret = pthread_create(&w->thread, NULL, thread_worker, w)) {
175  ctx->nb_threads = main_func ? i : i + 1;
176  pthread_mutex_unlock(&w->mutex);
177  pthread_cond_destroy(&w->cond);
178  pthread_mutex_destroy(&w->mutex);
180  return AVERROR(ret);
181  }
182 
183  while (!w->done)
184  pthread_cond_wait(&w->cond, &w->mutex);
185  pthread_mutex_unlock(&w->mutex);
186  }
187 
188  return nb_threads;
189 }
190 
191 void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
192 {
193  int nb_workers, i, is_last = 0;
194 
195  av_assert0(nb_jobs > 0);
196  ctx->nb_jobs = nb_jobs;
197  ctx->nb_active_threads = FFMIN(nb_jobs, ctx->nb_threads);
198  atomic_store_explicit(&ctx->first_job, 0, memory_order_relaxed);
199  atomic_store_explicit(&ctx->current_job, ctx->nb_active_threads, memory_order_relaxed);
200  nb_workers = ctx->nb_active_threads;
201  if (!ctx->main_func || !execute_main)
202  nb_workers--;
203 
204  for (i = 0; i < nb_workers; i++) {
205  WorkerContext *w = &ctx->workers[i];
206  pthread_mutex_lock(&w->mutex);
207  w->done = 0;
208  pthread_cond_signal(&w->cond);
209  pthread_mutex_unlock(&w->mutex);
210  }
211 
212  if (ctx->main_func && execute_main)
213  ctx->main_func(ctx->priv);
214  else
215  is_last = run_jobs(ctx);
216 
217  if (!is_last) {
218  pthread_mutex_lock(&ctx->done_mutex);
219  while (!ctx->done)
220  pthread_cond_wait(&ctx->done_cond, &ctx->done_mutex);
221  ctx->done = 0;
222  pthread_mutex_unlock(&ctx->done_mutex);
223  }
224 }
225 
227 {
228  AVSliceThread *ctx = *pctx;
229  int nb_workers, i;
230 
231  if (!ctx)
232  return;
233 
234  nb_workers = ctx->nb_threads;
235  if (!ctx->main_func)
236  nb_workers--;
237 
238  ctx->finished = 1;
239  for (i = 0; i < nb_workers; i++) {
240  WorkerContext *w = &ctx->workers[i];
241  pthread_mutex_lock(&w->mutex);
242  w->done = 0;
243  pthread_cond_signal(&w->cond);
244  pthread_mutex_unlock(&w->mutex);
245  }
246 
247  for (i = 0; i < nb_workers; i++) {
248  WorkerContext *w = &ctx->workers[i];
249  pthread_join(w->thread, NULL);
250  pthread_cond_destroy(&w->cond);
251  pthread_mutex_destroy(&w->mutex);
252  }
253 
254  pthread_cond_destroy(&ctx->done_cond);
255  pthread_mutex_destroy(&ctx->done_mutex);
256  av_freep(&ctx->workers);
257  av_freep(pctx);
258 }
259 
260 #else /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
261 
263  void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
264  void (*main_func)(void *priv),
265  int nb_threads)
266 {
267  *pctx = NULL;
268  return AVERROR(ENOSYS);
269 }
270 
271 void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
272 {
273  av_assert0(0);
274 }
275 
277 {
278  av_assert0(!pctx || !*pctx);
279 }
280 
281 #endif /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
pthread_mutex_t
_fmutex pthread_mutex_t
Definition: os2threads.h:53
pthread_join
static av_always_inline int pthread_join(pthread_t thread, void **value_ptr)
Definition: os2threads.h:94
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
thread.h
pthread_mutex_init
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
avpriv_slicethread_execute
void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
Execute slice threading.
Definition: slicethread.c:271
w
uint8_t w
Definition: llviddspenc.c:38
pthread_mutex_lock
static av_always_inline int pthread_mutex_lock(pthread_mutex_t *mutex)
Definition: os2threads.h:119
AVSliceThread
struct AVSliceThread AVSliceThread
Definition: slicethread.h:22
avpriv_slicethread_create
int avpriv_slicethread_create(AVSliceThread **pctx, void *priv, void(*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads), void(*main_func)(void *priv), int nb_threads)
Create slice threading context.
Definition: slicethread.c:262
mutex
static AVMutex mutex
Definition: resman.c:61
avassert.h
av_cold
#define av_cold
Definition: attributes.h:106
pthread_mutex_unlock
static av_always_inline int pthread_mutex_unlock(pthread_mutex_t *mutex)
Definition: os2threads.h:126
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:41
ctx
AVFormatContext * ctx
Definition: movenc.c:49
MAX_AUTO_THREADS
#define MAX_AUTO_THREADS
Definition: slicethread.c:27
pthread_create
static av_always_inline int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
Definition: os2threads.h:80
NULL
#define NULL
Definition: coverity.c:32
worker_func
static void worker_func(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads)
Definition: pthread_slice.c:57
av_cpu_count
int av_cpu_count(void)
Definition: cpu.c:221
attribute_align_arg
#define attribute_align_arg
Definition: internal.h:50
cpu.h
atomic_fetch_add_explicit
#define atomic_fetch_add_explicit(object, operand, order)
Definition: stdatomic.h:149
pthread_t
Definition: os2threads.h:44
pthread_cond_destroy
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:144
slicethread.h
pthread_mutex_destroy
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:256
internal.h
atomic_store_explicit
#define atomic_store_explicit(object, desired, order)
Definition: stdatomic.h:90
FFMIN
#define FFMIN(a, b)
Definition: macros.h:49
main_func
int() main_func(AVCodecContext *c)
Definition: pthread_slice.c:39
av_mallocz
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:256
pthread_cond_t
Definition: os2threads.h:58
av_calloc
void * av_calloc(size_t nmemb, size_t size)
Definition: mem.c:264
ret
ret
Definition: filter_design.txt:187
pthread_cond_signal
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:152
atomic_uint
intptr_t atomic_uint
Definition: stdatomic.h:56
pthread_cond_wait
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:192
mem.h
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:35
avpriv_slicethread_free
void avpriv_slicethread_free(AVSliceThread **pctx)
Destroy slice threading context.
Definition: slicethread.c:276
pthread_cond_init
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:133
atomic_init
#define atomic_init(obj, value)
Definition: stdatomic.h:33
cond
int(* cond)(enum AVPixelFormat pix_fmt)
Definition: pixdesc_query.c:28