FFmpeg
api-threadmessage-test.c
Go to the documentation of this file.
1 /*
2  * Permission is hereby granted, free of charge, to any person obtaining a copy
3  * of this software and associated documentation files (the "Software"), to deal
4  * in the Software without restriction, including without limitation the rights
5  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
6  * copies of the Software, and to permit persons to whom the Software is
7  * furnished to do so, subject to the following conditions:
8  *
9  * The above copyright notice and this permission notice shall be included in
10  * all copies or substantial portions of the Software.
11  *
12  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
13  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
14  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
15  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
16  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
17  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
18  * THE SOFTWARE.
19  */
20 
21 /**
22  * Thread message API test
23  */
24 
25 #include "libavutil/avassert.h"
26 #include "libavutil/avstring.h"
27 #include "libavutil/frame.h"
28 #include "libavutil/mem.h"
30 #include "libavutil/thread.h" // not public
31 
32 struct sender_data {
33  int id;
35  int workload;
37 };
38 
39 /* same as sender_data but shuffled for testing purpose */
40 struct receiver_data {
42  int workload;
43  int id;
45 };
46 
47 struct message {
49  // we add some junk in the message to make sure the message size is >
50  // sizeof(void*)
51  int magic;
52 };
53 
54 #define MAGIC 0xdeadc0de
55 
56 static void free_frame(void *arg)
57 {
58  struct message *msg = arg;
59  av_assert0(msg->magic == MAGIC);
60  av_frame_free(&msg->frame);
61 }
62 
63 static void *sender_thread(void *arg)
64 {
65  int i, ret = 0;
66  struct sender_data *wd = arg;
67 
68  av_log(NULL, AV_LOG_INFO, "sender #%d: workload=%d\n", wd->id, wd->workload);
69  for (i = 0; i < wd->workload; i++) {
70  if (rand() % wd->workload < wd->workload / 10) {
71  av_log(NULL, AV_LOG_INFO, "sender #%d: flushing the queue\n", wd->id);
73  } else {
74  char *val;
75  AVDictionary *meta = NULL;
76  struct message msg = {
77  .magic = MAGIC,
78  .frame = av_frame_alloc(),
79  };
80 
81  if (!msg.frame) {
82  ret = AVERROR(ENOMEM);
83  break;
84  }
85 
86  /* we add some metadata to identify the frames */
87  val = av_asprintf("frame %d/%d from sender %d",
88  i + 1, wd->workload, wd->id);
89  if (!val) {
90  av_frame_free(&msg.frame);
91  ret = AVERROR(ENOMEM);
92  break;
93  }
94  ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL);
95  if (ret < 0) {
96  av_frame_free(&msg.frame);
97  break;
98  }
99  msg.frame->metadata = meta;
100 
101  /* allocate a real frame in order to simulate "real" work */
103  msg.frame->width = 320;
104  msg.frame->height = 240;
105  ret = av_frame_get_buffer(msg.frame, 0);
106  if (ret < 0) {
107  av_frame_free(&msg.frame);
108  break;
109  }
110 
111  /* push the frame in the common queue */
112  av_log(NULL, AV_LOG_INFO, "sender #%d: sending my work (%d/%d frame:%p)\n",
113  wd->id, i + 1, wd->workload, msg.frame);
114  ret = av_thread_message_queue_send(wd->queue, &msg, 0);
115  if (ret < 0) {
116  av_frame_free(&msg.frame);
117  break;
118  }
119  }
120  }
121  av_log(NULL, AV_LOG_INFO, "sender #%d: my work is done here (%s)\n",
122  wd->id, av_err2str(ret));
124  return NULL;
125 }
126 
127 static void *receiver_thread(void *arg)
128 {
129  int i, ret = 0;
130  struct receiver_data *rd = arg;
131 
132  for (i = 0; i < rd->workload; i++) {
133  if (rand() % rd->workload < rd->workload / 10) {
134  av_log(NULL, AV_LOG_INFO, "receiver #%d: flushing the queue, "
135  "discarding %d message(s)\n", rd->id,
138  } else {
139  struct message msg;
140  AVDictionary *meta;
142 
143  ret = av_thread_message_queue_recv(rd->queue, &msg, 0);
144  if (ret < 0)
145  break;
146  av_assert0(msg.magic == MAGIC);
147  meta = msg.frame->metadata;
148  e = av_dict_get(meta, "sig", NULL, 0);
149  av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, msg.frame);
150  av_frame_free(&msg.frame);
151  }
152  }
153 
154  av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i);
156 
157  return NULL;
158 }
159 
160 static int get_workload(int minv, int maxv)
161 {
162  return maxv == minv ? maxv : rand() % (maxv - minv) + minv;
163 }
164 
165 int main(int ac, char **av)
166 {
167  int i, ret = 0;
168  int max_queue_size;
169  int nb_senders, sender_min_load, sender_max_load;
170  int nb_receivers, receiver_min_load, receiver_max_load;
171  struct sender_data *senders;
172  struct receiver_data *receivers;
174 
175  if (ac != 8) {
176  av_log(NULL, AV_LOG_ERROR, "%s <max_queue_size> "
177  "<nb_senders> <sender_min_send> <sender_max_send> "
178  "<nb_receivers> <receiver_min_recv> <receiver_max_recv>\n", av[0]);
179  return 1;
180  }
181 
182  max_queue_size = atoi(av[1]);
183  nb_senders = atoi(av[2]);
184  sender_min_load = atoi(av[3]);
185  sender_max_load = atoi(av[4]);
186  nb_receivers = atoi(av[5]);
187  receiver_min_load = atoi(av[6]);
188  receiver_max_load = atoi(av[7]);
189 
190  if (max_queue_size <= 0 ||
191  nb_senders <= 0 || sender_min_load <= 0 || sender_max_load <= 0 ||
192  nb_receivers <= 0 || receiver_min_load <= 0 || receiver_max_load <= 0) {
193  av_log(NULL, AV_LOG_ERROR, "negative values not allowed\n");
194  return 1;
195  }
196 
197  av_log(NULL, AV_LOG_INFO, "qsize:%d / %d senders sending [%d-%d] / "
198  "%d receivers receiving [%d-%d]\n", max_queue_size,
199  nb_senders, sender_min_load, sender_max_load,
200  nb_receivers, receiver_min_load, receiver_max_load);
201 
202  senders = av_calloc(nb_senders, sizeof(*senders));
203  receivers = av_calloc(nb_receivers, sizeof(*receivers));
204  if (!senders || !receivers) {
205  ret = AVERROR(ENOMEM);
206  goto end;
207  }
208 
209  ret = av_thread_message_queue_alloc(&queue, max_queue_size, sizeof(struct message));
210  if (ret < 0)
211  goto end;
212 
214 
215 #define SPAWN_THREADS(type) do { \
216  for (i = 0; i < nb_##type##s; i++) { \
217  struct type##_data *td = &type##s[i]; \
218  \
219  td->id = i; \
220  td->queue = queue; \
221  td->workload = get_workload(type##_min_load, type##_max_load); \
222  \
223  ret = pthread_create(&td->tid, NULL, type##_thread, td); \
224  if (ret) { \
225  const int err = AVERROR(ret); \
226  av_log(NULL, AV_LOG_ERROR, "Unable to start " AV_STRINGIFY(type) \
227  " thread: %s\n", av_err2str(err)); \
228  goto end; \
229  } \
230  } \
231 } while (0)
232 
233 #define WAIT_THREADS(type) do { \
234  for (i = 0; i < nb_##type##s; i++) { \
235  struct type##_data *td = &type##s[i]; \
236  \
237  ret = pthread_join(td->tid, NULL); \
238  if (ret) { \
239  const int err = AVERROR(ret); \
240  av_log(NULL, AV_LOG_ERROR, "Unable to join " AV_STRINGIFY(type) \
241  " thread: %s\n", av_err2str(err)); \
242  goto end; \
243  } \
244  } \
245 } while (0)
246 
247  SPAWN_THREADS(receiver);
248  SPAWN_THREADS(sender);
249 
250  WAIT_THREADS(sender);
251  WAIT_THREADS(receiver);
252 
253 end:
255  av_freep(&senders);
256  av_freep(&receivers);
257 
258  if (ret < 0 && ret != AVERROR_EOF) {
259  av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret));
260  return 1;
261  }
262  return 0;
263 }
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
av_thread_message_queue_nb_elems
int av_thread_message_queue_nb_elems(AVThreadMessageQueue *mq)
Return the current number of messages in the queue.
Definition: threadmessage.c:110
MAGIC
#define MAGIC
Definition: api-threadmessage-test.c:54
av_frame_get_buffer
int av_frame_get_buffer(AVFrame *frame, int align)
Allocate new buffer(s) for audio or video data.
Definition: frame.c:292
message
Definition: api-threadmessage-test.c:47
thread.h
AVERROR_EOF
#define AVERROR_EOF
End of file.
Definition: error.h:57
av_asprintf
char * av_asprintf(const char *fmt,...)
Definition: avstring.c:115
av_frame_free
void av_frame_free(AVFrame **frame)
Free the frame and any dynamically allocated objects in it, e.g.
Definition: frame.c:162
AVFrame
This structure describes decoded (raw) audio or video data.
Definition: frame.h:389
AVFrame::width
int width
Definition: frame.h:461
WAIT_THREADS
#define WAIT_THREADS(type)
AVDictionary
Definition: dict.c:34
message::magic
int magic
Definition: api-threadmessage-test.c:51
sender_data::tid
pthread_t tid
Definition: api-threadmessage-test.c:34
val
static double val(void *priv, double ch)
Definition: aeval.c:77
av_thread_message_queue_recv
int av_thread_message_queue_recv(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Receive a message from the queue.
Definition: threadmessage.c:177
AV_DICT_DONT_STRDUP_VAL
#define AV_DICT_DONT_STRDUP_VAL
Take ownership of a value that's been allocated with av_malloc() or another memory allocation functio...
Definition: dict.h:79
av_frame_alloc
AVFrame * av_frame_alloc(void)
Allocate an AVFrame and set its fields to default values.
Definition: frame.c:150
avassert.h
AV_LOG_ERROR
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:209
av_thread_message_queue_send
int av_thread_message_queue_send(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Send a message on the queue.
Definition: threadmessage.c:161
av_thread_message_flush
void av_thread_message_flush(AVThreadMessageQueue *mq)
Flush the message queue.
Definition: threadmessage.c:226
av_dict_get
AVDictionaryEntry * av_dict_get(const AVDictionary *m, const char *key, const AVDictionaryEntry *prev, int flags)
Get a dictionary entry with matching key.
Definition: dict.c:62
free_frame
static void free_frame(void *arg)
Definition: api-threadmessage-test.c:56
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:40
AVThreadMessageQueue
Definition: threadmessage.c:30
AV_PIX_FMT_RGBA
@ AV_PIX_FMT_RGBA
packed RGBA 8:8:8:8, 32bpp, RGBARGBA...
Definition: pixfmt.h:100
arg
const char * arg
Definition: jacosubdec.c:67
NULL
#define NULL
Definition: coverity.c:32
receiver_thread
static void * receiver_thread(void *arg)
Definition: api-threadmessage-test.c:127
SPAWN_THREADS
#define SPAWN_THREADS(type)
receiver_data::tid
pthread_t tid
Definition: api-threadmessage-test.c:41
threadmessage.h
av_err2str
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
Definition: error.h:122
AVFrame::format
int format
format of the frame, -1 if unknown or unset Values correspond to enum AVPixelFormat for video frames,...
Definition: frame.h:476
frame.h
sender_data
Thread message API test.
Definition: api-threadmessage-test.c:32
pthread_t
Definition: os2threads.h:44
AV_LOG_INFO
#define AV_LOG_INFO
Standard information.
Definition: log.h:220
av_thread_message_queue_alloc
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, unsigned nelem, unsigned elsize)
Allocate a new message queue.
Definition: threadmessage.c:45
get_workload
static int get_workload(int minv, int maxv)
Definition: api-threadmessage-test.c:160
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:256
sender_data::id
int id
Definition: api-threadmessage-test.c:33
receiver_data::workload
int workload
Definition: api-threadmessage-test.c:42
av_thread_message_queue_set_err_send
void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq, int err)
Set the sending error code.
Definition: threadmessage.c:193
av_calloc
void * av_calloc(size_t nmemb, size_t size)
Definition: mem.c:264
receiver_data::id
int id
Definition: api-threadmessage-test.c:43
ret
ret
Definition: filter_design.txt:187
sender_data::queue
AVThreadMessageQueue * queue
Definition: api-threadmessage-test.c:36
receiver_data
Definition: api-threadmessage-test.c:40
AVFrame::height
int height
Definition: frame.h:461
main
int main(int ac, char **av)
Definition: api-threadmessage-test.c:165
AVFrame::metadata
AVDictionary * metadata
metadata.
Definition: frame.h:707
message::frame
AVFrame * frame
Definition: api-threadmessage-test.c:48
mem.h
receiver_data::queue
AVThreadMessageQueue * queue
Definition: api-threadmessage-test.c:44
sender_thread
static void * sender_thread(void *arg)
Definition: api-threadmessage-test.c:63
AVDictionaryEntry
Definition: dict.h:89
av_thread_message_queue_free
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
Free a message queue.
Definition: threadmessage.c:96
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:34
av_dict_set
int av_dict_set(AVDictionary **pm, const char *key, const char *value, int flags)
Set the given entry in *pm, overwriting an existing entry.
Definition: dict.c:88
av_thread_message_queue_set_err_recv
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err)
Set the receiving error code.
Definition: threadmessage.c:204
av_thread_message_queue_set_free_func
void av_thread_message_queue_set_free_func(AVThreadMessageQueue *mq, void(*free_func)(void *msg))
Set the optional free message callback function which will be called if an operation is removing mess...
Definition: threadmessage.c:88
av_log
#define av_log(a,...)
Definition: tableprint_vlc.h:27
sender_data::workload
int workload
Definition: api-threadmessage-test.c:35
AVDictionaryEntry::value
char * value
Definition: dict.h:91
avstring.h