FFmpeg
libamqp.c
Go to the documentation of this file.
1 /*
2  * Advanced Message Queuing Protocol (AMQP) 0-9-1
3  * Copyright (c) 2020 Andriy Gelman
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <amqp.h>
23 #include <amqp_tcp_socket.h>
24 #include <sys/time.h>
25 #include "avformat.h"
26 #include "libavutil/avstring.h"
27 #include "libavutil/opt.h"
28 #include "libavutil/time.h"
29 #include "network.h"
30 #include "url.h"
31 #include "urldecode.h"
32 
33 typedef struct AMQPContext {
34  const AVClass *class;
35  amqp_connection_state_t conn;
36  amqp_socket_t *socket;
37  const char *exchange;
38  const char *routing_key;
39  int pkt_size;
43 } AMQPContext;
44 
45 #define STR_LEN 1024
46 #define DEFAULT_CHANNEL 1
47 
48 #define OFFSET(x) offsetof(AMQPContext, x)
49 #define D AV_OPT_FLAG_DECODING_PARAM
50 #define E AV_OPT_FLAG_ENCODING_PARAM
51 static const AVOption options[] = {
52  { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags = D | E },
53  { "exchange", "Exchange to send/read packets", OFFSET(exchange), AV_OPT_TYPE_STRING, { .str = "amq.direct" }, 0, 0, .flags = D | E },
54  { "routing_key", "Key to filter streams", OFFSET(routing_key), AV_OPT_TYPE_STRING, { .str = "amqp" }, 0, 0, .flags = D | E },
55  { "connection_timeout", "Initial connection timeout", OFFSET(connection_timeout), AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1, INT64_MAX, .flags = D | E},
56  { "delivery_mode", "Delivery mode", OFFSET(delivery_mode), AV_OPT_TYPE_INT, { .i64 = AMQP_DELIVERY_PERSISTENT }, 1, 2, .flags = E, "delivery_mode"},
57  { "persistent", "Persistent delivery mode", 0, AV_OPT_TYPE_CONST, { .i64 = AMQP_DELIVERY_PERSISTENT }, 0, 0, E, "delivery_mode" },
58  { "non-persistent", "Non-persistent delivery mode", 0, AV_OPT_TYPE_CONST, { .i64 = AMQP_DELIVERY_NONPERSISTENT }, 0, 0, E, "delivery_mode" },
59  { NULL }
60 };
61 
62 static int amqp_proto_open(URLContext *h, const char *uri, int flags)
63 {
64  int ret, server_msg;
65  char hostname[STR_LEN], credentials[STR_LEN], path[STR_LEN];
66  int port;
67  const char *user, *password = NULL, *vhost;
68  const char *user_decoded, *password_decoded, *vhost_decoded;
69  char *p;
70  amqp_rpc_reply_t broker_reply;
71  struct timeval tval = { 0 };
72 
73  AMQPContext *s = h->priv_data;
74 
75  h->is_streamed = 1;
76  h->max_packet_size = s->pkt_size;
77 
78  av_url_split(NULL, 0, credentials, sizeof(credentials),
79  hostname, sizeof(hostname), &port, path, sizeof(path), uri);
80 
81  if (port < 0)
82  port = 5672;
83 
84  if (hostname[0] == '\0' || port <= 0 || port > 65535 ) {
85  av_log(h, AV_LOG_ERROR, "Invalid hostname/port\n");
86  return AVERROR(EINVAL);
87  }
88 
89  p = strchr(credentials, ':');
90  if (p) {
91  *p = '\0';
92  password = p + 1;
93  }
94 
95  if (!password || *password == '\0')
96  password = "guest";
97 
98  password_decoded = ff_urldecode(password, 0);
99  if (!password_decoded)
100  return AVERROR(ENOMEM);
101 
102  user = credentials;
103  if (*user == '\0')
104  user = "guest";
105 
106  user_decoded = ff_urldecode(user, 0);
107  if (!user_decoded) {
108  av_freep(&password_decoded);
109  return AVERROR(ENOMEM);
110  }
111 
112  /* skip query for now */
113  p = strchr(path, '?');
114  if (p)
115  *p = '\0';
116 
117  vhost = path;
118  if (*vhost == '\0')
119  vhost = "/";
120  else
121  vhost++; /* skip leading '/' */
122 
123  vhost_decoded = ff_urldecode(vhost, 0);
124  if (!vhost_decoded) {
125  av_freep(&user_decoded);
126  av_freep(&password_decoded);
127  return AVERROR(ENOMEM);
128  }
129 
130  s->conn = amqp_new_connection();
131  if (!s->conn) {
132  av_freep(&vhost_decoded);
133  av_freep(&user_decoded);
134  av_freep(&password_decoded);
135  av_log(h, AV_LOG_ERROR, "Error creating connection\n");
136  return AVERROR_EXTERNAL;
137  }
138 
139  s->socket = amqp_tcp_socket_new(s->conn);
140  if (!s->socket) {
141  av_log(h, AV_LOG_ERROR, "Error creating socket\n");
142  goto destroy_connection;
143  }
144 
145  if (s->connection_timeout < 0)
146  s->connection_timeout = (h->rw_timeout > 0 ? h->rw_timeout : 5000000);
147 
148  tval.tv_sec = s->connection_timeout / 1000000;
149  tval.tv_usec = s->connection_timeout % 1000000;
150  ret = amqp_socket_open_noblock(s->socket, hostname, port, &tval);
151 
152  if (ret) {
153  av_log(h, AV_LOG_ERROR, "Error connecting to server: %s\n",
154  amqp_error_string2(ret));
155  goto destroy_connection;
156  }
157 
158  broker_reply = amqp_login(s->conn, vhost_decoded, 0, s->pkt_size, 0,
159  AMQP_SASL_METHOD_PLAIN, user_decoded, password_decoded);
160 
161  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
162  av_log(h, AV_LOG_ERROR, "Error login\n");
163  server_msg = AMQP_ACCESS_REFUSED;
164  goto close_connection;
165  }
166 
167  amqp_channel_open(s->conn, DEFAULT_CHANNEL);
168  broker_reply = amqp_get_rpc_reply(s->conn);
169 
170  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
171  av_log(h, AV_LOG_ERROR, "Error set channel\n");
172  server_msg = AMQP_CHANNEL_ERROR;
173  goto close_connection;
174  }
175 
176  if (h->flags & AVIO_FLAG_READ) {
177  amqp_bytes_t queuename;
178  char queuename_buff[STR_LEN];
179  amqp_queue_declare_ok_t *r;
180 
181  r = amqp_queue_declare(s->conn, DEFAULT_CHANNEL, amqp_empty_bytes,
182  0, 0, 0, 1, amqp_empty_table);
183  broker_reply = amqp_get_rpc_reply(s->conn);
184  if (!r || broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
185  av_log(h, AV_LOG_ERROR, "Error declare queue\n");
186  server_msg = AMQP_RESOURCE_ERROR;
187  goto close_channel;
188  }
189 
190  /* store queuename */
191  queuename.bytes = queuename_buff;
192  queuename.len = FFMIN(r->queue.len, STR_LEN);
193  memcpy(queuename.bytes, r->queue.bytes, queuename.len);
194 
195  amqp_queue_bind(s->conn, DEFAULT_CHANNEL, queuename,
196  amqp_cstring_bytes(s->exchange),
197  amqp_cstring_bytes(s->routing_key), amqp_empty_table);
198 
199  broker_reply = amqp_get_rpc_reply(s->conn);
200  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
201  av_log(h, AV_LOG_ERROR, "Queue bind error\n");
202  server_msg = AMQP_INTERNAL_ERROR;
203  goto close_channel;
204  }
205 
206  amqp_basic_consume(s->conn, DEFAULT_CHANNEL, queuename, amqp_empty_bytes,
207  0, 1, 0, amqp_empty_table);
208 
209  broker_reply = amqp_get_rpc_reply(s->conn);
210  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
211  av_log(h, AV_LOG_ERROR, "Set consume error\n");
212  server_msg = AMQP_INTERNAL_ERROR;
213  goto close_channel;
214  }
215  }
216 
217  av_freep(&vhost_decoded);
218  av_freep(&user_decoded);
219  av_freep(&password_decoded);
220  return 0;
221 
222 close_channel:
223  amqp_channel_close(s->conn, DEFAULT_CHANNEL, server_msg);
224 close_connection:
225  amqp_connection_close(s->conn, server_msg);
226 destroy_connection:
227  amqp_destroy_connection(s->conn);
228 
229  av_freep(&vhost_decoded);
230  av_freep(&user_decoded);
231  av_freep(&password_decoded);
232  return AVERROR_EXTERNAL;
233 }
234 
235 static int amqp_proto_write(URLContext *h, const unsigned char *buf, int size)
236 {
237  int ret;
238  AMQPContext *s = h->priv_data;
239  int fd = amqp_socket_get_sockfd(s->socket);
240 
241  amqp_bytes_t message = { size, (void *)buf };
242  amqp_basic_properties_t props;
243 
245  if (ret)
246  return ret;
247 
248  props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
249  props.content_type = amqp_cstring_bytes("octet/stream");
250  props.delivery_mode = s->delivery_mode;
251 
252  ret = amqp_basic_publish(s->conn, DEFAULT_CHANNEL, amqp_cstring_bytes(s->exchange),
253  amqp_cstring_bytes(s->routing_key), 0, 0,
254  &props, message);
255 
256  if (ret) {
257  av_log(h, AV_LOG_ERROR, "Error publish: %s\n", amqp_error_string2(ret));
258  return AVERROR_EXTERNAL;
259  }
260 
261  return size;
262 }
263 
264 static int amqp_proto_read(URLContext *h, unsigned char *buf, int size)
265 {
266  AMQPContext *s = h->priv_data;
267  int fd = amqp_socket_get_sockfd(s->socket);
268  int ret;
269 
270  amqp_rpc_reply_t broker_reply;
271  amqp_envelope_t envelope;
272 
274  if (ret)
275  return ret;
276 
277  amqp_maybe_release_buffers(s->conn);
278  broker_reply = amqp_consume_message(s->conn, &envelope, NULL, 0);
279 
280  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL)
281  return AVERROR_EXTERNAL;
282 
283  if (envelope.message.body.len > size) {
284  s->pkt_size_overflow = FFMAX(s->pkt_size_overflow, envelope.message.body.len);
285  av_log(h, AV_LOG_WARNING, "Message exceeds space in the buffer. "
286  "Message will be truncated. Setting -pkt_size %d "
287  "may resolve this issue.\n", s->pkt_size_overflow);
288  }
289  size = FFMIN(size, envelope.message.body.len);
290 
291  memcpy(buf, envelope.message.body.bytes, size);
292  amqp_destroy_envelope(&envelope);
293 
294  return size;
295 }
296 
298 {
299  AMQPContext *s = h->priv_data;
300  amqp_channel_close(s->conn, DEFAULT_CHANNEL, AMQP_REPLY_SUCCESS);
301  amqp_connection_close(s->conn, AMQP_REPLY_SUCCESS);
302  amqp_destroy_connection(s->conn);
303 
304  return 0;
305 }
306 
307 static const AVClass amqp_context_class = {
308  .class_name = "amqp",
309  .item_name = av_default_item_name,
310  .option = options,
311  .version = LIBAVUTIL_VERSION_INT,
312 };
313 
315  .name = "amqp",
316  .url_close = amqp_proto_close,
317  .url_open = amqp_proto_open,
318  .url_read = amqp_proto_read,
319  .url_write = amqp_proto_write,
320  .priv_data_size = sizeof(AMQPContext),
321  .priv_data_class = &amqp_context_class,
323 };
void av_url_split(char *proto, int proto_size, char *authorization, int authorization_size, char *hostname, int hostname_size, int *port_ptr, char *path, int path_size, const char *url)
Split a URL string into components.
Definition: utils.c:4747
#define NULL
Definition: coverity.c:32
static int amqp_proto_write(URLContext *h, const unsigned char *buf, int size)
Definition: libamqp.c:235
#define URL_PROTOCOL_FLAG_NETWORK
Definition: url.h:34
AVOption.
Definition: opt.h:248
#define STR_LEN
Definition: libamqp.c:45
#define AV_LOG_WARNING
Something somehow does not look correct.
Definition: log.h:200
#define E
Definition: libamqp.c:50
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
int is_streamed
true if streamed (no seek possible), default = false
Definition: url.h:45
AVIOInterruptCB interrupt_callback
Definition: url.h:47
const char * av_default_item_name(void *ptr)
Return the context name.
Definition: log.c:235
#define AVIO_FLAG_READ
read-only
Definition: avio.h:674
int64_t rw_timeout
maximum time to wait for (network) read/write operation completion, in mcs
Definition: url.h:48
int flags
Definition: url.h:43
amqp_connection_state_t conn
Definition: libamqp.c:35
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:72
const URLProtocol ff_libamqp_protocol
Definition: libamqp.c:314
const char * exchange
Definition: libamqp.c:37
AVOptions.
static int amqp_proto_read(URLContext *h, unsigned char *buf, int size)
Definition: libamqp.c:264
ptrdiff_t size
Definition: opengl_enc.c:100
#define av_log(a,...)
char * ff_urldecode(const char *url, int decode_plus_sign)
Decodes an URL from its percent-encoded form back into normal representation.
Definition: urldecode.c:35
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:194
const char * r
Definition: vf_curves.c:114
#define FFMAX(a, b)
Definition: common.h:94
int ff_network_wait_fd_timeout(int fd, int write, int64_t timeout, AVIOInterruptCB *int_cb)
This works similarly to ff_network_wait_fd, but waits up to &#39;timeout&#39; microseconds Uses ff_network_wa...
Definition: network.c:78
static void envelope(VectorscopeContext *s, AVFrame *out)
#define FFMIN(a, b)
Definition: common.h:96
const char * routing_key
Definition: libamqp.c:38
#define s(width, name)
Definition: cbs_vp9.c:257
#define DEFAULT_CHANNEL
Definition: libamqp.c:46
int delivery_mode
Definition: libamqp.c:42
int pkt_size
Definition: libamqp.c:39
static const AVClass amqp_context_class
Definition: libamqp.c:307
Definition: url.h:38
static int amqp_proto_open(URLContext *h, const char *uri, int flags)
Definition: libamqp.c:62
#define OFFSET(x)
Definition: libamqp.c:48
Describe the class of an AVClass context structure.
Definition: log.h:67
void * priv_data
Definition: url.h:41
amqp_socket_t * socket
Definition: libamqp.c:36
const char * name
Definition: url.h:55
static const AVOption options[]
Definition: libamqp.c:51
#define flags(name, subs,...)
Definition: cbs_av1.c:561
#define D
Definition: libamqp.c:49
Main libavformat public API header.
static int amqp_proto_close(URLContext *h)
Definition: libamqp.c:297
int pkt_size_overflow
Definition: libamqp.c:41
int max_packet_size
if non zero, the stream is packetized with this max packet size
Definition: url.h:44
#define av_freep(p)
unbuffered private I/O API
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later.That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another.Frame references ownership and permissions
#define AVERROR_EXTERNAL
Generic error in an external library.
Definition: error.h:57
int64_t connection_timeout
Definition: libamqp.c:40