[FFmpeg-devel] [PATCH 6/8] ffserver.c: Add config file reading
Stephan Holljes
klaxa1337 at googlemail.com
Fri Jun 1 01:20:30 EEST 2018
Signed-off-by: Stephan Holljes <klaxa1337 at googlemail.com>
---
ffserver.c | 248 ++++++++++++++++++++++++++++++++++++++++++-------------------
1 file changed, 172 insertions(+), 76 deletions(-)
diff --git a/ffserver.c b/ffserver.c
index b80a7f8..1363cdc 100644
--- a/ffserver.c
+++ b/ffserver.c
@@ -38,6 +38,7 @@
#include "segment.h"
#include "publisher.h"
#include "httpd.h"
+#include "configreader.h"
#define BUFFER_SECS 30
#define LISTEN_TIMEOUT_MSEC 1000
@@ -54,9 +55,11 @@ struct WriteInfo {
};
struct AcceptInfo {
- struct PublisherContext *pub;
+ struct PublisherContext **pubs;
struct HTTPDInterface *httpd;
- AVFormatContext *ifmt_ctx;
+ AVFormatContext **ifmt_ctxs;
+ struct HTTPDConfig *config;
+ int nb_pub; /* number of publishers (streams) equal to number of ifmt_ctx */
};
@@ -286,52 +289,77 @@ void *accept_thread(void *arg)
{
struct AcceptInfo *info = (struct AcceptInfo*) arg;
struct FFServerInfo *ffinfo = NULL;
+ struct PublisherContext *pub;
char status[4096];
+ char *stream_name;
struct HTTPClient *client = NULL;
void *server = NULL;
AVIOContext *client_ctx = NULL;
AVFormatContext *ofmt_ctx = NULL;
+ AVFormatContext *ifmt_ctx;
unsigned char *avio_buffer;
AVOutputFormat *ofmt;
AVDictionary *mkvopts = NULL;
AVStream *in_stream, *out_stream;
int ret, i, reply_code;
- struct HTTPDConfig config = {
- .bind_address = "0",
- .port = 8080,
- .accept_timeout = LISTEN_TIMEOUT_MSEC,
- };
-
- info->httpd->init(&server, config);
-
-
+ int shutdown;
+ struct HTTPDConfig *config = info->config;
+
+ info->httpd->init(&server, *config);
+
for (;;) {
- if (info->pub->shutdown)
+ shutdown = 1;
+ for (i = 0; i < config->nb_streams; i++) {
+ if (info->pubs[i] && !info->pubs[i]->shutdown)
+ shutdown = 0;
+ }
+ if (shutdown)
break;
- publisher_gen_status_json(info->pub, status);
- av_log(server, AV_LOG_INFO, status);
+ for (i = 0; i < config->nb_streams; i++) {
+ publisher_gen_status_json(info->pubs[i], status);
+ av_log(server, AV_LOG_INFO, status);
+ }
client = NULL;
av_log(server, AV_LOG_DEBUG, "Accepting new clients.\n");
reply_code = 200;
- if (publisher_reserve_client(info->pub)) {
- av_log(client, AV_LOG_WARNING, "No more client slots free, Returning 503.\n");
- reply_code = 503;
- }
-
+
if ((ret = info->httpd->accept(server, &client, reply_code)) < 0) {
if (ret == HTTPD_LISTEN_TIMEOUT) {
- publisher_cancel_reserve(info->pub);
continue;
} else if (ret == HTTPD_CLIENT_ERROR) {
info->httpd->close(server, client);
}
av_log(server, AV_LOG_WARNING, "Error during accept, retrying.\n");
- publisher_cancel_reserve(info->pub);
continue;
}
-
+
+ pub = NULL;
+ ifmt_ctx = NULL;
+ for (i = 0; i < config->nb_streams; i++) {
+ stream_name = info->pubs[i]->stream_name;
+ // skip leading '/' ---v
+ if(!strncmp(client->resource + 1, stream_name, strlen(stream_name))) {
+ pub = info->pubs[i];
+ ifmt_ctx = info->ifmt_ctxs[i];
+ break;
+ }
+ }
+
+ if (!pub || !ifmt_ctx) {
+ av_log(client_ctx, AV_LOG_WARNING, "No suitable publisher found for resource: %s.\n",
+ client->resource ? client->resource : "(null)");
+ reply_code = 404;
+ }
+
+
+ if (pub && ifmt_ctx && publisher_reserve_client(pub)) {
+ av_log(client_ctx, AV_LOG_WARNING, "No more client slots free, Returning 503.\n");
+ reply_code = 503;
+ }
+
if (reply_code != 200) {
- publisher_cancel_reserve(info->pub);
+ if (pub && ifmt_ctx)
+ publisher_cancel_reserve(pub);
info->httpd->close(server, client);
continue;
}
@@ -344,7 +372,7 @@ void *accept_thread(void *arg)
client_ctx = avio_alloc_context(avio_buffer, AV_BUFSIZE, 1, ffinfo, NULL, &ffserver_write, NULL);
if (!client_ctx) {
av_log(client, AV_LOG_ERROR, "Could not allocate output format context.\n");
- publisher_cancel_reserve(info->pub);
+ publisher_cancel_reserve(pub);
info->httpd->close(server, client);
av_free(client_ctx->buffer);
avio_context_free(&client_ctx);
@@ -354,7 +382,7 @@ void *accept_thread(void *arg)
avformat_alloc_output_context2(&ofmt_ctx, NULL, "matroska", NULL);
if (!ofmt_ctx) {
av_log(client, AV_LOG_ERROR, "Could not allocate output format context.\n");
- publisher_cancel_reserve(info->pub);
+ publisher_cancel_reserve(pub);
info->httpd->close(server, client);
avformat_free_context(ofmt_ctx);
av_free(client_ctx->buffer);
@@ -364,7 +392,7 @@ void *accept_thread(void *arg)
}
if ((ret = av_dict_set(&mkvopts, "live", "1", 0)) < 0) {
av_log(client, AV_LOG_ERROR, "Failed to set live mode for matroska: %s\n", av_err2str(ret));
- publisher_cancel_reserve(info->pub);
+ publisher_cancel_reserve(pub);
info->httpd->close(server, client);
avformat_free_context(ofmt_ctx);
av_free(client_ctx->buffer);
@@ -376,13 +404,13 @@ void *accept_thread(void *arg)
ofmt = ofmt_ctx->oformat;
ofmt->flags |= AVFMT_NOFILE | AVFMT_FLAG_AUTO_BSF;
- for (i = 0; i < info->ifmt_ctx->nb_streams; i++) {
- in_stream = info->ifmt_ctx->streams[i];
+ for (i = 0; i < ifmt_ctx->nb_streams; i++) {
+ in_stream = ifmt_ctx->streams[i];
out_stream = avformat_new_stream(ofmt_ctx, NULL);
if (!out_stream) {
av_log(client, AV_LOG_ERROR, "Could not allocate output stream.\n");
- publisher_cancel_reserve(info->pub);
+ publisher_cancel_reserve(pub);
info->httpd->close(server, client);
avformat_free_context(ofmt_ctx);
av_free(client_ctx->buffer);
@@ -394,7 +422,7 @@ void *accept_thread(void *arg)
ret = avcodec_parameters_copy(out_stream->codecpar, in_stream->codecpar);
if (ret < 0) {
av_log(client, AV_LOG_ERROR, "Failed to copy context from input to output stream codec context: %s.\n", av_err2str(ret));
- publisher_cancel_reserve(info->pub);
+ publisher_cancel_reserve(pub);
info->httpd->close(server, client);
avformat_free_context(ofmt_ctx);
av_free(client_ctx->buffer);
@@ -411,12 +439,12 @@ void *accept_thread(void *arg)
}
av_dict_copy(&out_stream->metadata, in_stream->metadata, 0);
}
- av_dict_copy(&info->ifmt_ctx->metadata, ofmt_ctx->metadata, 0);
+ av_dict_copy(&ifmt_ctx->metadata, ofmt_ctx->metadata, 0);
ofmt_ctx->pb = client_ctx;
ret = avformat_write_header(ofmt_ctx, &mkvopts);
if (ret < 0) {
av_log(client, AV_LOG_ERROR, "Could not write header to client: %s.\n", av_err2str(ret));
- publisher_cancel_reserve(info->pub);
+ publisher_cancel_reserve(pub);
info->httpd->close(server, client);
avformat_free_context(ofmt_ctx);
av_free(client_ctx->buffer);
@@ -424,7 +452,7 @@ void *accept_thread(void *arg)
av_free(ffinfo);
continue;
}
- publisher_add_client(info->pub, ofmt_ctx, ffinfo);
+ publisher_add_client(pub, ofmt_ctx, ffinfo);
ofmt_ctx = NULL;
}
@@ -466,59 +494,127 @@ void *write_thread(void *arg)
return NULL;
}
-
-int main(int argc, char *argv[])
-{
- struct ReadInfo rinfo;
+void *run_server(void *arg) {
struct AcceptInfo ainfo;
- struct WriteInfo *winfos;
- struct PublisherContext *pub;
- int ret, i;
- pthread_t r_thread, a_thread;
- pthread_t *w_threads;
+ struct ReadInfo *rinfos;
+ struct WriteInfo **winfos_p;
+ struct HTTPDConfig *config = (struct HTTPDConfig*) arg;
+ struct PublisherContext **pubs;
+ AVFormatContext **ifmt_ctxs;
+ int ret, i, stream_index;
+ pthread_t *r_threads;
+ pthread_t **w_threads_p;
- AVFormatContext *ifmt_ctx = NULL;
-
- rinfo.in_filename = "pipe:0";
- if (argc > 1)
- rinfo.in_filename = argv[1];
+ pubs = av_mallocz(config->nb_streams * sizeof(struct PublisherContext*));
+ ifmt_ctxs = av_mallocz(config->nb_streams * sizeof(AVFormatContext*));
av_log_set_level(AV_LOG_INFO);
- if ((ret = avformat_open_input(&ifmt_ctx, rinfo.in_filename, NULL, NULL))) {
- av_log(NULL, AV_LOG_ERROR, "main: Could not open input\n");
- return 1;
- }
-
- publisher_init(&pub);
-
- rinfo.ifmt_ctx = ifmt_ctx;
- rinfo.pub = pub;
- ainfo.ifmt_ctx = ifmt_ctx;
- ainfo.pub = pub;
+ ainfo.pubs = pubs;
+ ainfo.ifmt_ctxs = ifmt_ctxs;
+ ainfo.nb_pub = config->nb_streams;
ainfo.httpd = &lavfhttpd;
+ ainfo.config = config;
- w_threads = (pthread_t*) av_malloc(sizeof(pthread_t) * pub->nb_threads);
- winfos = (struct WriteInfo*) av_malloc(sizeof(struct WriteInfo) * pub->nb_threads);
+ rinfos = av_mallocz(config->nb_streams * sizeof(struct ReadInfo));
+ winfos_p = av_mallocz(config->nb_streams * sizeof(struct WriteInfo*));
+ r_threads = av_mallocz(config->nb_streams * sizeof(pthread_t));
+ w_threads_p = av_mallocz(config->nb_streams * sizeof(pthread_t*));
- for (i = 0; i < pub->nb_threads; i++) {
- winfos[i].pub = pub;
- winfos[i].thread_id = i;
- pthread_create(&w_threads[i], NULL, write_thread, &winfos[i]);
+ for (stream_index = 0; stream_index < config->nb_streams; stream_index++) {
+ struct PublisherContext *pub = NULL;
+ struct AVFormatContext *ifmt_ctx = NULL;
+ struct ReadInfo rinfo;
+ struct WriteInfo *winfos = NULL;
+ pthread_t *w_threads = NULL;
+ pthread_t r_thread;
+ rinfo.input_uri = config->streams[stream_index].input_uri;
+
+ if ((ret = avformat_open_input(&ifmt_ctx, rinfo.input_uri, NULL, NULL))) {
+ av_log(NULL, AV_LOG_ERROR, "run_server: Could not open input\n");
+ continue;
+ }
+
+ ifmt_ctxs[stream_index] = ifmt_ctx;
+
+ publisher_init(&pub, config->streams[stream_index].stream_name);
+ pubs[stream_index] = pub;
+
+ rinfo.ifmt_ctx = ifmt_ctx;
+ rinfo.pub = pub;
+
+ rinfos[stream_index] = rinfo;
+
+ w_threads = av_malloc(sizeof(pthread_t) * pub->nb_threads);
+ winfos = av_malloc(sizeof(struct WriteInfo) * pub->nb_threads);
+
+ w_threads_p[stream_index] = w_threads;
+ winfos_p[stream_index] = winfos;
+
+ for (i = 0; i < pub->nb_threads; i++) {
+ winfos[i].pub = pub;
+ winfos[i].thread_id = i;
+ pthread_create(&w_threads[i], NULL, write_thread, &winfos_p[stream_index][i]);
+ }
+ w_threads_p[stream_index] = w_threads;
+ pthread_create(&r_thread, NULL, read_thread, &rinfos[stream_index]);
+ r_threads[stream_index] = r_thread;
}
-
- pthread_create(&r_thread, NULL, read_thread, &rinfo);
-
+
+
+ //pthread_create(&a_thread, NULL, accept_thread, &ainfo);
accept_thread(&ainfo);
-
- pthread_join(r_thread, NULL);
-
- for (i = 0; i < pub->nb_threads; i++) {
- pthread_join(w_threads[i], NULL);
+ for (stream_index = 0; stream_index < config->nb_streams; stream_index++) {
+ pthread_join(r_threads[stream_index], NULL);
+ if (pubs[stream_index]) {
+ for (i = 0; i < pubs[stream_index]->nb_threads; i++) {
+ pthread_join(w_threads_p[stream_index][i], NULL);
+ }
+ }
+ av_free(winfos_p[stream_index]);
+ av_free(w_threads_p[stream_index]);
+ // pubs[stream_index] could be null if the file could not be opened
+ if (pubs[stream_index])
+ publisher_free(pubs[stream_index]);
}
- av_free(w_threads);
- av_free(winfos);
-
- publisher_freep(&pub);
+ av_free(rinfos);
+ av_free(winfos_p);
+ av_free(r_threads);
+ av_free(w_threads_p);
+ av_free(pubs);
+ av_free(ifmt_ctxs);
+
+ return NULL;
+}
+
+int main(int argc, char *argv[])
+{
+ struct HTTPDConfig *configs;
+ int nb_configs;
+ pthread_t *server_threads;
+ int i;
+
+ if (argc < 2) {
+ printf("Usage: %s config.lua\n", argv[0]);
+ return 1;
+ }
+
+ nb_configs = configs_read(&configs, argv[1]);
+ if (nb_configs <= 0) {
+ printf("No valid configurations parsed.\n");
+ return 1;
+ }
+ server_threads = av_malloc(nb_configs * sizeof(pthread_t));
+ for (i = 0; i < nb_configs; i++) {
+ config_dump(configs + i);
+ pthread_create(&server_threads[i], NULL, run_server, configs + i);
+ }
+
+ for (i = 0; i < nb_configs; i++) {
+ pthread_join(server_threads[i], NULL);
+ config_free(configs + i);
+ }
+ av_free(configs);
+ av_free(server_threads);
return 0;
}
--
2.16.2
More information about the ffmpeg-devel
mailing list