[FFmpeg-cvslog] rtmp: Add support for receiving incoming streams
Jordi Ortiz
git at videolan.org
Fri Aug 17 17:10:45 CEST 2012
ffmpeg | branch: master | Jordi Ortiz <nenjordi at gmail.com> | Thu Aug 16 16:04:48 2012 +0200| [e5f2731c736c992948ca7e344ad7cfc93168a03f] | committer: Martin Storsjö
rtmp: Add support for receiving incoming streams
Signed-off-by: Martin Storsjö <martin at martin.st>
> http://git.videolan.org/gitweb.cgi/ffmpeg.git/?a=commit;h=e5f2731c736c992948ca7e344ad7cfc93168a03f
---
doc/protocols.texi | 5 +
libavformat/rtmpproto.c | 513 ++++++++++++++++++++++++++++++++++++++++++++++-
libavformat/version.h | 4 +-
3 files changed, 515 insertions(+), 7 deletions(-)
diff --git a/doc/protocols.texi b/doc/protocols.texi
index ea5706f..086a249 100644
--- a/doc/protocols.texi
+++ b/doc/protocols.texi
@@ -188,6 +188,11 @@ application specified in @var{app}, may be prefixed by "mp4:". You
can override the value parsed from the URI through the @code{rtmp_playpath}
option, too.
+ at item listen
+Act as a server, listening for an incoming connection.
+
+ at item timeout
+Maximum time to wait for the incoming connection. Implies listen.
@end table
Additionally, the following parameters can be set via command line options
diff --git a/libavformat/rtmpproto.c b/libavformat/rtmpproto.c
index db11501..ca871c9 100644
--- a/libavformat/rtmpproto.c
+++ b/libavformat/rtmpproto.c
@@ -29,6 +29,7 @@
#include "libavutil/intfloat.h"
#include "libavutil/lfg.h"
#include "libavutil/opt.h"
+#include "libavutil/random_seed.h"
#include "libavutil/sha.h"
#include "avformat.h"
#include "internal.h"
@@ -51,6 +52,7 @@
#define PLAYPATH_MAX_LENGTH 256
#define TCURL_MAX_LENGTH 512
#define FLASHVER_MAX_LENGTH 64
+#define RTMP_PKTDATA_DEFAULT_SIZE 4096
/** RTMP protocol handler state */
typedef enum {
@@ -59,6 +61,7 @@ typedef enum {
STATE_FCPUBLISH, ///< client FCPublishing stream (for output)
STATE_PLAYING, ///< client has started receiving multimedia data from server
STATE_PUBLISHING, ///< client has started sending multimedia data to server (for output)
+ STATE_RECEIVING, ///< received a publish command (for input)
STATE_STOPPED, ///< the broadcast has been stopped
} ClientState;
@@ -110,6 +113,9 @@ typedef struct RTMPContext {
TrackedMethod*tracked_methods; ///< tracked methods buffer
int nb_tracked_methods; ///< number of tracked methods
int tracked_methods_size; ///< size of the tracked methods buffer
+ int listen; ///< listen mode flag
+ int listen_timeout; ///< listen timeout to wait for new connections
+ int nb_streamid; ///< The next stream id to return on createStream calls
} RTMPContext;
#define PLAYER_KEY_OPEN_PART_LEN 30 ///< length of partial key used for first client digest signing
@@ -377,6 +383,151 @@ static int gen_connect(URLContext *s, RTMPContext *rt)
return rtmp_send_packet(rt, &pkt, 1);
}
+static int read_connect(URLContext *s, RTMPContext *rt)
+{
+ RTMPPacket pkt = { 0 };
+ uint8_t *p;
+ const uint8_t *cp;
+ int ret;
+ char command[64];
+ int stringlen;
+ double seqnum;
+ uint8_t tmpstr[256];
+ GetByteContext gbc;
+
+ if ((ret = ff_rtmp_packet_read(rt->stream, &pkt, rt->in_chunk_size,
+ rt->prev_pkt[1])) < 0)
+ return ret;
+ cp = pkt.data;
+ bytestream2_init(&gbc, cp, pkt.data_size);
+ if (ff_amf_read_string(&gbc, command, sizeof(command), &stringlen)) {
+ av_log(s, AV_LOG_ERROR, "Unable to read command string\n");
+ ff_rtmp_packet_destroy(&pkt);
+ return AVERROR_INVALIDDATA;
+ }
+ if (strcmp(command, "connect")) {
+ av_log(s, AV_LOG_ERROR, "Expecting connect, got %s\n", command);
+ ff_rtmp_packet_destroy(&pkt);
+ return AVERROR_INVALIDDATA;
+ }
+ ret = ff_amf_read_number(&gbc, &seqnum);
+ if (ret)
+ av_log(s, AV_LOG_WARNING, "SeqNum not found\n");
+ /* Here one could parse an AMF Object with data as flashVers and others. */
+ ret = ff_amf_get_field_value(gbc.buffer,
+ gbc.buffer + bytestream2_get_bytes_left(&gbc),
+ "app", tmpstr, sizeof(tmpstr));
+ if (ret)
+ av_log(s, AV_LOG_WARNING, "App field not found in connect\n");
+ if (!ret && strcmp(tmpstr, rt->app))
+ av_log(s, AV_LOG_WARNING, "App field don't match up: %s <-> %s\n",
+ tmpstr, rt->app);
+ ff_rtmp_packet_destroy(&pkt);
+
+ // Send Window Acknowledgement Size (as defined in speficication)
+ if ((ret = ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL,
+ RTMP_PT_SERVER_BW, 0, 4)) < 0)
+ return ret;
+ p = pkt.data;
+ bytestream_put_be32(&p, rt->server_bw);
+ pkt.data_size = p - pkt.data;
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size,
+ rt->prev_pkt[1]);
+ ff_rtmp_packet_destroy(&pkt);
+ if (ret < 0)
+ return ret;
+ // Send Peer Bandwidth
+ if ((ret = ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL,
+ RTMP_PT_CLIENT_BW, 0, 5)) < 0)
+ return ret;
+ p = pkt.data;
+ bytestream_put_be32(&p, rt->server_bw);
+ bytestream_put_byte(&p, 2); // dynamic
+ pkt.data_size = p - pkt.data;
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size,
+ rt->prev_pkt[1]);
+ ff_rtmp_packet_destroy(&pkt);
+ if (ret < 0)
+ return ret;
+
+ // Ping request
+ if ((ret = ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL,
+ RTMP_PT_PING, 0, 6)) < 0)
+ return ret;
+
+ p = pkt.data;
+ bytestream_put_be16(&p, 0); // 0 -> Stream Begin
+ bytestream_put_be32(&p, 0);
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size,
+ rt->prev_pkt[1]);
+ ff_rtmp_packet_destroy(&pkt);
+ if (ret < 0)
+ return ret;
+
+ // Chunk size
+ if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL,
+ RTMP_PT_CHUNK_SIZE, 0, 4)) < 0)
+ return ret;
+
+ p = pkt.data;
+ bytestream_put_be32(&p, rt->out_chunk_size);
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size,
+ rt->prev_pkt[1]);
+ ff_rtmp_packet_destroy(&pkt);
+ if (ret < 0)
+ return ret;
+
+ // Send result_ NetConnection.Connect.Success to connect
+ if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL,
+ RTMP_PT_INVOKE, 0,
+ RTMP_PKTDATA_DEFAULT_SIZE)) < 0)
+ return ret;
+
+ p = pkt.data;
+ ff_amf_write_string(&p, "_result");
+ ff_amf_write_number(&p, seqnum);
+
+ ff_amf_write_object_start(&p);
+ ff_amf_write_field_name(&p, "fmsVer");
+ ff_amf_write_string(&p, "FMS/3,0,1,123");
+ ff_amf_write_field_name(&p, "capabilities");
+ ff_amf_write_number(&p, 31);
+ ff_amf_write_object_end(&p);
+
+ ff_amf_write_object_start(&p);
+ ff_amf_write_field_name(&p, "level");
+ ff_amf_write_string(&p, "status");
+ ff_amf_write_field_name(&p, "code");
+ ff_amf_write_string(&p, "NetConnection.Connect.Success");
+ ff_amf_write_field_name(&p, "description");
+ ff_amf_write_string(&p, "Connection succeeded.");
+ ff_amf_write_field_name(&p, "objectEncoding");
+ ff_amf_write_number(&p, 0);
+ ff_amf_write_object_end(&p);
+
+ pkt.data_size = p - pkt.data;
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size,
+ rt->prev_pkt[1]);
+ ff_rtmp_packet_destroy(&pkt);
+ if (ret < 0)
+ return ret;
+
+ if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL,
+ RTMP_PT_INVOKE, 0, 30)) < 0)
+ return ret;
+ p = pkt.data;
+ ff_amf_write_string(&p, "onBWDone");
+ ff_amf_write_number(&p, 0);
+ ff_amf_write_null(&p);
+ ff_amf_write_number(&p, 8192);
+ pkt.data_size = p - pkt.data;
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size,
+ rt->prev_pkt[1]);
+ ff_rtmp_packet_destroy(&pkt);
+
+ return ret;
+}
+
/**
* Generate 'releaseStream' call and send it to the server. It should make
* the server release some channel for media streams.
@@ -1138,6 +1289,123 @@ static int rtmp_handshake(URLContext *s, RTMPContext *rt)
return 0;
}
+static int rtmp_receive_hs_packet(RTMPContext* rt, uint32_t *first_int,
+ uint32_t *second_int, char *arraydata,
+ int size)
+{
+ ssize_t inoutsize;
+
+ inoutsize = ffurl_read_complete(rt->stream, arraydata,
+ RTMP_HANDSHAKE_PACKET_SIZE);
+ if (inoutsize <= 0)
+ return AVERROR(EIO);
+ if (inoutsize != RTMP_HANDSHAKE_PACKET_SIZE) {
+ av_log(rt, AV_LOG_ERROR, "Erroneous Message size %d"
+ " not following standard\n", (int)inoutsize);
+ return AVERROR(EINVAL);
+ }
+
+ *first_int = AV_RB32(arraydata);
+ *second_int = AV_RB32(arraydata + 4);
+ return 0;
+}
+
+static int rtmp_send_hs_packet(RTMPContext* rt, uint32_t first_int,
+ uint32_t second_int, char *arraydata, int size)
+{
+ ssize_t inoutsize;
+
+ AV_WB32(arraydata, first_int);
+ AV_WB32(arraydata + 4, first_int);
+ inoutsize = ffurl_write(rt->stream, arraydata,
+ RTMP_HANDSHAKE_PACKET_SIZE);
+ if (inoutsize != RTMP_HANDSHAKE_PACKET_SIZE) {
+ av_log(rt, AV_LOG_ERROR, "Unable to write answer\n");
+ return AVERROR(EIO);
+ }
+
+ return 0;
+}
+
+/**
+ * rtmp handshake server side
+ */
+static int rtmp_server_handshake(URLContext *s, RTMPContext *rt)
+{
+ uint8_t buffer[RTMP_HANDSHAKE_PACKET_SIZE];
+ uint32_t hs_epoch;
+ uint32_t hs_my_epoch;
+ uint8_t hs_c1[RTMP_HANDSHAKE_PACKET_SIZE];
+ uint8_t hs_s1[RTMP_HANDSHAKE_PACKET_SIZE];
+ uint32_t zeroes;
+ uint32_t temp = 0;
+ int randomidx = 0;
+ ssize_t inoutsize = 0;
+ int ret;
+
+ inoutsize = ffurl_read_complete(rt->stream, buffer, 1); // Receive C0
+ if (inoutsize <= 0) {
+ av_log(s, AV_LOG_ERROR, "Unable to read handshake\n");
+ return AVERROR(EIO);
+ }
+ // Check Version
+ if (buffer[0] != 3) {
+ av_log(s, AV_LOG_ERROR, "RTMP protocol version mismatch\n");
+ return AVERROR(EIO);
+ }
+ if (ffurl_write(rt->stream, buffer, 1) <= 0) { // Send S0
+ av_log(s, AV_LOG_ERROR,
+ "Unable to write answer - RTMP S0\n");
+ return AVERROR(EIO);
+ }
+ /* Receive C1 */
+ ret = rtmp_receive_hs_packet(rt, &hs_epoch, &zeroes, hs_c1,
+ RTMP_HANDSHAKE_PACKET_SIZE);
+ if (ret) {
+ av_log(s, AV_LOG_ERROR, "RTMP Handshake C1 Error\n");
+ return ret;
+ }
+ if (zeroes)
+ av_log(s, AV_LOG_WARNING, "Erroneous C1 Message zero != 0\n");
+ /* Send S1 */
+ /* By now same epoch will be sent */
+ hs_my_epoch = hs_epoch;
+ /* Generate random */
+ for (randomidx = 0; randomidx < (RTMP_HANDSHAKE_PACKET_SIZE);
+ randomidx += 4)
+ AV_WB32(hs_s1 + 8 + randomidx, av_get_random_seed());
+
+ ret = rtmp_send_hs_packet(rt, hs_my_epoch, 0, hs_s1,
+ RTMP_HANDSHAKE_PACKET_SIZE);
+ if (ret) {
+ av_log(s, AV_LOG_ERROR, "RTMP Handshake S1 Error\n");
+ return ret;
+ }
+ /* Send S2 */
+ ret = rtmp_send_hs_packet(rt, hs_epoch, 0, hs_c1,
+ RTMP_HANDSHAKE_PACKET_SIZE);
+ if (ret) {
+ av_log(s, AV_LOG_ERROR, "RTMP Handshake S2 Error\n");
+ return ret;
+ }
+ /* Receive C2 */
+ ret = rtmp_receive_hs_packet(rt, &temp, &zeroes, buffer,
+ RTMP_HANDSHAKE_PACKET_SIZE);
+ if (ret) {
+ av_log(s, AV_LOG_ERROR, "RTMP Handshake C2 Error\n");
+ return ret;
+ }
+ if (temp != hs_my_epoch)
+ av_log(s, AV_LOG_WARNING,
+ "Erroneous C2 Message epoch does not match up with C1 epoch\n");
+ if (memcmp(buffer + 8, hs_s1 + 8,
+ RTMP_HANDSHAKE_PACKET_SIZE - 8))
+ av_log(s, AV_LOG_WARNING,
+ "Erroneous C2 Message random does not match up\n");
+
+ return 0;
+}
+
static int handle_chunk_size(URLContext *s, RTMPPacket *pkt)
{
RTMPContext *rt = s->priv_data;
@@ -1270,6 +1538,139 @@ static int handle_invoke_error(URLContext *s, RTMPPacket *pkt)
return ret;
}
+static int send_invoke_response(URLContext *s, RTMPPacket *pkt)
+{
+ RTMPContext *rt = s->priv_data;
+ double seqnum;
+ char filename[64];
+ char command[64];
+ char statusmsg[128];
+ int stringlen;
+ char *pchar;
+ const uint8_t *p = pkt->data;
+ uint8_t *pp = NULL;
+ RTMPPacket spkt = { 0 };
+ GetByteContext gbc;
+ int ret;
+
+ bytestream2_init(&gbc, p, pkt->data_size);
+ if (ff_amf_read_string(&gbc, command, sizeof(command),
+ &stringlen)) {
+ av_log(s, AV_LOG_ERROR, "Error in PT_INVOKE\n");
+ return AVERROR_INVALIDDATA;
+ }
+
+ ret = ff_amf_read_number(&gbc, &seqnum);
+ if (ret)
+ return ret;
+ ret = ff_amf_read_null(&gbc);
+ if (ret)
+ return ret;
+ if (!strcmp(command, "FCPublish") ||
+ !strcmp(command, "publish")) {
+ ret = ff_amf_read_string(&gbc, filename,
+ sizeof(filename), &stringlen);
+ // check with url
+ if (s->filename) {
+ pchar = strrchr(s->filename, '/');
+ if (!pchar) {
+ av_log(s, AV_LOG_WARNING,
+ "Unable to find / in url %s, bad format\n",
+ s->filename);
+ pchar = s->filename;
+ }
+ pchar++;
+ if (strcmp(pchar, filename))
+ av_log(s, AV_LOG_WARNING, "Unexpected stream %s, expecting"
+ " %s\n", filename, pchar);
+ }
+ rt->state = STATE_RECEIVING;
+ }
+
+ if (!strcmp(command, "FCPublish")) {
+ if ((ret = ff_rtmp_packet_create(&spkt, RTMP_SYSTEM_CHANNEL,
+ RTMP_PT_INVOKE, 0,
+ RTMP_PKTDATA_DEFAULT_SIZE)) < 0) {
+ av_log(s, AV_LOG_ERROR, "Unable to create response packet\n");
+ return ret;
+ }
+ pp = spkt.data;
+ ff_amf_write_string(&pp, "onFCPublish");
+ } else if (!strcmp(command, "publish")) {
+ PutByteContext pbc;
+ // Send Stream Begin 1
+ if ((ret = ff_rtmp_packet_create(&spkt, RTMP_NETWORK_CHANNEL,
+ RTMP_PT_PING, 0, 6)) < 0) {
+ av_log(s, AV_LOG_ERROR, "Unable to create response packet\n");
+ return ret;
+ }
+ pp = spkt.data;
+ bytestream2_init_writer(&pbc, pp, spkt.data_size);
+ bytestream2_put_be16(&pbc, 0); // 0 -> Stream Begin
+ bytestream2_put_be32(&pbc, rt->nb_streamid);
+ ret = ff_rtmp_packet_write(rt->stream, &spkt, rt->out_chunk_size,
+ rt->prev_pkt[1]);
+ ff_rtmp_packet_destroy(&spkt);
+ if (ret < 0)
+ return ret;
+
+ // Send onStatus(NetStream.Publish.Start)
+ if ((ret = ff_rtmp_packet_create(&spkt, RTMP_SYSTEM_CHANNEL,
+ RTMP_PT_INVOKE, 0,
+ RTMP_PKTDATA_DEFAULT_SIZE)) < 0) {
+ av_log(s, AV_LOG_ERROR, "Unable to create response packet\n");
+ return ret;
+ }
+ spkt.extra = pkt->extra;
+ pp = spkt.data;
+ ff_amf_write_string(&pp, "onStatus");
+ ff_amf_write_number(&pp, 0);
+ ff_amf_write_null(&pp);
+
+ ff_amf_write_object_start(&pp);
+ ff_amf_write_field_name(&pp, "level");
+ ff_amf_write_string(&pp, "status");
+ ff_amf_write_field_name(&pp, "code");
+ ff_amf_write_string(&pp, "NetStream.Publish.Start");
+ ff_amf_write_field_name(&pp, "description");
+ snprintf(statusmsg, sizeof(statusmsg),
+ "%s is now published", filename);
+ ff_amf_write_string(&pp, statusmsg);
+ ff_amf_write_field_name(&pp, "details");
+ ff_amf_write_string(&pp, filename);
+ ff_amf_write_field_name(&pp, "clientid");
+ snprintf(statusmsg, sizeof(statusmsg), "%s", LIBAVFORMAT_IDENT);
+ ff_amf_write_string(&pp, statusmsg);
+ ff_amf_write_object_end(&pp);
+
+ } else {
+ if ((ret = ff_rtmp_packet_create(&spkt, RTMP_SYSTEM_CHANNEL,
+ RTMP_PT_INVOKE, 0,
+ RTMP_PKTDATA_DEFAULT_SIZE)) < 0) {
+ av_log(s, AV_LOG_ERROR, "Unable to create response packet\n");
+ return ret;
+ }
+ pp = spkt.data;
+ ff_amf_write_string(&pp, "_result");
+ ff_amf_write_number(&pp, seqnum);
+ ff_amf_write_null(&pp);
+ if (!strcmp(command, "createStream")) {
+ rt->nb_streamid++;
+ if (rt->nb_streamid == 0 || rt->nb_streamid == 2)
+ rt->nb_streamid++; /* Values 0 and 2 are reserved */
+ ff_amf_write_number(&pp, rt->nb_streamid);
+ /* By now we don't control which streams are removed in
+ * deleteStream. There is no stream creation control
+ * if a client creates more than 2^32 - 2 streams. */
+ }
+ }
+ spkt.data_size = pp - spkt.data;
+ ret = ff_rtmp_packet_write(rt->stream, &spkt, rt->out_chunk_size,
+ rt->prev_pkt[1]);
+ ff_rtmp_packet_destroy(&spkt);
+ return ret;
+}
+
static int handle_invoke_result(URLContext *s, RTMPPacket *pkt)
{
RTMPContext *rt = s->priv_data;
@@ -1384,11 +1785,79 @@ static int handle_invoke(URLContext *s, RTMPPacket *pkt)
} else if (!memcmp(pkt->data, "\002\000\010onBWDone", 11)) {
if ((ret = gen_check_bw(s, rt)) < 0)
return ret;
+ } else if (!memcmp(pkt->data, "\002\000\015releaseStream", 16) ||
+ !memcmp(pkt->data, "\002\000\011FCPublish", 12) ||
+ !memcmp(pkt->data, "\002\000\007publish", 10) ||
+ !memcmp(pkt->data, "\002\000\010_checkbw", 11) ||
+ !memcmp(pkt->data, "\002\000\014createStream", 15)) {
+ if (ret = send_invoke_response(s, pkt) < 0)
+ return ret;
}
return ret;
}
+static int handle_notify(URLContext *s, RTMPPacket *pkt) {
+ RTMPContext *rt = s->priv_data;
+ const uint8_t *p = NULL;
+ uint8_t *cp = NULL;
+ uint8_t commandbuffer[64];
+ char statusmsg[128];
+ int stringlen;
+ GetByteContext gbc;
+ PutByteContext pbc;
+ uint32_t ts;
+ int old_flv_size;
+ const uint8_t *datatowrite;
+ unsigned datatowritelength;
+
+ p = pkt->data;
+ bytestream2_init(&gbc, p, pkt->data_size);
+ if (ff_amf_read_string(&gbc, commandbuffer, sizeof(commandbuffer),
+ &stringlen))
+ return AVERROR_INVALIDDATA;
+ if (!strcmp(commandbuffer, "@setDataFrame")) {
+ datatowrite = gbc.buffer;
+ datatowritelength = bytestream2_get_bytes_left(&gbc);
+ if (ff_amf_read_string(&gbc, statusmsg,
+ sizeof(statusmsg), &stringlen))
+ return AVERROR_INVALIDDATA;
+ if (strcmp(statusmsg, "onMetaData")) {
+ av_log(s, AV_LOG_INFO, "Expecting onMetadata but got %s\n",
+ statusmsg);
+ return 0;
+ }
+
+ /* Provide ECMAArray to flv */
+ ts = pkt->timestamp;
+
+ // generate packet header and put data into buffer for FLV demuxer
+ if (rt->flv_off < rt->flv_size) {
+ old_flv_size = rt->flv_size;
+ rt->flv_size += datatowritelength + 15;
+ } else {
+ old_flv_size = 0;
+ rt->flv_size = datatowritelength + 15;
+ rt->flv_off = 0;
+ }
+
+ cp = av_realloc(rt->flv_data, rt->flv_size);
+ if (!cp)
+ return AVERROR(ENOMEM);
+ rt->flv_data = cp;
+ bytestream2_init_writer(&pbc, cp, rt->flv_size);
+ bytestream2_skip_p(&pbc, old_flv_size);
+ bytestream2_put_byte(&pbc, pkt->type);
+ bytestream2_put_be24(&pbc, datatowritelength);
+ bytestream2_put_be24(&pbc, ts);
+ bytestream2_put_byte(&pbc, ts >> 24);
+ bytestream2_put_be24(&pbc, 0);
+ bytestream2_put_buffer(&pbc, datatowrite, datatowritelength);
+ bytestream2_put_be32(&pbc, 0);
+ }
+ return 0;
+}
+
/**
* Parse received packet and possibly perform some action depending on
* the packet contents.
@@ -1430,6 +1899,7 @@ static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt)
case RTMP_PT_VIDEO:
case RTMP_PT_AUDIO:
case RTMP_PT_METADATA:
+ case RTMP_PT_NOTIFY:
/* Audio, Video and Metadata packets are parsed in get_packet() */
break;
default:
@@ -1489,7 +1959,9 @@ static int get_packet(URLContext *s, int for_header)
ff_rtmp_packet_destroy(&rpkt);
return AVERROR_EOF;
}
- if (for_header && (rt->state == STATE_PLAYING || rt->state == STATE_PUBLISHING)) {
+ if (for_header && (rt->state == STATE_PLAYING ||
+ rt->state == STATE_PUBLISHING ||
+ rt->state == STATE_RECEIVING)) {
ff_rtmp_packet_destroy(&rpkt);
return 0;
}
@@ -1514,6 +1986,14 @@ static int get_packet(URLContext *s, int for_header)
bytestream_put_be32(&p, 0);
ff_rtmp_packet_destroy(&rpkt);
return 0;
+ } else if (rpkt.type == RTMP_PT_NOTIFY) {
+ ret = handle_notify(s, &rpkt);
+ ff_rtmp_packet_destroy(&rpkt);
+ if (ret) {
+ av_log(s, AV_LOG_ERROR, "Handle notify error\n");
+ return ret;
+ }
+ return 0;
} else if (rpkt.type == RTMP_PT_METADATA) {
// we got raw FLV data, make it available for FLV demuxer
rt->flv_off = 0;
@@ -1584,11 +2064,19 @@ static int rtmp_open(URLContext *s, const char *uri, int flags)
AVDictionary *opts = NULL;
int ret;
+ if (rt->listen_timeout > 0)
+ rt->listen = 1;
+
rt->is_input = !(flags & AVIO_FLAG_WRITE);
av_url_split(proto, sizeof(proto), NULL, 0, hostname, sizeof(hostname), &port,
path, sizeof(path), s->filename);
+ if (rt->listen && strcmp(proto, "rtmp")) {
+ av_log(s, AV_LOG_ERROR, "rtmp_listen not available for %s\n",
+ proto);
+ return AVERROR(EINVAL);
+ }
if (!strcmp(proto, "rtmpt") || !strcmp(proto, "rtmpts")) {
if (!strcmp(proto, "rtmpts"))
av_dict_set(&opts, "ffrtmphttp_tls", "1", 1);
@@ -1611,7 +2099,12 @@ static int rtmp_open(URLContext *s, const char *uri, int flags)
/* open the tcp connection */
if (port < 0)
port = RTMP_DEFAULT_PORT;
- ff_url_join(buf, sizeof(buf), "tcp", NULL, hostname, port, NULL);
+ if (rt->listen)
+ ff_url_join(buf, sizeof(buf), "tcp", NULL, hostname, port,
+ "?listen&listen_timeout=%d",
+ rt->listen_timeout * 1000);
+ else
+ ff_url_join(buf, sizeof(buf), "tcp", NULL, hostname, port, NULL);
}
if ((ret = ffurl_open(&rt->stream, buf, AVIO_FLAG_READ_WRITE,
@@ -1626,7 +2119,9 @@ static int rtmp_open(URLContext *s, const char *uri, int flags)
}
rt->state = STATE_START;
- if ((ret = rtmp_handshake(s, rt)) < 0)
+ if (!rt->listen && (ret = rtmp_handshake(s, rt)) < 0)
+ goto fail;
+ if (rt->listen && (ret = rtmp_server_handshake(s, rt)) < 0)
goto fail;
rt->out_chunk_size = 128;
@@ -1726,8 +2221,14 @@ static int rtmp_open(URLContext *s, const char *uri, int flags)
av_log(s, AV_LOG_DEBUG, "Proto = %s, path = %s, app = %s, fname = %s\n",
proto, path, rt->app, rt->playpath);
- if ((ret = gen_connect(s, rt)) < 0)
- goto fail;
+ if (!rt->listen) {
+ if ((ret = gen_connect(s, rt)) < 0)
+ goto fail;
+ } else {
+ if (read_connect(s, s->priv_data) < 0)
+ goto fail;
+ rt->is_input = 1;
+ }
do {
ret = get_packet(s, 1);
@@ -1919,6 +2420,8 @@ static const AVOption rtmp_options[] = {
{"rtmp_swfurl", "URL of the SWF player. By default no value will be sent", OFFSET(swfurl), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC|ENC},
{"rtmp_swfverify", "URL to player swf file, compute hash/size automatically.", OFFSET(swfverify), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC},
{"rtmp_tcurl", "URL of the target stream. Defaults to proto://host[:port]/app.", OFFSET(tcurl), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC|ENC},
+ {"rtmp_listen", "Listen for incoming rtmp connections", OFFSET(listen), AV_OPT_TYPE_INT, {0}, INT_MIN, INT_MAX, DEC, "rtmp_listen" },
+ {"timeout", "Maximum timeout (in seconds) to wait for incoming connections. -1 is infinite. Implies -rtmp_listen 1", OFFSET(listen_timeout), AV_OPT_TYPE_INT, {-1}, INT_MIN, INT_MAX, DEC, "rtmp_listen" },
{ NULL },
};
diff --git a/libavformat/version.h b/libavformat/version.h
index e54f22e..1bc9ee7 100644
--- a/libavformat/version.h
+++ b/libavformat/version.h
@@ -30,8 +30,8 @@
#include "libavutil/avutil.h"
#define LIBAVFORMAT_VERSION_MAJOR 54
-#define LIBAVFORMAT_VERSION_MINOR 13
-#define LIBAVFORMAT_VERSION_MICRO 4
+#define LIBAVFORMAT_VERSION_MINOR 14
+#define LIBAVFORMAT_VERSION_MICRO 0
#define LIBAVFORMAT_VERSION_INT AV_VERSION_INT(LIBAVFORMAT_VERSION_MAJOR, \
LIBAVFORMAT_VERSION_MINOR, \
More information about the ffmpeg-cvslog
mailing list