FFmpeg
libamqp.c
Go to the documentation of this file.
1 /*
2  * Advanced Message Queuing Protocol (AMQP) 0-9-1
3  * Copyright (c) 2020 Andriy Gelman
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <amqp.h>
23 #include <amqp_tcp_socket.h>
24 #include <sys/time.h>
25 #include "avformat.h"
26 #include "libavutil/avstring.h"
27 #include "libavutil/opt.h"
28 #include "libavutil/time.h"
29 #include "network.h"
30 #include "url.h"
31 #include "urldecode.h"
32 
33 typedef struct AMQPContext {
34  const AVClass *class;
35  amqp_connection_state_t conn;
36  amqp_socket_t *socket;
37  const char *exchange;
38  const char *routing_key;
39  int pkt_size;
42 } AMQPContext;
43 
44 #define STR_LEN 1024
45 #define DEFAULT_CHANNEL 1
46 
47 #define OFFSET(x) offsetof(AMQPContext, x)
48 #define D AV_OPT_FLAG_DECODING_PARAM
49 #define E AV_OPT_FLAG_ENCODING_PARAM
50 static const AVOption options[] = {
51  { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags = D | E },
52  { "exchange", "Exchange to send/read packets", OFFSET(exchange), AV_OPT_TYPE_STRING, { .str = "amq.direct" }, 0, 0, .flags = D | E },
53  { "routing_key", "Key to filter streams", OFFSET(routing_key), AV_OPT_TYPE_STRING, { .str = "amqp" }, 0, 0, .flags = D | E },
54  { "connection_timeout", "Initial connection timeout", OFFSET(connection_timeout), AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1, INT64_MAX, .flags = D | E},
55  { NULL }
56 };
57 
58 static int amqp_proto_open(URLContext *h, const char *uri, int flags)
59 {
60  int ret, server_msg;
61  char hostname[STR_LEN], credentials[STR_LEN];
62  int port;
63  const char *user, *password = NULL;
64  const char *user_decoded, *password_decoded;
65  char *p;
66  amqp_rpc_reply_t broker_reply;
67  struct timeval tval = { 0 };
68 
69  AMQPContext *s = h->priv_data;
70 
71  h->is_streamed = 1;
72  h->max_packet_size = s->pkt_size;
73 
74  av_url_split(NULL, 0, credentials, sizeof(credentials),
75  hostname, sizeof(hostname), &port, NULL, 0, uri);
76 
77  if (port < 0)
78  port = 5672;
79 
80  if (hostname[0] == '\0' || port <= 0 || port > 65535 ) {
81  av_log(h, AV_LOG_ERROR, "Invalid hostname/port\n");
82  return AVERROR(EINVAL);
83  }
84 
85  p = strchr(credentials, ':');
86  if (p) {
87  *p = '\0';
88  password = p + 1;
89  }
90 
91  if (!password || *password == '\0')
92  password = "guest";
93 
94  password_decoded = ff_urldecode(password, 0);
95  if (!password_decoded)
96  return AVERROR(ENOMEM);
97 
98  user = credentials;
99  if (*user == '\0')
100  user = "guest";
101 
102  user_decoded = ff_urldecode(user, 0);
103  if (!user_decoded) {
104  av_freep(&password_decoded);
105  return AVERROR(ENOMEM);
106  }
107 
108  s->conn = amqp_new_connection();
109  if (!s->conn) {
110  av_freep(&user_decoded);
111  av_freep(&password_decoded);
112  av_log(h, AV_LOG_ERROR, "Error creating connection\n");
113  return AVERROR_EXTERNAL;
114  }
115 
116  s->socket = amqp_tcp_socket_new(s->conn);
117  if (!s->socket) {
118  av_log(h, AV_LOG_ERROR, "Error creating socket\n");
119  goto destroy_connection;
120  }
121 
122  if (s->connection_timeout < 0)
123  s->connection_timeout = (h->rw_timeout > 0 ? h->rw_timeout : 5000000);
124 
125  tval.tv_sec = s->connection_timeout / 1000000;
126  tval.tv_usec = s->connection_timeout % 1000000;
127  ret = amqp_socket_open_noblock(s->socket, hostname, port, &tval);
128 
129  if (ret) {
130  av_log(h, AV_LOG_ERROR, "Error connecting to server: %s\n",
131  amqp_error_string2(ret));
132  goto destroy_connection;
133  }
134 
135  broker_reply = amqp_login(s->conn, "/", 0, s->pkt_size, 0,
136  AMQP_SASL_METHOD_PLAIN, user_decoded, password_decoded);
137 
138  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
139  av_log(h, AV_LOG_ERROR, "Error login\n");
140  server_msg = AMQP_ACCESS_REFUSED;
141  goto close_connection;
142  }
143 
144  amqp_channel_open(s->conn, DEFAULT_CHANNEL);
145  broker_reply = amqp_get_rpc_reply(s->conn);
146 
147  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
148  av_log(h, AV_LOG_ERROR, "Error set channel\n");
149  server_msg = AMQP_CHANNEL_ERROR;
150  goto close_connection;
151  }
152 
153  if (h->flags & AVIO_FLAG_READ) {
154  amqp_bytes_t queuename;
155  char queuename_buff[STR_LEN];
156  amqp_queue_declare_ok_t *r;
157 
158  r = amqp_queue_declare(s->conn, DEFAULT_CHANNEL, amqp_empty_bytes,
159  0, 0, 0, 1, amqp_empty_table);
160  broker_reply = amqp_get_rpc_reply(s->conn);
161  if (!r || broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
162  av_log(h, AV_LOG_ERROR, "Error declare queue\n");
163  server_msg = AMQP_RESOURCE_ERROR;
164  goto close_channel;
165  }
166 
167  /* store queuename */
168  queuename.bytes = queuename_buff;
169  queuename.len = FFMIN(r->queue.len, STR_LEN);
170  memcpy(queuename.bytes, r->queue.bytes, queuename.len);
171 
172  amqp_queue_bind(s->conn, DEFAULT_CHANNEL, queuename,
173  amqp_cstring_bytes(s->exchange),
174  amqp_cstring_bytes(s->routing_key), amqp_empty_table);
175 
176  broker_reply = amqp_get_rpc_reply(s->conn);
177  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
178  av_log(h, AV_LOG_ERROR, "Queue bind error\n");
179  server_msg = AMQP_INTERNAL_ERROR;
180  goto close_channel;
181  }
182 
183  amqp_basic_consume(s->conn, DEFAULT_CHANNEL, queuename, amqp_empty_bytes,
184  0, 1, 0, amqp_empty_table);
185 
186  broker_reply = amqp_get_rpc_reply(s->conn);
187  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
188  av_log(h, AV_LOG_ERROR, "Set consume error\n");
189  server_msg = AMQP_INTERNAL_ERROR;
190  goto close_channel;
191  }
192  }
193 
194  av_freep(&user_decoded);
195  av_freep(&password_decoded);
196  return 0;
197 
198 close_channel:
199  amqp_channel_close(s->conn, DEFAULT_CHANNEL, server_msg);
200 close_connection:
201  amqp_connection_close(s->conn, server_msg);
202 destroy_connection:
203  amqp_destroy_connection(s->conn);
204 
205  av_freep(&user_decoded);
206  av_freep(&password_decoded);
207  return AVERROR_EXTERNAL;
208 }
209 
210 static int amqp_proto_write(URLContext *h, const unsigned char *buf, int size)
211 {
212  int ret;
213  AMQPContext *s = h->priv_data;
214  int fd = amqp_socket_get_sockfd(s->socket);
215 
216  amqp_bytes_t message = { size, (void *)buf };
217  amqp_basic_properties_t props;
218 
220  if (ret)
221  return ret;
222 
223  props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
224  props.content_type = amqp_cstring_bytes("octet/stream");
225  props.delivery_mode = 2; /* persistent delivery mode */
226 
227  ret = amqp_basic_publish(s->conn, DEFAULT_CHANNEL, amqp_cstring_bytes(s->exchange),
228  amqp_cstring_bytes(s->routing_key), 0, 0,
229  &props, message);
230 
231  if (ret) {
232  av_log(h, AV_LOG_ERROR, "Error publish: %s\n", amqp_error_string2(ret));
233  return AVERROR_EXTERNAL;
234  }
235 
236  return size;
237 }
238 
239 static int amqp_proto_read(URLContext *h, unsigned char *buf, int size)
240 {
241  AMQPContext *s = h->priv_data;
242  int fd = amqp_socket_get_sockfd(s->socket);
243  int ret;
244 
245  amqp_rpc_reply_t broker_reply;
246  amqp_envelope_t envelope;
247 
249  if (ret)
250  return ret;
251 
252  amqp_maybe_release_buffers(s->conn);
253  broker_reply = amqp_consume_message(s->conn, &envelope, NULL, 0);
254 
255  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL)
256  return AVERROR_EXTERNAL;
257 
258  if (envelope.message.body.len > size) {
259  s->pkt_size_overflow = FFMAX(s->pkt_size_overflow, envelope.message.body.len);
260  av_log(h, AV_LOG_WARNING, "Message exceeds space in the buffer. "
261  "Message will be truncated. Setting -pkt_size %d "
262  "may resolve this issue.\n", s->pkt_size_overflow);
263  }
264  size = FFMIN(size, envelope.message.body.len);
265 
266  memcpy(buf, envelope.message.body.bytes, size);
267  amqp_destroy_envelope(&envelope);
268 
269  return size;
270 }
271 
273 {
274  AMQPContext *s = h->priv_data;
275  amqp_channel_close(s->conn, DEFAULT_CHANNEL, AMQP_REPLY_SUCCESS);
276  amqp_connection_close(s->conn, AMQP_REPLY_SUCCESS);
277  amqp_destroy_connection(s->conn);
278 
279  return 0;
280 }
281 
282 static const AVClass amqp_context_class = {
283  .class_name = "amqp",
284  .item_name = av_default_item_name,
285  .option = options,
286  .version = LIBAVUTIL_VERSION_INT,
287 };
288 
290  .name = "amqp",
291  .url_close = amqp_proto_close,
292  .url_open = amqp_proto_open,
293  .url_read = amqp_proto_read,
294  .url_write = amqp_proto_write,
295  .priv_data_size = sizeof(AMQPContext),
296  .priv_data_class = &amqp_context_class,
298 };
void av_url_split(char *proto, int proto_size, char *authorization, int authorization_size, char *hostname, int hostname_size, int *port_ptr, char *path, int path_size, const char *url)
Split a URL string into components.
Definition: utils.c:4792
#define NULL
Definition: coverity.c:32
static int amqp_proto_write(URLContext *h, const unsigned char *buf, int size)
Definition: libamqp.c:210
#define URL_PROTOCOL_FLAG_NETWORK
Definition: url.h:34
AVOption.
Definition: opt.h:246
#define STR_LEN
Definition: libamqp.c:44
#define AV_LOG_WARNING
Something somehow does not look correct.
Definition: log.h:182
#define E
Definition: libamqp.c:49
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
int is_streamed
true if streamed (no seek possible), default = false
Definition: url.h:45
AVIOInterruptCB interrupt_callback
Definition: url.h:47
const char * av_default_item_name(void *ptr)
Return the context name.
Definition: log.c:235
#define AVIO_FLAG_READ
read-only
Definition: avio.h:674
int64_t rw_timeout
maximum time to wait for (network) read/write operation completion, in mcs
Definition: url.h:48
int flags
Definition: url.h:43
amqp_connection_state_t conn
Definition: libamqp.c:35
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:72
const URLProtocol ff_libamqp_protocol
Definition: libamqp.c:289
const char * exchange
Definition: libamqp.c:37
AVOptions.
static int amqp_proto_read(URLContext *h, unsigned char *buf, int size)
Definition: libamqp.c:239
ptrdiff_t size
Definition: opengl_enc.c:100
#define av_log(a,...)
char * ff_urldecode(const char *url, int decode_plus_sign)
Decodes an URL from its percent-encoded form back into normal representation.
Definition: urldecode.c:35
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:176
const char * r
Definition: vf_curves.c:114
#define FFMAX(a, b)
Definition: common.h:94
int ff_network_wait_fd_timeout(int fd, int write, int64_t timeout, AVIOInterruptCB *int_cb)
This works similarly to ff_network_wait_fd, but waits up to &#39;timeout&#39; microseconds Uses ff_network_wa...
Definition: network.c:78
static void envelope(VectorscopeContext *s, AVFrame *out)
#define FFMIN(a, b)
Definition: common.h:96
const char * routing_key
Definition: libamqp.c:38
#define s(width, name)
Definition: cbs_vp9.c:257
#define DEFAULT_CHANNEL
Definition: libamqp.c:45
int pkt_size
Definition: libamqp.c:39
static const AVClass amqp_context_class
Definition: libamqp.c:282
Definition: url.h:38
static int amqp_proto_open(URLContext *h, const char *uri, int flags)
Definition: libamqp.c:58
#define OFFSET(x)
Definition: libamqp.c:47
Describe the class of an AVClass context structure.
Definition: log.h:67
void * priv_data
Definition: url.h:41
amqp_socket_t * socket
Definition: libamqp.c:36
const char * name
Definition: url.h:55
static const AVOption options[]
Definition: libamqp.c:50
#define flags(name, subs,...)
Definition: cbs_av1.c:564
#define D
Definition: libamqp.c:48
Main libavformat public API header.
static int amqp_proto_close(URLContext *h)
Definition: libamqp.c:272
int pkt_size_overflow
Definition: libamqp.c:41
int max_packet_size
if non zero, the stream is packetized with this max packet size
Definition: url.h:44
#define av_freep(p)
unbuffered private I/O API
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later.That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another.Frame references ownership and permissions
#define AVERROR_EXTERNAL
Generic error in an external library.
Definition: error.h:57
int64_t connection_timeout
Definition: libamqp.c:40