FFmpeg
threadmessage.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014 Nicolas George
3  *
4  * This file is part of FFmpeg.
5  *
6  * FFmpeg is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public License
8  * as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * FFmpeg is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License
17  * along with FFmpeg; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 #include "fifo.h"
22 #include "threadmessage.h"
23 #include "thread.h"
24 
26 #if HAVE_THREADS
27  AVFifoBuffer *fifo;
29  pthread_cond_t cond_recv;
30  pthread_cond_t cond_send;
31  int err_send;
32  int err_recv;
33  unsigned elsize;
34  void (*free_func)(void *msg);
35 #else
36  int dummy;
37 #endif
38 };
39 
41  unsigned nelem,
42  unsigned elsize)
43 {
44 #if HAVE_THREADS
46  int ret = 0;
47 
48  if (nelem > INT_MAX / elsize)
49  return AVERROR(EINVAL);
50  if (!(rmq = av_mallocz(sizeof(*rmq))))
51  return AVERROR(ENOMEM);
52  if ((ret = pthread_mutex_init(&rmq->lock, NULL))) {
53  av_free(rmq);
54  return AVERROR(ret);
55  }
56  if ((ret = pthread_cond_init(&rmq->cond_recv, NULL))) {
57  pthread_mutex_destroy(&rmq->lock);
58  av_free(rmq);
59  return AVERROR(ret);
60  }
61  if ((ret = pthread_cond_init(&rmq->cond_send, NULL))) {
62  pthread_cond_destroy(&rmq->cond_recv);
63  pthread_mutex_destroy(&rmq->lock);
64  av_free(rmq);
65  return AVERROR(ret);
66  }
67  if (!(rmq->fifo = av_fifo_alloc(elsize * nelem))) {
68  pthread_cond_destroy(&rmq->cond_send);
69  pthread_cond_destroy(&rmq->cond_recv);
70  pthread_mutex_destroy(&rmq->lock);
71  av_free(rmq);
72  return AVERROR(ENOMEM);
73  }
74  rmq->elsize = elsize;
75  *mq = rmq;
76  return 0;
77 #else
78  *mq = NULL;
79  return AVERROR(ENOSYS);
80 #endif /* HAVE_THREADS */
81 }
82 
84  void (*free_func)(void *msg))
85 {
86 #if HAVE_THREADS
87  mq->free_func = free_func;
88 #endif
89 }
90 
92 {
93 #if HAVE_THREADS
94  if (*mq) {
96  av_fifo_freep(&(*mq)->fifo);
97  pthread_cond_destroy(&(*mq)->cond_send);
98  pthread_cond_destroy(&(*mq)->cond_recv);
99  pthread_mutex_destroy(&(*mq)->lock);
100  av_freep(mq);
101  }
102 #endif
103 }
104 
106 {
107 #if HAVE_THREADS
108  int ret;
109  pthread_mutex_lock(&mq->lock);
110  ret = av_fifo_size(mq->fifo);
111  pthread_mutex_unlock(&mq->lock);
112  return ret / mq->elsize;
113 #else
114  return AVERROR(ENOSYS);
115 #endif
116 }
117 
118 #if HAVE_THREADS
119 
120 static int av_thread_message_queue_send_locked(AVThreadMessageQueue *mq,
121  void *msg,
122  unsigned flags)
123 {
124  while (!mq->err_send && av_fifo_space(mq->fifo) < mq->elsize) {
125  if ((flags & AV_THREAD_MESSAGE_NONBLOCK))
126  return AVERROR(EAGAIN);
127  pthread_cond_wait(&mq->cond_send, &mq->lock);
128  }
129  if (mq->err_send)
130  return mq->err_send;
131  av_fifo_generic_write(mq->fifo, msg, mq->elsize, NULL);
132  /* one message is sent, signal one receiver */
133  pthread_cond_signal(&mq->cond_recv);
134  return 0;
135 }
136 
137 static int av_thread_message_queue_recv_locked(AVThreadMessageQueue *mq,
138  void *msg,
139  unsigned flags)
140 {
141  while (!mq->err_recv && av_fifo_size(mq->fifo) < mq->elsize) {
142  if ((flags & AV_THREAD_MESSAGE_NONBLOCK))
143  return AVERROR(EAGAIN);
144  pthread_cond_wait(&mq->cond_recv, &mq->lock);
145  }
146  if (av_fifo_size(mq->fifo) < mq->elsize)
147  return mq->err_recv;
148  av_fifo_generic_read(mq->fifo, msg, mq->elsize, NULL);
149  /* one message space appeared, signal one sender */
150  pthread_cond_signal(&mq->cond_send);
151  return 0;
152 }
153 
154 #endif /* HAVE_THREADS */
155 
157  void *msg,
158  unsigned flags)
159 {
160 #if HAVE_THREADS
161  int ret;
162 
163  pthread_mutex_lock(&mq->lock);
164  ret = av_thread_message_queue_send_locked(mq, msg, flags);
165  pthread_mutex_unlock(&mq->lock);
166  return ret;
167 #else
168  return AVERROR(ENOSYS);
169 #endif /* HAVE_THREADS */
170 }
171 
173  void *msg,
174  unsigned flags)
175 {
176 #if HAVE_THREADS
177  int ret;
178 
179  pthread_mutex_lock(&mq->lock);
180  ret = av_thread_message_queue_recv_locked(mq, msg, flags);
181  pthread_mutex_unlock(&mq->lock);
182  return ret;
183 #else
184  return AVERROR(ENOSYS);
185 #endif /* HAVE_THREADS */
186 }
187 
189  int err)
190 {
191 #if HAVE_THREADS
192  pthread_mutex_lock(&mq->lock);
193  mq->err_send = err;
194  pthread_cond_broadcast(&mq->cond_send);
195  pthread_mutex_unlock(&mq->lock);
196 #endif /* HAVE_THREADS */
197 }
198 
200  int err)
201 {
202 #if HAVE_THREADS
203  pthread_mutex_lock(&mq->lock);
204  mq->err_recv = err;
205  pthread_cond_broadcast(&mq->cond_recv);
206  pthread_mutex_unlock(&mq->lock);
207 #endif /* HAVE_THREADS */
208 }
209 
210 #if HAVE_THREADS
211 static void free_func_wrap(void *arg, void *msg, int size)
212 {
214  mq->free_func(msg);
215 }
216 #endif
217 
219 {
220 #if HAVE_THREADS
221  int used, off;
222  void *free_func = mq->free_func;
223 
224  pthread_mutex_lock(&mq->lock);
225  used = av_fifo_size(mq->fifo);
226  if (free_func)
227  for (off = 0; off < used; off += mq->elsize)
228  av_fifo_generic_peek_at(mq->fifo, mq, off, mq->elsize, free_func_wrap);
229  av_fifo_drain(mq->fifo, used);
230  /* only the senders need to be notified since the queue is empty and there
231  * is nothing to read */
232  pthread_cond_broadcast(&mq->cond_send);
233  pthread_mutex_unlock(&mq->lock);
234 #endif /* HAVE_THREADS */
235 }
#define NULL
Definition: coverity.c:32
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:108
#define pthread_mutex_lock(a)
Definition: ffprobe.c:61
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:166
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err)
Set the receiving error code.
void av_thread_message_queue_set_free_func(AVThreadMessageQueue *mq, void(*free_func)(void *msg))
Set the optional free message callback function which will be called if an operation is removing mess...
Definition: threadmessage.c:83
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:236
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:140
int av_fifo_generic_write(AVFifoBuffer *f, void *src, int size, int(*func)(void *, void *, int))
Feed data from a user-supplied callback to an AVFifoBuffer.
Definition: fifo.c:122
int av_thread_message_queue_nb_elems(AVThreadMessageQueue *mq)
Return the current number of messages in the queue.
int av_thread_message_queue_recv(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Receive a message from the queue.
int av_thread_message_queue_send(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Send a message on the queue.
int av_fifo_space(const AVFifoBuffer *f)
Return the amount of space in bytes in the AVFifoBuffer, that is the amount of data you can write int...
Definition: fifo.c:82
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:148
ptrdiff_t size
Definition: opengl_enc.c:100
void av_thread_message_flush(AVThreadMessageQueue *mq)
Flush the message queue.
int av_fifo_generic_read(AVFifoBuffer *f, void *dest, int buf_size, void(*func)(void *, void *, int))
Feed data from an AVFifoBuffer to a user-supplied callback.
Definition: fifo.c:213
const char * arg
Definition: jacosubdec.c:66
typedef void(APIENTRY *FF_PFNGLACTIVETEXTUREPROC)(GLenum texture)
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:100
#define pthread_mutex_unlock(a)
Definition: ffprobe.c:65
void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq, int err)
Set the sending error code.
int av_fifo_size(const AVFifoBuffer *f)
Return the amount of data in bytes in the AVFifoBuffer, that is the amount of data you can read from ...
Definition: fifo.c:77
int av_fifo_generic_peek_at(AVFifoBuffer *f, void *dest, int offset, int buf_size, void(*func)(void *, void *, int))
Feed data at specific position from an AVFifoBuffer to a user-supplied callback.
Definition: fifo.c:151
a very simple circular buffer FIFO implementation
Perform non-blocking operation.
Definition: threadmessage.h:31
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, unsigned nelem, unsigned elsize)
Allocate a new message queue.
Definition: threadmessage.c:40
#define flags(name, subs,...)
Definition: cbs_av1.c:561
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
Free a message queue.
Definition: threadmessage.c:91
static pthread_mutex_t lock
Definition: ffjni.c:37
_fmutex pthread_mutex_t
Definition: os2threads.h:49
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:129
#define av_free(p)
AVFifoBuffer * av_fifo_alloc(unsigned int size)
Initialize an AVFifoBuffer.
Definition: fifo.c:43
static av_always_inline int pthread_cond_broadcast(pthread_cond_t *cond)
Definition: os2threads.h:158
#define av_freep(p)
void av_fifo_freep(AVFifoBuffer **f)
Free an AVFifoBuffer and reset pointer to NULL.
Definition: fifo.c:63
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later.That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another.Frame references ownership and permissions
void av_fifo_drain(AVFifoBuffer *f, int size)
Discard data from the FIFO.
Definition: fifo.c:233