FFmpeg
libamqp.c
Go to the documentation of this file.
1 /*
2  * Advanced Message Queuing Protocol (AMQP) 0-9-1
3  * Copyright (c) 2020 Andriy Gelman
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <amqp.h>
23 #include <amqp_tcp_socket.h>
24 #include <sys/time.h>
25 #include "avformat.h"
26 #include "libavutil/avstring.h"
27 #include "libavutil/opt.h"
28 #include "libavutil/time.h"
29 #include "network.h"
30 #include "url.h"
31 #include "urldecode.h"
32 
33 typedef struct AMQPContext {
34  const AVClass *class;
35  amqp_connection_state_t conn;
36  amqp_socket_t *socket;
37  const char *exchange;
38  const char *routing_key;
39  int pkt_size;
43 } AMQPContext;
44 
45 #define STR_LEN 1024
46 #define DEFAULT_CHANNEL 1
47 
48 #define OFFSET(x) offsetof(AMQPContext, x)
49 #define D AV_OPT_FLAG_DECODING_PARAM
50 #define E AV_OPT_FLAG_ENCODING_PARAM
51 static const AVOption options[] = {
52  { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags = D | E },
53  { "exchange", "Exchange to send/read packets", OFFSET(exchange), AV_OPT_TYPE_STRING, { .str = "amq.direct" }, 0, 0, .flags = D | E },
54  { "routing_key", "Key to filter streams", OFFSET(routing_key), AV_OPT_TYPE_STRING, { .str = "amqp" }, 0, 0, .flags = D | E },
55  { "connection_timeout", "Initial connection timeout", OFFSET(connection_timeout), AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1, INT64_MAX, .flags = D | E},
56  { "delivery_mode", "Delivery mode", OFFSET(delivery_mode), AV_OPT_TYPE_INT, { .i64 = AMQP_DELIVERY_PERSISTENT }, 1, 2, .flags = E, "delivery_mode"},
57  { "persistent", "Persistent delivery mode", 0, AV_OPT_TYPE_CONST, { .i64 = AMQP_DELIVERY_PERSISTENT }, 0, 0, E, "delivery_mode" },
58  { "non-persistent", "Non-persistent delivery mode", 0, AV_OPT_TYPE_CONST, { .i64 = AMQP_DELIVERY_NONPERSISTENT }, 0, 0, E, "delivery_mode" },
59  { NULL }
60 };
61 
62 static int amqp_proto_open(URLContext *h, const char *uri, int flags)
63 {
64  int ret, server_msg;
65  char hostname[STR_LEN], credentials[STR_LEN];
66  int port;
67  const char *user, *password = NULL;
68  const char *user_decoded, *password_decoded;
69  char *p;
70  amqp_rpc_reply_t broker_reply;
71  struct timeval tval = { 0 };
72 
73  AMQPContext *s = h->priv_data;
74 
75  h->is_streamed = 1;
76  h->max_packet_size = s->pkt_size;
77 
78  av_url_split(NULL, 0, credentials, sizeof(credentials),
79  hostname, sizeof(hostname), &port, NULL, 0, uri);
80 
81  if (port < 0)
82  port = 5672;
83 
84  if (hostname[0] == '\0' || port <= 0 || port > 65535 ) {
85  av_log(h, AV_LOG_ERROR, "Invalid hostname/port\n");
86  return AVERROR(EINVAL);
87  }
88 
89  p = strchr(credentials, ':');
90  if (p) {
91  *p = '\0';
92  password = p + 1;
93  }
94 
95  if (!password || *password == '\0')
96  password = "guest";
97 
98  password_decoded = ff_urldecode(password, 0);
99  if (!password_decoded)
100  return AVERROR(ENOMEM);
101 
102  user = credentials;
103  if (*user == '\0')
104  user = "guest";
105 
106  user_decoded = ff_urldecode(user, 0);
107  if (!user_decoded) {
108  av_freep(&password_decoded);
109  return AVERROR(ENOMEM);
110  }
111 
112  s->conn = amqp_new_connection();
113  if (!s->conn) {
114  av_freep(&user_decoded);
115  av_freep(&password_decoded);
116  av_log(h, AV_LOG_ERROR, "Error creating connection\n");
117  return AVERROR_EXTERNAL;
118  }
119 
120  s->socket = amqp_tcp_socket_new(s->conn);
121  if (!s->socket) {
122  av_log(h, AV_LOG_ERROR, "Error creating socket\n");
123  goto destroy_connection;
124  }
125 
126  if (s->connection_timeout < 0)
127  s->connection_timeout = (h->rw_timeout > 0 ? h->rw_timeout : 5000000);
128 
129  tval.tv_sec = s->connection_timeout / 1000000;
130  tval.tv_usec = s->connection_timeout % 1000000;
131  ret = amqp_socket_open_noblock(s->socket, hostname, port, &tval);
132 
133  if (ret) {
134  av_log(h, AV_LOG_ERROR, "Error connecting to server: %s\n",
135  amqp_error_string2(ret));
136  goto destroy_connection;
137  }
138 
139  broker_reply = amqp_login(s->conn, "/", 0, s->pkt_size, 0,
140  AMQP_SASL_METHOD_PLAIN, user_decoded, password_decoded);
141 
142  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
143  av_log(h, AV_LOG_ERROR, "Error login\n");
144  server_msg = AMQP_ACCESS_REFUSED;
145  goto close_connection;
146  }
147 
148  amqp_channel_open(s->conn, DEFAULT_CHANNEL);
149  broker_reply = amqp_get_rpc_reply(s->conn);
150 
151  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
152  av_log(h, AV_LOG_ERROR, "Error set channel\n");
153  server_msg = AMQP_CHANNEL_ERROR;
154  goto close_connection;
155  }
156 
157  if (h->flags & AVIO_FLAG_READ) {
158  amqp_bytes_t queuename;
159  char queuename_buff[STR_LEN];
160  amqp_queue_declare_ok_t *r;
161 
162  r = amqp_queue_declare(s->conn, DEFAULT_CHANNEL, amqp_empty_bytes,
163  0, 0, 0, 1, amqp_empty_table);
164  broker_reply = amqp_get_rpc_reply(s->conn);
165  if (!r || broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
166  av_log(h, AV_LOG_ERROR, "Error declare queue\n");
167  server_msg = AMQP_RESOURCE_ERROR;
168  goto close_channel;
169  }
170 
171  /* store queuename */
172  queuename.bytes = queuename_buff;
173  queuename.len = FFMIN(r->queue.len, STR_LEN);
174  memcpy(queuename.bytes, r->queue.bytes, queuename.len);
175 
176  amqp_queue_bind(s->conn, DEFAULT_CHANNEL, queuename,
177  amqp_cstring_bytes(s->exchange),
178  amqp_cstring_bytes(s->routing_key), amqp_empty_table);
179 
180  broker_reply = amqp_get_rpc_reply(s->conn);
181  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
182  av_log(h, AV_LOG_ERROR, "Queue bind error\n");
183  server_msg = AMQP_INTERNAL_ERROR;
184  goto close_channel;
185  }
186 
187  amqp_basic_consume(s->conn, DEFAULT_CHANNEL, queuename, amqp_empty_bytes,
188  0, 1, 0, amqp_empty_table);
189 
190  broker_reply = amqp_get_rpc_reply(s->conn);
191  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
192  av_log(h, AV_LOG_ERROR, "Set consume error\n");
193  server_msg = AMQP_INTERNAL_ERROR;
194  goto close_channel;
195  }
196  }
197 
198  av_freep(&user_decoded);
199  av_freep(&password_decoded);
200  return 0;
201 
202 close_channel:
203  amqp_channel_close(s->conn, DEFAULT_CHANNEL, server_msg);
204 close_connection:
205  amqp_connection_close(s->conn, server_msg);
206 destroy_connection:
207  amqp_destroy_connection(s->conn);
208 
209  av_freep(&user_decoded);
210  av_freep(&password_decoded);
211  return AVERROR_EXTERNAL;
212 }
213 
214 static int amqp_proto_write(URLContext *h, const unsigned char *buf, int size)
215 {
216  int ret;
217  AMQPContext *s = h->priv_data;
218  int fd = amqp_socket_get_sockfd(s->socket);
219 
220  amqp_bytes_t message = { size, (void *)buf };
221  amqp_basic_properties_t props;
222 
224  if (ret)
225  return ret;
226 
227  props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
228  props.content_type = amqp_cstring_bytes("octet/stream");
229  props.delivery_mode = s->delivery_mode;
230 
231  ret = amqp_basic_publish(s->conn, DEFAULT_CHANNEL, amqp_cstring_bytes(s->exchange),
232  amqp_cstring_bytes(s->routing_key), 0, 0,
233  &props, message);
234 
235  if (ret) {
236  av_log(h, AV_LOG_ERROR, "Error publish: %s\n", amqp_error_string2(ret));
237  return AVERROR_EXTERNAL;
238  }
239 
240  return size;
241 }
242 
243 static int amqp_proto_read(URLContext *h, unsigned char *buf, int size)
244 {
245  AMQPContext *s = h->priv_data;
246  int fd = amqp_socket_get_sockfd(s->socket);
247  int ret;
248 
249  amqp_rpc_reply_t broker_reply;
250  amqp_envelope_t envelope;
251 
253  if (ret)
254  return ret;
255 
256  amqp_maybe_release_buffers(s->conn);
257  broker_reply = amqp_consume_message(s->conn, &envelope, NULL, 0);
258 
259  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL)
260  return AVERROR_EXTERNAL;
261 
262  if (envelope.message.body.len > size) {
263  s->pkt_size_overflow = FFMAX(s->pkt_size_overflow, envelope.message.body.len);
264  av_log(h, AV_LOG_WARNING, "Message exceeds space in the buffer. "
265  "Message will be truncated. Setting -pkt_size %d "
266  "may resolve this issue.\n", s->pkt_size_overflow);
267  }
268  size = FFMIN(size, envelope.message.body.len);
269 
270  memcpy(buf, envelope.message.body.bytes, size);
271  amqp_destroy_envelope(&envelope);
272 
273  return size;
274 }
275 
277 {
278  AMQPContext *s = h->priv_data;
279  amqp_channel_close(s->conn, DEFAULT_CHANNEL, AMQP_REPLY_SUCCESS);
280  amqp_connection_close(s->conn, AMQP_REPLY_SUCCESS);
281  amqp_destroy_connection(s->conn);
282 
283  return 0;
284 }
285 
286 static const AVClass amqp_context_class = {
287  .class_name = "amqp",
288  .item_name = av_default_item_name,
289  .option = options,
290  .version = LIBAVUTIL_VERSION_INT,
291 };
292 
294  .name = "amqp",
295  .url_close = amqp_proto_close,
296  .url_open = amqp_proto_open,
297  .url_read = amqp_proto_read,
298  .url_write = amqp_proto_write,
299  .priv_data_size = sizeof(AMQPContext),
300  .priv_data_class = &amqp_context_class,
302 };
void av_url_split(char *proto, int proto_size, char *authorization, int authorization_size, char *hostname, int hostname_size, int *port_ptr, char *path, int path_size, const char *url)
Split a URL string into components.
Definition: utils.c:4725
#define NULL
Definition: coverity.c:32
static int amqp_proto_write(URLContext *h, const unsigned char *buf, int size)
Definition: libamqp.c:214
#define URL_PROTOCOL_FLAG_NETWORK
Definition: url.h:34
AVOption.
Definition: opt.h:248
#define STR_LEN
Definition: libamqp.c:45
#define AV_LOG_WARNING
Something somehow does not look correct.
Definition: log.h:200
#define E
Definition: libamqp.c:50
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
int is_streamed
true if streamed (no seek possible), default = false
Definition: url.h:45
AVIOInterruptCB interrupt_callback
Definition: url.h:47
const char * av_default_item_name(void *ptr)
Return the context name.
Definition: log.c:235
#define AVIO_FLAG_READ
read-only
Definition: avio.h:674
int64_t rw_timeout
maximum time to wait for (network) read/write operation completion, in mcs
Definition: url.h:48
int flags
Definition: url.h:43
amqp_connection_state_t conn
Definition: libamqp.c:35
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:72
const URLProtocol ff_libamqp_protocol
Definition: libamqp.c:293
const char * exchange
Definition: libamqp.c:37
AVOptions.
static int amqp_proto_read(URLContext *h, unsigned char *buf, int size)
Definition: libamqp.c:243
ptrdiff_t size
Definition: opengl_enc.c:100
#define av_log(a,...)
char * ff_urldecode(const char *url, int decode_plus_sign)
Decodes an URL from its percent-encoded form back into normal representation.
Definition: urldecode.c:35
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:194
const char * r
Definition: vf_curves.c:114
#define FFMAX(a, b)
Definition: common.h:94
int ff_network_wait_fd_timeout(int fd, int write, int64_t timeout, AVIOInterruptCB *int_cb)
This works similarly to ff_network_wait_fd, but waits up to &#39;timeout&#39; microseconds Uses ff_network_wa...
Definition: network.c:78
static void envelope(VectorscopeContext *s, AVFrame *out)
#define FFMIN(a, b)
Definition: common.h:96
const char * routing_key
Definition: libamqp.c:38
#define s(width, name)
Definition: cbs_vp9.c:257
#define DEFAULT_CHANNEL
Definition: libamqp.c:46
int delivery_mode
Definition: libamqp.c:42
int pkt_size
Definition: libamqp.c:39
static const AVClass amqp_context_class
Definition: libamqp.c:286
Definition: url.h:38
static int amqp_proto_open(URLContext *h, const char *uri, int flags)
Definition: libamqp.c:62
#define OFFSET(x)
Definition: libamqp.c:48
Describe the class of an AVClass context structure.
Definition: log.h:67
void * priv_data
Definition: url.h:41
amqp_socket_t * socket
Definition: libamqp.c:36
const char * name
Definition: url.h:55
static const AVOption options[]
Definition: libamqp.c:51
#define flags(name, subs,...)
Definition: cbs_av1.c:560
#define D
Definition: libamqp.c:49
Main libavformat public API header.
static int amqp_proto_close(URLContext *h)
Definition: libamqp.c:276
int pkt_size_overflow
Definition: libamqp.c:41
int max_packet_size
if non zero, the stream is packetized with this max packet size
Definition: url.h:44
#define av_freep(p)
unbuffered private I/O API
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later.That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another.Frame references ownership and permissions
#define AVERROR_EXTERNAL
Generic error in an external library.
Definition: error.h:57
int64_t connection_timeout
Definition: libamqp.c:40