FFmpeg
executor.c
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2023 Nuo Mi
3  *
4  * This file is part of FFmpeg.
5  *
6  * FFmpeg is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * FFmpeg is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with FFmpeg; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 #include "internal.h"
21 #include "mem.h"
22 #include "thread.h"
23 
24 #include "executor.h"
25 
26 #if !HAVE_THREADS
27 
28 #define ExecutorThread char
29 
30 #define executor_thread_create(t, a, s, ar) 0
31 #define executor_thread_join(t, r) do {} while(0)
32 
33 #else
34 
35 #define ExecutorThread pthread_t
36 
37 #define executor_thread_create(t, a, s, ar) pthread_create(t, a, s, ar)
38 #define executor_thread_join(t, r) pthread_join(t, r)
39 
40 #endif //!HAVE_THREADS
41 
42 typedef struct ThreadInfo {
45 } ThreadInfo;
46 
47 struct AVExecutor {
50 
52  uint8_t *local_contexts;
53 
56  int die;
57 
59 };
60 
61 static AVTask* remove_task(AVTask **prev, AVTask *t)
62 {
63  *prev = t->next;
64  t->next = NULL;
65  return t;
66 }
67 
68 static void add_task(AVTask **prev, AVTask *t)
69 {
70  t->next = *prev;
71  *prev = t;
72 }
73 
74 static int run_one_task(AVExecutor *e, void *lc)
75 {
76  AVTaskCallbacks *cb = &e->cb;
77  AVTask **prev;
78 
79  for (prev = &e->tasks; *prev && !cb->ready(*prev, cb->user_data); prev = &(*prev)->next)
80  /* nothing */;
81  if (*prev) {
82  AVTask *t = remove_task(prev, *prev);
83  ff_mutex_unlock(&e->lock);
84  cb->run(t, lc, cb->user_data);
85  ff_mutex_lock(&e->lock);
86  return 1;
87  }
88  return 0;
89 }
90 
91 #if HAVE_THREADS
92 static void *executor_worker_task(void *data)
93 {
94  ThreadInfo *ti = (ThreadInfo*)data;
95  AVExecutor *e = ti->e;
96  void *lc = e->local_contexts + (ti - e->threads) * e->cb.local_context_size;
97 
98  ff_mutex_lock(&e->lock);
99  while (1) {
100  if (e->die) break;
101 
102  if (!run_one_task(e, lc)) {
103  //no task in one loop
104  ff_cond_wait(&e->cond, &e->lock);
105  }
106  }
107  ff_mutex_unlock(&e->lock);
108  return NULL;
109 }
110 #endif
111 
112 static void executor_free(AVExecutor *e, const int has_lock, const int has_cond)
113 {
114  if (e->thread_count) {
115  //signal die
116  ff_mutex_lock(&e->lock);
117  e->die = 1;
118  ff_cond_broadcast(&e->cond);
119  ff_mutex_unlock(&e->lock);
120 
121  for (int i = 0; i < e->thread_count; i++)
123  }
124  if (has_cond)
125  ff_cond_destroy(&e->cond);
126  if (has_lock)
127  ff_mutex_destroy(&e->lock);
128 
129  av_free(e->threads);
131 
132  av_free(e);
133 }
134 
135 AVExecutor* av_executor_alloc(const AVTaskCallbacks *cb, int thread_count)
136 {
137  AVExecutor *e;
138  int has_lock = 0, has_cond = 0;
139  if (!cb || !cb->user_data || !cb->ready || !cb->run || !cb->priority_higher)
140  return NULL;
141 
142  e = av_mallocz(sizeof(*e));
143  if (!e)
144  return NULL;
145  e->cb = *cb;
146 
147  e->local_contexts = av_calloc(thread_count, e->cb.local_context_size);
148  if (!e->local_contexts)
149  goto free_executor;
150 
151  e->threads = av_calloc(thread_count, sizeof(*e->threads));
152  if (!e->threads)
153  goto free_executor;
154 
155  has_lock = !ff_mutex_init(&e->lock, NULL);
156  has_cond = !ff_cond_init(&e->cond, NULL);
157 
158  if (!has_lock || !has_cond)
159  goto free_executor;
160 
161  for (/* nothing */; e->thread_count < thread_count; e->thread_count++) {
162  ThreadInfo *ti = e->threads + e->thread_count;
163  ti->e = e;
164  if (executor_thread_create(&ti->thread, NULL, executor_worker_task, ti))
165  goto free_executor;
166  }
167  return e;
168 
169 free_executor:
170  executor_free(e, has_lock, has_cond);
171  return NULL;
172 }
173 
174 void av_executor_free(AVExecutor **executor)
175 {
176  if (!executor || !*executor)
177  return;
178  executor_free(*executor, 1, 1);
179  *executor = NULL;
180 }
181 
183 {
184  AVTaskCallbacks *cb = &e->cb;
185  AVTask **prev;
186 
187  ff_mutex_lock(&e->lock);
188  if (t) {
189  for (prev = &e->tasks; *prev && cb->priority_higher(*prev, t); prev = &(*prev)->next)
190  /* nothing */;
191  add_task(prev, t);
192  }
193  ff_cond_signal(&e->cond);
194  ff_mutex_unlock(&e->lock);
195 
196 #if !HAVE_THREADS
197  // We are running in a single-threaded environment, so we must handle all tasks ourselves
198  while (run_one_task(e, e->local_contexts))
199  /* nothing */;
200 #endif
201 }
add_task
static void add_task(AVTask **prev, AVTask *t)
Definition: executor.c:68
ThreadInfo::thread
ExecutorThread thread
Definition: executor.c:44
AVExecutor::threads
ThreadInfo * threads
Definition: executor.c:51
executor_free
static void executor_free(AVExecutor *e, const int has_lock, const int has_cond)
Definition: executor.c:112
ff_mutex_init
static int ff_mutex_init(AVMutex *mutex, const void *attr)
Definition: thread.h:187
cb
static double cb(void *priv, double x, double y)
Definition: vf_geq.c:241
thread.h
AVTask::next
AVTask * next
Definition: executor.h:28
data
const char data[16]
Definition: mxf.c:148
ff_cond_broadcast
static int ff_cond_broadcast(AVCond *cond)
Definition: thread.h:197
executor_thread_join
#define executor_thread_join(t, r)
Definition: executor.c:31
ff_mutex_unlock
static int ff_mutex_unlock(AVMutex *mutex)
Definition: thread.h:189
AVTaskCallbacks
Definition: executor.h:31
AVTaskCallbacks::local_context_size
int local_context_size
Definition: executor.h:34
AVExecutor::die
int die
Definition: executor.c:56
av_executor_alloc
AVExecutor * av_executor_alloc(const AVTaskCallbacks *cb, int thread_count)
Alloc executor.
Definition: executor.c:135
AVMutex
#define AVMutex
Definition: thread.h:184
ff_cond_wait
static int ff_cond_wait(AVCond *cond, AVMutex *mutex)
Definition: thread.h:198
AVCond
#define AVCond
Definition: thread.h:192
AVTask
Definition: executor.h:27
NULL
#define NULL
Definition: coverity.c:32
AVExecutor
Definition: executor.c:47
AVExecutor::cond
AVCond cond
Definition: executor.c:55
ff_mutex_destroy
static int ff_mutex_destroy(AVMutex *mutex)
Definition: thread.h:190
AVExecutor::thread_count
int thread_count
Definition: executor.c:49
AVExecutor::cb
AVTaskCallbacks cb
Definition: executor.c:48
ThreadInfo::e
AVExecutor * e
Definition: executor.c:43
executor.h
run_one_task
static int run_one_task(AVExecutor *e, void *lc)
Definition: executor.c:74
executor_thread_create
#define executor_thread_create(t, a, s, ar)
Definition: executor.c:30
ThreadInfo
HAVE_THREADS.
Definition: executor.c:42
ff_mutex_lock
static int ff_mutex_lock(AVMutex *mutex)
Definition: thread.h:188
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:255
internal.h
av_mallocz
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:254
AVExecutor::tasks
AVTask * tasks
Definition: executor.c:58
av_calloc
void * av_calloc(size_t nmemb, size_t size)
Definition: mem.c:262
remove_task
static AVTask * remove_task(AVTask **prev, AVTask *t)
Definition: executor.c:61
av_executor_free
void av_executor_free(AVExecutor **executor)
Free executor.
Definition: executor.c:174
av_executor_execute
void av_executor_execute(AVExecutor *e, AVTask *t)
Add task to executor.
Definition: executor.c:182
ff_cond_signal
static int ff_cond_signal(AVCond *cond)
Definition: thread.h:196
mem.h
av_free
#define av_free(p)
Definition: tableprint_vlc.h:33
ff_cond_destroy
static int ff_cond_destroy(AVCond *cond)
Definition: thread.h:195
ExecutorThread
#define ExecutorThread
Definition: executor.c:28
ff_cond_init
static int ff_cond_init(AVCond *cond, const void *attr)
Definition: thread.h:194
AVExecutor::lock
AVMutex lock
Definition: executor.c:54
AVExecutor::local_contexts
uint8_t * local_contexts
Definition: executor.c:52