[FFmpeg-devel] [PATCH] avformat/libsrt: Adding support for multiple clients as a server

nicolas.dato at gmail.com nicolas.dato at gmail.com
Thu Sep 19 16:46:45 EEST 2024


From: Nicolas Jorge Dato <nicolas.dato at gmail.com>

When in listener mode and writing, now libsrt supports multiple clients
configured with the max_clients parameter.

When max_clients=1 (default), it behaves as before.
When max_clientes > 1, after accepting the first client it launches
a thread to listen for more clients, and a launches a new thread for
each client.
---
 libavformat/libsrt.c | 297 ++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 265 insertions(+), 32 deletions(-)

diff --git a/libavformat/libsrt.c b/libavformat/libsrt.c
index 9e860abccd..c9392e6996 100644
--- a/libavformat/libsrt.c
+++ b/libavformat/libsrt.c
@@ -21,8 +21,11 @@
  * Haivision Open SRT (Secure Reliable Transport) protocol
  */
 
+#include <pthread.h>
+#include <semaphore.h>
 #include <srt/srt.h>
 
+#include "libavutil/fifo.h"
 #include "libavutil/mem.h"
 #include "libavutil/opt.h"
 #include "libavutil/parseutils.h"
@@ -51,10 +54,22 @@ enum SRTMode {
     SRT_MODE_RENDEZVOUS = 2
 };
 
-typedef struct SRTContext {
-    const AVClass *class;
+typedef struct SRTClientContext {
+    int set;
+    int ended;
+    URLContext *h;
+    pthread_t thread;
+    int payload_size;
     int fd;
     int eid;
+    sem_t msg;
+    AVFifo *fifo;
+} SRTClientContext;
+
+typedef struct SRTContext {
+    const AVClass *class;
+    int *fd;
+    int *eid;
     int64_t rw_timeout;
     int64_t listen_timeout;
     int recv_buffer_size;
@@ -93,6 +108,13 @@ typedef struct SRTContext {
     SRT_TRANSTYPE transtype;
     int linger;
     int tsbpd;
+    pthread_mutex_t accept_mutex;
+    pthread_t accept_thread;
+    int listen_fd;
+    int listen_eid;
+    SRTClientContext *client_context;
+    int max_clients;
+    int close_threads;
 } SRTContext;
 
 #define D AV_OPT_FLAG_DECODING_PARAM
@@ -146,6 +168,7 @@ static const AVOption libsrt_options[] = {
     { "file",           NULL, 0, AV_OPT_TYPE_CONST,  { .i64 = SRTT_FILE }, INT_MIN, INT_MAX, .flags = D|E, .unit = "transtype" },
     { "linger",         "Number of seconds that the socket waits for unsent data when closing", OFFSET(linger),           AV_OPT_TYPE_INT,      { .i64 = -1 }, -1, INT_MAX,   .flags = D|E },
     { "tsbpd",          "Timestamp-based packet delivery",                                      OFFSET(tsbpd),            AV_OPT_TYPE_BOOL,     { .i64 = -1 }, -1, 1,         .flags = D|E },
+    { "max_clients",    "Maximum simultaneous clients when mode=listener and writing packages", OFFSET(max_clients),      AV_OPT_TYPE_INT,      { .i64 = 1 },   1, INT_MAX,   .flags = E },
     { NULL }
 };
 
@@ -235,13 +258,159 @@ static int libsrt_network_wait_fd_timeout(URLContext *h, int eid, int write, int
     }
 }
 
-static int libsrt_listen(int eid, int fd, const struct sockaddr *addr, socklen_t addrlen, URLContext *h, int64_t timeout)
+static int libsrt_accept(int fd, URLContext *h)
 {
     int ret;
-    int reuse = 1;
     /* Max streamid length plus an extra space for the terminating null character */
     char streamid[513];
     int streamid_len = sizeof(streamid);
+
+    ret = srt_accept(fd, NULL, NULL);
+    if (ret < 0)
+        return libsrt_neterrno(h);
+    if (libsrt_socket_nonblock(ret, 1) < 0)
+        av_log(h, AV_LOG_DEBUG, "libsrt_socket_nonblock failed\n");
+    if (!libsrt_getsockopt(h, ret, SRTO_STREAMID, "SRTO_STREAMID", streamid, &streamid_len))
+        /* Note: returned streamid_len doesn't count the terminating null character */
+        av_log(h, AV_LOG_VERBOSE, "accept streamid [%s], length %d\n", streamid, streamid_len);
+
+    return ret;
+}
+
+static int libsrt_write_common(URLContext *h, int fd, int eid, const uint8_t *buf, int size)
+{
+    int ret;
+
+    if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
+        ret = libsrt_network_wait_fd_timeout(h, eid, 1, h->rw_timeout, &h->interrupt_callback);
+        if (ret)
+            return ret;
+    }
+
+    ret = srt_sendmsg(fd, buf, size, -1, 1);
+    if (ret < 0) {
+        ret = libsrt_neterrno(h);
+    }
+
+    return ret;
+}
+
+static void *libsrt_client_thread(void *_SRTClientContext)
+{
+    SRTClientContext *c = _SRTClientContext;
+    URLContext *h = c->h;
+    SRTContext *s = h->priv_data;
+    uint8_t *buf;
+    int ret;
+
+    buf = av_malloc(c->payload_size);
+    if (buf == NULL) {
+        av_log(h, AV_LOG_ERROR, "%s\n", av_err2str(AVERROR(ENOMEM));
+        return NULL;
+    }
+    while (!s->close_threads) {
+        sem_wait(&c->msg);
+        while (!s->close_threads && av_fifo_read(c->fifo, buf, c->payload_size) >= 0) {
+            do {
+                ret = libsrt_write_common(h, c->fd, c->eid, buf, c->payload_size);
+            } while(ret == AVERROR(EAGAIN) && !s->close_threads);
+            if (ret < 0 && ret != AVERROR(EAGAIN)) {
+                av_log(h, AV_LOG_INFO, "ending client thread with error ret %s\n", av_err2str(ret));
+                goto end;
+            }
+        }
+    }
+end:
+    av_freep(&buf);
+    c->ended = 1;
+
+    return NULL;
+}
+
+static int libsrt_launch_client_thread(URLContext *h, SRTClientContext *c, int fd, int eid)
+{
+    SRTContext *s = h->priv_data;
+
+    c->set = 1;
+    c->ended = 0;
+    c->h = h;
+    c->fd = fd;
+    c->eid = eid;
+    c->payload_size = SRT_LIVE_DEFAULT_PAYLOAD_SIZE;
+    if (s->payload_size > 0) {
+        c->payload_size = s->payload_size;
+    }
+    c->fifo = av_fifo_alloc2(c->payload_size * 4096, 1, 0);
+    sem_init(&c->msg, 0, 0);
+    pthread_create(&c->thread, NULL, libsrt_client_thread, c);
+
+    return 0;
+}
+
+static int libsrt_close_client_thread(URLContext *h, SRTClientContext *c)
+{
+    pthread_join(c->thread, NULL);
+    sem_destroy(&c->msg);
+    av_fifo_freep2(&c->fifo);
+    srt_epoll_release(c->eid);
+    srt_close(c->fd);
+    c->ended = 0;
+    c->set = 0;
+    return 0;
+}
+
+static void *libsrt_accept_thread(void *_URLContext)
+{
+    URLContext *h = _URLContext;
+    SRTContext *s = h->priv_data;
+    int i;
+    int ret;
+    int client_fd;
+
+    while (!s->close_threads) {
+        pthread_mutex_lock(&s->accept_mutex);
+        for (i = 0; i < s->max_clients; i++) {
+            if (s->client_context[i].set && s->client_context[i].ended) {
+                av_log(h, AV_LOG_DEBUG, "closing client thread idx %d\n", i);
+                libsrt_close_client_thread(h, &s->client_context[i]);
+            }
+        }
+        pthread_mutex_unlock(&s->accept_mutex);
+
+        ret = libsrt_network_wait_fd_timeout(h, s->listen_eid, 0, 250000, &h->interrupt_callback);
+        if (ret < 0)
+            continue;
+
+        client_fd = ret = libsrt_accept(s->listen_fd, h);
+        if (ret < 0)
+            continue;
+        av_log(h, AV_LOG_DEBUG, "new client connection\n");
+        pthread_mutex_lock(&s->accept_mutex);
+        for (i = 0; i < s->max_clients; i++) {
+            if (!s->client_context[i].set) {
+                s->fd[i] = client_fd;
+                s->eid[i] = libsrt_epoll_create(h, client_fd, 1);
+                av_log(h, AV_LOG_DEBUG, "launching client thread idx %d\n", i);
+                libsrt_launch_client_thread(h, &s->client_context[i], s->fd[i], s->eid[i]);
+                break;
+            }
+        }
+        if (i == s->max_clients) {
+            av_log(h, AV_LOG_DEBUG, "no more clients available, max_clients = %d\n", s->max_clients);
+            srt_close(client_fd);
+        }
+        pthread_mutex_unlock(&s->accept_mutex);
+    }
+    av_log(h, AV_LOG_DEBUG, "exiting accept thread\n");
+
+    return NULL;
+}
+
+static int libsrt_listen(int eid, int fd, const struct sockaddr *addr, socklen_t addrlen, URLContext *h, int64_t timeout)
+{
+    int ret;
+    int reuse = 1;
+
     if (srt_setsockopt(fd, SOL_SOCKET, SRTO_REUSEADDR, &reuse, sizeof(reuse))) {
         av_log(h, AV_LOG_WARNING, "setsockopt(SRTO_REUSEADDR) failed\n");
     }
@@ -255,14 +424,7 @@ static int libsrt_listen(int eid, int fd, const struct sockaddr *addr, socklen_t
     if (ret < 0)
         return ret;
 
-    ret = srt_accept(fd, NULL, NULL);
-    if (ret < 0)
-        return libsrt_neterrno(h);
-    if (libsrt_socket_nonblock(ret, 1) < 0)
-        av_log(h, AV_LOG_DEBUG, "libsrt_socket_nonblock failed\n");
-    if (!libsrt_getsockopt(h, ret, SRTO_STREAMID, "SRTO_STREAMID", streamid, &streamid_len))
-        /* Note: returned streamid_len doesn't count the terminating null character */
-        av_log(h, AV_LOG_VERBOSE, "accept streamid [%s], length %d\n", streamid, streamid_len);
+    ret = libsrt_accept(fd, h);
 
     return ret;
 }
@@ -462,10 +624,15 @@ static int libsrt_setup(URLContext *h, const char *uri, int flags)
             goto fail1;
         // multi-client
         ret = libsrt_listen(read_eid, fd, cur_ai->ai_addr, cur_ai->ai_addrlen, h, s->listen_timeout);
-        srt_epoll_release(read_eid);
-        if (ret < 0)
-            goto fail1;
-        srt_close(fd);
+        if (!(flags & AVIO_FLAG_WRITE) || s->max_clients == 1) {
+            srt_epoll_release(read_eid);
+            if (ret < 0)
+                goto fail1;
+            srt_close(fd);
+        } else {
+            s->listen_eid = read_eid;
+            s->listen_fd = fd;
+        }
         fd = ret;
     } else {
         int write_eid = ret = libsrt_epoll_create(h, fd, 1);
@@ -508,8 +675,15 @@ static int libsrt_setup(URLContext *h, const char *uri, int flags)
         goto fail1;
 
     h->is_streamed = 1;
-    s->fd = fd;
-    s->eid = eid;
+    s->fd[0] = fd;
+    s->eid[0] = eid;
+
+    if (s->mode == SRT_MODE_LISTENER && (flags & AVIO_FLAG_WRITE) && s->max_clients > 1) {
+        av_log(h, AV_LOG_DEBUG, "launching client thread idx 0\n");
+        libsrt_launch_client_thread(h, &s->client_context[0], fd, eid);
+        av_log(h, AV_LOG_DEBUG, "launching accept_thread\n");
+        pthread_create(&s->accept_thread, NULL, libsrt_accept_thread, h);
+    }
 
     freeaddrinfo(ai);
     return 0;
@@ -671,7 +845,26 @@ static int libsrt_open(URLContext *h, const char *uri, int flags)
         if (av_find_info_tag(buf, sizeof(buf), "linger", p)) {
             s->linger = strtol(buf, NULL, 10);
         }
+        if (av_find_info_tag(buf, sizeof(buf), "max_clients", p)) {
+            s->max_clients = strtoll(buf, NULL, 10);
+            if (s->max_clients < 1) {
+                ret = AVERROR(EINVAL);
+                goto err;
+            }
+        }
+    }
+    if (s->mode != SRT_MODE_LISTENER || !(flags & AVIO_FLAG_WRITE)) {
+        s->max_clients = 1;
     }
+    s->fd = av_calloc(s->max_clients, sizeof(*s->fd));
+    s->eid = av_calloc(s->max_clients, sizeof(*s->eid));
+    s->client_context = av_calloc(s->max_clients, sizeof(*s->client_context));
+    if (s->fd == NULL || s->eid == NULL || s->client_context == NULL) {
+        ret = AVERROR(ENOMEM);
+        av_log(h, AV_LOG_ERROR, "%s\n", av_err2str(ret));
+        goto err;
+    }
+
     ret = libsrt_setup(h, uri, flags);
     if (ret < 0)
         goto err;
@@ -688,12 +881,12 @@ static int libsrt_read(URLContext *h, uint8_t *buf, int size)
     int ret;
 
     if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
-        ret = libsrt_network_wait_fd_timeout(h, s->eid, 0, h->rw_timeout, &h->interrupt_callback);
+        ret = libsrt_network_wait_fd_timeout(h, s->eid[0], 0, h->rw_timeout, &h->interrupt_callback);
         if (ret)
             return ret;
     }
 
-    ret = srt_recvmsg(s->fd, buf, size);
+    ret = srt_recvmsg(s->fd[0], buf, size);
     if (ret < 0) {
         ret = libsrt_neterrno(h);
     }
@@ -704,17 +897,35 @@ static int libsrt_read(URLContext *h, uint8_t *buf, int size)
 static int libsrt_write(URLContext *h, const uint8_t *buf, int size)
 {
     SRTContext *s = h->priv_data;
+    SRTClientContext *c;
+    int i;
     int ret;
-
-    if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
-        ret = libsrt_network_wait_fd_timeout(h, s->eid, 1, h->rw_timeout, &h->interrupt_callback);
-        if (ret)
-            return ret;
-    }
-
-    ret = srt_sendmsg(s->fd, buf, size, -1, 1);
-    if (ret < 0) {
-        ret = libsrt_neterrno(h);
+    int any_ok = 0;
+    int any = 0;
+
+    if (s->mode == SRT_MODE_LISTENER && s->max_clients > 1) {
+        pthread_mutex_lock(&s->accept_mutex);
+        for (i = 0; i < s->max_clients; i++) {
+            c = &s->client_context[i];
+            if (c->set && !c->ended) {
+                any = 1;
+                ret = av_fifo_write(c->fifo, buf, size);
+                if (ret >= 0) {
+                    sem_post(&c->msg);
+                    any_ok = 1;
+                }
+            }
+        }
+        pthread_mutex_unlock(&s->accept_mutex);
+        if (!any) {
+            ret = AVERROR(EIO);
+        } else if (!any_ok) {
+            ret = AVERROR(EAGAIN);
+        } else {
+            ret = size;
+        }
+    } else {
+        ret = libsrt_write_common(h, s->fd[0], s->eid[0], buf, size);
     }
 
     return ret;
@@ -723,9 +934,31 @@ static int libsrt_write(URLContext *h, const uint8_t *buf, int size)
 static int libsrt_close(URLContext *h)
 {
     SRTContext *s = h->priv_data;
+    int i;
 
-    srt_epoll_release(s->eid);
-    srt_close(s->fd);
+    if (s->max_clients > 1) {
+        s->close_threads = 1;
+        for (i = 0; i < s->max_clients; i++) {
+            if (s->client_context[i].set) {
+                sem_post(&s->client_context[i].msg);
+            }
+        }
+        pthread_join(s->accept_thread, NULL);
+        srt_epoll_release(s->listen_eid);
+        srt_close(s->listen_fd);
+        for (i = 0; i < s->max_clients; i++) {
+            if (s->client_context[i].set) {
+                av_log(h, AV_LOG_DEBUG, "closing client thread idx %d\n", i);
+                libsrt_close_client_thread(h, &s->client_context[i]);
+            }
+        }
+    } else {
+        srt_epoll_release(s->eid[0]);
+        srt_close(s->fd[0]);
+    }
+    av_freep(&s->fd);
+    av_freep(&s->eid);
+    av_freep(&s->client_context);
 
     srt_cleanup();
 
-- 
2.39.4



More information about the ffmpeg-devel mailing list