[FFmpeg-devel] [PATCH] avformat/libsrt: Adding support for multiple clients as a server

Zhao Zhili quinkblack at foxmail.com
Mon Sep 23 15:34:44 EEST 2024



> On Sep 19, 2024, at 21:46, nicolas.dato at gmail.com wrote:
> 
> From: Nicolas Jorge Dato <nicolas.dato at gmail.com>
> 
> When in listener mode and writing, now libsrt supports multiple clients
> configured with the max_clients parameter.
> 
> When max_clients=1 (default), it behaves as before.
> When max_clientes > 1, after accepting the first client it launches
> a thread to listen for more clients, and a launches a new thread for
> each client.
> ---
> libavformat/libsrt.c | 297 ++++++++++++++++++++++++++++++++++++++-----
> 1 file changed, 265 insertions(+), 32 deletions(-)
> 
> diff --git a/libavformat/libsrt.c b/libavformat/libsrt.c
> index 9e860abccd..c9392e6996 100644
> --- a/libavformat/libsrt.c
> +++ b/libavformat/libsrt.c
> @@ -21,8 +21,11 @@
>  * Haivision Open SRT (Secure Reliable Transport) protocol
>  */
> 
> +#include <pthread.h>
> +#include <semaphore.h>
> #include <srt/srt.h>

The implementation isn't portable.

Although FFmpeg has ffserver once upon a time, the IO in libavformat isn’t designed
to be used as a server. I'm afraid a real server (e.g, srs-simple realtime server)
is more appropriate than stretch libavformat.

> 
> +#include "libavutil/fifo.h"
> #include "libavutil/mem.h"
> #include "libavutil/opt.h"
> #include "libavutil/parseutils.h"
> @@ -51,10 +54,22 @@ enum SRTMode {
>     SRT_MODE_RENDEZVOUS = 2
> };
> 
> -typedef struct SRTContext {
> -    const AVClass *class;
> +typedef struct SRTClientContext {
> +    int set;
> +    int ended;
> +    URLContext *h;
> +    pthread_t thread;
> +    int payload_size;
>     int fd;
>     int eid;
> +    sem_t msg;
> +    AVFifo *fifo;
> +} SRTClientContext;
> +
> +typedef struct SRTContext {
> +    const AVClass *class;
> +    int *fd;
> +    int *eid;
>     int64_t rw_timeout;
>     int64_t listen_timeout;
>     int recv_buffer_size;
> @@ -93,6 +108,13 @@ typedef struct SRTContext {
>     SRT_TRANSTYPE transtype;
>     int linger;
>     int tsbpd;
> +    pthread_mutex_t accept_mutex;
> +    pthread_t accept_thread;
> +    int listen_fd;
> +    int listen_eid;
> +    SRTClientContext *client_context;
> +    int max_clients;
> +    int close_threads;
> } SRTContext;
> 
> #define D AV_OPT_FLAG_DECODING_PARAM
> @@ -146,6 +168,7 @@ static const AVOption libsrt_options[] = {
>     { "file",           NULL, 0, AV_OPT_TYPE_CONST,  { .i64 = SRTT_FILE }, INT_MIN, INT_MAX, .flags = D|E, .unit = "transtype" },
>     { "linger",         "Number of seconds that the socket waits for unsent data when closing", OFFSET(linger),           AV_OPT_TYPE_INT,      { .i64 = -1 }, -1, INT_MAX,   .flags = D|E },
>     { "tsbpd",          "Timestamp-based packet delivery",                                      OFFSET(tsbpd),            AV_OPT_TYPE_BOOL,     { .i64 = -1 }, -1, 1,         .flags = D|E },
> +    { "max_clients",    "Maximum simultaneous clients when mode=listener and writing packages", OFFSET(max_clients),      AV_OPT_TYPE_INT,      { .i64 = 1 },   1, INT_MAX,   .flags = E },
>     { NULL }
> };
> 
> @@ -235,13 +258,159 @@ static int libsrt_network_wait_fd_timeout(URLContext *h, int eid, int write, int
>     }
> }
> 
> -static int libsrt_listen(int eid, int fd, const struct sockaddr *addr, socklen_t addrlen, URLContext *h, int64_t timeout)
> +static int libsrt_accept(int fd, URLContext *h)
> {
>     int ret;
> -    int reuse = 1;
>     /* Max streamid length plus an extra space for the terminating null character */
>     char streamid[513];
>     int streamid_len = sizeof(streamid);
> +
> +    ret = srt_accept(fd, NULL, NULL);
> +    if (ret < 0)
> +        return libsrt_neterrno(h);
> +    if (libsrt_socket_nonblock(ret, 1) < 0)
> +        av_log(h, AV_LOG_DEBUG, "libsrt_socket_nonblock failed\n");
> +    if (!libsrt_getsockopt(h, ret, SRTO_STREAMID, "SRTO_STREAMID", streamid, &streamid_len))
> +        /* Note: returned streamid_len doesn't count the terminating null character */
> +        av_log(h, AV_LOG_VERBOSE, "accept streamid [%s], length %d\n", streamid, streamid_len);
> +
> +    return ret;
> +}
> +
> +static int libsrt_write_common(URLContext *h, int fd, int eid, const uint8_t *buf, int size)
> +{
> +    int ret;
> +
> +    if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
> +        ret = libsrt_network_wait_fd_timeout(h, eid, 1, h->rw_timeout, &h->interrupt_callback);
> +        if (ret)
> +            return ret;
> +    }
> +
> +    ret = srt_sendmsg(fd, buf, size, -1, 1);
> +    if (ret < 0) {
> +        ret = libsrt_neterrno(h);
> +    }
> +
> +    return ret;
> +}
> +
> +static void *libsrt_client_thread(void *_SRTClientContext)
> +{
> +    SRTClientContext *c = _SRTClientContext;
> +    URLContext *h = c->h;
> +    SRTContext *s = h->priv_data;
> +    uint8_t *buf;
> +    int ret;
> +
> +    buf = av_malloc(c->payload_size);
> +    if (buf == NULL) {
> +        av_log(h, AV_LOG_ERROR, "%s\n", av_err2str(AVERROR(ENOMEM));
> +        return NULL;
> +    }
> +    while (!s->close_threads) {
> +        sem_wait(&c->msg);
> +        while (!s->close_threads && av_fifo_read(c->fifo, buf, c->payload_size) >= 0) {
> +            do {
> +                ret = libsrt_write_common(h, c->fd, c->eid, buf, c->payload_size);
> +            } while(ret == AVERROR(EAGAIN) && !s->close_threads);
> +            if (ret < 0 && ret != AVERROR(EAGAIN)) {
> +                av_log(h, AV_LOG_INFO, "ending client thread with error ret %s\n", av_err2str(ret));
> +                goto end;
> +            }
> +        }
> +    }
> +end:
> +    av_freep(&buf);
> +    c->ended = 1;
> +
> +    return NULL;
> +}
> +
> +static int libsrt_launch_client_thread(URLContext *h, SRTClientContext *c, int fd, int eid)
> +{
> +    SRTContext *s = h->priv_data;
> +
> +    c->set = 1;
> +    c->ended = 0;
> +    c->h = h;
> +    c->fd = fd;
> +    c->eid = eid;
> +    c->payload_size = SRT_LIVE_DEFAULT_PAYLOAD_SIZE;
> +    if (s->payload_size > 0) {
> +        c->payload_size = s->payload_size;
> +    }
> +    c->fifo = av_fifo_alloc2(c->payload_size * 4096, 1, 0);
> +    sem_init(&c->msg, 0, 0);
> +    pthread_create(&c->thread, NULL, libsrt_client_thread, c);
> +
> +    return 0;
> +}
> +
> +static int libsrt_close_client_thread(URLContext *h, SRTClientContext *c)
> +{
> +    pthread_join(c->thread, NULL);
> +    sem_destroy(&c->msg);
> +    av_fifo_freep2(&c->fifo);
> +    srt_epoll_release(c->eid);
> +    srt_close(c->fd);
> +    c->ended = 0;
> +    c->set = 0;
> +    return 0;
> +}
> +
> +static void *libsrt_accept_thread(void *_URLContext)
> +{
> +    URLContext *h = _URLContext;
> +    SRTContext *s = h->priv_data;
> +    int i;
> +    int ret;
> +    int client_fd;
> +
> +    while (!s->close_threads) {
> +        pthread_mutex_lock(&s->accept_mutex);
> +        for (i = 0; i < s->max_clients; i++) {
> +            if (s->client_context[i].set && s->client_context[i].ended) {
> +                av_log(h, AV_LOG_DEBUG, "closing client thread idx %d\n", i);
> +                libsrt_close_client_thread(h, &s->client_context[i]);
> +            }
> +        }
> +        pthread_mutex_unlock(&s->accept_mutex);
> +
> +        ret = libsrt_network_wait_fd_timeout(h, s->listen_eid, 0, 250000, &h->interrupt_callback);
> +        if (ret < 0)
> +            continue;
> +
> +        client_fd = ret = libsrt_accept(s->listen_fd, h);
> +        if (ret < 0)
> +            continue;
> +        av_log(h, AV_LOG_DEBUG, "new client connection\n");
> +        pthread_mutex_lock(&s->accept_mutex);
> +        for (i = 0; i < s->max_clients; i++) {
> +            if (!s->client_context[i].set) {
> +                s->fd[i] = client_fd;
> +                s->eid[i] = libsrt_epoll_create(h, client_fd, 1);
> +                av_log(h, AV_LOG_DEBUG, "launching client thread idx %d\n", i);
> +                libsrt_launch_client_thread(h, &s->client_context[i], s->fd[i], s->eid[i]);
> +                break;
> +            }
> +        }
> +        if (i == s->max_clients) {
> +            av_log(h, AV_LOG_DEBUG, "no more clients available, max_clients = %d\n", s->max_clients);
> +            srt_close(client_fd);
> +        }
> +        pthread_mutex_unlock(&s->accept_mutex);
> +    }
> +    av_log(h, AV_LOG_DEBUG, "exiting accept thread\n");
> +
> +    return NULL;
> +}
> +
> +static int libsrt_listen(int eid, int fd, const struct sockaddr *addr, socklen_t addrlen, URLContext *h, int64_t timeout)
> +{
> +    int ret;
> +    int reuse = 1;
> +
>     if (srt_setsockopt(fd, SOL_SOCKET, SRTO_REUSEADDR, &reuse, sizeof(reuse))) {
>         av_log(h, AV_LOG_WARNING, "setsockopt(SRTO_REUSEADDR) failed\n");
>     }
> @@ -255,14 +424,7 @@ static int libsrt_listen(int eid, int fd, const struct sockaddr *addr, socklen_t
>     if (ret < 0)
>         return ret;
> 
> -    ret = srt_accept(fd, NULL, NULL);
> -    if (ret < 0)
> -        return libsrt_neterrno(h);
> -    if (libsrt_socket_nonblock(ret, 1) < 0)
> -        av_log(h, AV_LOG_DEBUG, "libsrt_socket_nonblock failed\n");
> -    if (!libsrt_getsockopt(h, ret, SRTO_STREAMID, "SRTO_STREAMID", streamid, &streamid_len))
> -        /* Note: returned streamid_len doesn't count the terminating null character */
> -        av_log(h, AV_LOG_VERBOSE, "accept streamid [%s], length %d\n", streamid, streamid_len);
> +    ret = libsrt_accept(fd, h);
> 
>     return ret;
> }
> @@ -462,10 +624,15 @@ static int libsrt_setup(URLContext *h, const char *uri, int flags)
>             goto fail1;
>         // multi-client
>         ret = libsrt_listen(read_eid, fd, cur_ai->ai_addr, cur_ai->ai_addrlen, h, s->listen_timeout);
> -        srt_epoll_release(read_eid);
> -        if (ret < 0)
> -            goto fail1;
> -        srt_close(fd);
> +        if (!(flags & AVIO_FLAG_WRITE) || s->max_clients == 1) {
> +            srt_epoll_release(read_eid);
> +            if (ret < 0)
> +                goto fail1;
> +            srt_close(fd);
> +        } else {
> +            s->listen_eid = read_eid;
> +            s->listen_fd = fd;
> +        }
>         fd = ret;
>     } else {
>         int write_eid = ret = libsrt_epoll_create(h, fd, 1);
> @@ -508,8 +675,15 @@ static int libsrt_setup(URLContext *h, const char *uri, int flags)
>         goto fail1;
> 
>     h->is_streamed = 1;
> -    s->fd = fd;
> -    s->eid = eid;
> +    s->fd[0] = fd;
> +    s->eid[0] = eid;
> +
> +    if (s->mode == SRT_MODE_LISTENER && (flags & AVIO_FLAG_WRITE) && s->max_clients > 1) {
> +        av_log(h, AV_LOG_DEBUG, "launching client thread idx 0\n");
> +        libsrt_launch_client_thread(h, &s->client_context[0], fd, eid);
> +        av_log(h, AV_LOG_DEBUG, "launching accept_thread\n");
> +        pthread_create(&s->accept_thread, NULL, libsrt_accept_thread, h);
> +    }
> 
>     freeaddrinfo(ai);
>     return 0;
> @@ -671,7 +845,26 @@ static int libsrt_open(URLContext *h, const char *uri, int flags)
>         if (av_find_info_tag(buf, sizeof(buf), "linger", p)) {
>             s->linger = strtol(buf, NULL, 10);
>         }
> +        if (av_find_info_tag(buf, sizeof(buf), "max_clients", p)) {
> +            s->max_clients = strtoll(buf, NULL, 10);
> +            if (s->max_clients < 1) {
> +                ret = AVERROR(EINVAL);
> +                goto err;
> +            }
> +        }
> +    }
> +    if (s->mode != SRT_MODE_LISTENER || !(flags & AVIO_FLAG_WRITE)) {
> +        s->max_clients = 1;
>     }
> +    s->fd = av_calloc(s->max_clients, sizeof(*s->fd));
> +    s->eid = av_calloc(s->max_clients, sizeof(*s->eid));
> +    s->client_context = av_calloc(s->max_clients, sizeof(*s->client_context));
> +    if (s->fd == NULL || s->eid == NULL || s->client_context == NULL) {
> +        ret = AVERROR(ENOMEM);
> +        av_log(h, AV_LOG_ERROR, "%s\n", av_err2str(ret));
> +        goto err;
> +    }
> +
>     ret = libsrt_setup(h, uri, flags);
>     if (ret < 0)
>         goto err;
> @@ -688,12 +881,12 @@ static int libsrt_read(URLContext *h, uint8_t *buf, int size)
>     int ret;
> 
>     if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
> -        ret = libsrt_network_wait_fd_timeout(h, s->eid, 0, h->rw_timeout, &h->interrupt_callback);
> +        ret = libsrt_network_wait_fd_timeout(h, s->eid[0], 0, h->rw_timeout, &h->interrupt_callback);
>         if (ret)
>             return ret;
>     }
> 
> -    ret = srt_recvmsg(s->fd, buf, size);
> +    ret = srt_recvmsg(s->fd[0], buf, size);
>     if (ret < 0) {
>         ret = libsrt_neterrno(h);
>     }
> @@ -704,17 +897,35 @@ static int libsrt_read(URLContext *h, uint8_t *buf, int size)
> static int libsrt_write(URLContext *h, const uint8_t *buf, int size)
> {
>     SRTContext *s = h->priv_data;
> +    SRTClientContext *c;
> +    int i;
>     int ret;
> -
> -    if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
> -        ret = libsrt_network_wait_fd_timeout(h, s->eid, 1, h->rw_timeout, &h->interrupt_callback);
> -        if (ret)
> -            return ret;
> -    }
> -
> -    ret = srt_sendmsg(s->fd, buf, size, -1, 1);
> -    if (ret < 0) {
> -        ret = libsrt_neterrno(h);
> +    int any_ok = 0;
> +    int any = 0;
> +
> +    if (s->mode == SRT_MODE_LISTENER && s->max_clients > 1) {
> +        pthread_mutex_lock(&s->accept_mutex);
> +        for (i = 0; i < s->max_clients; i++) {
> +            c = &s->client_context[i];
> +            if (c->set && !c->ended) {
> +                any = 1;
> +                ret = av_fifo_write(c->fifo, buf, size);
> +                if (ret >= 0) {
> +                    sem_post(&c->msg);
> +                    any_ok = 1;
> +                }
> +            }
> +        }
> +        pthread_mutex_unlock(&s->accept_mutex);
> +        if (!any) {
> +            ret = AVERROR(EIO);
> +        } else if (!any_ok) {
> +            ret = AVERROR(EAGAIN);
> +        } else {
> +            ret = size;
> +        }
> +    } else {
> +        ret = libsrt_write_common(h, s->fd[0], s->eid[0], buf, size);
>     }
> 
>     return ret;
> @@ -723,9 +934,31 @@ static int libsrt_write(URLContext *h, const uint8_t *buf, int size)
> static int libsrt_close(URLContext *h)
> {
>     SRTContext *s = h->priv_data;
> +    int i;
> 
> -    srt_epoll_release(s->eid);
> -    srt_close(s->fd);
> +    if (s->max_clients > 1) {
> +        s->close_threads = 1;
> +        for (i = 0; i < s->max_clients; i++) {
> +            if (s->client_context[i].set) {
> +                sem_post(&s->client_context[i].msg);
> +            }
> +        }
> +        pthread_join(s->accept_thread, NULL);
> +        srt_epoll_release(s->listen_eid);
> +        srt_close(s->listen_fd);
> +        for (i = 0; i < s->max_clients; i++) {
> +            if (s->client_context[i].set) {
> +                av_log(h, AV_LOG_DEBUG, "closing client thread idx %d\n", i);
> +                libsrt_close_client_thread(h, &s->client_context[i]);
> +            }
> +        }
> +    } else {
> +        srt_epoll_release(s->eid[0]);
> +        srt_close(s->fd[0]);
> +    }
> +    av_freep(&s->fd);
> +    av_freep(&s->eid);
> +    av_freep(&s->client_context);
> 
>     srt_cleanup();
> 
> -- 
> 2.39.4
> 
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel at ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
> 
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request at ffmpeg.org with subject "unsubscribe".



More information about the ffmpeg-devel mailing list