FFmpeg
threadmessage.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014 Nicolas George
3  *
4  * This file is part of FFmpeg.
5  *
6  * FFmpeg is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public License
8  * as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * FFmpeg is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License
17  * along with FFmpeg; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 #include <limits.h>
22 #include "fifo.h"
23 #include "mem.h"
24 #include "threadmessage.h"
25 #include "thread.h"
26 
28 #if HAVE_THREADS
29  AVFifo *fifo;
31  pthread_cond_t cond_recv;
32  pthread_cond_t cond_send;
33  int err_send;
34  int err_recv;
35  unsigned elsize;
36  void (*free_func)(void *msg);
37 #else
38  int dummy;
39 #endif
40 };
41 
43  unsigned nelem,
44  unsigned elsize)
45 {
46 #if HAVE_THREADS
48  int ret = 0;
49 
50  if (nelem > INT_MAX / elsize)
51  return AVERROR(EINVAL);
52  if (!(rmq = av_mallocz(sizeof(*rmq))))
53  return AVERROR(ENOMEM);
54  if ((ret = pthread_mutex_init(&rmq->lock, NULL))) {
55  av_free(rmq);
56  return AVERROR(ret);
57  }
58  if ((ret = pthread_cond_init(&rmq->cond_recv, NULL))) {
59  pthread_mutex_destroy(&rmq->lock);
60  av_free(rmq);
61  return AVERROR(ret);
62  }
63  if ((ret = pthread_cond_init(&rmq->cond_send, NULL))) {
64  pthread_cond_destroy(&rmq->cond_recv);
65  pthread_mutex_destroy(&rmq->lock);
66  av_free(rmq);
67  return AVERROR(ret);
68  }
69  if (!(rmq->fifo = av_fifo_alloc2(nelem, elsize, 0))) {
70  pthread_cond_destroy(&rmq->cond_send);
71  pthread_cond_destroy(&rmq->cond_recv);
72  pthread_mutex_destroy(&rmq->lock);
73  av_free(rmq);
74  return AVERROR(ENOMEM);
75  }
76  rmq->elsize = elsize;
77  *mq = rmq;
78  return 0;
79 #else
80  *mq = NULL;
81  return AVERROR(ENOSYS);
82 #endif /* HAVE_THREADS */
83 }
84 
86  void (*free_func)(void *msg))
87 {
88 #if HAVE_THREADS
89  mq->free_func = free_func;
90 #endif
91 }
92 
94 {
95 #if HAVE_THREADS
96  if (*mq) {
98  av_fifo_freep2(&(*mq)->fifo);
99  pthread_cond_destroy(&(*mq)->cond_send);
100  pthread_cond_destroy(&(*mq)->cond_recv);
101  pthread_mutex_destroy(&(*mq)->lock);
102  av_freep(mq);
103  }
104 #endif
105 }
106 
108 {
109 #if HAVE_THREADS
110  int ret;
111  pthread_mutex_lock(&mq->lock);
112  ret = av_fifo_can_read(mq->fifo);
113  pthread_mutex_unlock(&mq->lock);
114  return ret;
115 #else
116  return AVERROR(ENOSYS);
117 #endif
118 }
119 
120 #if HAVE_THREADS
121 
122 static int av_thread_message_queue_send_locked(AVThreadMessageQueue *mq,
123  void *msg,
124  unsigned flags)
125 {
126  while (!mq->err_send && !av_fifo_can_write(mq->fifo)) {
128  return AVERROR(EAGAIN);
129  pthread_cond_wait(&mq->cond_send, &mq->lock);
130  }
131  if (mq->err_send)
132  return mq->err_send;
133  av_fifo_write(mq->fifo, msg, 1);
134  /* one message is sent, signal one receiver */
135  pthread_cond_signal(&mq->cond_recv);
136  return 0;
137 }
138 
139 static int av_thread_message_queue_recv_locked(AVThreadMessageQueue *mq,
140  void *msg,
141  unsigned flags)
142 {
143  while (!mq->err_recv && !av_fifo_can_read(mq->fifo)) {
145  return AVERROR(EAGAIN);
146  pthread_cond_wait(&mq->cond_recv, &mq->lock);
147  }
148  if (!av_fifo_can_read(mq->fifo))
149  return mq->err_recv;
150  av_fifo_read(mq->fifo, msg, 1);
151  /* one message space appeared, signal one sender */
152  pthread_cond_signal(&mq->cond_send);
153  return 0;
154 }
155 
156 #endif /* HAVE_THREADS */
157 
159  void *msg,
160  unsigned flags)
161 {
162 #if HAVE_THREADS
163  int ret;
164 
165  pthread_mutex_lock(&mq->lock);
166  ret = av_thread_message_queue_send_locked(mq, msg, flags);
167  pthread_mutex_unlock(&mq->lock);
168  return ret;
169 #else
170  return AVERROR(ENOSYS);
171 #endif /* HAVE_THREADS */
172 }
173 
175  void *msg,
176  unsigned flags)
177 {
178 #if HAVE_THREADS
179  int ret;
180 
181  pthread_mutex_lock(&mq->lock);
182  ret = av_thread_message_queue_recv_locked(mq, msg, flags);
183  pthread_mutex_unlock(&mq->lock);
184  return ret;
185 #else
186  return AVERROR(ENOSYS);
187 #endif /* HAVE_THREADS */
188 }
189 
191  int err)
192 {
193 #if HAVE_THREADS
194  pthread_mutex_lock(&mq->lock);
195  mq->err_send = err;
196  pthread_cond_broadcast(&mq->cond_send);
197  pthread_mutex_unlock(&mq->lock);
198 #endif /* HAVE_THREADS */
199 }
200 
202  int err)
203 {
204 #if HAVE_THREADS
205  pthread_mutex_lock(&mq->lock);
206  mq->err_recv = err;
207  pthread_cond_broadcast(&mq->cond_recv);
208  pthread_mutex_unlock(&mq->lock);
209 #endif /* HAVE_THREADS */
210 }
211 
212 #if HAVE_THREADS
213 static int free_func_wrap(void *arg, void *buf, size_t *nb_elems)
214 {
216  uint8_t *msg = buf;
217  for (size_t i = 0; i < *nb_elems; i++)
218  mq->free_func(msg + i * mq->elsize);
219  return 0;
220 }
221 #endif
222 
224 {
225 #if HAVE_THREADS
226  size_t used;
227 
228  pthread_mutex_lock(&mq->lock);
229  used = av_fifo_can_read(mq->fifo);
230  if (mq->free_func)
231  av_fifo_read_to_cb(mq->fifo, free_func_wrap, mq, &used);
232  /* only the senders need to be notified since the queue is empty and there
233  * is nothing to read */
234  pthread_cond_broadcast(&mq->cond_send);
235  pthread_mutex_unlock(&mq->lock);
236 #endif /* HAVE_THREADS */
237 }
pthread_mutex_t
_fmutex pthread_mutex_t
Definition: os2threads.h:53
av_fifo_can_write
size_t av_fifo_can_write(const AVFifo *f)
Definition: fifo.c:94
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
av_thread_message_queue_nb_elems
int av_thread_message_queue_nb_elems(AVThreadMessageQueue *mq)
Return the current number of messages in the queue.
Definition: threadmessage.c:107
thread.h
pthread_mutex_init
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
AV_THREAD_MESSAGE_NONBLOCK
@ AV_THREAD_MESSAGE_NONBLOCK
Perform non-blocking operation.
Definition: threadmessage.h:31
fifo.h
av_fifo_write
int av_fifo_write(AVFifo *f, const void *buf, size_t nb_elems)
Write data into a FIFO.
Definition: fifo.c:188
av_thread_message_queue_recv
int av_thread_message_queue_recv(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Receive a message from the queue.
Definition: threadmessage.c:174
av_thread_message_queue_send
int av_thread_message_queue_send(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Send a message on the queue.
Definition: threadmessage.c:158
av_fifo_read
int av_fifo_read(AVFifo *f, void *buf, size_t nb_elems)
Read data from a FIFO.
Definition: fifo.c:240
av_thread_message_flush
void av_thread_message_flush(AVThreadMessageQueue *mq)
Flush the message queue.
Definition: threadmessage.c:223
limits.h
AVThreadMessageQueue
Definition: threadmessage.c:27
pthread_cond_broadcast
static av_always_inline int pthread_cond_broadcast(pthread_cond_t *cond)
Definition: os2threads.h:162
arg
const char * arg
Definition: jacosubdec.c:67
NULL
#define NULL
Definition: coverity.c:32
av_fifo_can_read
size_t av_fifo_can_read(const AVFifo *f)
Definition: fifo.c:87
pthread_mutex_unlock
#define pthread_mutex_unlock(a)
Definition: ffprobe.c:79
av_fifo_read_to_cb
int av_fifo_read_to_cb(AVFifo *f, AVFifoCB write_cb, void *opaque, size_t *nb_elems)
Feed data from a FIFO into a user-provided callback.
Definition: fifo.c:247
AVFifo
Definition: fifo.c:35
threadmessage.h
pthread_cond_destroy
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:144
AVThreadMessageQueue::dummy
int dummy
Definition: threadmessage.c:38
av_thread_message_queue_alloc
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, unsigned nelem, unsigned elsize)
Allocate a new message queue.
Definition: threadmessage.c:42
pthread_mutex_destroy
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:269
av_thread_message_queue_set_err_send
void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq, int err)
Set the sending error code.
Definition: threadmessage.c:190
av_mallocz
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:254
pthread_cond_t
Definition: os2threads.h:58
ret
ret
Definition: filter_design.txt:187
av_fifo_alloc2
AVFifo * av_fifo_alloc2(size_t nb_elems, size_t elem_size, unsigned int flags)
Allocate and initialize an AVFifo with a given element size.
Definition: fifo.c:47
pthread_cond_signal
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:152
lock
static pthread_mutex_t lock
Definition: ffjni.c:38
pthread_cond_wait
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:192
mem.h
av_free
#define av_free(p)
Definition: tableprint_vlc.h:33
av_thread_message_queue_free
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
Free a message queue.
Definition: threadmessage.c:93
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:34
flags
#define flags(name, subs,...)
Definition: cbs_av1.c:561
av_thread_message_queue_set_err_recv
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err)
Set the receiving error code.
Definition: threadmessage.c:201
av_thread_message_queue_set_free_func
void av_thread_message_queue_set_free_func(AVThreadMessageQueue *mq, void(*free_func)(void *msg))
Set the optional free message callback function which will be called if an operation is removing mess...
Definition: threadmessage.c:85
av_fifo_freep2
void av_fifo_freep2(AVFifo **f)
Free an AVFifo and reset pointer to NULL.
Definition: fifo.c:286
pthread_cond_init
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:133
pthread_mutex_lock
#define pthread_mutex_lock(a)
Definition: ffprobe.c:75