FFmpeg
f_zmq.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Stefano Sabatini
3  *
4  * This file is part of FFmpeg.
5  *
6  * FFmpeg is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * FFmpeg is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with FFmpeg; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 /**
22  * @file
23  * receive commands through libzeromq and broker them to filters
24  */
25 
26 #include <zmq.h>
27 #include "libavutil/avstring.h"
28 #include "libavutil/bprint.h"
29 #include "libavutil/opt.h"
30 #include "avfilter.h"
31 #include "internal.h"
32 #include "audio.h"
33 #include "video.h"
34 
35 typedef struct ZMQContext {
36  const AVClass *class;
37  void *zmq;
38  void *responder;
39  char *bind_address;
41 } ZMQContext;
42 
43 #define OFFSET(x) offsetof(ZMQContext, x)
44 #define FLAGS AV_OPT_FLAG_FILTERING_PARAM | AV_OPT_FLAG_AUDIO_PARAM | AV_OPT_FLAG_VIDEO_PARAM
45 static const AVOption options[] = {
46  { "bind_address", "set bind address", OFFSET(bind_address), AV_OPT_TYPE_STRING, {.str = "tcp://*:5555"}, 0, 0, FLAGS },
47  { "b", "set bind address", OFFSET(bind_address), AV_OPT_TYPE_STRING, {.str = "tcp://*:5555"}, 0, 0, FLAGS },
48  { NULL }
49 };
50 
52 {
53  ZMQContext *zmq = ctx->priv;
54 
55  zmq->zmq = zmq_ctx_new();
56  if (!zmq->zmq) {
58  "Could not create ZMQ context: %s\n", zmq_strerror(errno));
59  return AVERROR_EXTERNAL;
60  }
61 
62  zmq->responder = zmq_socket(zmq->zmq, ZMQ_REP);
63  if (!zmq->responder) {
65  "Could not create ZMQ socket: %s\n", zmq_strerror(errno));
66  return AVERROR_EXTERNAL;
67  }
68 
69  if (zmq_bind(zmq->responder, zmq->bind_address) == -1) {
71  "Could not bind ZMQ socket to address '%s': %s\n",
72  zmq->bind_address, zmq_strerror(errno));
73  return AVERROR_EXTERNAL;
74  }
75 
76  zmq->command_count = -1;
77  return 0;
78 }
79 
81 {
82  ZMQContext *zmq = ctx->priv;
83 
84  zmq_close(zmq->responder);
85  zmq_ctx_destroy(zmq->zmq);
86 }
87 
88 typedef struct Command {
89  char *target, *command, *arg;
90 } Command;
91 
92 #define SPACES " \f\t\n\r"
93 
94 static int parse_command(Command *cmd, const char *command_str, void *log_ctx)
95 {
96  const char **buf = &command_str;
97 
98  cmd->target = av_get_token(buf, SPACES);
99  if (!cmd->target || !cmd->target[0]) {
100  av_log(log_ctx, AV_LOG_ERROR,
101  "No target specified in command '%s'\n", command_str);
102  return AVERROR(EINVAL);
103  }
104 
105  cmd->command = av_get_token(buf, SPACES);
106  if (!cmd->command || !cmd->command[0]) {
107  av_log(log_ctx, AV_LOG_ERROR,
108  "No command specified in command '%s'\n", command_str);
109  return AVERROR(EINVAL);
110  }
111 
112  cmd->arg = av_get_token(buf, SPACES);
113  return 0;
114 }
115 
116 static int recv_msg(AVFilterContext *ctx, char **buf, int *buf_size)
117 {
118  ZMQContext *zmq = ctx->priv;
119  zmq_msg_t msg;
120  int ret = 0;
121 
122  if (zmq_msg_init(&msg) == -1) {
124  "Could not initialize receive message: %s\n", zmq_strerror(errno));
125  return AVERROR_EXTERNAL;
126  }
127 
128  if (zmq_msg_recv(&msg, zmq->responder, ZMQ_DONTWAIT) == -1) {
129  if (errno != EAGAIN)
131  "Could not receive message: %s\n", zmq_strerror(errno));
133  goto end;
134  }
135 
136  *buf_size = zmq_msg_size(&msg) + 1;
137  *buf = av_malloc(*buf_size);
138  if (!*buf) {
139  ret = AVERROR(ENOMEM);
140  goto end;
141  }
142  memcpy(*buf, zmq_msg_data(&msg), *buf_size);
143  (*buf)[*buf_size-1] = 0;
144 
145 end:
146  zmq_msg_close(&msg);
147  return ret;
148 }
149 
151 {
152  AVFilterContext *ctx = inlink->dst;
153  ZMQContext *zmq = ctx->priv;
154 
155  while (1) {
156  char cmd_buf[1024];
157  char *recv_buf, *send_buf;
158  int recv_buf_size;
159  Command cmd = {0};
160  int ret;
161 
162  /* receive command */
163  if (recv_msg(ctx, &recv_buf, &recv_buf_size) < 0)
164  break;
165  zmq->command_count++;
166 
167  /* parse command */
168  if (parse_command(&cmd, recv_buf, ctx) < 0) {
169  av_log(ctx, AV_LOG_ERROR, "Could not parse command #%d\n", zmq->command_count);
170  goto end;
171  }
172 
173  /* process command */
175  "Processing command #%d target:%s command:%s arg:%s\n",
176  zmq->command_count, cmd.target, cmd.command, cmd.arg);
178  cmd.target, cmd.command, cmd.arg,
179  cmd_buf, sizeof(cmd_buf),
181  send_buf = av_asprintf("%d %s%s%s",
182  -ret, av_err2str(ret), cmd_buf[0] ? "\n" : "", cmd_buf);
183  if (!send_buf) {
184  ret = AVERROR(ENOMEM);
185  goto end;
186  }
188  "Sending command reply for command #%d:\n%s\n",
189  zmq->command_count, send_buf);
190  if (zmq_send(zmq->responder, send_buf, strlen(send_buf), 0) == -1)
191  av_log(ctx, AV_LOG_ERROR, "Failed to send reply for command #%d: %s\n",
192  zmq->command_count, zmq_strerror(ret));
193 
194  end:
195  av_freep(&send_buf);
196  av_freep(&recv_buf);
197  recv_buf_size = 0;
198  av_freep(&cmd.target);
199  av_freep(&cmd.command);
200  av_freep(&cmd.arg);
201  }
202 
203  return ff_filter_frame(ctx->outputs[0], ref);
204 }
205 
206 #if CONFIG_ZMQ_FILTER
207 
208 #define zmq_options options
210 
211 static const AVFilterPad zmq_inputs[] = {
212  {
213  .name = "default",
214  .type = AVMEDIA_TYPE_VIDEO,
215  .filter_frame = filter_frame,
216  },
217  { NULL }
218 };
219 
220 static const AVFilterPad zmq_outputs[] = {
221  {
222  .name = "default",
223  .type = AVMEDIA_TYPE_VIDEO,
224  },
225  { NULL }
226 };
227 
229  .name = "zmq",
230  .description = NULL_IF_CONFIG_SMALL("Receive commands through ZMQ and broker them to filters."),
231  .init = init,
232  .uninit = uninit,
233  .priv_size = sizeof(ZMQContext),
234  .inputs = zmq_inputs,
235  .outputs = zmq_outputs,
236  .priv_class = &zmq_class,
237 };
238 
239 #endif
240 
241 #if CONFIG_AZMQ_FILTER
242 
243 #define azmq_options options
245 
246 static const AVFilterPad azmq_inputs[] = {
247  {
248  .name = "default",
249  .type = AVMEDIA_TYPE_AUDIO,
250  .filter_frame = filter_frame,
251  },
252  { NULL }
253 };
254 
255 static const AVFilterPad azmq_outputs[] = {
256  {
257  .name = "default",
258  .type = AVMEDIA_TYPE_AUDIO,
259  },
260  { NULL }
261 };
262 
264  .name = "azmq",
265  .description = NULL_IF_CONFIG_SMALL("Receive commands through ZMQ and broker them to filters."),
266  .init = init,
267  .uninit = uninit,
268  .priv_size = sizeof(ZMQContext),
269  .inputs = azmq_inputs,
270  .outputs = azmq_outputs,
271  .priv_class = &azmq_class,
272 };
273 
274 #endif
AVFILTER_CMD_FLAG_ONE
#define AVFILTER_CMD_FLAG_ONE
Stop once a filter understood the command (for target=all for example), fast filters are favored auto...
Definition: avfilter.h:691
AV_LOG_WARNING
#define AV_LOG_WARNING
Something somehow does not look correct.
Definition: log.h:182
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
opt.h
av_get_token
char * av_get_token(const char **buf, const char *term)
Unescape the given string until a non escaped terminating char, and return the token corresponding to...
Definition: avstring.c:149
ff_filter_frame
int ff_filter_frame(AVFilterLink *link, AVFrame *frame)
Send a frame of data to the next filter.
Definition: avfilter.c:1080
inlink
The exact code depends on how similar the blocks are and how related they are to the and needs to apply these operations to the correct inlink or outlink if there are several Macros are available to factor that when no extra processing is inlink
Definition: filter_design.txt:212
av_asprintf
char * av_asprintf(const char *fmt,...)
Definition: avstring.c:113
end
static av_cold int end(AVCodecContext *avctx)
Definition: avrndec.c:90
AVFrame
This structure describes decoded (raw) audio or video data.
Definition: frame.h:295
AVOption
AVOption.
Definition: opt.h:246
options
static const AVOption options[]
Definition: f_zmq.c:45
AV_LOG_VERBOSE
#define AV_LOG_VERBOSE
Detailed information.
Definition: log.h:192
AVFilter::name
const char * name
Filter name.
Definition: avfilter.h:148
ZMQContext
Definition: f_zmq.c:35
video.h
parse_command
static int parse_command(Command *cmd, const char *command_str, void *log_ctx)
Definition: f_zmq.c:94
ZMQContext::responder
void * responder
Definition: f_zmq.c:38
av_malloc
#define av_malloc(s)
Definition: tableprint_vlc.h:31
Command::target
char * target
Definition: f_sendcmd.c:59
Command::command
char * command
Definition: f_sendcmd.c:59
ff_vf_zmq
AVFilter ff_vf_zmq
AVFilterPad
A filter pad used for either input or output.
Definition: internal.h:54
AV_LOG_ERROR
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:176
buf
void * buf
Definition: avisynth_c.h:766
av_cold
#define av_cold
Definition: attributes.h:84
AVMEDIA_TYPE_AUDIO
@ AVMEDIA_TYPE_AUDIO
Definition: avutil.h:202
outputs
static const AVFilterPad outputs[]
Definition: af_acontrast.c:203
filter_frame
static int filter_frame(AVFilterLink *inlink, AVFrame *ref)
Definition: f_zmq.c:150
ctx
AVFormatContext * ctx
Definition: movenc.c:48
Command
Definition: f_sendcmd.c:57
AVClass
Describe the class of an AVClass context structure.
Definition: log.h:67
NULL
#define NULL
Definition: coverity.c:32
uninit
static void av_cold uninit(AVFilterContext *ctx)
Definition: f_zmq.c:80
inputs
these buffered frames must be flushed immediately if a new input produces new the filter must not call request_frame to get more It must just process the frame or queue it The task of requesting more frames is left to the filter s request_frame method or the application If a filter has several inputs
Definition: filter_design.txt:243
SPACES
#define SPACES
Definition: f_zmq.c:92
NULL_IF_CONFIG_SMALL
#define NULL_IF_CONFIG_SMALL(x)
Return NULL if CONFIG_SMALL is true, otherwise the argument without modification.
Definition: internal.h:188
ZMQContext::bind_address
char * bind_address
Definition: f_zmq.c:39
av_err2str
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
Definition: error.h:119
recv_msg
static int recv_msg(AVFilterContext *ctx, char **buf, int *buf_size)
Definition: f_zmq.c:116
AVERROR_EXTERNAL
#define AVERROR_EXTERNAL
Generic error in an external library.
Definition: error.h:57
internal.h
AVFILTER_DEFINE_CLASS
#define AVFILTER_DEFINE_CLASS(fname)
Definition: internal.h:334
bprint.h
FLAGS
#define FLAGS
Definition: f_zmq.c:44
AVFilterPad::name
const char * name
Pad name.
Definition: internal.h:60
AVFilter
Filter definition.
Definition: avfilter.h:144
ret
ret
Definition: filter_design.txt:187
ff_af_azmq
AVFilter ff_af_azmq
Command::arg
char * arg
Definition: f_sendcmd.c:59
ZMQContext::command_count
int command_count
Definition: f_zmq.c:40
avfilter.h
ref
static int ref[MAX_W *MAX_W]
Definition: jpeg2000dwt.c:107
ZMQContext::zmq
void * zmq
Definition: f_zmq.c:37
init
static av_cold int init(AVFilterContext *ctx)
Definition: f_zmq.c:51
AVFilterContext
An instance of a filter.
Definition: avfilter.h:338
AVMEDIA_TYPE_VIDEO
@ AVMEDIA_TYPE_VIDEO
Definition: avutil.h:201
audio.h
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:35
avfilter_graph_send_command
int avfilter_graph_send_command(AVFilterGraph *graph, const char *target, const char *cmd, const char *arg, char *res, int res_len, int flags)
Send a command to one or more filter instances.
Definition: avfiltergraph.c:1287
av_log
#define av_log(a,...)
Definition: tableprint_vlc.h:28
avstring.h
AV_OPT_TYPE_STRING
@ AV_OPT_TYPE_STRING
Definition: opt.h:227
OFFSET
#define OFFSET(x)
Definition: f_zmq.c:43