FFmpeg
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
api-threadmessage-test.c
Go to the documentation of this file.
1 /*
2  * Permission is hereby granted, free of charge, to any person obtaining a copy
3  * of this software and associated documentation files (the "Software"), to deal
4  * in the Software without restriction, including without limitation the rights
5  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
6  * copies of the Software, and to permit persons to whom the Software is
7  * furnished to do so, subject to the following conditions:
8  *
9  * The above copyright notice and this permission notice shall be included in
10  * all copies or substantial portions of the Software.
11  *
12  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
13  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
14  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
15  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
16  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
17  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
18  * THE SOFTWARE.
19  */
20 
21 /**
22  * Thread message API test
23  */
24 
25 #include "libavutil/avassert.h"
26 #include "libavutil/avstring.h"
27 #include "libavutil/frame.h"
29 #include "libavutil/thread.h" // not public
30 
31 struct sender_data {
32  int id;
34  int workload;
36 };
37 
38 /* same as sender_data but shuffled for testing purpose */
39 struct receiver_data {
41  int workload;
42  int id;
44 };
45 
46 struct message {
48  // we add some junk in the message to make sure the message size is >
49  // sizeof(void*)
50  int magic;
51 };
52 
53 #define MAGIC 0xdeadc0de
54 
55 static void free_frame(void *arg)
56 {
57  struct message *msg = arg;
58  av_assert0(msg->magic == MAGIC);
59  av_frame_free(&msg->frame);
60 }
61 
62 static void *sender_thread(void *arg)
63 {
64  int i, ret = 0;
65  struct sender_data *wd = arg;
66 
67  av_log(NULL, AV_LOG_INFO, "sender #%d: workload=%d\n", wd->id, wd->workload);
68  for (i = 0; i < wd->workload; i++) {
69  if (rand() % wd->workload < wd->workload / 10) {
70  av_log(NULL, AV_LOG_INFO, "sender #%d: flushing the queue\n", wd->id);
72  } else {
73  char *val;
74  AVDictionary *meta = NULL;
75  struct message msg = {
76  .magic = MAGIC,
77  .frame = av_frame_alloc(),
78  };
79 
80  if (!msg.frame) {
81  ret = AVERROR(ENOMEM);
82  break;
83  }
84 
85  /* we add some metadata to identify the frames */
86  val = av_asprintf("frame %d/%d from sender %d",
87  i + 1, wd->workload, wd->id);
88  if (!val) {
89  av_frame_free(&msg.frame);
90  ret = AVERROR(ENOMEM);
91  break;
92  }
93  ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL);
94  if (ret < 0) {
95  av_frame_free(&msg.frame);
96  break;
97  }
98  msg.frame->metadata = meta;
99 
100  /* allocate a real frame in order to simulate "real" work */
102  msg.frame->width = 320;
103  msg.frame->height = 240;
104  ret = av_frame_get_buffer(msg.frame, 32);
105  if (ret < 0) {
106  av_frame_free(&msg.frame);
107  break;
108  }
109 
110  /* push the frame in the common queue */
111  av_log(NULL, AV_LOG_INFO, "sender #%d: sending my work (%d/%d frame:%p)\n",
112  wd->id, i + 1, wd->workload, msg.frame);
113  ret = av_thread_message_queue_send(wd->queue, &msg, 0);
114  if (ret < 0) {
115  av_frame_free(&msg.frame);
116  break;
117  }
118  }
119  }
120  av_log(NULL, AV_LOG_INFO, "sender #%d: my work is done here (%s)\n",
121  wd->id, av_err2str(ret));
123  return NULL;
124 }
125 
126 static void *receiver_thread(void *arg)
127 {
128  int i, ret = 0;
129  struct receiver_data *rd = arg;
130 
131  for (i = 0; i < rd->workload; i++) {
132  if (rand() % rd->workload < rd->workload / 10) {
133  av_log(NULL, AV_LOG_INFO, "receiver #%d: flushing the queue, "
134  "discarding %d message(s)\n", rd->id,
137  } else {
138  struct message msg;
139  AVDictionary *meta;
141 
142  ret = av_thread_message_queue_recv(rd->queue, &msg, 0);
143  if (ret < 0)
144  break;
145  av_assert0(msg.magic == MAGIC);
146  meta = msg.frame->metadata;
147  e = av_dict_get(meta, "sig", NULL, 0);
148  av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, msg.frame);
149  av_frame_free(&msg.frame);
150  }
151  }
152 
153  av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i);
155 
156  return NULL;
157 }
158 
159 static int get_workload(int minv, int maxv)
160 {
161  return maxv == minv ? maxv : rand() % (maxv - minv) + minv;
162 }
163 
164 int main(int ac, char **av)
165 {
166  int i, ret = 0;
167  int max_queue_size;
168  int nb_senders, sender_min_load, sender_max_load;
169  int nb_receivers, receiver_min_load, receiver_max_load;
170  struct sender_data *senders;
171  struct receiver_data *receivers;
173 
174  if (ac != 8) {
175  av_log(NULL, AV_LOG_ERROR, "%s <max_queue_size> "
176  "<nb_senders> <sender_min_send> <sender_max_send> "
177  "<nb_receivers> <receiver_min_recv> <receiver_max_recv>\n", av[0]);
178  return 1;
179  }
180 
181  max_queue_size = atoi(av[1]);
182  nb_senders = atoi(av[2]);
183  sender_min_load = atoi(av[3]);
184  sender_max_load = atoi(av[4]);
185  nb_receivers = atoi(av[5]);
186  receiver_min_load = atoi(av[6]);
187  receiver_max_load = atoi(av[7]);
188 
189  if (max_queue_size <= 0 ||
190  nb_senders <= 0 || sender_min_load <= 0 || sender_max_load <= 0 ||
191  nb_receivers <= 0 || receiver_min_load <= 0 || receiver_max_load <= 0) {
192  av_log(NULL, AV_LOG_ERROR, "negative values not allowed\n");
193  return 1;
194  }
195 
196  av_log(NULL, AV_LOG_INFO, "qsize:%d / %d senders sending [%d-%d] / "
197  "%d receivers receiving [%d-%d]\n", max_queue_size,
198  nb_senders, sender_min_load, sender_max_load,
199  nb_receivers, receiver_min_load, receiver_max_load);
200 
201  senders = av_mallocz_array(nb_senders, sizeof(*senders));
202  receivers = av_mallocz_array(nb_receivers, sizeof(*receivers));
203  if (!senders || !receivers) {
204  ret = AVERROR(ENOMEM);
205  goto end;
206  }
207 
208  ret = av_thread_message_queue_alloc(&queue, max_queue_size, sizeof(struct message));
209  if (ret < 0)
210  goto end;
211 
213 
214 #define SPAWN_THREADS(type) do { \
215  for (i = 0; i < nb_##type##s; i++) { \
216  struct type##_data *td = &type##s[i]; \
217  \
218  td->id = i; \
219  td->queue = queue; \
220  td->workload = get_workload(type##_min_load, type##_max_load); \
221  \
222  ret = pthread_create(&td->tid, NULL, type##_thread, td); \
223  if (ret) { \
224  const int err = AVERROR(ret); \
225  av_log(NULL, AV_LOG_ERROR, "Unable to start " AV_STRINGIFY(type) \
226  " thread: %s\n", av_err2str(err)); \
227  goto end; \
228  } \
229  } \
230 } while (0)
231 
232 #define WAIT_THREADS(type) do { \
233  for (i = 0; i < nb_##type##s; i++) { \
234  struct type##_data *td = &type##s[i]; \
235  \
236  ret = pthread_join(td->tid, NULL); \
237  if (ret) { \
238  const int err = AVERROR(ret); \
239  av_log(NULL, AV_LOG_ERROR, "Unable to join " AV_STRINGIFY(type) \
240  " thread: %s\n", av_err2str(err)); \
241  goto end; \
242  } \
243  } \
244 } while (0)
245 
246  SPAWN_THREADS(receiver);
247  SPAWN_THREADS(sender);
248 
249  WAIT_THREADS(sender);
250  WAIT_THREADS(receiver);
251 
252 end:
254  av_freep(&senders);
255  av_freep(&receivers);
256 
257  if (ret < 0 && ret != AVERROR_EOF) {
258  av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret));
259  return 1;
260  }
261  return 0;
262 }
#define NULL
Definition: coverity.c:32
const char const char void * val
Definition: avisynth_c.h:771
This structure describes decoded (raw) audio or video data.
Definition: frame.h:226
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err)
Set the receiving error code.
static int get_workload(int minv, int maxv)
void av_thread_message_queue_set_free_func(AVThreadMessageQueue *mq, void(*free_func)(void *msg))
Set the optional free message callback function which will be called if an operation is removing mess...
Definition: threadmessage.c:83
AVThreadMessageQueue * queue
#define SPAWN_THREADS(type)
int av_thread_message_queue_nb_elems(AVThreadMessageQueue *mq)
Return the current number of messages in the queue.
static void * sender_thread(void *arg)
Thread message API test.
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:37
AVFrame * av_frame_alloc(void)
Allocate an AVFrame and set its fields to default values.
Definition: frame.c:189
int av_thread_message_queue_recv(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Receive a message from the queue.
static av_cold int end(AVCodecContext *avctx)
Definition: avrndec.c:90
int av_thread_message_queue_send(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Send a message on the queue.
static void * receiver_thread(void *arg)
AVDictionaryEntry * av_dict_get(const AVDictionary *m, const char *key, const AVDictionaryEntry *prev, int flags)
Get a dictionary entry with matching key.
Definition: dict.c:40
#define AVERROR_EOF
End of file.
Definition: error.h:55
AVDictionary * metadata
metadata.
Definition: frame.h:513
#define av_log(a,...)
AVThreadMessageQueue * queue
int width
Definition: frame.h:284
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:176
#define MAGIC
void av_thread_message_flush(AVThreadMessageQueue *mq)
Flush the message queue.
#define AVERROR(e)
Definition: error.h:43
void av_frame_free(AVFrame **frame)
Free the frame and any dynamically allocated objects in it, e.g.
Definition: frame.c:202
const char * arg
Definition: jacosubdec.c:66
simple assert() macros that are a bit more flexible than ISO C assert().
packed RGBA 8:8:8:8, 32bpp, RGBARGBA...
Definition: pixfmt.h:93
reference-counted frame API
char * av_asprintf(const char *fmt,...)
Definition: avstring.c:113
#define AV_DICT_DONT_STRDUP_VAL
Take ownership of a value that's been allocated with av_malloc() or another memory allocation functio...
Definition: dict.h:76
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
Definition: error.h:119
int format
format of the frame, -1 if unknown or unset Values correspond to enum AVPixelFormat for video frames...
Definition: frame.h:299
#define AV_LOG_INFO
Standard information.
Definition: log.h:187
void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq, int err)
Set the sending error code.
int main(int ac, char **av)
int av_dict_set(AVDictionary **pm, const char *key, const char *value, int flags)
Set the given entry in *pm, overwriting an existing entry.
Definition: dict.c:70
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, unsigned nelem, unsigned elsize)
Allocate a new message queue.
Definition: threadmessage.c:40
#define WAIT_THREADS(type)
int av_frame_get_buffer(AVFrame *frame, int align)
Allocate new buffer(s) for audio or video data.
Definition: frame.c:324
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
Free a message queue.
Definition: threadmessage.c:91
char * value
Definition: dict.h:87
int height
Definition: frame.h:284
#define av_freep(p)
static void free_frame(void *arg)
void * av_mallocz_array(size_t nmemb, size_t size)
Definition: mem.c:191