23 #include <amqp_tcp_socket.h> 35 amqp_connection_state_t
conn;
46 #define DEFAULT_CHANNEL 1 48 #define OFFSET(x) offsetof(AMQPContext, x) 49 #define D AV_OPT_FLAG_DECODING_PARAM 50 #define E AV_OPT_FLAG_ENCODING_PARAM 57 {
"persistent",
"Persistent delivery mode", 0,
AV_OPT_TYPE_CONST, { .i64 = AMQP_DELIVERY_PERSISTENT }, 0, 0,
E,
"delivery_mode" },
58 {
"non-persistent",
"Non-persistent delivery mode", 0,
AV_OPT_TYPE_CONST, { .i64 = AMQP_DELIVERY_NONPERSISTENT }, 0, 0,
E,
"delivery_mode" },
67 const char *user, *password =
NULL, *vhost;
68 const char *user_decoded, *password_decoded, *vhost_decoded;
70 amqp_rpc_reply_t broker_reply;
71 struct timeval tval = { 0 };
79 hostname,
sizeof(hostname), &port, path,
sizeof(path), uri);
84 if (hostname[0] ==
'\0' || port <= 0 || port > 65535 ) {
89 p = strchr(credentials,
':');
95 if (!password || *password ==
'\0')
99 if (!password_decoded)
113 p = strchr(path,
'?');
124 if (!vhost_decoded) {
130 s->
conn = amqp_new_connection();
142 goto destroy_connection;
150 ret = amqp_socket_open_noblock(s->
socket, hostname, port, &tval);
154 amqp_error_string2(ret));
155 goto destroy_connection;
158 broker_reply = amqp_login(s->
conn, vhost_decoded, 0, s->
pkt_size, 0,
159 AMQP_SASL_METHOD_PLAIN, user_decoded, password_decoded);
161 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
163 server_msg = AMQP_ACCESS_REFUSED;
164 goto close_connection;
168 broker_reply = amqp_get_rpc_reply(s->
conn);
170 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
172 server_msg = AMQP_CHANNEL_ERROR;
173 goto close_connection;
177 amqp_bytes_t queuename;
179 amqp_queue_declare_ok_t *
r;
182 0, 0, 0, 1, amqp_empty_table);
183 broker_reply = amqp_get_rpc_reply(s->
conn);
184 if (!r || broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
186 server_msg = AMQP_RESOURCE_ERROR;
191 queuename.bytes = queuename_buff;
193 memcpy(queuename.bytes, r->queue.bytes, queuename.len);
197 amqp_cstring_bytes(s->
routing_key), amqp_empty_table);
199 broker_reply = amqp_get_rpc_reply(s->
conn);
200 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
202 server_msg = AMQP_INTERNAL_ERROR;
207 0, 1, 0, amqp_empty_table);
209 broker_reply = amqp_get_rpc_reply(s->
conn);
210 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
212 server_msg = AMQP_INTERNAL_ERROR;
225 amqp_connection_close(s->
conn, server_msg);
227 amqp_destroy_connection(s->
conn);
239 int fd = amqp_socket_get_sockfd(s->
socket);
242 amqp_basic_properties_t props;
248 props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
249 props.content_type = amqp_cstring_bytes(
"octet/stream");
267 int fd = amqp_socket_get_sockfd(s->
socket);
270 amqp_rpc_reply_t broker_reply;
277 amqp_maybe_release_buffers(s->
conn);
278 broker_reply = amqp_consume_message(s->
conn, &envelope,
NULL, 0);
280 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL)
283 if (envelope.message.body.len > size) {
286 "Message will be truncated. Setting -pkt_size %d " 289 size =
FFMIN(size, envelope.message.body.len);
291 memcpy(buf, envelope.message.body.bytes, size);
292 amqp_destroy_envelope(&envelope);
301 amqp_connection_close(s->
conn, AMQP_REPLY_SUCCESS);
302 amqp_destroy_connection(s->
conn);
321 .priv_data_class = &amqp_context_class,
void av_url_split(char *proto, int proto_size, char *authorization, int authorization_size, char *hostname, int hostname_size, int *port_ptr, char *path, int path_size, const char *url)
Split a URL string into components.
static int amqp_proto_write(URLContext *h, const unsigned char *buf, int size)
#define URL_PROTOCOL_FLAG_NETWORK
#define AV_LOG_WARNING
Something somehow does not look correct.
#define LIBAVUTIL_VERSION_INT
int is_streamed
true if streamed (no seek possible), default = false
AVIOInterruptCB interrupt_callback
const char * av_default_item_name(void *ptr)
Return the context name.
#define AVIO_FLAG_READ
read-only
int64_t rw_timeout
maximum time to wait for (network) read/write operation completion, in mcs
amqp_connection_state_t conn
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
const URLProtocol ff_libamqp_protocol
static int amqp_proto_read(URLContext *h, unsigned char *buf, int size)
char * ff_urldecode(const char *url, int decode_plus_sign)
Decodes an URL from its percent-encoded form back into normal representation.
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
int ff_network_wait_fd_timeout(int fd, int write, int64_t timeout, AVIOInterruptCB *int_cb)
This works similarly to ff_network_wait_fd, but waits up to 'timeout' microseconds Uses ff_network_wa...
static void envelope(VectorscopeContext *s, AVFrame *out)
static const AVClass amqp_context_class
static int amqp_proto_open(URLContext *h, const char *uri, int flags)
Describe the class of an AVClass context structure.
static const AVOption options[]
#define flags(name, subs,...)
static int amqp_proto_close(URLContext *h)
int max_packet_size
if non zero, the stream is packetized with this max packet size
unbuffered private I/O API
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later.That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another.Frame references ownership and permissions
#define AVERROR_EXTERNAL
Generic error in an external library.
int64_t connection_timeout