FFmpeg
libzmq.c
Go to the documentation of this file.
1 /*
2  * ZeroMQ Protocol
3  * Copyright (c) 2019 Andriy Gelman
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <zmq.h>
23 #include "url.h"
24 #include "network.h"
25 #include "libavutil/avstring.h"
26 #include "libavutil/opt.h"
27 #include "libavutil/time.h"
28 
29 #define ZMQ_STRERROR zmq_strerror(zmq_errno())
30 
31 typedef struct ZMQContext {
32  const AVClass *class;
33  void *context;
34  void *socket;
35  int pkt_size;
36  int pkt_size_overflow; /*keep track of the largest packet during overflow*/
37 } ZMQContext;
38 
39 #define OFFSET(x) offsetof(ZMQContext, x)
40 #define D AV_OPT_FLAG_DECODING_PARAM
41 #define E AV_OPT_FLAG_ENCODING_PARAM
42 static const AVOption options[] = {
43  { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 32768 }, -1, INT_MAX, .flags = D | E },
44  { NULL }
45 };
46 
47 static int zmq_proto_wait(URLContext *h, void *socket, int write)
48 {
49  int ret;
50  int ev = write ? ZMQ_POLLOUT : ZMQ_POLLIN;
51  zmq_pollitem_t items = { .socket = socket, .fd = 0, .events = ev, .revents = 0 };
52  ret = zmq_poll(&items, 1, POLLING_TIME);
53  if (ret == -1) {
54  av_log(h, AV_LOG_ERROR, "Error occured during zmq_poll(): %s\n", ZMQ_STRERROR);
55  return AVERROR_EXTERNAL;
56  }
57  return items.revents & ev ? 0 : AVERROR(EAGAIN);
58 }
59 
60 static int zmq_proto_wait_timeout(URLContext *h, void *socket, int write, int64_t timeout, AVIOInterruptCB *int_cb)
61 {
62  int ret;
63  int64_t wait_start = 0;
64 
65  while (1) {
66  if (ff_check_interrupt(int_cb))
67  return AVERROR_EXIT;
68  ret = zmq_proto_wait(h, socket, write);
69  if (ret != AVERROR(EAGAIN))
70  return ret;
71  if (timeout > 0) {
72  if (!wait_start)
73  wait_start = av_gettime_relative();
74  else if (av_gettime_relative() - wait_start > timeout)
75  return AVERROR(ETIMEDOUT);
76  }
77  }
78 }
79 
80 static int zmq_proto_open(URLContext *h, const char *uri, int flags)
81 {
82  int ret;
83  ZMQContext *s = h->priv_data;
84  s->pkt_size_overflow = 0;
85  h->is_streamed = 1;
86 
87  if (s->pkt_size > 0)
88  h->max_packet_size = s->pkt_size;
89 
90  s->context = zmq_ctx_new();
91  if (!s->context) {
92  /*errno not set on failure during zmq_ctx_new()*/
93  av_log(h, AV_LOG_ERROR, "Error occured during zmq_ctx_new()\n");
94  return AVERROR_EXTERNAL;
95  }
96 
97  av_strstart(uri, "zmq:", &uri);
98 
99  /*publish during write*/
100  if (h->flags & AVIO_FLAG_WRITE) {
101  s->socket = zmq_socket(s->context, ZMQ_PUB);
102  if (!s->socket) {
103  av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", ZMQ_STRERROR);
104  zmq_ctx_term(s->context);
105  return AVERROR_EXTERNAL;
106  }
107 
108  ret = zmq_bind(s->socket, uri);
109  if (ret == -1) {
110  av_log(h, AV_LOG_ERROR, "Error occured during zmq_bind(): %s\n", ZMQ_STRERROR);
111  zmq_close(s->socket);
112  zmq_ctx_term(s->context);
113  return AVERROR_EXTERNAL;
114  }
115  }
116 
117  /*subscribe for read*/
118  if (h->flags & AVIO_FLAG_READ) {
119  s->socket = zmq_socket(s->context, ZMQ_SUB);
120  if (!s->socket) {
121  av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", ZMQ_STRERROR);
122  zmq_ctx_term(s->context);
123  return AVERROR_EXTERNAL;
124  }
125 
126  zmq_setsockopt(s->socket, ZMQ_SUBSCRIBE, "", 0);
127  ret = zmq_connect(s->socket, uri);
128  if (ret == -1) {
129  av_log(h, AV_LOG_ERROR, "Error occured during zmq_connect(): %s\n", ZMQ_STRERROR);
130  zmq_close(s->socket);
131  zmq_ctx_term(s->context);
132  return AVERROR_EXTERNAL;
133  }
134  }
135  return 0;
136 }
137 
138 static int zmq_proto_write(URLContext *h, const unsigned char *buf, int size)
139 {
140  int ret;
141  ZMQContext *s = h->priv_data;
142 
144  if (ret)
145  return ret;
146  ret = zmq_send(s->socket, buf, size, 0);
147  if (ret == -1) {
148  av_log(h, AV_LOG_ERROR, "Error occured during zmq_send(): %s\n", ZMQ_STRERROR);
149  return AVERROR_EXTERNAL;
150  }
151  return ret; /*number of bytes sent*/
152 }
153 
154 static int zmq_proto_read(URLContext *h, unsigned char *buf, int size)
155 {
156  int ret;
157  ZMQContext *s = h->priv_data;
158 
160  if (ret)
161  return ret;
162  ret = zmq_recv(s->socket, buf, size, 0);
163  if (ret == -1) {
164  av_log(h, AV_LOG_ERROR, "Error occured during zmq_recv(): %s\n", ZMQ_STRERROR);
165  return AVERROR_EXTERNAL;
166  }
167  if (ret > size) {
169  av_log(h, AV_LOG_WARNING, "Message exceeds available space in the buffer. Message will be truncated. Setting -pkt_size %d may resolve the issue.\n", s->pkt_size_overflow);
170  ret = size;
171  }
172  return ret; /*number of bytes read*/
173 }
174 
176 {
177  ZMQContext *s = h->priv_data;
178  zmq_close(s->socket);
179  zmq_ctx_term(s->context);
180  return 0;
181 }
182 
183 static const AVClass zmq_context_class = {
184  .class_name = "zmq",
185  .item_name = av_default_item_name,
186  .option = options,
187  .version = LIBAVUTIL_VERSION_INT,
188 };
189 
191  .name = "zmq",
192  .url_close = zmq_proto_close,
193  .url_open = zmq_proto_open,
194  .url_read = zmq_proto_read,
195  .url_write = zmq_proto_write,
196  .priv_data_size = sizeof(ZMQContext),
197  .priv_data_class = &zmq_context_class,
199 };
#define NULL
Definition: coverity.c:32
static const AVClass zmq_context_class
Definition: libzmq.c:183
#define URL_PROTOCOL_FLAG_NETWORK
Definition: url.h:34
static const AVOption options[]
Definition: libzmq.c:42
AVOption.
Definition: opt.h:246
int pkt_size
Definition: libzmq.c:35
#define AV_LOG_WARNING
Something somehow does not look correct.
Definition: log.h:182
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
int is_streamed
true if streamed (no seek possible), default = false
Definition: url.h:45
AVIOInterruptCB interrupt_callback
Definition: url.h:47
const char * av_default_item_name(void *ptr)
Return the context name.
Definition: log.c:191
#define AVIO_FLAG_READ
read-only
Definition: avio.h:674
int64_t rw_timeout
maximum time to wait for (network) read/write operation completion, in mcs
Definition: url.h:48
#define AVIO_FLAG_WRITE
write-only
Definition: avio.h:675
int flags
Definition: url.h:43
static int zmq_proto_open(URLContext *h, const char *uri, int flags)
Definition: libzmq.c:80
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:72
AVOptions.
static int zmq_proto_wait_timeout(URLContext *h, void *socket, int write, int64_t timeout, AVIOInterruptCB *int_cb)
Definition: libzmq.c:60
ptrdiff_t size
Definition: opengl_enc.c:100
#define av_log(a,...)
void * context
Definition: libzmq.c:33
Callback for checking whether to abort blocking functions.
Definition: avio.h:58
#define OFFSET(x)
Definition: libzmq.c:39
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:176
const AVIOInterruptCB int_cb
Definition: ffmpeg.c:481
static int zmq_proto_read(URLContext *h, unsigned char *buf, int size)
Definition: libzmq.c:154
static int zmq_proto_write(URLContext *h, const unsigned char *buf, int size)
Definition: libzmq.c:138
#define FFMAX(a, b)
Definition: common.h:94
#define ZMQ_STRERROR
Definition: libzmq.c:29
#define s(width, name)
Definition: cbs_vp9.c:257
const URLProtocol ff_libzmq_protocol
Definition: libzmq.c:190
void * socket
Definition: libzmq.c:34
#define AVERROR_EXIT
Immediate exit was requested; the called function should not be restarted.
Definition: error.h:56
static int zmq_proto_close(URLContext *h)
Definition: libzmq.c:175
int ff_check_interrupt(AVIOInterruptCB *cb)
Check if the user has requested to interrupt a blocking function associated with cb.
Definition: avio.c:663
void * buf
Definition: avisynth_c.h:766
Definition: url.h:38
Describe the class of an AVClass context structure.
Definition: log.h:67
void * priv_data
Definition: url.h:41
#define POLLING_TIME
Definition: network.h:246
const char * name
Definition: url.h:55
int64_t av_gettime_relative(void)
Get the current time in microseconds since some unspecified starting point.
Definition: time.c:56
#define flags(name, subs,...)
Definition: cbs_av1.c:561
int av_strstart(const char *str, const char *pfx, const char **ptr)
Return non-zero if pfx is a prefix of str.
Definition: avstring.c:34
static int zmq_proto_wait(URLContext *h, void *socket, int write)
Definition: libzmq.c:47
#define E
Definition: libzmq.c:41
int max_packet_size
if non zero, the stream is packetized with this max packet size
Definition: url.h:44
unbuffered private I/O API
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later.That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another.Frame references ownership and permissions
#define AVERROR_EXTERNAL
Generic error in an external library.
Definition: error.h:57
#define D
Definition: libzmq.c:40
int pkt_size_overflow
Definition: libzmq.c:36