FFmpeg
threadmessage.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014 Nicolas George
3  *
4  * This file is part of FFmpeg.
5  *
6  * FFmpeg is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public License
8  * as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * FFmpeg is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License
17  * along with FFmpeg; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 #include <limits.h>
22 #include <stddef.h>
23 
24 #include "error.h"
25 #include "fifo.h"
26 #include "mem.h"
27 #include "threadmessage.h"
28 #include "thread.h"
29 
31 #if HAVE_THREADS
32  AVFifo *fifo;
34  pthread_cond_t cond_recv;
35  pthread_cond_t cond_send;
36  int err_send;
37  int err_recv;
38  unsigned elsize;
39  void (*free_func)(void *msg);
40 #else
41  int dummy;
42 #endif
43 };
44 
46  unsigned nelem,
47  unsigned elsize)
48 {
49 #if HAVE_THREADS
51  int ret = 0;
52 
53  if (nelem > INT_MAX / elsize)
54  return AVERROR(EINVAL);
55  if (!(rmq = av_mallocz(sizeof(*rmq))))
56  return AVERROR(ENOMEM);
57  if ((ret = pthread_mutex_init(&rmq->lock, NULL))) {
58  av_free(rmq);
59  return AVERROR(ret);
60  }
61  if ((ret = pthread_cond_init(&rmq->cond_recv, NULL))) {
62  pthread_mutex_destroy(&rmq->lock);
63  av_free(rmq);
64  return AVERROR(ret);
65  }
66  if ((ret = pthread_cond_init(&rmq->cond_send, NULL))) {
67  pthread_cond_destroy(&rmq->cond_recv);
68  pthread_mutex_destroy(&rmq->lock);
69  av_free(rmq);
70  return AVERROR(ret);
71  }
72  if (!(rmq->fifo = av_fifo_alloc2(nelem, elsize, 0))) {
73  pthread_cond_destroy(&rmq->cond_send);
74  pthread_cond_destroy(&rmq->cond_recv);
75  pthread_mutex_destroy(&rmq->lock);
76  av_free(rmq);
77  return AVERROR(ENOMEM);
78  }
79  rmq->elsize = elsize;
80  *mq = rmq;
81  return 0;
82 #else
83  *mq = NULL;
84  return AVERROR(ENOSYS);
85 #endif /* HAVE_THREADS */
86 }
87 
89  void (*free_func)(void *msg))
90 {
91 #if HAVE_THREADS
92  mq->free_func = free_func;
93 #endif
94 }
95 
97 {
98 #if HAVE_THREADS
99  if (*mq) {
101  av_fifo_freep2(&(*mq)->fifo);
102  pthread_cond_destroy(&(*mq)->cond_send);
103  pthread_cond_destroy(&(*mq)->cond_recv);
104  pthread_mutex_destroy(&(*mq)->lock);
105  av_freep(mq);
106  }
107 #endif
108 }
109 
111 {
112 #if HAVE_THREADS
113  int ret;
114  pthread_mutex_lock(&mq->lock);
115  ret = av_fifo_can_read(mq->fifo);
116  pthread_mutex_unlock(&mq->lock);
117  return ret;
118 #else
119  return AVERROR(ENOSYS);
120 #endif
121 }
122 
123 #if HAVE_THREADS
124 
125 static int av_thread_message_queue_send_locked(AVThreadMessageQueue *mq,
126  void *msg,
127  unsigned flags)
128 {
129  while (!mq->err_send && !av_fifo_can_write(mq->fifo)) {
131  return AVERROR(EAGAIN);
132  pthread_cond_wait(&mq->cond_send, &mq->lock);
133  }
134  if (mq->err_send)
135  return mq->err_send;
136  av_fifo_write(mq->fifo, msg, 1);
137  /* one message is sent, signal one receiver */
138  pthread_cond_signal(&mq->cond_recv);
139  return 0;
140 }
141 
142 static int av_thread_message_queue_recv_locked(AVThreadMessageQueue *mq,
143  void *msg,
144  unsigned flags)
145 {
146  while (!mq->err_recv && !av_fifo_can_read(mq->fifo)) {
148  return AVERROR(EAGAIN);
149  pthread_cond_wait(&mq->cond_recv, &mq->lock);
150  }
151  if (!av_fifo_can_read(mq->fifo))
152  return mq->err_recv;
153  av_fifo_read(mq->fifo, msg, 1);
154  /* one message space appeared, signal one sender */
155  pthread_cond_signal(&mq->cond_send);
156  return 0;
157 }
158 
159 #endif /* HAVE_THREADS */
160 
162  void *msg,
163  unsigned flags)
164 {
165 #if HAVE_THREADS
166  int ret;
167 
168  pthread_mutex_lock(&mq->lock);
169  ret = av_thread_message_queue_send_locked(mq, msg, flags);
170  pthread_mutex_unlock(&mq->lock);
171  return ret;
172 #else
173  return AVERROR(ENOSYS);
174 #endif /* HAVE_THREADS */
175 }
176 
178  void *msg,
179  unsigned flags)
180 {
181 #if HAVE_THREADS
182  int ret;
183 
184  pthread_mutex_lock(&mq->lock);
185  ret = av_thread_message_queue_recv_locked(mq, msg, flags);
186  pthread_mutex_unlock(&mq->lock);
187  return ret;
188 #else
189  return AVERROR(ENOSYS);
190 #endif /* HAVE_THREADS */
191 }
192 
194  int err)
195 {
196 #if HAVE_THREADS
197  pthread_mutex_lock(&mq->lock);
198  mq->err_send = err;
199  pthread_cond_broadcast(&mq->cond_send);
200  pthread_mutex_unlock(&mq->lock);
201 #endif /* HAVE_THREADS */
202 }
203 
205  int err)
206 {
207 #if HAVE_THREADS
208  pthread_mutex_lock(&mq->lock);
209  mq->err_recv = err;
210  pthread_cond_broadcast(&mq->cond_recv);
211  pthread_mutex_unlock(&mq->lock);
212 #endif /* HAVE_THREADS */
213 }
214 
215 #if HAVE_THREADS
216 static int free_func_wrap(void *arg, void *buf, size_t *nb_elems)
217 {
219  uint8_t *msg = buf;
220  for (size_t i = 0; i < *nb_elems; i++)
221  mq->free_func(msg + i * mq->elsize);
222  return 0;
223 }
224 #endif
225 
227 {
228 #if HAVE_THREADS
229  size_t used;
230 
231  pthread_mutex_lock(&mq->lock);
232  used = av_fifo_can_read(mq->fifo);
233  if (mq->free_func)
234  av_fifo_read_to_cb(mq->fifo, free_func_wrap, mq, &used);
235  /* only the senders need to be notified since the queue is empty and there
236  * is nothing to read */
237  pthread_cond_broadcast(&mq->cond_send);
238  pthread_mutex_unlock(&mq->lock);
239 #endif /* HAVE_THREADS */
240 }
pthread_mutex_t
_fmutex pthread_mutex_t
Definition: os2threads.h:53
av_fifo_can_write
size_t av_fifo_can_write(const AVFifo *f)
Definition: fifo.c:94
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
av_thread_message_queue_nb_elems
int av_thread_message_queue_nb_elems(AVThreadMessageQueue *mq)
Return the current number of messages in the queue.
Definition: threadmessage.c:110
thread.h
pthread_mutex_init
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
AV_THREAD_MESSAGE_NONBLOCK
@ AV_THREAD_MESSAGE_NONBLOCK
Perform non-blocking operation.
Definition: threadmessage.h:31
fifo.h
av_fifo_write
int av_fifo_write(AVFifo *f, const void *buf, size_t nb_elems)
Write data into a FIFO.
Definition: fifo.c:188
av_thread_message_queue_recv
int av_thread_message_queue_recv(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Receive a message from the queue.
Definition: threadmessage.c:177
av_thread_message_queue_send
int av_thread_message_queue_send(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Send a message on the queue.
Definition: threadmessage.c:161
av_fifo_read
int av_fifo_read(AVFifo *f, void *buf, size_t nb_elems)
Read data from a FIFO.
Definition: fifo.c:240
av_thread_message_flush
void av_thread_message_flush(AVThreadMessageQueue *mq)
Flush the message queue.
Definition: threadmessage.c:226
limits.h
AVThreadMessageQueue
Definition: threadmessage.c:30
pthread_cond_broadcast
static av_always_inline int pthread_cond_broadcast(pthread_cond_t *cond)
Definition: os2threads.h:162
arg
const char * arg
Definition: jacosubdec.c:67
NULL
#define NULL
Definition: coverity.c:32
av_fifo_can_read
size_t av_fifo_can_read(const AVFifo *f)
Definition: fifo.c:87
pthread_mutex_unlock
#define pthread_mutex_unlock(a)
Definition: ffprobe.c:82
error.h
av_fifo_read_to_cb
int av_fifo_read_to_cb(AVFifo *f, AVFifoCB write_cb, void *opaque, size_t *nb_elems)
Feed data from a FIFO into a user-provided callback.
Definition: fifo.c:247
AVFifo
Definition: fifo.c:35
threadmessage.h
pthread_cond_destroy
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:144
AVThreadMessageQueue::dummy
int dummy
Definition: threadmessage.c:41
av_thread_message_queue_alloc
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, unsigned nelem, unsigned elsize)
Allocate a new message queue.
Definition: threadmessage.c:45
pthread_mutex_destroy
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
lock
static pthread_mutex_t lock
Definition: ffjni.c:39
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:256
av_thread_message_queue_set_err_send
void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq, int err)
Set the sending error code.
Definition: threadmessage.c:193
av_mallocz
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:256
pthread_cond_t
Definition: os2threads.h:58
ret
ret
Definition: filter_design.txt:187
av_fifo_alloc2
AVFifo * av_fifo_alloc2(size_t nb_elems, size_t elem_size, unsigned int flags)
Allocate and initialize an AVFifo with a given element size.
Definition: fifo.c:47
pthread_cond_signal
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:152
pthread_cond_wait
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:192
mem.h
av_free
#define av_free(p)
Definition: tableprint_vlc.h:33
av_thread_message_queue_free
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
Free a message queue.
Definition: threadmessage.c:96
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:34
flags
#define flags(name, subs,...)
Definition: cbs_av1.c:482
av_thread_message_queue_set_err_recv
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err)
Set the receiving error code.
Definition: threadmessage.c:204
av_thread_message_queue_set_free_func
void av_thread_message_queue_set_free_func(AVThreadMessageQueue *mq, void(*free_func)(void *msg))
Set the optional free message callback function which will be called if an operation is removing mess...
Definition: threadmessage.c:88
av_fifo_freep2
void av_fifo_freep2(AVFifo **f)
Free an AVFifo and reset pointer to NULL.
Definition: fifo.c:286
pthread_cond_init
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:133
pthread_mutex_lock
#define pthread_mutex_lock(a)
Definition: ffprobe.c:78