FFmpeg
f_zmq.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Stefano Sabatini
3  *
4  * This file is part of FFmpeg.
5  *
6  * FFmpeg is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * FFmpeg is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with FFmpeg; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 /**
22  * @file
23  * receive commands through libzeromq and broker them to filters
24  */
25 
26 #include <zmq.h>
27 #include "libavutil/avstring.h"
28 #include "libavutil/bprint.h"
29 #include "libavutil/opt.h"
30 #include "avfilter.h"
31 #include "internal.h"
32 #include "audio.h"
33 #include "video.h"
34 
35 typedef struct ZMQContext {
36  const AVClass *class;
37  void *zmq;
38  void *responder;
39  char *bind_address;
41 } ZMQContext;
42 
43 #define OFFSET(x) offsetof(ZMQContext, x)
44 #define FLAGS AV_OPT_FLAG_FILTERING_PARAM | AV_OPT_FLAG_AUDIO_PARAM | AV_OPT_FLAG_VIDEO_PARAM
45 static const AVOption options[] = {
46  { "bind_address", "set bind address", OFFSET(bind_address), AV_OPT_TYPE_STRING, {.str = "tcp://*:5555"}, 0, 0, FLAGS },
47  { "b", "set bind address", OFFSET(bind_address), AV_OPT_TYPE_STRING, {.str = "tcp://*:5555"}, 0, 0, FLAGS },
48  { NULL }
49 };
50 
52 {
53  ZMQContext *zmq = ctx->priv;
54 
55  zmq->zmq = zmq_ctx_new();
56  if (!zmq->zmq) {
57  av_log(ctx, AV_LOG_ERROR,
58  "Could not create ZMQ context: %s\n", zmq_strerror(errno));
59  return AVERROR_EXTERNAL;
60  }
61 
62  zmq->responder = zmq_socket(zmq->zmq, ZMQ_REP);
63  if (!zmq->responder) {
64  av_log(ctx, AV_LOG_ERROR,
65  "Could not create ZMQ socket: %s\n", zmq_strerror(errno));
66  return AVERROR_EXTERNAL;
67  }
68 
69  if (zmq_bind(zmq->responder, zmq->bind_address) == -1) {
70  av_log(ctx, AV_LOG_ERROR,
71  "Could not bind ZMQ socket to address '%s': %s\n",
72  zmq->bind_address, zmq_strerror(errno));
73  return AVERROR_EXTERNAL;
74  }
75 
76  zmq->command_count = -1;
77  return 0;
78 }
79 
81 {
82  ZMQContext *zmq = ctx->priv;
83 
84  zmq_close(zmq->responder);
85  zmq_ctx_destroy(zmq->zmq);
86 }
87 
88 typedef struct Command {
89  char *target, *command, *arg;
90 } Command;
91 
92 #define SPACES " \f\t\n\r"
93 
94 static int parse_command(Command *cmd, const char *command_str, void *log_ctx)
95 {
96  const char **buf = &command_str;
97 
98  cmd->target = av_get_token(buf, SPACES);
99  if (!cmd->target || !cmd->target[0]) {
100  av_log(log_ctx, AV_LOG_ERROR,
101  "No target specified in command '%s'\n", command_str);
102  return AVERROR(EINVAL);
103  }
104 
105  cmd->command = av_get_token(buf, SPACES);
106  if (!cmd->command || !cmd->command[0]) {
107  av_log(log_ctx, AV_LOG_ERROR,
108  "No command specified in command '%s'\n", command_str);
109  return AVERROR(EINVAL);
110  }
111 
112  cmd->arg = av_get_token(buf, SPACES);
113  return 0;
114 }
115 
116 static int recv_msg(AVFilterContext *ctx, char **buf, int *buf_size)
117 {
118  ZMQContext *zmq = ctx->priv;
119  zmq_msg_t msg;
120  int ret = 0;
121 
122  if (zmq_msg_init(&msg) == -1) {
123  av_log(ctx, AV_LOG_WARNING,
124  "Could not initialize receive message: %s\n", zmq_strerror(errno));
125  return AVERROR_EXTERNAL;
126  }
127 
128  if (zmq_msg_recv(&msg, zmq->responder, ZMQ_DONTWAIT) == -1) {
129  if (errno != EAGAIN)
130  av_log(ctx, AV_LOG_WARNING,
131  "Could not receive message: %s\n", zmq_strerror(errno));
132  ret = AVERROR_EXTERNAL;
133  goto end;
134  }
135 
136  *buf_size = zmq_msg_size(&msg) + 1;
137  *buf = av_malloc(*buf_size);
138  if (!*buf) {
139  ret = AVERROR(ENOMEM);
140  goto end;
141  }
142  memcpy(*buf, zmq_msg_data(&msg), *buf_size - 1);
143  (*buf)[*buf_size-1] = 0;
144 
145 end:
146  zmq_msg_close(&msg);
147  return ret;
148 }
149 
151 {
152  AVFilterContext *ctx = inlink->dst;
153  ZMQContext *zmq = ctx->priv;
154 
155  while (1) {
156  char cmd_buf[1024];
157  char *recv_buf, *send_buf;
158  int recv_buf_size;
159  Command cmd = {0};
160  int ret;
161 
162  /* receive command */
163  if (recv_msg(ctx, &recv_buf, &recv_buf_size) < 0)
164  break;
165  zmq->command_count++;
166 
167  /* parse command */
168  if (parse_command(&cmd, recv_buf, ctx) < 0) {
169  av_log(ctx, AV_LOG_ERROR, "Could not parse command #%d\n", zmq->command_count);
170  goto end;
171  }
172 
173  /* process command */
174  av_log(ctx, AV_LOG_VERBOSE,
175  "Processing command #%d target:%s command:%s arg:%s\n",
176  zmq->command_count, cmd.target, cmd.command, cmd.arg);
177  ret = avfilter_graph_send_command(inlink->graph,
178  cmd.target, cmd.command, cmd.arg,
179  cmd_buf, sizeof(cmd_buf),
181  send_buf = av_asprintf("%d %s%s%s",
182  -ret, av_err2str(ret), cmd_buf[0] ? "\n" : "", cmd_buf);
183  if (!send_buf) {
184  ret = AVERROR(ENOMEM);
185  goto end;
186  }
187  av_log(ctx, AV_LOG_VERBOSE,
188  "Sending command reply for command #%d:\n%s\n",
189  zmq->command_count, send_buf);
190  if (zmq_send(zmq->responder, send_buf, strlen(send_buf), 0) == -1)
191  av_log(ctx, AV_LOG_ERROR, "Failed to send reply for command #%d: %s\n",
192  zmq->command_count, zmq_strerror(ret));
193 
194  end:
195  av_freep(&send_buf);
196  av_freep(&recv_buf);
197  recv_buf_size = 0;
198  av_freep(&cmd.target);
199  av_freep(&cmd.command);
200  av_freep(&cmd.arg);
201  }
202 
203  return ff_filter_frame(ctx->outputs[0], ref);
204 }
205 
206 #if CONFIG_ZMQ_FILTER
207 
208 #define zmq_options options
210 
211 static const AVFilterPad zmq_inputs[] = {
212  {
213  .name = "default",
214  .type = AVMEDIA_TYPE_VIDEO,
215  .filter_frame = filter_frame,
216  },
217  { NULL }
218 };
219 
220 static const AVFilterPad zmq_outputs[] = {
221  {
222  .name = "default",
223  .type = AVMEDIA_TYPE_VIDEO,
224  },
225  { NULL }
226 };
227 
229  .name = "zmq",
230  .description = NULL_IF_CONFIG_SMALL("Receive commands through ZMQ and broker them to filters."),
231  .init = init,
232  .uninit = uninit,
233  .priv_size = sizeof(ZMQContext),
234  .inputs = zmq_inputs,
235  .outputs = zmq_outputs,
236  .priv_class = &zmq_class,
237 };
238 
239 #endif
240 
241 #if CONFIG_AZMQ_FILTER
242 
243 #define azmq_options options
245 
246 static const AVFilterPad azmq_inputs[] = {
247  {
248  .name = "default",
249  .type = AVMEDIA_TYPE_AUDIO,
250  .filter_frame = filter_frame,
251  },
252  { NULL }
253 };
254 
255 static const AVFilterPad azmq_outputs[] = {
256  {
257  .name = "default",
258  .type = AVMEDIA_TYPE_AUDIO,
259  },
260  { NULL }
261 };
262 
264  .name = "azmq",
265  .description = NULL_IF_CONFIG_SMALL("Receive commands through ZMQ and broker them to filters."),
266  .init = init,
267  .uninit = uninit,
268  .priv_size = sizeof(ZMQContext),
269  .inputs = azmq_inputs,
270  .outputs = azmq_outputs,
271  .priv_class = &azmq_class,
272 };
273 
274 #endif
#define NULL
Definition: coverity.c:32
This structure describes decoded (raw) audio or video data.
Definition: frame.h:295
AVFilter ff_af_azmq
void * zmq
Definition: f_zmq.c:37
AVOption.
Definition: opt.h:246
#define AV_LOG_WARNING
Something somehow does not look correct.
Definition: log.h:182
Main libavfilter public API header.
static av_cold int init(AVFilterContext *ctx)
Definition: f_zmq.c:51
static int filter_frame(AVFilterLink *inlink, AVFrame *ref)
Definition: f_zmq.c:150
char * command
Definition: f_sendcmd.c:59
const char * name
Pad name.
Definition: internal.h:60
int ff_filter_frame(AVFilterLink *link, AVFrame *frame)
Send a frame of data to the next filter.
Definition: avfilter.c:1080
AVFilter ff_vf_zmq
#define av_cold
Definition: attributes.h:82
#define av_malloc(s)
AVOptions.
char * bind_address
Definition: f_zmq.c:39
static av_cold int end(AVCodecContext *avctx)
Definition: avrndec.c:90
#define SPACES
Definition: f_zmq.c:92
#define AV_LOG_VERBOSE
Detailed information.
Definition: log.h:192
#define OFFSET(x)
Definition: f_zmq.c:43
#define av_log(a,...)
static int recv_msg(AVFilterContext *ctx, char **buf, int *buf_size)
Definition: f_zmq.c:116
char * target
Definition: f_sendcmd.c:59
A filter pad used for either input or output.
Definition: internal.h:54
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:176
static const AVOption options[]
Definition: f_zmq.c:45
#define NULL_IF_CONFIG_SMALL(x)
Return NULL if CONFIG_SMALL is true, otherwise the argument without modification. ...
Definition: internal.h:186
void * priv
private data for use by the filter
Definition: avfilter.h:353
const char * arg
Definition: jacosubdec.c:66
int command_count
Definition: f_zmq.c:40
char * av_get_token(const char **buf, const char *term)
Unescape the given string until a non escaped terminating char, and return the token corresponding to...
Definition: avstring.c:149
char * av_asprintf(const char *fmt,...)
Definition: avstring.c:113
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
Definition: error.h:119
AVFormatContext * ctx
Definition: movenc.c:48
#define AVFILTER_CMD_FLAG_ONE
Stop once a filter understood the command (for target=all for example), fast filters are favored auto...
Definition: avfilter.h:691
static const AVFilterPad outputs[]
Definition: af_acontrast.c:203
char * arg
Definition: f_sendcmd.c:59
static int command(AVFilterContext *ctx, const char *cmd, const char *arg, char *res, int res_len, int flags)
Definition: vf_drawtext.c:869
void * buf
Definition: avisynth_c.h:766
these buffered frames must be flushed immediately if a new input produces new the filter must not call request_frame to get more It must just process the frame or queue it The task of requesting more frames is left to the filter s request_frame method or the application If a filter has several inputs
Describe the class of an AVClass context structure.
Definition: log.h:67
Filter definition.
Definition: avfilter.h:144
const char * name
Filter name.
Definition: avfilter.h:148
AVFilterLink ** outputs
array of pointers to output links
Definition: avfilter.h:350
static void av_cold uninit(AVFilterContext *ctx)
Definition: f_zmq.c:80
The exact code depends on how similar the blocks are and how related they are to the and needs to apply these operations to the correct inlink or outlink if there are several Macros are available to factor that when no extra processing is inlink
static int ref[MAX_W *MAX_W]
Definition: jpeg2000dwt.c:107
static int parse_command(Command *cmd, const char *command_str, void *log_ctx)
Definition: f_zmq.c:94
int avfilter_graph_send_command(AVFilterGraph *graph, const char *target, const char *cmd, const char *arg, char *res, int res_len, int flags)
Send a command to one or more filter instances.
#define AVFILTER_DEFINE_CLASS(fname)
Definition: internal.h:334
An instance of a filter.
Definition: avfilter.h:338
#define av_freep(p)
internal API functions
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later.That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another.Frame references ownership and permissions
#define AVERROR_EXTERNAL
Generic error in an external library.
Definition: error.h:57
#define FLAGS
Definition: f_zmq.c:44
void * responder
Definition: f_zmq.c:38