FFmpeg
api-threadmessage-test.c
Go to the documentation of this file.
1 /*
2  * Permission is hereby granted, free of charge, to any person obtaining a copy
3  * of this software and associated documentation files (the "Software"), to deal
4  * in the Software without restriction, including without limitation the rights
5  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
6  * copies of the Software, and to permit persons to whom the Software is
7  * furnished to do so, subject to the following conditions:
8  *
9  * The above copyright notice and this permission notice shall be included in
10  * all copies or substantial portions of the Software.
11  *
12  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
13  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
14  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
15  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
16  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
17  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
18  * THE SOFTWARE.
19  */
20 
21 /**
22  * Thread message API test
23  */
24 
25 #include "libavutil/avassert.h"
26 #include "libavutil/avstring.h"
27 #include "libavutil/frame.h"
29 #include "libavutil/thread.h" // not public
30 
31 struct sender_data {
32  int id;
34  int workload;
36 };
37 
38 /* same as sender_data but shuffled for testing purpose */
39 struct receiver_data {
41  int workload;
42  int id;
44 };
45 
46 struct message {
48  // we add some junk in the message to make sure the message size is >
49  // sizeof(void*)
50  int magic;
51 };
52 
53 #define MAGIC 0xdeadc0de
54 
55 static void free_frame(void *arg)
56 {
57  struct message *msg = arg;
58  av_assert0(msg->magic == MAGIC);
59  av_frame_free(&msg->frame);
60 }
61 
62 static void *sender_thread(void *arg)
63 {
64  int i, ret = 0;
65  struct sender_data *wd = arg;
66 
67  av_log(NULL, AV_LOG_INFO, "sender #%d: workload=%d\n", wd->id, wd->workload);
68  for (i = 0; i < wd->workload; i++) {
69  if (rand() % wd->workload < wd->workload / 10) {
70  av_log(NULL, AV_LOG_INFO, "sender #%d: flushing the queue\n", wd->id);
72  } else {
73  char *val;
74  AVDictionary *meta = NULL;
75  struct message msg = {
76  .magic = MAGIC,
77  .frame = av_frame_alloc(),
78  };
79 
80  if (!msg.frame) {
81  ret = AVERROR(ENOMEM);
82  break;
83  }
84 
85  /* we add some metadata to identify the frames */
86  val = av_asprintf("frame %d/%d from sender %d",
87  i + 1, wd->workload, wd->id);
88  if (!val) {
89  av_frame_free(&msg.frame);
90  ret = AVERROR(ENOMEM);
91  break;
92  }
93  ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL);
94  if (ret < 0) {
95  av_frame_free(&msg.frame);
96  break;
97  }
98  msg.frame->metadata = meta;
99 
100  /* allocate a real frame in order to simulate "real" work */
102  msg.frame->width = 320;
103  msg.frame->height = 240;
104  ret = av_frame_get_buffer(msg.frame, 0);
105  if (ret < 0) {
106  av_frame_free(&msg.frame);
107  break;
108  }
109 
110  /* push the frame in the common queue */
111  av_log(NULL, AV_LOG_INFO, "sender #%d: sending my work (%d/%d frame:%p)\n",
112  wd->id, i + 1, wd->workload, msg.frame);
113  ret = av_thread_message_queue_send(wd->queue, &msg, 0);
114  if (ret < 0) {
115  av_frame_free(&msg.frame);
116  break;
117  }
118  }
119  }
120  av_log(NULL, AV_LOG_INFO, "sender #%d: my work is done here (%s)\n",
121  wd->id, av_err2str(ret));
123  return NULL;
124 }
125 
126 static void *receiver_thread(void *arg)
127 {
128  int i, ret = 0;
129  struct receiver_data *rd = arg;
130 
131  for (i = 0; i < rd->workload; i++) {
132  if (rand() % rd->workload < rd->workload / 10) {
133  av_log(NULL, AV_LOG_INFO, "receiver #%d: flushing the queue, "
134  "discarding %d message(s)\n", rd->id,
137  } else {
138  struct message msg;
139  AVDictionary *meta;
141 
142  ret = av_thread_message_queue_recv(rd->queue, &msg, 0);
143  if (ret < 0)
144  break;
145  av_assert0(msg.magic == MAGIC);
146  meta = msg.frame->metadata;
147  e = av_dict_get(meta, "sig", NULL, 0);
148  av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, msg.frame);
149  av_frame_free(&msg.frame);
150  }
151  }
152 
153  av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i);
155 
156  return NULL;
157 }
158 
159 static int get_workload(int minv, int maxv)
160 {
161  return maxv == minv ? maxv : rand() % (maxv - minv) + minv;
162 }
163 
164 int main(int ac, char **av)
165 {
166  int i, ret = 0;
167  int max_queue_size;
168  int nb_senders, sender_min_load, sender_max_load;
169  int nb_receivers, receiver_min_load, receiver_max_load;
170  struct sender_data *senders;
171  struct receiver_data *receivers;
173 
174  if (ac != 8) {
175  av_log(NULL, AV_LOG_ERROR, "%s <max_queue_size> "
176  "<nb_senders> <sender_min_send> <sender_max_send> "
177  "<nb_receivers> <receiver_min_recv> <receiver_max_recv>\n", av[0]);
178  return 1;
179  }
180 
181  max_queue_size = atoi(av[1]);
182  nb_senders = atoi(av[2]);
183  sender_min_load = atoi(av[3]);
184  sender_max_load = atoi(av[4]);
185  nb_receivers = atoi(av[5]);
186  receiver_min_load = atoi(av[6]);
187  receiver_max_load = atoi(av[7]);
188 
189  if (max_queue_size <= 0 ||
190  nb_senders <= 0 || sender_min_load <= 0 || sender_max_load <= 0 ||
191  nb_receivers <= 0 || receiver_min_load <= 0 || receiver_max_load <= 0) {
192  av_log(NULL, AV_LOG_ERROR, "negative values not allowed\n");
193  return 1;
194  }
195 
196  av_log(NULL, AV_LOG_INFO, "qsize:%d / %d senders sending [%d-%d] / "
197  "%d receivers receiving [%d-%d]\n", max_queue_size,
198  nb_senders, sender_min_load, sender_max_load,
199  nb_receivers, receiver_min_load, receiver_max_load);
200 
201  senders = av_calloc(nb_senders, sizeof(*senders));
202  receivers = av_calloc(nb_receivers, sizeof(*receivers));
203  if (!senders || !receivers) {
204  ret = AVERROR(ENOMEM);
205  goto end;
206  }
207 
208  ret = av_thread_message_queue_alloc(&queue, max_queue_size, sizeof(struct message));
209  if (ret < 0)
210  goto end;
211 
213 
214 #define SPAWN_THREADS(type) do { \
215  for (i = 0; i < nb_##type##s; i++) { \
216  struct type##_data *td = &type##s[i]; \
217  \
218  td->id = i; \
219  td->queue = queue; \
220  td->workload = get_workload(type##_min_load, type##_max_load); \
221  \
222  ret = pthread_create(&td->tid, NULL, type##_thread, td); \
223  if (ret) { \
224  const int err = AVERROR(ret); \
225  av_log(NULL, AV_LOG_ERROR, "Unable to start " AV_STRINGIFY(type) \
226  " thread: %s\n", av_err2str(err)); \
227  goto end; \
228  } \
229  } \
230 } while (0)
231 
232 #define WAIT_THREADS(type) do { \
233  for (i = 0; i < nb_##type##s; i++) { \
234  struct type##_data *td = &type##s[i]; \
235  \
236  ret = pthread_join(td->tid, NULL); \
237  if (ret) { \
238  const int err = AVERROR(ret); \
239  av_log(NULL, AV_LOG_ERROR, "Unable to join " AV_STRINGIFY(type) \
240  " thread: %s\n", av_err2str(err)); \
241  goto end; \
242  } \
243  } \
244 } while (0)
245 
246  SPAWN_THREADS(receiver);
247  SPAWN_THREADS(sender);
248 
249  WAIT_THREADS(sender);
250  WAIT_THREADS(receiver);
251 
252 end:
254  av_freep(&senders);
255  av_freep(&receivers);
256 
257  if (ret < 0 && ret != AVERROR_EOF) {
258  av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret));
259  return 1;
260  }
261  return 0;
262 }
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
av_thread_message_queue_nb_elems
int av_thread_message_queue_nb_elems(AVThreadMessageQueue *mq)
Return the current number of messages in the queue.
Definition: threadmessage.c:107
MAGIC
#define MAGIC
Definition: api-threadmessage-test.c:53
av_frame_get_buffer
int av_frame_get_buffer(AVFrame *frame, int align)
Allocate new buffer(s) for audio or video data.
Definition: frame.c:242
message
Definition: api-threadmessage-test.c:46
thread.h
AVERROR_EOF
#define AVERROR_EOF
End of file.
Definition: error.h:57
av_asprintf
char * av_asprintf(const char *fmt,...)
Definition: avstring.c:116
av_frame_free
void av_frame_free(AVFrame **frame)
Free the frame and any dynamically allocated objects in it, e.g.
Definition: frame.c:99
AVFrame
This structure describes decoded (raw) audio or video data.
Definition: frame.h:330
AVFrame::width
int width
Definition: frame.h:402
WAIT_THREADS
#define WAIT_THREADS(type)
AVDictionary
Definition: dict.c:32
message::magic
int magic
Definition: api-threadmessage-test.c:50
sender_data::tid
pthread_t tid
Definition: api-threadmessage-test.c:33
val
static double val(void *priv, double ch)
Definition: aeval.c:77
av_thread_message_queue_recv
int av_thread_message_queue_recv(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Receive a message from the queue.
Definition: threadmessage.c:174
AV_DICT_DONT_STRDUP_VAL
#define AV_DICT_DONT_STRDUP_VAL
Take ownership of a value that's been allocated with av_malloc() or another memory allocation functio...
Definition: dict.h:79
av_frame_alloc
AVFrame * av_frame_alloc(void)
Allocate an AVFrame and set its fields to default values.
Definition: frame.c:87
avassert.h
AV_LOG_ERROR
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:180
av_thread_message_queue_send
int av_thread_message_queue_send(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Send a message on the queue.
Definition: threadmessage.c:158
av_thread_message_flush
void av_thread_message_flush(AVThreadMessageQueue *mq)
Flush the message queue.
Definition: threadmessage.c:223
av_dict_get
AVDictionaryEntry * av_dict_get(const AVDictionary *m, const char *key, const AVDictionaryEntry *prev, int flags)
Get a dictionary entry with matching key.
Definition: dict.c:60
free_frame
static void free_frame(void *arg)
Definition: api-threadmessage-test.c:55
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:37
AVThreadMessageQueue
Definition: threadmessage.c:27
AV_PIX_FMT_RGBA
@ AV_PIX_FMT_RGBA
packed RGBA 8:8:8:8, 32bpp, RGBARGBA...
Definition: pixfmt.h:93
arg
const char * arg
Definition: jacosubdec.c:67
NULL
#define NULL
Definition: coverity.c:32
receiver_thread
static void * receiver_thread(void *arg)
Definition: api-threadmessage-test.c:126
SPAWN_THREADS
#define SPAWN_THREADS(type)
receiver_data::tid
pthread_t tid
Definition: api-threadmessage-test.c:40
threadmessage.h
av_err2str
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
Definition: error.h:121
AVFrame::format
int format
format of the frame, -1 if unknown or unset Values correspond to enum AVPixelFormat for video frames,...
Definition: frame.h:417
frame.h
sender_data
Thread message API test.
Definition: api-threadmessage-test.c:31
pthread_t
Definition: os2threads.h:44
AV_LOG_INFO
#define AV_LOG_INFO
Standard information.
Definition: log.h:191
av_thread_message_queue_alloc
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, unsigned nelem, unsigned elsize)
Allocate a new message queue.
Definition: threadmessage.c:42
get_workload
static int get_workload(int minv, int maxv)
Definition: api-threadmessage-test.c:159
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:269
sender_data::id
int id
Definition: api-threadmessage-test.c:32
receiver_data::workload
int workload
Definition: api-threadmessage-test.c:41
av_thread_message_queue_set_err_send
void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq, int err)
Set the sending error code.
Definition: threadmessage.c:190
av_calloc
void * av_calloc(size_t nmemb, size_t size)
Definition: mem.c:262
receiver_data::id
int id
Definition: api-threadmessage-test.c:42
ret
ret
Definition: filter_design.txt:187
sender_data::queue
AVThreadMessageQueue * queue
Definition: api-threadmessage-test.c:35
receiver_data
Definition: api-threadmessage-test.c:39
AVFrame::height
int height
Definition: frame.h:402
main
int main(int ac, char **av)
Definition: api-threadmessage-test.c:164
AVFrame::metadata
AVDictionary * metadata
metadata.
Definition: frame.h:639
message::frame
AVFrame * frame
Definition: api-threadmessage-test.c:47
receiver_data::queue
AVThreadMessageQueue * queue
Definition: api-threadmessage-test.c:43
sender_thread
static void * sender_thread(void *arg)
Definition: api-threadmessage-test.c:62
AVDictionaryEntry
Definition: dict.h:89
av_thread_message_queue_free
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
Free a message queue.
Definition: threadmessage.c:93
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:34
av_dict_set
int av_dict_set(AVDictionary **pm, const char *key, const char *value, int flags)
Set the given entry in *pm, overwriting an existing entry.
Definition: dict.c:86
av_thread_message_queue_set_err_recv
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err)
Set the receiving error code.
Definition: threadmessage.c:201
av_thread_message_queue_set_free_func
void av_thread_message_queue_set_free_func(AVThreadMessageQueue *mq, void(*free_func)(void *msg))
Set the optional free message callback function which will be called if an operation is removing mess...
Definition: threadmessage.c:85
av_log
#define av_log(a,...)
Definition: tableprint_vlc.h:27
sender_data::workload
int workload
Definition: api-threadmessage-test.c:34
AVDictionaryEntry::value
char * value
Definition: dict.h:91
avstring.h