[FFmpeg-devel] [PATCH] avformat/libsrt: Adding support for multiple clients as a server
Zhao Zhili
quinkblack at foxmail.com
Mon Sep 23 15:34:44 EEST 2024
> On Sep 19, 2024, at 21:46, nicolas.dato at gmail.com wrote:
>
> From: Nicolas Jorge Dato <nicolas.dato at gmail.com>
>
> When in listener mode and writing, now libsrt supports multiple clients
> configured with the max_clients parameter.
>
> When max_clients=1 (default), it behaves as before.
> When max_clientes > 1, after accepting the first client it launches
> a thread to listen for more clients, and a launches a new thread for
> each client.
> ---
> libavformat/libsrt.c | 297 ++++++++++++++++++++++++++++++++++++++-----
> 1 file changed, 265 insertions(+), 32 deletions(-)
>
> diff --git a/libavformat/libsrt.c b/libavformat/libsrt.c
> index 9e860abccd..c9392e6996 100644
> --- a/libavformat/libsrt.c
> +++ b/libavformat/libsrt.c
> @@ -21,8 +21,11 @@
> * Haivision Open SRT (Secure Reliable Transport) protocol
> */
>
> +#include <pthread.h>
> +#include <semaphore.h>
> #include <srt/srt.h>
The implementation isn't portable.
Although FFmpeg has ffserver once upon a time, the IO in libavformat isn’t designed
to be used as a server. I'm afraid a real server (e.g, srs-simple realtime server)
is more appropriate than stretch libavformat.
>
> +#include "libavutil/fifo.h"
> #include "libavutil/mem.h"
> #include "libavutil/opt.h"
> #include "libavutil/parseutils.h"
> @@ -51,10 +54,22 @@ enum SRTMode {
> SRT_MODE_RENDEZVOUS = 2
> };
>
> -typedef struct SRTContext {
> - const AVClass *class;
> +typedef struct SRTClientContext {
> + int set;
> + int ended;
> + URLContext *h;
> + pthread_t thread;
> + int payload_size;
> int fd;
> int eid;
> + sem_t msg;
> + AVFifo *fifo;
> +} SRTClientContext;
> +
> +typedef struct SRTContext {
> + const AVClass *class;
> + int *fd;
> + int *eid;
> int64_t rw_timeout;
> int64_t listen_timeout;
> int recv_buffer_size;
> @@ -93,6 +108,13 @@ typedef struct SRTContext {
> SRT_TRANSTYPE transtype;
> int linger;
> int tsbpd;
> + pthread_mutex_t accept_mutex;
> + pthread_t accept_thread;
> + int listen_fd;
> + int listen_eid;
> + SRTClientContext *client_context;
> + int max_clients;
> + int close_threads;
> } SRTContext;
>
> #define D AV_OPT_FLAG_DECODING_PARAM
> @@ -146,6 +168,7 @@ static const AVOption libsrt_options[] = {
> { "file", NULL, 0, AV_OPT_TYPE_CONST, { .i64 = SRTT_FILE }, INT_MIN, INT_MAX, .flags = D|E, .unit = "transtype" },
> { "linger", "Number of seconds that the socket waits for unsent data when closing", OFFSET(linger), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E },
> { "tsbpd", "Timestamp-based packet delivery", OFFSET(tsbpd), AV_OPT_TYPE_BOOL, { .i64 = -1 }, -1, 1, .flags = D|E },
> + { "max_clients", "Maximum simultaneous clients when mode=listener and writing packages", OFFSET(max_clients), AV_OPT_TYPE_INT, { .i64 = 1 }, 1, INT_MAX, .flags = E },
> { NULL }
> };
>
> @@ -235,13 +258,159 @@ static int libsrt_network_wait_fd_timeout(URLContext *h, int eid, int write, int
> }
> }
>
> -static int libsrt_listen(int eid, int fd, const struct sockaddr *addr, socklen_t addrlen, URLContext *h, int64_t timeout)
> +static int libsrt_accept(int fd, URLContext *h)
> {
> int ret;
> - int reuse = 1;
> /* Max streamid length plus an extra space for the terminating null character */
> char streamid[513];
> int streamid_len = sizeof(streamid);
> +
> + ret = srt_accept(fd, NULL, NULL);
> + if (ret < 0)
> + return libsrt_neterrno(h);
> + if (libsrt_socket_nonblock(ret, 1) < 0)
> + av_log(h, AV_LOG_DEBUG, "libsrt_socket_nonblock failed\n");
> + if (!libsrt_getsockopt(h, ret, SRTO_STREAMID, "SRTO_STREAMID", streamid, &streamid_len))
> + /* Note: returned streamid_len doesn't count the terminating null character */
> + av_log(h, AV_LOG_VERBOSE, "accept streamid [%s], length %d\n", streamid, streamid_len);
> +
> + return ret;
> +}
> +
> +static int libsrt_write_common(URLContext *h, int fd, int eid, const uint8_t *buf, int size)
> +{
> + int ret;
> +
> + if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
> + ret = libsrt_network_wait_fd_timeout(h, eid, 1, h->rw_timeout, &h->interrupt_callback);
> + if (ret)
> + return ret;
> + }
> +
> + ret = srt_sendmsg(fd, buf, size, -1, 1);
> + if (ret < 0) {
> + ret = libsrt_neterrno(h);
> + }
> +
> + return ret;
> +}
> +
> +static void *libsrt_client_thread(void *_SRTClientContext)
> +{
> + SRTClientContext *c = _SRTClientContext;
> + URLContext *h = c->h;
> + SRTContext *s = h->priv_data;
> + uint8_t *buf;
> + int ret;
> +
> + buf = av_malloc(c->payload_size);
> + if (buf == NULL) {
> + av_log(h, AV_LOG_ERROR, "%s\n", av_err2str(AVERROR(ENOMEM));
> + return NULL;
> + }
> + while (!s->close_threads) {
> + sem_wait(&c->msg);
> + while (!s->close_threads && av_fifo_read(c->fifo, buf, c->payload_size) >= 0) {
> + do {
> + ret = libsrt_write_common(h, c->fd, c->eid, buf, c->payload_size);
> + } while(ret == AVERROR(EAGAIN) && !s->close_threads);
> + if (ret < 0 && ret != AVERROR(EAGAIN)) {
> + av_log(h, AV_LOG_INFO, "ending client thread with error ret %s\n", av_err2str(ret));
> + goto end;
> + }
> + }
> + }
> +end:
> + av_freep(&buf);
> + c->ended = 1;
> +
> + return NULL;
> +}
> +
> +static int libsrt_launch_client_thread(URLContext *h, SRTClientContext *c, int fd, int eid)
> +{
> + SRTContext *s = h->priv_data;
> +
> + c->set = 1;
> + c->ended = 0;
> + c->h = h;
> + c->fd = fd;
> + c->eid = eid;
> + c->payload_size = SRT_LIVE_DEFAULT_PAYLOAD_SIZE;
> + if (s->payload_size > 0) {
> + c->payload_size = s->payload_size;
> + }
> + c->fifo = av_fifo_alloc2(c->payload_size * 4096, 1, 0);
> + sem_init(&c->msg, 0, 0);
> + pthread_create(&c->thread, NULL, libsrt_client_thread, c);
> +
> + return 0;
> +}
> +
> +static int libsrt_close_client_thread(URLContext *h, SRTClientContext *c)
> +{
> + pthread_join(c->thread, NULL);
> + sem_destroy(&c->msg);
> + av_fifo_freep2(&c->fifo);
> + srt_epoll_release(c->eid);
> + srt_close(c->fd);
> + c->ended = 0;
> + c->set = 0;
> + return 0;
> +}
> +
> +static void *libsrt_accept_thread(void *_URLContext)
> +{
> + URLContext *h = _URLContext;
> + SRTContext *s = h->priv_data;
> + int i;
> + int ret;
> + int client_fd;
> +
> + while (!s->close_threads) {
> + pthread_mutex_lock(&s->accept_mutex);
> + for (i = 0; i < s->max_clients; i++) {
> + if (s->client_context[i].set && s->client_context[i].ended) {
> + av_log(h, AV_LOG_DEBUG, "closing client thread idx %d\n", i);
> + libsrt_close_client_thread(h, &s->client_context[i]);
> + }
> + }
> + pthread_mutex_unlock(&s->accept_mutex);
> +
> + ret = libsrt_network_wait_fd_timeout(h, s->listen_eid, 0, 250000, &h->interrupt_callback);
> + if (ret < 0)
> + continue;
> +
> + client_fd = ret = libsrt_accept(s->listen_fd, h);
> + if (ret < 0)
> + continue;
> + av_log(h, AV_LOG_DEBUG, "new client connection\n");
> + pthread_mutex_lock(&s->accept_mutex);
> + for (i = 0; i < s->max_clients; i++) {
> + if (!s->client_context[i].set) {
> + s->fd[i] = client_fd;
> + s->eid[i] = libsrt_epoll_create(h, client_fd, 1);
> + av_log(h, AV_LOG_DEBUG, "launching client thread idx %d\n", i);
> + libsrt_launch_client_thread(h, &s->client_context[i], s->fd[i], s->eid[i]);
> + break;
> + }
> + }
> + if (i == s->max_clients) {
> + av_log(h, AV_LOG_DEBUG, "no more clients available, max_clients = %d\n", s->max_clients);
> + srt_close(client_fd);
> + }
> + pthread_mutex_unlock(&s->accept_mutex);
> + }
> + av_log(h, AV_LOG_DEBUG, "exiting accept thread\n");
> +
> + return NULL;
> +}
> +
> +static int libsrt_listen(int eid, int fd, const struct sockaddr *addr, socklen_t addrlen, URLContext *h, int64_t timeout)
> +{
> + int ret;
> + int reuse = 1;
> +
> if (srt_setsockopt(fd, SOL_SOCKET, SRTO_REUSEADDR, &reuse, sizeof(reuse))) {
> av_log(h, AV_LOG_WARNING, "setsockopt(SRTO_REUSEADDR) failed\n");
> }
> @@ -255,14 +424,7 @@ static int libsrt_listen(int eid, int fd, const struct sockaddr *addr, socklen_t
> if (ret < 0)
> return ret;
>
> - ret = srt_accept(fd, NULL, NULL);
> - if (ret < 0)
> - return libsrt_neterrno(h);
> - if (libsrt_socket_nonblock(ret, 1) < 0)
> - av_log(h, AV_LOG_DEBUG, "libsrt_socket_nonblock failed\n");
> - if (!libsrt_getsockopt(h, ret, SRTO_STREAMID, "SRTO_STREAMID", streamid, &streamid_len))
> - /* Note: returned streamid_len doesn't count the terminating null character */
> - av_log(h, AV_LOG_VERBOSE, "accept streamid [%s], length %d\n", streamid, streamid_len);
> + ret = libsrt_accept(fd, h);
>
> return ret;
> }
> @@ -462,10 +624,15 @@ static int libsrt_setup(URLContext *h, const char *uri, int flags)
> goto fail1;
> // multi-client
> ret = libsrt_listen(read_eid, fd, cur_ai->ai_addr, cur_ai->ai_addrlen, h, s->listen_timeout);
> - srt_epoll_release(read_eid);
> - if (ret < 0)
> - goto fail1;
> - srt_close(fd);
> + if (!(flags & AVIO_FLAG_WRITE) || s->max_clients == 1) {
> + srt_epoll_release(read_eid);
> + if (ret < 0)
> + goto fail1;
> + srt_close(fd);
> + } else {
> + s->listen_eid = read_eid;
> + s->listen_fd = fd;
> + }
> fd = ret;
> } else {
> int write_eid = ret = libsrt_epoll_create(h, fd, 1);
> @@ -508,8 +675,15 @@ static int libsrt_setup(URLContext *h, const char *uri, int flags)
> goto fail1;
>
> h->is_streamed = 1;
> - s->fd = fd;
> - s->eid = eid;
> + s->fd[0] = fd;
> + s->eid[0] = eid;
> +
> + if (s->mode == SRT_MODE_LISTENER && (flags & AVIO_FLAG_WRITE) && s->max_clients > 1) {
> + av_log(h, AV_LOG_DEBUG, "launching client thread idx 0\n");
> + libsrt_launch_client_thread(h, &s->client_context[0], fd, eid);
> + av_log(h, AV_LOG_DEBUG, "launching accept_thread\n");
> + pthread_create(&s->accept_thread, NULL, libsrt_accept_thread, h);
> + }
>
> freeaddrinfo(ai);
> return 0;
> @@ -671,7 +845,26 @@ static int libsrt_open(URLContext *h, const char *uri, int flags)
> if (av_find_info_tag(buf, sizeof(buf), "linger", p)) {
> s->linger = strtol(buf, NULL, 10);
> }
> + if (av_find_info_tag(buf, sizeof(buf), "max_clients", p)) {
> + s->max_clients = strtoll(buf, NULL, 10);
> + if (s->max_clients < 1) {
> + ret = AVERROR(EINVAL);
> + goto err;
> + }
> + }
> + }
> + if (s->mode != SRT_MODE_LISTENER || !(flags & AVIO_FLAG_WRITE)) {
> + s->max_clients = 1;
> }
> + s->fd = av_calloc(s->max_clients, sizeof(*s->fd));
> + s->eid = av_calloc(s->max_clients, sizeof(*s->eid));
> + s->client_context = av_calloc(s->max_clients, sizeof(*s->client_context));
> + if (s->fd == NULL || s->eid == NULL || s->client_context == NULL) {
> + ret = AVERROR(ENOMEM);
> + av_log(h, AV_LOG_ERROR, "%s\n", av_err2str(ret));
> + goto err;
> + }
> +
> ret = libsrt_setup(h, uri, flags);
> if (ret < 0)
> goto err;
> @@ -688,12 +881,12 @@ static int libsrt_read(URLContext *h, uint8_t *buf, int size)
> int ret;
>
> if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
> - ret = libsrt_network_wait_fd_timeout(h, s->eid, 0, h->rw_timeout, &h->interrupt_callback);
> + ret = libsrt_network_wait_fd_timeout(h, s->eid[0], 0, h->rw_timeout, &h->interrupt_callback);
> if (ret)
> return ret;
> }
>
> - ret = srt_recvmsg(s->fd, buf, size);
> + ret = srt_recvmsg(s->fd[0], buf, size);
> if (ret < 0) {
> ret = libsrt_neterrno(h);
> }
> @@ -704,17 +897,35 @@ static int libsrt_read(URLContext *h, uint8_t *buf, int size)
> static int libsrt_write(URLContext *h, const uint8_t *buf, int size)
> {
> SRTContext *s = h->priv_data;
> + SRTClientContext *c;
> + int i;
> int ret;
> -
> - if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
> - ret = libsrt_network_wait_fd_timeout(h, s->eid, 1, h->rw_timeout, &h->interrupt_callback);
> - if (ret)
> - return ret;
> - }
> -
> - ret = srt_sendmsg(s->fd, buf, size, -1, 1);
> - if (ret < 0) {
> - ret = libsrt_neterrno(h);
> + int any_ok = 0;
> + int any = 0;
> +
> + if (s->mode == SRT_MODE_LISTENER && s->max_clients > 1) {
> + pthread_mutex_lock(&s->accept_mutex);
> + for (i = 0; i < s->max_clients; i++) {
> + c = &s->client_context[i];
> + if (c->set && !c->ended) {
> + any = 1;
> + ret = av_fifo_write(c->fifo, buf, size);
> + if (ret >= 0) {
> + sem_post(&c->msg);
> + any_ok = 1;
> + }
> + }
> + }
> + pthread_mutex_unlock(&s->accept_mutex);
> + if (!any) {
> + ret = AVERROR(EIO);
> + } else if (!any_ok) {
> + ret = AVERROR(EAGAIN);
> + } else {
> + ret = size;
> + }
> + } else {
> + ret = libsrt_write_common(h, s->fd[0], s->eid[0], buf, size);
> }
>
> return ret;
> @@ -723,9 +934,31 @@ static int libsrt_write(URLContext *h, const uint8_t *buf, int size)
> static int libsrt_close(URLContext *h)
> {
> SRTContext *s = h->priv_data;
> + int i;
>
> - srt_epoll_release(s->eid);
> - srt_close(s->fd);
> + if (s->max_clients > 1) {
> + s->close_threads = 1;
> + for (i = 0; i < s->max_clients; i++) {
> + if (s->client_context[i].set) {
> + sem_post(&s->client_context[i].msg);
> + }
> + }
> + pthread_join(s->accept_thread, NULL);
> + srt_epoll_release(s->listen_eid);
> + srt_close(s->listen_fd);
> + for (i = 0; i < s->max_clients; i++) {
> + if (s->client_context[i].set) {
> + av_log(h, AV_LOG_DEBUG, "closing client thread idx %d\n", i);
> + libsrt_close_client_thread(h, &s->client_context[i]);
> + }
> + }
> + } else {
> + srt_epoll_release(s->eid[0]);
> + srt_close(s->fd[0]);
> + }
> + av_freep(&s->fd);
> + av_freep(&s->eid);
> + av_freep(&s->client_context);
>
> srt_cleanup();
>
> --
> 2.39.4
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel at ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request at ffmpeg.org with subject "unsubscribe".
More information about the ffmpeg-devel
mailing list