[FFmpeg-devel] [PATCH] avformat/libsrt: Adding support for multiple clients as a server
nicolas.dato at gmail.com
nicolas.dato at gmail.com
Thu Sep 19 16:46:45 EEST 2024
From: Nicolas Jorge Dato <nicolas.dato at gmail.com>
When in listener mode and writing, now libsrt supports multiple clients
configured with the max_clients parameter.
When max_clients=1 (default), it behaves as before.
When max_clientes > 1, after accepting the first client it launches
a thread to listen for more clients, and a launches a new thread for
each client.
---
libavformat/libsrt.c | 297 ++++++++++++++++++++++++++++++++++++++-----
1 file changed, 265 insertions(+), 32 deletions(-)
diff --git a/libavformat/libsrt.c b/libavformat/libsrt.c
index 9e860abccd..c9392e6996 100644
--- a/libavformat/libsrt.c
+++ b/libavformat/libsrt.c
@@ -21,8 +21,11 @@
* Haivision Open SRT (Secure Reliable Transport) protocol
*/
+#include <pthread.h>
+#include <semaphore.h>
#include <srt/srt.h>
+#include "libavutil/fifo.h"
#include "libavutil/mem.h"
#include "libavutil/opt.h"
#include "libavutil/parseutils.h"
@@ -51,10 +54,22 @@ enum SRTMode {
SRT_MODE_RENDEZVOUS = 2
};
-typedef struct SRTContext {
- const AVClass *class;
+typedef struct SRTClientContext {
+ int set;
+ int ended;
+ URLContext *h;
+ pthread_t thread;
+ int payload_size;
int fd;
int eid;
+ sem_t msg;
+ AVFifo *fifo;
+} SRTClientContext;
+
+typedef struct SRTContext {
+ const AVClass *class;
+ int *fd;
+ int *eid;
int64_t rw_timeout;
int64_t listen_timeout;
int recv_buffer_size;
@@ -93,6 +108,13 @@ typedef struct SRTContext {
SRT_TRANSTYPE transtype;
int linger;
int tsbpd;
+ pthread_mutex_t accept_mutex;
+ pthread_t accept_thread;
+ int listen_fd;
+ int listen_eid;
+ SRTClientContext *client_context;
+ int max_clients;
+ int close_threads;
} SRTContext;
#define D AV_OPT_FLAG_DECODING_PARAM
@@ -146,6 +168,7 @@ static const AVOption libsrt_options[] = {
{ "file", NULL, 0, AV_OPT_TYPE_CONST, { .i64 = SRTT_FILE }, INT_MIN, INT_MAX, .flags = D|E, .unit = "transtype" },
{ "linger", "Number of seconds that the socket waits for unsent data when closing", OFFSET(linger), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E },
{ "tsbpd", "Timestamp-based packet delivery", OFFSET(tsbpd), AV_OPT_TYPE_BOOL, { .i64 = -1 }, -1, 1, .flags = D|E },
+ { "max_clients", "Maximum simultaneous clients when mode=listener and writing packages", OFFSET(max_clients), AV_OPT_TYPE_INT, { .i64 = 1 }, 1, INT_MAX, .flags = E },
{ NULL }
};
@@ -235,13 +258,159 @@ static int libsrt_network_wait_fd_timeout(URLContext *h, int eid, int write, int
}
}
-static int libsrt_listen(int eid, int fd, const struct sockaddr *addr, socklen_t addrlen, URLContext *h, int64_t timeout)
+static int libsrt_accept(int fd, URLContext *h)
{
int ret;
- int reuse = 1;
/* Max streamid length plus an extra space for the terminating null character */
char streamid[513];
int streamid_len = sizeof(streamid);
+
+ ret = srt_accept(fd, NULL, NULL);
+ if (ret < 0)
+ return libsrt_neterrno(h);
+ if (libsrt_socket_nonblock(ret, 1) < 0)
+ av_log(h, AV_LOG_DEBUG, "libsrt_socket_nonblock failed\n");
+ if (!libsrt_getsockopt(h, ret, SRTO_STREAMID, "SRTO_STREAMID", streamid, &streamid_len))
+ /* Note: returned streamid_len doesn't count the terminating null character */
+ av_log(h, AV_LOG_VERBOSE, "accept streamid [%s], length %d\n", streamid, streamid_len);
+
+ return ret;
+}
+
+static int libsrt_write_common(URLContext *h, int fd, int eid, const uint8_t *buf, int size)
+{
+ int ret;
+
+ if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
+ ret = libsrt_network_wait_fd_timeout(h, eid, 1, h->rw_timeout, &h->interrupt_callback);
+ if (ret)
+ return ret;
+ }
+
+ ret = srt_sendmsg(fd, buf, size, -1, 1);
+ if (ret < 0) {
+ ret = libsrt_neterrno(h);
+ }
+
+ return ret;
+}
+
+static void *libsrt_client_thread(void *_SRTClientContext)
+{
+ SRTClientContext *c = _SRTClientContext;
+ URLContext *h = c->h;
+ SRTContext *s = h->priv_data;
+ uint8_t *buf;
+ int ret;
+
+ buf = av_malloc(c->payload_size);
+ if (buf == NULL) {
+ av_log(h, AV_LOG_ERROR, "%s\n", av_err2str(AVERROR(ENOMEM));
+ return NULL;
+ }
+ while (!s->close_threads) {
+ sem_wait(&c->msg);
+ while (!s->close_threads && av_fifo_read(c->fifo, buf, c->payload_size) >= 0) {
+ do {
+ ret = libsrt_write_common(h, c->fd, c->eid, buf, c->payload_size);
+ } while(ret == AVERROR(EAGAIN) && !s->close_threads);
+ if (ret < 0 && ret != AVERROR(EAGAIN)) {
+ av_log(h, AV_LOG_INFO, "ending client thread with error ret %s\n", av_err2str(ret));
+ goto end;
+ }
+ }
+ }
+end:
+ av_freep(&buf);
+ c->ended = 1;
+
+ return NULL;
+}
+
+static int libsrt_launch_client_thread(URLContext *h, SRTClientContext *c, int fd, int eid)
+{
+ SRTContext *s = h->priv_data;
+
+ c->set = 1;
+ c->ended = 0;
+ c->h = h;
+ c->fd = fd;
+ c->eid = eid;
+ c->payload_size = SRT_LIVE_DEFAULT_PAYLOAD_SIZE;
+ if (s->payload_size > 0) {
+ c->payload_size = s->payload_size;
+ }
+ c->fifo = av_fifo_alloc2(c->payload_size * 4096, 1, 0);
+ sem_init(&c->msg, 0, 0);
+ pthread_create(&c->thread, NULL, libsrt_client_thread, c);
+
+ return 0;
+}
+
+static int libsrt_close_client_thread(URLContext *h, SRTClientContext *c)
+{
+ pthread_join(c->thread, NULL);
+ sem_destroy(&c->msg);
+ av_fifo_freep2(&c->fifo);
+ srt_epoll_release(c->eid);
+ srt_close(c->fd);
+ c->ended = 0;
+ c->set = 0;
+ return 0;
+}
+
+static void *libsrt_accept_thread(void *_URLContext)
+{
+ URLContext *h = _URLContext;
+ SRTContext *s = h->priv_data;
+ int i;
+ int ret;
+ int client_fd;
+
+ while (!s->close_threads) {
+ pthread_mutex_lock(&s->accept_mutex);
+ for (i = 0; i < s->max_clients; i++) {
+ if (s->client_context[i].set && s->client_context[i].ended) {
+ av_log(h, AV_LOG_DEBUG, "closing client thread idx %d\n", i);
+ libsrt_close_client_thread(h, &s->client_context[i]);
+ }
+ }
+ pthread_mutex_unlock(&s->accept_mutex);
+
+ ret = libsrt_network_wait_fd_timeout(h, s->listen_eid, 0, 250000, &h->interrupt_callback);
+ if (ret < 0)
+ continue;
+
+ client_fd = ret = libsrt_accept(s->listen_fd, h);
+ if (ret < 0)
+ continue;
+ av_log(h, AV_LOG_DEBUG, "new client connection\n");
+ pthread_mutex_lock(&s->accept_mutex);
+ for (i = 0; i < s->max_clients; i++) {
+ if (!s->client_context[i].set) {
+ s->fd[i] = client_fd;
+ s->eid[i] = libsrt_epoll_create(h, client_fd, 1);
+ av_log(h, AV_LOG_DEBUG, "launching client thread idx %d\n", i);
+ libsrt_launch_client_thread(h, &s->client_context[i], s->fd[i], s->eid[i]);
+ break;
+ }
+ }
+ if (i == s->max_clients) {
+ av_log(h, AV_LOG_DEBUG, "no more clients available, max_clients = %d\n", s->max_clients);
+ srt_close(client_fd);
+ }
+ pthread_mutex_unlock(&s->accept_mutex);
+ }
+ av_log(h, AV_LOG_DEBUG, "exiting accept thread\n");
+
+ return NULL;
+}
+
+static int libsrt_listen(int eid, int fd, const struct sockaddr *addr, socklen_t addrlen, URLContext *h, int64_t timeout)
+{
+ int ret;
+ int reuse = 1;
+
if (srt_setsockopt(fd, SOL_SOCKET, SRTO_REUSEADDR, &reuse, sizeof(reuse))) {
av_log(h, AV_LOG_WARNING, "setsockopt(SRTO_REUSEADDR) failed\n");
}
@@ -255,14 +424,7 @@ static int libsrt_listen(int eid, int fd, const struct sockaddr *addr, socklen_t
if (ret < 0)
return ret;
- ret = srt_accept(fd, NULL, NULL);
- if (ret < 0)
- return libsrt_neterrno(h);
- if (libsrt_socket_nonblock(ret, 1) < 0)
- av_log(h, AV_LOG_DEBUG, "libsrt_socket_nonblock failed\n");
- if (!libsrt_getsockopt(h, ret, SRTO_STREAMID, "SRTO_STREAMID", streamid, &streamid_len))
- /* Note: returned streamid_len doesn't count the terminating null character */
- av_log(h, AV_LOG_VERBOSE, "accept streamid [%s], length %d\n", streamid, streamid_len);
+ ret = libsrt_accept(fd, h);
return ret;
}
@@ -462,10 +624,15 @@ static int libsrt_setup(URLContext *h, const char *uri, int flags)
goto fail1;
// multi-client
ret = libsrt_listen(read_eid, fd, cur_ai->ai_addr, cur_ai->ai_addrlen, h, s->listen_timeout);
- srt_epoll_release(read_eid);
- if (ret < 0)
- goto fail1;
- srt_close(fd);
+ if (!(flags & AVIO_FLAG_WRITE) || s->max_clients == 1) {
+ srt_epoll_release(read_eid);
+ if (ret < 0)
+ goto fail1;
+ srt_close(fd);
+ } else {
+ s->listen_eid = read_eid;
+ s->listen_fd = fd;
+ }
fd = ret;
} else {
int write_eid = ret = libsrt_epoll_create(h, fd, 1);
@@ -508,8 +675,15 @@ static int libsrt_setup(URLContext *h, const char *uri, int flags)
goto fail1;
h->is_streamed = 1;
- s->fd = fd;
- s->eid = eid;
+ s->fd[0] = fd;
+ s->eid[0] = eid;
+
+ if (s->mode == SRT_MODE_LISTENER && (flags & AVIO_FLAG_WRITE) && s->max_clients > 1) {
+ av_log(h, AV_LOG_DEBUG, "launching client thread idx 0\n");
+ libsrt_launch_client_thread(h, &s->client_context[0], fd, eid);
+ av_log(h, AV_LOG_DEBUG, "launching accept_thread\n");
+ pthread_create(&s->accept_thread, NULL, libsrt_accept_thread, h);
+ }
freeaddrinfo(ai);
return 0;
@@ -671,7 +845,26 @@ static int libsrt_open(URLContext *h, const char *uri, int flags)
if (av_find_info_tag(buf, sizeof(buf), "linger", p)) {
s->linger = strtol(buf, NULL, 10);
}
+ if (av_find_info_tag(buf, sizeof(buf), "max_clients", p)) {
+ s->max_clients = strtoll(buf, NULL, 10);
+ if (s->max_clients < 1) {
+ ret = AVERROR(EINVAL);
+ goto err;
+ }
+ }
+ }
+ if (s->mode != SRT_MODE_LISTENER || !(flags & AVIO_FLAG_WRITE)) {
+ s->max_clients = 1;
}
+ s->fd = av_calloc(s->max_clients, sizeof(*s->fd));
+ s->eid = av_calloc(s->max_clients, sizeof(*s->eid));
+ s->client_context = av_calloc(s->max_clients, sizeof(*s->client_context));
+ if (s->fd == NULL || s->eid == NULL || s->client_context == NULL) {
+ ret = AVERROR(ENOMEM);
+ av_log(h, AV_LOG_ERROR, "%s\n", av_err2str(ret));
+ goto err;
+ }
+
ret = libsrt_setup(h, uri, flags);
if (ret < 0)
goto err;
@@ -688,12 +881,12 @@ static int libsrt_read(URLContext *h, uint8_t *buf, int size)
int ret;
if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
- ret = libsrt_network_wait_fd_timeout(h, s->eid, 0, h->rw_timeout, &h->interrupt_callback);
+ ret = libsrt_network_wait_fd_timeout(h, s->eid[0], 0, h->rw_timeout, &h->interrupt_callback);
if (ret)
return ret;
}
- ret = srt_recvmsg(s->fd, buf, size);
+ ret = srt_recvmsg(s->fd[0], buf, size);
if (ret < 0) {
ret = libsrt_neterrno(h);
}
@@ -704,17 +897,35 @@ static int libsrt_read(URLContext *h, uint8_t *buf, int size)
static int libsrt_write(URLContext *h, const uint8_t *buf, int size)
{
SRTContext *s = h->priv_data;
+ SRTClientContext *c;
+ int i;
int ret;
-
- if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
- ret = libsrt_network_wait_fd_timeout(h, s->eid, 1, h->rw_timeout, &h->interrupt_callback);
- if (ret)
- return ret;
- }
-
- ret = srt_sendmsg(s->fd, buf, size, -1, 1);
- if (ret < 0) {
- ret = libsrt_neterrno(h);
+ int any_ok = 0;
+ int any = 0;
+
+ if (s->mode == SRT_MODE_LISTENER && s->max_clients > 1) {
+ pthread_mutex_lock(&s->accept_mutex);
+ for (i = 0; i < s->max_clients; i++) {
+ c = &s->client_context[i];
+ if (c->set && !c->ended) {
+ any = 1;
+ ret = av_fifo_write(c->fifo, buf, size);
+ if (ret >= 0) {
+ sem_post(&c->msg);
+ any_ok = 1;
+ }
+ }
+ }
+ pthread_mutex_unlock(&s->accept_mutex);
+ if (!any) {
+ ret = AVERROR(EIO);
+ } else if (!any_ok) {
+ ret = AVERROR(EAGAIN);
+ } else {
+ ret = size;
+ }
+ } else {
+ ret = libsrt_write_common(h, s->fd[0], s->eid[0], buf, size);
}
return ret;
@@ -723,9 +934,31 @@ static int libsrt_write(URLContext *h, const uint8_t *buf, int size)
static int libsrt_close(URLContext *h)
{
SRTContext *s = h->priv_data;
+ int i;
- srt_epoll_release(s->eid);
- srt_close(s->fd);
+ if (s->max_clients > 1) {
+ s->close_threads = 1;
+ for (i = 0; i < s->max_clients; i++) {
+ if (s->client_context[i].set) {
+ sem_post(&s->client_context[i].msg);
+ }
+ }
+ pthread_join(s->accept_thread, NULL);
+ srt_epoll_release(s->listen_eid);
+ srt_close(s->listen_fd);
+ for (i = 0; i < s->max_clients; i++) {
+ if (s->client_context[i].set) {
+ av_log(h, AV_LOG_DEBUG, "closing client thread idx %d\n", i);
+ libsrt_close_client_thread(h, &s->client_context[i]);
+ }
+ }
+ } else {
+ srt_epoll_release(s->eid[0]);
+ srt_close(s->fd[0]);
+ }
+ av_freep(&s->fd);
+ av_freep(&s->eid);
+ av_freep(&s->client_context);
srt_cleanup();
--
2.39.4
More information about the ffmpeg-devel
mailing list