FFmpeg
executor.c
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2024 Nuo Mi
3  *
4  * This file is part of FFmpeg.
5  *
6  * FFmpeg is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * FFmpeg is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with FFmpeg; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 #include "config.h"
22 
23 #include <stdbool.h>
24 
25 #include "libavutil/mem.h"
26 #include "libavutil/thread.h"
27 
28 #include "executor.h"
29 
30 #if !HAVE_THREADS
31 
32 #define ExecutorThread char
33 
34 #define executor_thread_create(t, a, s, ar) 0
35 #define executor_thread_join(t, r) do {} while(0)
36 
37 #else
38 
39 #define ExecutorThread pthread_t
40 
41 #define executor_thread_create(t, a, s, ar) pthread_create(t, a, s, ar)
42 #define executor_thread_join(t, r) pthread_join(t, r)
43 
44 #endif //!HAVE_THREADS
45 
46 typedef struct ThreadInfo {
49 } ThreadInfo;
50 
51 typedef struct Queue {
54 } Queue;
55 
56 struct FFExecutor {
59  bool recursive;
60 
62  uint8_t *local_contexts;
63 
66  int die;
67 
68  Queue *q;
69 };
70 
72 {
73  FFTask *t = q->head;
74  if (t) {
75  q->head = t->next;
76  t->next = NULL;
77  if (!q->head)
78  q->tail = NULL;
79  }
80  return t;
81 }
82 
83 static void add_task(Queue *q, FFTask *t)
84 {
85  t->next = NULL;
86  if (!q->head)
87  q->tail = q->head = t;
88  else
89  q->tail = q->tail->next = t;
90 }
91 
92 static int run_one_task(FFExecutor *e, void *lc)
93 {
94  FFTaskCallbacks *cb = &e->cb;
95  FFTask *t = NULL;
96 
97  for (int i = 0; i < e->cb.priorities && !t; i++)
98  t = remove_task(e->q + i);
99 
100  if (t) {
101  if (e->thread_count > 0)
102  ff_mutex_unlock(&e->lock);
103  cb->run(t, lc, cb->user_data);
104  if (e->thread_count > 0)
105  ff_mutex_lock(&e->lock);
106  return 1;
107  }
108  return 0;
109 }
110 
111 #if HAVE_THREADS
112 static void *executor_worker_task(void *data)
113 {
114  ThreadInfo *ti = (ThreadInfo*)data;
115  FFExecutor *e = ti->e;
116  void *lc = e->local_contexts + (ti - e->threads) * e->cb.local_context_size;
117 
118  ff_mutex_lock(&e->lock);
119  while (1) {
120  if (e->die) break;
121 
122  if (!run_one_task(e, lc)) {
123  //no task in one loop
124  ff_cond_wait(&e->cond, &e->lock);
125  }
126  }
127  ff_mutex_unlock(&e->lock);
128  return NULL;
129 }
130 #endif
131 
132 static void executor_free(FFExecutor *e, const int has_lock, const int has_cond)
133 {
134  if (e->thread_count) {
135  //signal die
136  ff_mutex_lock(&e->lock);
137  e->die = 1;
138  ff_cond_broadcast(&e->cond);
139  ff_mutex_unlock(&e->lock);
140 
141  for (int i = 0; i < e->thread_count; i++)
143  }
144  if (has_cond)
145  ff_cond_destroy(&e->cond);
146  if (has_lock)
147  ff_mutex_destroy(&e->lock);
148 
149  av_free(e->threads);
150  av_free(e->q);
152 
153  av_free(e);
154 }
155 
156 FFExecutor* ff_executor_alloc(const FFTaskCallbacks *cb, int thread_count)
157 {
158  FFExecutor *e;
159  int has_lock = 0, has_cond = 0;
160  if (!cb || !cb->user_data || !cb->run || !cb->priorities)
161  return NULL;
162 
163  e = av_mallocz(sizeof(*e));
164  if (!e)
165  return NULL;
166  e->cb = *cb;
167 
168  e->local_contexts = av_calloc(FFMAX(thread_count, 1), e->cb.local_context_size);
169  if (!e->local_contexts)
170  goto free_executor;
171 
172  e->q = av_calloc(e->cb.priorities, sizeof(Queue));
173  if (!e->q)
174  goto free_executor;
175 
176  e->threads = av_calloc(FFMAX(thread_count, 1), sizeof(*e->threads));
177  if (!e->threads)
178  goto free_executor;
179 
180  if (!thread_count)
181  return e;
182 
183  has_lock = !ff_mutex_init(&e->lock, NULL);
184  has_cond = !ff_cond_init(&e->cond, NULL);
185 
186  if (!has_lock || !has_cond)
187  goto free_executor;
188 
189  for (/* nothing */; e->thread_count < thread_count; e->thread_count++) {
190  ThreadInfo *ti = e->threads + e->thread_count;
191  ti->e = e;
192  if (executor_thread_create(&ti->thread, NULL, executor_worker_task, ti))
193  goto free_executor;
194  }
195  return e;
196 
197 free_executor:
198  executor_free(e, has_lock, has_cond);
199  return NULL;
200 }
201 
202 void ff_executor_free(FFExecutor **executor)
203 {
204  int thread_count;
205 
206  if (!executor || !*executor)
207  return;
208  thread_count = (*executor)->thread_count;
209  executor_free(*executor, thread_count, thread_count);
210  *executor = NULL;
211 }
212 
214 {
215  if (e->thread_count)
216  ff_mutex_lock(&e->lock);
217  if (t)
218  add_task(e->q + t->priority % e->cb.priorities, t);
219  if (e->thread_count) {
220  ff_cond_signal(&e->cond);
221  ff_mutex_unlock(&e->lock);
222  }
223 
224  if (!e->thread_count || !HAVE_THREADS) {
225  if (e->recursive)
226  return;
227  e->recursive = true;
228  // We are running in a single-threaded environment, so we must handle all tasks ourselves
229  while (run_one_task(e, e->local_contexts))
230  /* nothing */;
231  e->recursive = false;
232  }
233 }
ThreadInfo::thread
ExecutorThread thread
Definition: executor.c:48
FFExecutor::threads
ThreadInfo * threads
Definition: executor.c:61
ff_mutex_init
static int ff_mutex_init(AVMutex *mutex, const void *attr)
Definition: thread.h:187
cb
static double cb(void *priv, double x, double y)
Definition: vf_geq.c:247
thread.h
FFExecutor::thread_count
int thread_count
Definition: executor.c:58
FFExecutor::cond
AVCond cond
Definition: executor.c:65
FFTaskCallbacks::priorities
int priorities
Definition: executor.h:44
data
const char data[16]
Definition: mxf.c:149
Queue::tail
FFTask * tail
Definition: executor.c:53
FFMAX
#define FFMAX(a, b)
Definition: macros.h:47
ExecutorThread
#define ExecutorThread
Definition: executor.c:32
ff_cond_broadcast
static int ff_cond_broadcast(AVCond *cond)
Definition: thread.h:197
FFTaskCallbacks
Definition: executor.h:38
FFExecutor::die
int die
Definition: executor.c:66
ff_mutex_unlock
static int ff_mutex_unlock(AVMutex *mutex)
Definition: thread.h:189
FFTask
Definition: executor.h:33
FFExecutor::recursive
bool recursive
Definition: executor.c:59
FFExecutor::local_contexts
uint8_t * local_contexts
Definition: executor.c:62
ff_executor_free
void ff_executor_free(FFExecutor **executor)
Free executor.
Definition: executor.c:202
Queue
Linear double-ended data structure.
Definition: executor.c:51
AVMutex
#define AVMutex
Definition: thread.h:184
FFExecutor
Definition: executor.c:56
Queue::head
FFTask * head
Definition: executor.c:52
add_task
static void add_task(Queue *q, FFTask *t)
Definition: executor.c:83
ff_cond_wait
static int ff_cond_wait(AVCond *cond, AVMutex *mutex)
Definition: thread.h:198
AVCond
#define AVCond
Definition: thread.h:192
FFExecutor::cb
FFTaskCallbacks cb
Definition: executor.c:57
ff_executor_execute
void ff_executor_execute(FFExecutor *e, FFTask *t)
Add task to executor.
Definition: executor.c:213
remove_task
static FFTask * remove_task(Queue *q)
Definition: executor.c:71
NULL
#define NULL
Definition: coverity.c:32
FFExecutor::q
Queue * q
Definition: executor.c:68
FFTask::next
FFTask * next
Definition: executor.h:34
executor_thread_join
#define executor_thread_join(t, r)
Definition: executor.c:35
FFTask::priority
int priority
Definition: executor.h:35
ThreadInfo::e
FFExecutor * e
Definition: executor.c:47
ff_executor_alloc
FFExecutor * ff_executor_alloc(const FFTaskCallbacks *cb, int thread_count)
Alloc executor.
Definition: executor.c:156
ff_mutex_destroy
static int ff_mutex_destroy(AVMutex *mutex)
Definition: thread.h:190
executor_free
static void executor_free(FFExecutor *e, const int has_lock, const int has_cond)
Definition: executor.c:132
FFExecutor::lock
AVMutex lock
Definition: executor.c:64
ThreadInfo
HAVE_THREADS.
Definition: executor.c:46
ff_mutex_lock
static int ff_mutex_lock(AVMutex *mutex)
Definition: thread.h:188
run_one_task
static int run_one_task(FFExecutor *e, void *lc)
Definition: executor.c:92
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:256
av_mallocz
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:256
av_calloc
void * av_calloc(size_t nmemb, size_t size)
Definition: mem.c:264
FFTaskCallbacks::local_context_size
int local_context_size
Definition: executor.h:41
executor.h
ff_cond_signal
static int ff_cond_signal(AVCond *cond)
Definition: thread.h:196
mem.h
executor_thread_create
#define executor_thread_create(t, a, s, ar)
Definition: executor.c:34
av_free
#define av_free(p)
Definition: tableprint_vlc.h:33
ff_cond_destroy
static int ff_cond_destroy(AVCond *cond)
Definition: thread.h:195
ff_cond_init
static int ff_cond_init(AVCond *cond, const void *attr)
Definition: thread.h:194