FFmpeg
api-threadmessage-test.c
Go to the documentation of this file.
1 /*
2  * Permission is hereby granted, free of charge, to any person obtaining a copy
3  * of this software and associated documentation files (the "Software"), to deal
4  * in the Software without restriction, including without limitation the rights
5  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
6  * copies of the Software, and to permit persons to whom the Software is
7  * furnished to do so, subject to the following conditions:
8  *
9  * The above copyright notice and this permission notice shall be included in
10  * all copies or substantial portions of the Software.
11  *
12  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
13  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
14  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
15  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
16  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
17  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
18  * THE SOFTWARE.
19  */
20 
21 /**
22  * Thread message API test
23  */
24 
25 #include "libavutil/avassert.h"
26 #include "libavutil/avstring.h"
27 #include "libavutil/frame.h"
29 #include "libavutil/thread.h" // not public
30 
31 struct sender_data {
32  int id;
34  int workload;
36 };
37 
38 /* same as sender_data but shuffled for testing purpose */
39 struct receiver_data {
41  int workload;
42  int id;
44 };
45 
46 struct message {
48  // we add some junk in the message to make sure the message size is >
49  // sizeof(void*)
50  int magic;
51 };
52 
53 #define MAGIC 0xdeadc0de
54 
55 static void free_frame(void *arg)
56 {
57  struct message *msg = arg;
58  av_assert0(msg->magic == MAGIC);
59  av_frame_free(&msg->frame);
60 }
61 
62 static void *sender_thread(void *arg)
63 {
64  int i, ret = 0;
65  struct sender_data *wd = arg;
66 
67  av_log(NULL, AV_LOG_INFO, "sender #%d: workload=%d\n", wd->id, wd->workload);
68  for (i = 0; i < wd->workload; i++) {
69  if (rand() % wd->workload < wd->workload / 10) {
70  av_log(NULL, AV_LOG_INFO, "sender #%d: flushing the queue\n", wd->id);
72  } else {
73  char *val;
74  AVDictionary *meta = NULL;
75  struct message msg = {
76  .magic = MAGIC,
77  .frame = av_frame_alloc(),
78  };
79 
80  if (!msg.frame) {
81  ret = AVERROR(ENOMEM);
82  break;
83  }
84 
85  /* we add some metadata to identify the frames */
86  val = av_asprintf("frame %d/%d from sender %d",
87  i + 1, wd->workload, wd->id);
88  if (!val) {
89  av_frame_free(&msg.frame);
90  ret = AVERROR(ENOMEM);
91  break;
92  }
93  ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL);
94  if (ret < 0) {
95  av_frame_free(&msg.frame);
96  break;
97  }
98  msg.frame->metadata = meta;
99 
100  /* allocate a real frame in order to simulate "real" work */
102  msg.frame->width = 320;
103  msg.frame->height = 240;
104  ret = av_frame_get_buffer(msg.frame, 32);
105  if (ret < 0) {
106  av_frame_free(&msg.frame);
107  break;
108  }
109 
110  /* push the frame in the common queue */
111  av_log(NULL, AV_LOG_INFO, "sender #%d: sending my work (%d/%d frame:%p)\n",
112  wd->id, i + 1, wd->workload, msg.frame);
113  ret = av_thread_message_queue_send(wd->queue, &msg, 0);
114  if (ret < 0) {
115  av_frame_free(&msg.frame);
116  break;
117  }
118  }
119  }
120  av_log(NULL, AV_LOG_INFO, "sender #%d: my work is done here (%s)\n",
121  wd->id, av_err2str(ret));
123  return NULL;
124 }
125 
126 static void *receiver_thread(void *arg)
127 {
128  int i, ret = 0;
129  struct receiver_data *rd = arg;
130 
131  for (i = 0; i < rd->workload; i++) {
132  if (rand() % rd->workload < rd->workload / 10) {
133  av_log(NULL, AV_LOG_INFO, "receiver #%d: flushing the queue, "
134  "discarding %d message(s)\n", rd->id,
137  } else {
138  struct message msg;
139  AVDictionary *meta;
141 
142  ret = av_thread_message_queue_recv(rd->queue, &msg, 0);
143  if (ret < 0)
144  break;
145  av_assert0(msg.magic == MAGIC);
146  meta = msg.frame->metadata;
147  e = av_dict_get(meta, "sig", NULL, 0);
148  av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, msg.frame);
149  av_frame_free(&msg.frame);
150  }
151  }
152 
153  av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i);
155 
156  return NULL;
157 }
158 
159 static int get_workload(int minv, int maxv)
160 {
161  return maxv == minv ? maxv : rand() % (maxv - minv) + minv;
162 }
163 
164 int main(int ac, char **av)
165 {
166  int i, ret = 0;
167  int max_queue_size;
168  int nb_senders, sender_min_load, sender_max_load;
169  int nb_receivers, receiver_min_load, receiver_max_load;
170  struct sender_data *senders;
171  struct receiver_data *receivers;
173 
174  if (ac != 8) {
175  av_log(NULL, AV_LOG_ERROR, "%s <max_queue_size> "
176  "<nb_senders> <sender_min_send> <sender_max_send> "
177  "<nb_receivers> <receiver_min_recv> <receiver_max_recv>\n", av[0]);
178  return 1;
179  }
180 
181  max_queue_size = atoi(av[1]);
182  nb_senders = atoi(av[2]);
183  sender_min_load = atoi(av[3]);
184  sender_max_load = atoi(av[4]);
185  nb_receivers = atoi(av[5]);
186  receiver_min_load = atoi(av[6]);
187  receiver_max_load = atoi(av[7]);
188 
189  if (max_queue_size <= 0 ||
190  nb_senders <= 0 || sender_min_load <= 0 || sender_max_load <= 0 ||
191  nb_receivers <= 0 || receiver_min_load <= 0 || receiver_max_load <= 0) {
192  av_log(NULL, AV_LOG_ERROR, "negative values not allowed\n");
193  return 1;
194  }
195 
196  av_log(NULL, AV_LOG_INFO, "qsize:%d / %d senders sending [%d-%d] / "
197  "%d receivers receiving [%d-%d]\n", max_queue_size,
198  nb_senders, sender_min_load, sender_max_load,
199  nb_receivers, receiver_min_load, receiver_max_load);
200 
201  senders = av_mallocz_array(nb_senders, sizeof(*senders));
202  receivers = av_mallocz_array(nb_receivers, sizeof(*receivers));
203  if (!senders || !receivers) {
204  ret = AVERROR(ENOMEM);
205  goto end;
206  }
207 
208  ret = av_thread_message_queue_alloc(&queue, max_queue_size, sizeof(struct message));
209  if (ret < 0)
210  goto end;
211 
213 
214 #define SPAWN_THREADS(type) do { \
215  for (i = 0; i < nb_##type##s; i++) { \
216  struct type##_data *td = &type##s[i]; \
217  \
218  td->id = i; \
219  td->queue = queue; \
220  td->workload = get_workload(type##_min_load, type##_max_load); \
221  \
222  ret = pthread_create(&td->tid, NULL, type##_thread, td); \
223  if (ret) { \
224  const int err = AVERROR(ret); \
225  av_log(NULL, AV_LOG_ERROR, "Unable to start " AV_STRINGIFY(type) \
226  " thread: %s\n", av_err2str(err)); \
227  goto end; \
228  } \
229  } \
230 } while (0)
231 
232 #define WAIT_THREADS(type) do { \
233  for (i = 0; i < nb_##type##s; i++) { \
234  struct type##_data *td = &type##s[i]; \
235  \
236  ret = pthread_join(td->tid, NULL); \
237  if (ret) { \
238  const int err = AVERROR(ret); \
239  av_log(NULL, AV_LOG_ERROR, "Unable to join " AV_STRINGIFY(type) \
240  " thread: %s\n", av_err2str(err)); \
241  goto end; \
242  } \
243  } \
244 } while (0)
245 
246  SPAWN_THREADS(receiver);
247  SPAWN_THREADS(sender);
248 
249  WAIT_THREADS(sender);
250  WAIT_THREADS(receiver);
251 
252 end:
254  av_freep(&senders);
255  av_freep(&receivers);
256 
257  if (ret < 0 && ret != AVERROR_EOF) {
258  av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret));
259  return 1;
260  }
261  return 0;
262 }
#define NULL
Definition: coverity.c:32
const char const char void * val
Definition: avisynth_c.h:863
This structure describes decoded (raw) audio or video data.
Definition: frame.h:295
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err)
Set the receiving error code.
static int get_workload(int minv, int maxv)
void av_thread_message_queue_set_free_func(AVThreadMessageQueue *mq, void(*free_func)(void *msg))
Set the optional free message callback function which will be called if an operation is removing mess...
Definition: threadmessage.c:83
AVThreadMessageQueue * queue
#define SPAWN_THREADS(type)
int av_thread_message_queue_nb_elems(AVThreadMessageQueue *mq)
Return the current number of messages in the queue.
static void * sender_thread(void *arg)
Thread message API test.
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:37
AVFrame * av_frame_alloc(void)
Allocate an AVFrame and set its fields to default values.
Definition: frame.c:189
int av_thread_message_queue_recv(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Receive a message from the queue.
static av_cold int end(AVCodecContext *avctx)
Definition: avrndec.c:90
int av_thread_message_queue_send(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Send a message on the queue.
static void * receiver_thread(void *arg)
AVDictionaryEntry * av_dict_get(const AVDictionary *m, const char *key, const AVDictionaryEntry *prev, int flags)
Get a dictionary entry with matching key.
Definition: dict.c:40
#define AVERROR_EOF
End of file.
Definition: error.h:55
AVDictionary * metadata
metadata.
Definition: frame.h:581
#define av_log(a,...)
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:259
AVThreadMessageQueue * queue
int width
Definition: frame.h:353
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:176
#define MAGIC
void av_thread_message_flush(AVThreadMessageQueue *mq)
Flush the message queue.
void av_frame_free(AVFrame **frame)
Free the frame and any dynamically allocated objects in it, e.g.
Definition: frame.c:202
const char * arg
Definition: jacosubdec.c:66
simple assert() macros that are a bit more flexible than ISO C assert().
packed RGBA 8:8:8:8, 32bpp, RGBARGBA...
Definition: pixfmt.h:93
reference-counted frame API
char * av_asprintf(const char *fmt,...)
Definition: avstring.c:113
#define AV_DICT_DONT_STRDUP_VAL
Take ownership of a value that&#39;s been allocated with av_malloc() or another memory allocation functio...
Definition: dict.h:76
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
Definition: error.h:119
int format
format of the frame, -1 if unknown or unset Values correspond to enum AVPixelFormat for video frames...
Definition: frame.h:368
#define AV_LOG_INFO
Standard information.
Definition: log.h:187
void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq, int err)
Set the sending error code.
int main(int ac, char **av)
int av_dict_set(AVDictionary **pm, const char *key, const char *value, int flags)
Set the given entry in *pm, overwriting an existing entry.
Definition: dict.c:70
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, unsigned nelem, unsigned elsize)
Allocate a new message queue.
Definition: threadmessage.c:40
#define WAIT_THREADS(type)
int av_frame_get_buffer(AVFrame *frame, int align)
Allocate new buffer(s) for audio or video data.
Definition: frame.c:324
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
Free a message queue.
Definition: threadmessage.c:91
char * value
Definition: dict.h:87
int height
Definition: frame.h:353
#define av_freep(p)
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later.That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another.Frame references ownership and permissions
static void free_frame(void *arg)
void * av_mallocz_array(size_t nmemb, size_t size)
Definition: mem.c:191