FFmpeg
libzmq.c
Go to the documentation of this file.
1 /*
2  * ZeroMQ Protocol
3  * Copyright (c) 2019 Andriy Gelman
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <zmq.h>
23 #include "url.h"
24 #include "network.h"
25 #include "libavutil/avstring.h"
26 #include "libavutil/opt.h"
27 #include "libavutil/time.h"
28 
29 #define ZMQ_STRERROR zmq_strerror(zmq_errno())
30 
31 typedef struct ZMQContext {
32  const AVClass *class;
33  void *context;
34  void *socket;
35  int pkt_size;
36  int pkt_size_overflow; /*keep track of the largest packet during overflow*/
37 } ZMQContext;
38 
39 #define OFFSET(x) offsetof(ZMQContext, x)
40 #define D AV_OPT_FLAG_DECODING_PARAM
41 #define E AV_OPT_FLAG_ENCODING_PARAM
42 static const AVOption options[] = {
43  { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 131072 }, -1, INT_MAX, .flags = D | E },
44  { NULL }
45 };
46 
47 static int zmq_proto_wait(URLContext *h, void *socket, int write)
48 {
49  int ret;
50  int ev = write ? ZMQ_POLLOUT : ZMQ_POLLIN;
51  zmq_pollitem_t items = { .socket = socket, .fd = 0, .events = ev, .revents = 0 };
52  ret = zmq_poll(&items, 1, POLLING_TIME);
53  if (ret == -1) {
54  av_log(h, AV_LOG_ERROR, "Error occurred during zmq_poll(): %s\n", ZMQ_STRERROR);
55  return AVERROR_EXTERNAL;
56  }
57  return items.revents & ev ? 0 : AVERROR(EAGAIN);
58 }
59 
60 static int zmq_proto_wait_timeout(URLContext *h, void *socket, int write, int64_t timeout, AVIOInterruptCB *int_cb)
61 {
62  int ret;
63  int64_t wait_start = 0;
64 
65  while (1) {
67  return AVERROR_EXIT;
68  ret = zmq_proto_wait(h, socket, write);
69  if (ret != AVERROR(EAGAIN))
70  return ret;
71  if (timeout > 0) {
72  if (!wait_start)
73  wait_start = av_gettime_relative();
74  else if (av_gettime_relative() - wait_start > timeout)
75  return AVERROR(ETIMEDOUT);
76  }
77  }
78 }
79 
80 static int zmq_proto_open(URLContext *h, const char *uri, int flags)
81 {
82  int ret;
83  ZMQContext *s = h->priv_data;
84  s->pkt_size_overflow = 0;
85  h->is_streamed = 1;
86 
87  if (s->pkt_size > 0)
88  h->max_packet_size = s->pkt_size;
89 
90  s->context = zmq_ctx_new();
91  if (!s->context) {
92  /*errno not set on failure during zmq_ctx_new()*/
93  av_log(h, AV_LOG_ERROR, "Error occurred during zmq_ctx_new()\n");
94  return AVERROR_EXTERNAL;
95  }
96 
97  if (!av_strstart(uri, "zmq:", &uri)) {
98  av_log(h, AV_LOG_ERROR, "URL %s lacks prefix\n", uri);
99  return AVERROR(EINVAL);
100  }
101 
102  /*publish during write*/
103  if (h->flags & AVIO_FLAG_WRITE) {
104  s->socket = zmq_socket(s->context, ZMQ_PUB);
105  if (!s->socket) {
106  av_log(h, AV_LOG_ERROR, "Error occurred during zmq_socket(): %s\n", ZMQ_STRERROR);
107  goto fail_term;
108  }
109 
110  ret = zmq_bind(s->socket, uri);
111  if (ret == -1) {
112  av_log(h, AV_LOG_ERROR, "Error occurred during zmq_bind(): %s\n", ZMQ_STRERROR);
113  goto fail_close;
114  }
115  }
116 
117  /*subscribe for read*/
118  if (h->flags & AVIO_FLAG_READ) {
119  s->socket = zmq_socket(s->context, ZMQ_SUB);
120  if (!s->socket) {
121  av_log(h, AV_LOG_ERROR, "Error occurred during zmq_socket(): %s\n", ZMQ_STRERROR);
122  goto fail_term;
123  }
124 
125  ret = zmq_setsockopt(s->socket, ZMQ_SUBSCRIBE, "", 0);
126  if (ret == -1) {
127  av_log(h, AV_LOG_ERROR, "Error occurred during zmq_setsockopt(): %s\n", ZMQ_STRERROR);
128  goto fail_close;
129  }
130 
131  ret = zmq_connect(s->socket, uri);
132  if (ret == -1) {
133  av_log(h, AV_LOG_ERROR, "Error occurred during zmq_connect(): %s\n", ZMQ_STRERROR);
134  goto fail_close;
135  }
136  }
137  return 0;
138 
139 fail_close:
140  zmq_close(s->socket);
141 fail_term:
142  zmq_ctx_term(s->context);
143  return AVERROR_EXTERNAL;
144 }
145 
146 static int zmq_proto_write(URLContext *h, const unsigned char *buf, int size)
147 {
148  int ret;
149  ZMQContext *s = h->priv_data;
150 
151  ret = zmq_proto_wait_timeout(h, s->socket, 1, h->rw_timeout, &h->interrupt_callback);
152  if (ret)
153  return ret;
154  ret = zmq_send(s->socket, buf, size, 0);
155  if (ret == -1) {
156  av_log(h, AV_LOG_ERROR, "Error occurred during zmq_send(): %s\n", ZMQ_STRERROR);
157  return AVERROR_EXTERNAL;
158  }
159  return ret; /*number of bytes sent*/
160 }
161 
162 static int zmq_proto_read(URLContext *h, unsigned char *buf, int size)
163 {
164  int ret;
165  ZMQContext *s = h->priv_data;
166 
167  ret = zmq_proto_wait_timeout(h, s->socket, 0, h->rw_timeout, &h->interrupt_callback);
168  if (ret)
169  return ret;
170  ret = zmq_recv(s->socket, buf, size, 0);
171  if (ret == -1) {
172  av_log(h, AV_LOG_ERROR, "Error occurred during zmq_recv(): %s\n", ZMQ_STRERROR);
173  return AVERROR_EXTERNAL;
174  }
175  if (ret > size) {
176  s->pkt_size_overflow = FFMAX(s->pkt_size_overflow, ret);
177  av_log(h, AV_LOG_WARNING, "Message exceeds available space in the buffer. Message will be truncated. Setting -pkt_size %d may resolve the issue.\n", s->pkt_size_overflow);
178  ret = size;
179  }
180  return ret; /*number of bytes read*/
181 }
182 
184 {
185  ZMQContext *s = h->priv_data;
186  zmq_close(s->socket);
187  zmq_ctx_term(s->context);
188  return 0;
189 }
190 
191 static const AVClass zmq_context_class = {
192  .class_name = "zmq",
193  .item_name = av_default_item_name,
194  .option = options,
195  .version = LIBAVUTIL_VERSION_INT,
196 };
197 
199  .name = "zmq",
200  .url_close = zmq_proto_close,
201  .url_open = zmq_proto_open,
202  .url_read = zmq_proto_read,
203  .url_write = zmq_proto_write,
204  .priv_data_size = sizeof(ZMQContext),
205  .priv_data_class = &zmq_context_class,
207 };
av_gettime_relative
int64_t av_gettime_relative(void)
Get the current time in microseconds since some unspecified starting point.
Definition: time.c:56
AV_LOG_WARNING
#define AV_LOG_WARNING
Something somehow does not look correct.
Definition: log.h:215
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
opt.h
URL_PROTOCOL_FLAG_NETWORK
#define URL_PROTOCOL_FLAG_NETWORK
Definition: url.h:33
int64_t
long long int64_t
Definition: coverity.c:34
AVOption
AVOption.
Definition: opt.h:429
E
#define E
Definition: libzmq.c:41
ZMQContext::pkt_size
int pkt_size
Definition: libzmq.c:35
FFMAX
#define FFMAX(a, b)
Definition: macros.h:47
ZMQContext
Definition: f_zmq.c:37
URLProtocol
Definition: url.h:51
AVIOInterruptCB
Callback for checking whether to abort blocking functions.
Definition: avio.h:59
ZMQContext::context
void * context
Definition: libzmq.c:33
ff_check_interrupt
int ff_check_interrupt(AVIOInterruptCB *cb)
Check if the user has requested to interrupt a blocking function associated with cb.
Definition: avio.c:854
ZMQ_STRERROR
#define ZMQ_STRERROR
Definition: libzmq.c:29
AV_LOG_ERROR
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:209
zmq_proto_close
static int zmq_proto_close(URLContext *h)
Definition: libzmq.c:183
s
#define s(width, name)
Definition: cbs_vp9.c:198
AVIO_FLAG_WRITE
#define AVIO_FLAG_WRITE
write-only
Definition: avio.h:618
LIBAVUTIL_VERSION_INT
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
ff_libzmq_protocol
const URLProtocol ff_libzmq_protocol
Definition: libzmq.c:198
AVClass
Describe the class of an AVClass context structure.
Definition: log.h:75
NULL
#define NULL
Definition: coverity.c:32
zmq_proto_write
static int zmq_proto_write(URLContext *h, const unsigned char *buf, int size)
Definition: libzmq.c:146
av_default_item_name
const char * av_default_item_name(void *ptr)
Return the context name.
Definition: log.c:237
options
Definition: swscale.c:42
ZMQContext::pkt_size_overflow
int pkt_size_overflow
Definition: libzmq.c:36
time.h
zmq_proto_wait_timeout
static int zmq_proto_wait_timeout(URLContext *h, void *socket, int write, int64_t timeout, AVIOInterruptCB *int_cb)
Definition: libzmq.c:60
size
int size
Definition: twinvq_data.h:10344
URLProtocol::name
const char * name
Definition: url.h:52
AVERROR_EXTERNAL
#define AVERROR_EXTERNAL
Generic error in an external library.
Definition: error.h:59
av_strstart
int av_strstart(const char *str, const char *pfx, const char **ptr)
Return non-zero if pfx is a prefix of str.
Definition: avstring.c:36
URLContext
Definition: url.h:35
url.h
int_cb
const AVIOInterruptCB int_cb
Definition: ffmpeg.c:307
zmq_proto_open
static int zmq_proto_open(URLContext *h, const char *uri, int flags)
Definition: libzmq.c:80
ret
ret
Definition: filter_design.txt:187
AVClass::class_name
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:80
network.h
options
static const AVOption options[]
Definition: libzmq.c:42
zmq_context_class
static const AVClass zmq_context_class
Definition: libzmq.c:191
AV_OPT_TYPE_INT
@ AV_OPT_TYPE_INT
Underlying C type is int.
Definition: opt.h:259
D
#define D
Definition: libzmq.c:40
ZMQContext::socket
void * socket
Definition: libzmq.c:34
AVIO_FLAG_READ
#define AVIO_FLAG_READ
read-only
Definition: avio.h:617
POLLING_TIME
#define POLLING_TIME
Definition: network.h:249
flags
#define flags(name, subs,...)
Definition: cbs_av1.c:482
av_log
#define av_log(a,...)
Definition: tableprint_vlc.h:27
zmq_proto_wait
static int zmq_proto_wait(URLContext *h, void *socket, int write)
Definition: libzmq.c:47
h
h
Definition: vp9dsp_template.c:2070
AVERROR_EXIT
#define AVERROR_EXIT
Immediate exit was requested; the called function should not be restarted.
Definition: error.h:58
avstring.h
zmq_proto_read
static int zmq_proto_read(URLContext *h, unsigned char *buf, int size)
Definition: libzmq.c:162
OFFSET
#define OFFSET(x)
Definition: libzmq.c:39