FFmpeg
f_zmq.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Stefano Sabatini
3  *
4  * This file is part of FFmpeg.
5  *
6  * FFmpeg is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * FFmpeg is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with FFmpeg; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 /**
22  * @file
23  * receive commands through libzeromq and broker them to filters
24  */
25 
26 #include "config_components.h"
27 
28 #include <zmq.h>
29 #include "libavutil/avstring.h"
30 #include "libavutil/bprint.h"
31 #include "libavutil/opt.h"
32 #include "avfilter.h"
33 #include "internal.h"
34 #include "audio.h"
35 #include "video.h"
36 
37 typedef struct ZMQContext {
38  const AVClass *class;
39  void *zmq;
40  void *responder;
41  char *bind_address;
43 } ZMQContext;
44 
45 #define OFFSET(x) offsetof(ZMQContext, x)
46 #define FLAGS AV_OPT_FLAG_FILTERING_PARAM | AV_OPT_FLAG_AUDIO_PARAM | AV_OPT_FLAG_VIDEO_PARAM
47 static const AVOption options[] = {
48  { "bind_address", "set bind address", OFFSET(bind_address), AV_OPT_TYPE_STRING, {.str = "tcp://*:5555"}, 0, 0, FLAGS },
49  { "b", "set bind address", OFFSET(bind_address), AV_OPT_TYPE_STRING, {.str = "tcp://*:5555"}, 0, 0, FLAGS },
50  { NULL }
51 };
52 
54 {
55  ZMQContext *zmq = ctx->priv;
56 
57  zmq->zmq = zmq_ctx_new();
58  if (!zmq->zmq) {
60  "Could not create ZMQ context: %s\n", zmq_strerror(errno));
61  return AVERROR_EXTERNAL;
62  }
63 
64  zmq->responder = zmq_socket(zmq->zmq, ZMQ_REP);
65  if (!zmq->responder) {
67  "Could not create ZMQ socket: %s\n", zmq_strerror(errno));
68  return AVERROR_EXTERNAL;
69  }
70 
71  if (zmq_bind(zmq->responder, zmq->bind_address) == -1) {
73  "Could not bind ZMQ socket to address '%s': %s\n",
74  zmq->bind_address, zmq_strerror(errno));
75  return AVERROR_EXTERNAL;
76  }
77 
78  zmq->command_count = -1;
79  return 0;
80 }
81 
83 {
84  ZMQContext *zmq = ctx->priv;
85 
86  zmq_close(zmq->responder);
87  zmq_ctx_destroy(zmq->zmq);
88 }
89 
90 typedef struct Command {
91  char *target, *command, *arg;
92 } Command;
93 
94 #define SPACES " \f\t\n\r"
95 
96 static int parse_command(Command *cmd, const char *command_str, void *log_ctx)
97 {
98  const char **buf = &command_str;
99 
100  cmd->target = av_get_token(buf, SPACES);
101  if (!cmd->target || !cmd->target[0]) {
102  av_log(log_ctx, AV_LOG_ERROR,
103  "No target specified in command '%s'\n", command_str);
104  return AVERROR(EINVAL);
105  }
106 
107  cmd->command = av_get_token(buf, SPACES);
108  if (!cmd->command || !cmd->command[0]) {
109  av_log(log_ctx, AV_LOG_ERROR,
110  "No command specified in command '%s'\n", command_str);
111  return AVERROR(EINVAL);
112  }
113 
114  cmd->arg = av_get_token(buf, SPACES);
115  return 0;
116 }
117 
118 static int recv_msg(AVFilterContext *ctx, char **buf, int *buf_size)
119 {
120  ZMQContext *zmq = ctx->priv;
121  zmq_msg_t msg;
122  int ret = 0;
123 
124  if (zmq_msg_init(&msg) == -1) {
126  "Could not initialize receive message: %s\n", zmq_strerror(errno));
127  return AVERROR_EXTERNAL;
128  }
129 
130  if (zmq_msg_recv(&msg, zmq->responder, ZMQ_DONTWAIT) == -1) {
131  if (errno != EAGAIN)
133  "Could not receive message: %s\n", zmq_strerror(errno));
135  goto end;
136  }
137 
138  *buf_size = zmq_msg_size(&msg) + 1;
139  *buf = av_malloc(*buf_size);
140  if (!*buf) {
141  ret = AVERROR(ENOMEM);
142  goto end;
143  }
144  memcpy(*buf, zmq_msg_data(&msg), *buf_size - 1);
145  (*buf)[*buf_size-1] = 0;
146 
147 end:
148  zmq_msg_close(&msg);
149  return ret;
150 }
151 
153 {
154  AVFilterContext *ctx = inlink->dst;
155  ZMQContext *zmq = ctx->priv;
156 
157  while (1) {
158  char cmd_buf[1024];
159  char *recv_buf, *send_buf;
160  int recv_buf_size;
161  Command cmd = {0};
162  int ret;
163 
164  /* receive command */
165  if (recv_msg(ctx, &recv_buf, &recv_buf_size) < 0)
166  break;
167  zmq->command_count++;
168 
169  /* parse command */
170  if (parse_command(&cmd, recv_buf, ctx) < 0) {
171  av_log(ctx, AV_LOG_ERROR, "Could not parse command #%d\n", zmq->command_count);
172  goto end;
173  }
174 
175  /* process command */
177  "Processing command #%d target:%s command:%s arg:%s\n",
178  zmq->command_count, cmd.target, cmd.command, cmd.arg);
180  cmd.target, cmd.command, cmd.arg,
181  cmd_buf, sizeof(cmd_buf),
183  send_buf = av_asprintf("%d %s%s%s",
184  -ret, av_err2str(ret), cmd_buf[0] ? "\n" : "", cmd_buf);
185  if (!send_buf) {
186  ret = AVERROR(ENOMEM);
187  goto end;
188  }
190  "Sending command reply for command #%d:\n%s\n",
191  zmq->command_count, send_buf);
192  if (zmq_send(zmq->responder, send_buf, strlen(send_buf), 0) == -1)
193  av_log(ctx, AV_LOG_ERROR, "Failed to send reply for command #%d: %s\n",
194  zmq->command_count, zmq_strerror(ret));
195 
196  end:
197  av_freep(&send_buf);
198  av_freep(&recv_buf);
199  recv_buf_size = 0;
200  av_freep(&cmd.target);
201  av_freep(&cmd.command);
202  av_freep(&cmd.arg);
203  }
204 
205  return ff_filter_frame(ctx->outputs[0], ref);
206 }
207 
208 AVFILTER_DEFINE_CLASS_EXT(zmq, "(a)zmq", options);
209 
210 #if CONFIG_ZMQ_FILTER
211 
212 static const AVFilterPad zmq_inputs[] = {
213  {
214  .name = "default",
215  .type = AVMEDIA_TYPE_VIDEO,
216  .filter_frame = filter_frame,
217  },
218 };
219 
220 static const AVFilterPad zmq_outputs[] = {
221  {
222  .name = "default",
223  .type = AVMEDIA_TYPE_VIDEO,
224  },
225 };
226 
227 const AVFilter ff_vf_zmq = {
228  .name = "zmq",
229  .description = NULL_IF_CONFIG_SMALL("Receive commands through ZMQ and broker them to filters."),
230  .init = init,
231  .uninit = uninit,
232  .priv_size = sizeof(ZMQContext),
233  FILTER_INPUTS(zmq_inputs),
234  FILTER_OUTPUTS(zmq_outputs),
235  .priv_class = &zmq_class,
236 };
237 
238 #endif
239 
240 #if CONFIG_AZMQ_FILTER
241 
242 static const AVFilterPad azmq_inputs[] = {
243  {
244  .name = "default",
245  .type = AVMEDIA_TYPE_AUDIO,
246  .filter_frame = filter_frame,
247  },
248 };
249 
250 static const AVFilterPad azmq_outputs[] = {
251  {
252  .name = "default",
253  .type = AVMEDIA_TYPE_AUDIO,
254  },
255 };
256 
257 const AVFilter ff_af_azmq = {
258  .name = "azmq",
259  .description = NULL_IF_CONFIG_SMALL("Receive commands through ZMQ and broker them to filters."),
260  .priv_class = &zmq_class,
261  .init = init,
262  .uninit = uninit,
263  .priv_size = sizeof(ZMQContext),
264  FILTER_INPUTS(azmq_inputs),
265  FILTER_OUTPUTS(azmq_outputs),
266 };
267 
268 #endif
AVFILTER_CMD_FLAG_ONE
#define AVFILTER_CMD_FLAG_ONE
Stop once a filter understood the command (for target=all for example), fast filters are favored auto...
Definition: avfilter.h:749
AV_LOG_WARNING
#define AV_LOG_WARNING
Something somehow does not look correct.
Definition: log.h:186
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
opt.h
av_get_token
char * av_get_token(const char **buf, const char *term)
Unescape the given string until a non escaped terminating char, and return the token corresponding to...
Definition: avstring.c:154
ff_filter_frame
int ff_filter_frame(AVFilterLink *link, AVFrame *frame)
Send a frame of data to the next filter.
Definition: avfilter.c:999
inlink
The exact code depends on how similar the blocks are and how related they are to the and needs to apply these operations to the correct inlink or outlink if there are several Macros are available to factor that when no extra processing is inlink
Definition: filter_design.txt:212
av_asprintf
char * av_asprintf(const char *fmt,...)
Definition: avstring.c:116
AVFrame
This structure describes decoded (raw) audio or video data.
Definition: frame.h:325
AVOption
AVOption.
Definition: opt.h:251
options
static const AVOption options[]
Definition: f_zmq.c:47
AV_LOG_VERBOSE
#define AV_LOG_VERBOSE
Detailed information.
Definition: log.h:196
ff_af_azmq
const AVFilter ff_af_azmq
AVFilter::name
const char * name
Filter name.
Definition: avfilter.h:175
ZMQContext
Definition: f_zmq.c:37
video.h
parse_command
static int parse_command(Command *cmd, const char *command_str, void *log_ctx)
Definition: f_zmq.c:96
ZMQContext::responder
void * responder
Definition: f_zmq.c:40
av_malloc
#define av_malloc(s)
Definition: tableprint_vlc.h:30
Command::target
char * target
Definition: f_sendcmd.c:89
Command::command
char * command
Definition: f_sendcmd.c:89
AVFilterPad
A filter pad used for either input or output.
Definition: internal.h:49
AV_LOG_ERROR
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:180
av_cold
#define av_cold
Definition: attributes.h:90
AVMEDIA_TYPE_AUDIO
@ AVMEDIA_TYPE_AUDIO
Definition: avutil.h:202
filter_frame
static int filter_frame(AVFilterLink *inlink, AVFrame *ref)
Definition: f_zmq.c:152
ctx
AVFormatContext * ctx
Definition: movenc.c:48
Command
Definition: f_sendcmd.c:87
FILTER_INPUTS
#define FILTER_INPUTS(array)
Definition: internal.h:190
AVClass
Describe the class of an AVClass context structure.
Definition: log.h:66
NULL
#define NULL
Definition: coverity.c:32
uninit
static void av_cold uninit(AVFilterContext *ctx)
Definition: f_zmq.c:82
SPACES
#define SPACES
Definition: f_zmq.c:94
NULL_IF_CONFIG_SMALL
#define NULL_IF_CONFIG_SMALL(x)
Return NULL if CONFIG_SMALL is true, otherwise the argument without modification.
Definition: internal.h:117
ZMQContext::bind_address
char * bind_address
Definition: f_zmq.c:41
av_err2str
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
Definition: error.h:121
recv_msg
static int recv_msg(AVFilterContext *ctx, char **buf, int *buf_size)
Definition: f_zmq.c:118
AVERROR_EXTERNAL
#define AVERROR_EXTERNAL
Generic error in an external library.
Definition: error.h:59
internal.h
bprint.h
FLAGS
#define FLAGS
Definition: f_zmq.c:46
AVFilterPad::name
const char * name
Pad name.
Definition: internal.h:55
AVFilter
Filter definition.
Definition: avfilter.h:171
ret
ret
Definition: filter_design.txt:187
Command::arg
char * arg
Definition: f_sendcmd.c:89
ZMQContext::command_count
int command_count
Definition: f_zmq.c:42
avfilter.h
ref
static int ref[MAX_W *MAX_W]
Definition: jpeg2000dwt.c:112
ZMQContext::zmq
void * zmq
Definition: f_zmq.c:39
init
static av_cold int init(AVFilterContext *ctx)
Definition: f_zmq.c:53
AVFilterContext
An instance of a filter.
Definition: avfilter.h:408
AVMEDIA_TYPE_VIDEO
@ AVMEDIA_TYPE_VIDEO
Definition: avutil.h:201
audio.h
FILTER_OUTPUTS
#define FILTER_OUTPUTS(array)
Definition: internal.h:191
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:34
avfilter_graph_send_command
int avfilter_graph_send_command(AVFilterGraph *graph, const char *target, const char *cmd, const char *arg, char *res, int res_len, int flags)
Send a command to one or more filter instances.
Definition: avfiltergraph.c:1184
av_log
#define av_log(a,...)
Definition: tableprint_vlc.h:27
ff_vf_zmq
const AVFilter ff_vf_zmq
avstring.h
AV_OPT_TYPE_STRING
@ AV_OPT_TYPE_STRING
Definition: opt.h:229
AVFILTER_DEFINE_CLASS_EXT
AVFILTER_DEFINE_CLASS_EXT(zmq, "(a)zmq", options)
OFFSET
#define OFFSET(x)
Definition: f_zmq.c:45