FFmpeg
libzmq.c
Go to the documentation of this file.
1 /*
2  * ZeroMQ Protocol
3  * Copyright (c) 2019 Andriy Gelman
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <zmq.h>
23 #include "url.h"
24 #include "network.h"
25 #include "libavutil/avstring.h"
26 #include "libavutil/opt.h"
27 #include "libavutil/time.h"
28 
29 #define ZMQ_STRERROR zmq_strerror(zmq_errno())
30 
31 typedef struct ZMQContext {
32  const AVClass *class;
33  void *context;
34  void *socket;
35  int pkt_size;
36  int pkt_size_overflow; /*keep track of the largest packet during overflow*/
37 } ZMQContext;
38 
39 #define OFFSET(x) offsetof(ZMQContext, x)
40 #define D AV_OPT_FLAG_DECODING_PARAM
41 #define E AV_OPT_FLAG_ENCODING_PARAM
42 static const AVOption options[] = {
43  { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 131072 }, -1, INT_MAX, .flags = D | E },
44  { NULL }
45 };
46 
47 static int zmq_proto_wait(URLContext *h, void *socket, int write)
48 {
49  int ret;
50  int ev = write ? ZMQ_POLLOUT : ZMQ_POLLIN;
51  zmq_pollitem_t items = { .socket = socket, .fd = 0, .events = ev, .revents = 0 };
52  ret = zmq_poll(&items, 1, POLLING_TIME);
53  if (ret == -1) {
54  av_log(h, AV_LOG_ERROR, "Error occured during zmq_poll(): %s\n", ZMQ_STRERROR);
55  return AVERROR_EXTERNAL;
56  }
57  return items.revents & ev ? 0 : AVERROR(EAGAIN);
58 }
59 
60 static int zmq_proto_wait_timeout(URLContext *h, void *socket, int write, int64_t timeout, AVIOInterruptCB *int_cb)
61 {
62  int ret;
63  int64_t wait_start = 0;
64 
65  while (1) {
66  if (ff_check_interrupt(int_cb))
67  return AVERROR_EXIT;
68  ret = zmq_proto_wait(h, socket, write);
69  if (ret != AVERROR(EAGAIN))
70  return ret;
71  if (timeout > 0) {
72  if (!wait_start)
73  wait_start = av_gettime_relative();
74  else if (av_gettime_relative() - wait_start > timeout)
75  return AVERROR(ETIMEDOUT);
76  }
77  }
78 }
79 
80 static int zmq_proto_open(URLContext *h, const char *uri, int flags)
81 {
82  int ret;
83  ZMQContext *s = h->priv_data;
84  s->pkt_size_overflow = 0;
85  h->is_streamed = 1;
86 
87  if (s->pkt_size > 0)
88  h->max_packet_size = s->pkt_size;
89 
90  s->context = zmq_ctx_new();
91  if (!s->context) {
92  /*errno not set on failure during zmq_ctx_new()*/
93  av_log(h, AV_LOG_ERROR, "Error occured during zmq_ctx_new()\n");
94  return AVERROR_EXTERNAL;
95  }
96 
97  av_strstart(uri, "zmq:", &uri);
98 
99  /*publish during write*/
100  if (h->flags & AVIO_FLAG_WRITE) {
101  s->socket = zmq_socket(s->context, ZMQ_PUB);
102  if (!s->socket) {
103  av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", ZMQ_STRERROR);
104  goto fail_term;
105  }
106 
107  ret = zmq_bind(s->socket, uri);
108  if (ret == -1) {
109  av_log(h, AV_LOG_ERROR, "Error occured during zmq_bind(): %s\n", ZMQ_STRERROR);
110  goto fail_close;
111  }
112  }
113 
114  /*subscribe for read*/
115  if (h->flags & AVIO_FLAG_READ) {
116  s->socket = zmq_socket(s->context, ZMQ_SUB);
117  if (!s->socket) {
118  av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", ZMQ_STRERROR);
119  goto fail_term;
120  }
121 
122  ret = zmq_setsockopt(s->socket, ZMQ_SUBSCRIBE, "", 0);
123  if (ret == -1) {
124  av_log(h, AV_LOG_ERROR, "Error occured during zmq_setsockopt(): %s\n", ZMQ_STRERROR);
125  goto fail_close;
126  }
127 
128  ret = zmq_connect(s->socket, uri);
129  if (ret == -1) {
130  av_log(h, AV_LOG_ERROR, "Error occured during zmq_connect(): %s\n", ZMQ_STRERROR);
131  goto fail_close;
132  }
133  }
134  return 0;
135 
136 fail_close:
137  zmq_close(s->socket);
138 fail_term:
139  zmq_ctx_term(s->context);
140  return AVERROR_EXTERNAL;
141 }
142 
143 static int zmq_proto_write(URLContext *h, const unsigned char *buf, int size)
144 {
145  int ret;
146  ZMQContext *s = h->priv_data;
147 
149  if (ret)
150  return ret;
151  ret = zmq_send(s->socket, buf, size, 0);
152  if (ret == -1) {
153  av_log(h, AV_LOG_ERROR, "Error occured during zmq_send(): %s\n", ZMQ_STRERROR);
154  return AVERROR_EXTERNAL;
155  }
156  return ret; /*number of bytes sent*/
157 }
158 
159 static int zmq_proto_read(URLContext *h, unsigned char *buf, int size)
160 {
161  int ret;
162  ZMQContext *s = h->priv_data;
163 
165  if (ret)
166  return ret;
167  ret = zmq_recv(s->socket, buf, size, 0);
168  if (ret == -1) {
169  av_log(h, AV_LOG_ERROR, "Error occured during zmq_recv(): %s\n", ZMQ_STRERROR);
170  return AVERROR_EXTERNAL;
171  }
172  if (ret > size) {
174  av_log(h, AV_LOG_WARNING, "Message exceeds available space in the buffer. Message will be truncated. Setting -pkt_size %d may resolve the issue.\n", s->pkt_size_overflow);
175  ret = size;
176  }
177  return ret; /*number of bytes read*/
178 }
179 
181 {
182  ZMQContext *s = h->priv_data;
183  zmq_close(s->socket);
184  zmq_ctx_term(s->context);
185  return 0;
186 }
187 
188 static const AVClass zmq_context_class = {
189  .class_name = "zmq",
190  .item_name = av_default_item_name,
191  .option = options,
192  .version = LIBAVUTIL_VERSION_INT,
193 };
194 
196  .name = "zmq",
197  .url_close = zmq_proto_close,
198  .url_open = zmq_proto_open,
199  .url_read = zmq_proto_read,
200  .url_write = zmq_proto_write,
201  .priv_data_size = sizeof(ZMQContext),
202  .priv_data_class = &zmq_context_class,
204 };
#define NULL
Definition: coverity.c:32
static const AVClass zmq_context_class
Definition: libzmq.c:188
#define URL_PROTOCOL_FLAG_NETWORK
Definition: url.h:34
static const AVOption options[]
Definition: libzmq.c:42
AVOption.
Definition: opt.h:246
int pkt_size
Definition: libzmq.c:35
#define AV_LOG_WARNING
Something somehow does not look correct.
Definition: log.h:182
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
int is_streamed
true if streamed (no seek possible), default = false
Definition: url.h:45
AVIOInterruptCB interrupt_callback
Definition: url.h:47
const char * av_default_item_name(void *ptr)
Return the context name.
Definition: log.c:235
#define AVIO_FLAG_READ
read-only
Definition: avio.h:674
int64_t rw_timeout
maximum time to wait for (network) read/write operation completion, in mcs
Definition: url.h:48
#define AVIO_FLAG_WRITE
write-only
Definition: avio.h:675
int flags
Definition: url.h:43
static int zmq_proto_open(URLContext *h, const char *uri, int flags)
Definition: libzmq.c:80
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:72
AVOptions.
static int zmq_proto_wait_timeout(URLContext *h, void *socket, int write, int64_t timeout, AVIOInterruptCB *int_cb)
Definition: libzmq.c:60
ptrdiff_t size
Definition: opengl_enc.c:100
#define av_log(a,...)
void * context
Definition: libzmq.c:33
Callback for checking whether to abort blocking functions.
Definition: avio.h:58
#define OFFSET(x)
Definition: libzmq.c:39
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:176
const AVIOInterruptCB int_cb
Definition: ffmpeg.c:489
static int zmq_proto_read(URLContext *h, unsigned char *buf, int size)
Definition: libzmq.c:159
static int zmq_proto_write(URLContext *h, const unsigned char *buf, int size)
Definition: libzmq.c:143
#define FFMAX(a, b)
Definition: common.h:94
#define ZMQ_STRERROR
Definition: libzmq.c:29
#define s(width, name)
Definition: cbs_vp9.c:257
const URLProtocol ff_libzmq_protocol
Definition: libzmq.c:195
void * socket
Definition: libzmq.c:34
#define AVERROR_EXIT
Immediate exit was requested; the called function should not be restarted.
Definition: error.h:56
static int zmq_proto_close(URLContext *h)
Definition: libzmq.c:180
int ff_check_interrupt(AVIOInterruptCB *cb)
Check if the user has requested to interrupt a blocking function associated with cb.
Definition: avio.c:663
void * buf
Definition: avisynth_c.h:766
Definition: url.h:38
Describe the class of an AVClass context structure.
Definition: log.h:67
void * priv_data
Definition: url.h:41
#define POLLING_TIME
Definition: network.h:249
const char * name
Definition: url.h:55
int64_t av_gettime_relative(void)
Get the current time in microseconds since some unspecified starting point.
Definition: time.c:56
#define flags(name, subs,...)
Definition: cbs_av1.c:564
int av_strstart(const char *str, const char *pfx, const char **ptr)
Return non-zero if pfx is a prefix of str.
Definition: avstring.c:34
static int zmq_proto_wait(URLContext *h, void *socket, int write)
Definition: libzmq.c:47
#define E
Definition: libzmq.c:41
int max_packet_size
if non zero, the stream is packetized with this max packet size
Definition: url.h:44
unbuffered private I/O API
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later.That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another.Frame references ownership and permissions
#define AVERROR_EXTERNAL
Generic error in an external library.
Definition: error.h:57
#define D
Definition: libzmq.c:40
int pkt_size_overflow
Definition: libzmq.c:36