[FFmpeg-devel] [PATCH 6/8] ffserver.c: Add config file reading

Stephan Holljes klaxa1337 at googlemail.com
Fri Jun 1 01:20:30 EEST 2018


Signed-off-by: Stephan Holljes <klaxa1337 at googlemail.com>
---
 ffserver.c | 248 ++++++++++++++++++++++++++++++++++++++++++-------------------
 1 file changed, 172 insertions(+), 76 deletions(-)

diff --git a/ffserver.c b/ffserver.c
index b80a7f8..1363cdc 100644
--- a/ffserver.c
+++ b/ffserver.c
@@ -38,6 +38,7 @@
 #include "segment.h"
 #include "publisher.h"
 #include "httpd.h"
+#include "configreader.h"
 
 #define BUFFER_SECS 30
 #define LISTEN_TIMEOUT_MSEC 1000
@@ -54,9 +55,11 @@ struct WriteInfo {
 };
 
 struct AcceptInfo {
-    struct PublisherContext *pub;
+    struct PublisherContext **pubs;
     struct HTTPDInterface *httpd;
-    AVFormatContext *ifmt_ctx;
+    AVFormatContext **ifmt_ctxs;
+    struct HTTPDConfig *config;
+    int nb_pub; /* number of publishers (streams) equal to number of ifmt_ctx */
 };
 
 
@@ -286,52 +289,77 @@ void *accept_thread(void *arg)
 {
     struct AcceptInfo *info = (struct AcceptInfo*) arg;
     struct FFServerInfo *ffinfo = NULL;
+    struct PublisherContext *pub;
     char status[4096];
+    char *stream_name;
     struct HTTPClient *client = NULL;
     void *server = NULL;
     AVIOContext *client_ctx = NULL;
     AVFormatContext *ofmt_ctx = NULL;
+    AVFormatContext *ifmt_ctx;
     unsigned char *avio_buffer;
     AVOutputFormat *ofmt;
     AVDictionary *mkvopts = NULL;
     AVStream *in_stream, *out_stream;
     int ret, i, reply_code;
-    struct HTTPDConfig config = {
-        .bind_address = "0",
-        .port = 8080,
-        .accept_timeout = LISTEN_TIMEOUT_MSEC,
-    };
-    
-    info->httpd->init(&server, config);
-    
-    
+    int shutdown;
+    struct HTTPDConfig *config = info->config;
+
+    info->httpd->init(&server, *config);
+
     for (;;) {
-        if (info->pub->shutdown)
+        shutdown = 1;
+        for (i = 0; i < config->nb_streams; i++) {
+            if (info->pubs[i] && !info->pubs[i]->shutdown)
+                shutdown = 0;
+        }
+        if (shutdown)
             break;
-        publisher_gen_status_json(info->pub, status);
-        av_log(server, AV_LOG_INFO, status);
+        for (i = 0; i < config->nb_streams; i++) {
+            publisher_gen_status_json(info->pubs[i], status);
+            av_log(server, AV_LOG_INFO, status);
+        }
         client = NULL;
         av_log(server, AV_LOG_DEBUG, "Accepting new clients.\n");
         reply_code = 200;
-        if (publisher_reserve_client(info->pub)) {
-            av_log(client, AV_LOG_WARNING, "No more client slots free, Returning 503.\n");
-            reply_code = 503;
-        }
-        
+
         if ((ret = info->httpd->accept(server, &client, reply_code)) < 0) {
             if (ret == HTTPD_LISTEN_TIMEOUT) {
-                publisher_cancel_reserve(info->pub);
                 continue;
             } else if (ret == HTTPD_CLIENT_ERROR) {
                 info->httpd->close(server, client);
             }
             av_log(server, AV_LOG_WARNING, "Error during accept, retrying.\n");
-            publisher_cancel_reserve(info->pub);
             continue;
         }
-        
+
+        pub = NULL;
+        ifmt_ctx = NULL;
+        for (i = 0; i < config->nb_streams; i++) {
+            stream_name = info->pubs[i]->stream_name;
+            //       skip leading '/'  ---v
+            if(!strncmp(client->resource + 1, stream_name, strlen(stream_name))) {
+                pub = info->pubs[i];
+                ifmt_ctx = info->ifmt_ctxs[i];
+                break;
+            }
+        }
+
+        if (!pub || !ifmt_ctx) {
+            av_log(client_ctx, AV_LOG_WARNING, "No suitable publisher found for resource: %s.\n",
+                                                        client->resource ? client->resource : "(null)");
+            reply_code = 404;
+        }
+
+
+        if (pub && ifmt_ctx && publisher_reserve_client(pub)) {
+            av_log(client_ctx, AV_LOG_WARNING, "No more client slots free, Returning 503.\n");
+            reply_code = 503;
+        }
+
         if (reply_code != 200) {
-            publisher_cancel_reserve(info->pub);
+            if (pub && ifmt_ctx)
+                publisher_cancel_reserve(pub);
             info->httpd->close(server, client);
             continue;
         }
@@ -344,7 +372,7 @@ void *accept_thread(void *arg)
         client_ctx = avio_alloc_context(avio_buffer, AV_BUFSIZE, 1, ffinfo, NULL, &ffserver_write, NULL);
         if (!client_ctx) {
             av_log(client, AV_LOG_ERROR, "Could not allocate output format context.\n");
-            publisher_cancel_reserve(info->pub);
+            publisher_cancel_reserve(pub);
             info->httpd->close(server, client);
             av_free(client_ctx->buffer);
             avio_context_free(&client_ctx);
@@ -354,7 +382,7 @@ void *accept_thread(void *arg)
         avformat_alloc_output_context2(&ofmt_ctx, NULL, "matroska", NULL);
         if (!ofmt_ctx) {
             av_log(client, AV_LOG_ERROR, "Could not allocate output format context.\n");
-            publisher_cancel_reserve(info->pub);
+            publisher_cancel_reserve(pub);
             info->httpd->close(server, client);
             avformat_free_context(ofmt_ctx);
             av_free(client_ctx->buffer);
@@ -364,7 +392,7 @@ void *accept_thread(void *arg)
         }
         if ((ret = av_dict_set(&mkvopts, "live", "1", 0)) < 0) {
             av_log(client, AV_LOG_ERROR, "Failed to set live mode for matroska: %s\n", av_err2str(ret));
-            publisher_cancel_reserve(info->pub);
+            publisher_cancel_reserve(pub);
             info->httpd->close(server, client);
             avformat_free_context(ofmt_ctx);
             av_free(client_ctx->buffer);
@@ -376,13 +404,13 @@ void *accept_thread(void *arg)
         ofmt = ofmt_ctx->oformat;
         ofmt->flags |= AVFMT_NOFILE | AVFMT_FLAG_AUTO_BSF;
         
-        for (i = 0; i < info->ifmt_ctx->nb_streams; i++) {
-            in_stream = info->ifmt_ctx->streams[i];
+        for (i = 0; i < ifmt_ctx->nb_streams; i++) {
+            in_stream = ifmt_ctx->streams[i];
             out_stream = avformat_new_stream(ofmt_ctx, NULL);
             
             if (!out_stream) {
                 av_log(client, AV_LOG_ERROR, "Could not allocate output stream.\n");
-                publisher_cancel_reserve(info->pub);
+                publisher_cancel_reserve(pub);
                 info->httpd->close(server, client);
                 avformat_free_context(ofmt_ctx);
                 av_free(client_ctx->buffer);
@@ -394,7 +422,7 @@ void *accept_thread(void *arg)
             ret = avcodec_parameters_copy(out_stream->codecpar, in_stream->codecpar);
             if (ret < 0) {
                 av_log(client, AV_LOG_ERROR, "Failed to copy context from input to output stream codec context: %s.\n", av_err2str(ret));
-                publisher_cancel_reserve(info->pub);
+                publisher_cancel_reserve(pub);
                 info->httpd->close(server, client);
                 avformat_free_context(ofmt_ctx);
                 av_free(client_ctx->buffer);
@@ -411,12 +439,12 @@ void *accept_thread(void *arg)
             }
             av_dict_copy(&out_stream->metadata, in_stream->metadata, 0);
         }
-        av_dict_copy(&info->ifmt_ctx->metadata, ofmt_ctx->metadata, 0);
+        av_dict_copy(&ifmt_ctx->metadata, ofmt_ctx->metadata, 0);
         ofmt_ctx->pb = client_ctx;
         ret = avformat_write_header(ofmt_ctx, &mkvopts);
         if (ret < 0) {
             av_log(client, AV_LOG_ERROR, "Could not write header to client: %s.\n", av_err2str(ret));
-            publisher_cancel_reserve(info->pub);
+            publisher_cancel_reserve(pub);
             info->httpd->close(server, client);
             avformat_free_context(ofmt_ctx);
             av_free(client_ctx->buffer);
@@ -424,7 +452,7 @@ void *accept_thread(void *arg)
             av_free(ffinfo);
             continue;
         }
-        publisher_add_client(info->pub, ofmt_ctx, ffinfo);
+        publisher_add_client(pub, ofmt_ctx, ffinfo);
         ofmt_ctx = NULL;
         
     }
@@ -466,59 +494,127 @@ void *write_thread(void *arg)
     return NULL;
 }
 
-
-int main(int argc, char *argv[])
-{
-    struct ReadInfo rinfo;
+void *run_server(void *arg) {
     struct AcceptInfo ainfo;
-    struct WriteInfo *winfos;
-    struct PublisherContext *pub;
-    int ret, i;
-    pthread_t r_thread, a_thread;
-    pthread_t *w_threads;
+    struct ReadInfo *rinfos;
+    struct WriteInfo **winfos_p;
+    struct HTTPDConfig *config = (struct HTTPDConfig*) arg;
+    struct PublisherContext **pubs;
+    AVFormatContext **ifmt_ctxs;
+    int ret, i, stream_index;
+    pthread_t *r_threads;
+    pthread_t **w_threads_p;
     
-    AVFormatContext *ifmt_ctx = NULL;
-    
-    rinfo.in_filename = "pipe:0";
-    if (argc > 1)
-        rinfo.in_filename = argv[1];
+    pubs = av_mallocz(config->nb_streams * sizeof(struct PublisherContext*));
+    ifmt_ctxs = av_mallocz(config->nb_streams * sizeof(AVFormatContext*));
     
     av_log_set_level(AV_LOG_INFO);
     
-    if ((ret = avformat_open_input(&ifmt_ctx, rinfo.in_filename, NULL, NULL))) {
-        av_log(NULL, AV_LOG_ERROR, "main: Could not open input\n");
-        return 1;
-    }
-    
-    publisher_init(&pub);
-    
-    rinfo.ifmt_ctx = ifmt_ctx;
-    rinfo.pub = pub;
-    ainfo.ifmt_ctx = ifmt_ctx;
-    ainfo.pub = pub;
+    ainfo.pubs = pubs;
+    ainfo.ifmt_ctxs = ifmt_ctxs;
+    ainfo.nb_pub = config->nb_streams;
     ainfo.httpd = &lavfhttpd;
+    ainfo.config = config;
     
-    w_threads = (pthread_t*) av_malloc(sizeof(pthread_t) * pub->nb_threads);
-    winfos = (struct WriteInfo*) av_malloc(sizeof(struct WriteInfo) * pub->nb_threads);
+    rinfos = av_mallocz(config->nb_streams * sizeof(struct ReadInfo));
+    winfos_p = av_mallocz(config->nb_streams * sizeof(struct WriteInfo*));
+    r_threads = av_mallocz(config->nb_streams * sizeof(pthread_t));
+    w_threads_p = av_mallocz(config->nb_streams * sizeof(pthread_t*));
     
-    for (i = 0; i < pub->nb_threads; i++) {
-        winfos[i].pub = pub;
-        winfos[i].thread_id = i;
-        pthread_create(&w_threads[i], NULL, write_thread, &winfos[i]);
+    for (stream_index = 0; stream_index < config->nb_streams; stream_index++) {
+        struct PublisherContext *pub = NULL;
+        struct AVFormatContext *ifmt_ctx = NULL;
+        struct ReadInfo rinfo;
+        struct WriteInfo *winfos = NULL;
+        pthread_t *w_threads = NULL;
+        pthread_t r_thread;
+        rinfo.input_uri = config->streams[stream_index].input_uri;
+
+        if ((ret = avformat_open_input(&ifmt_ctx, rinfo.input_uri, NULL, NULL))) {
+            av_log(NULL, AV_LOG_ERROR, "run_server: Could not open input\n");
+            continue;
+        }
+
+        ifmt_ctxs[stream_index] = ifmt_ctx;
+
+        publisher_init(&pub, config->streams[stream_index].stream_name);
+        pubs[stream_index] = pub;
+
+        rinfo.ifmt_ctx = ifmt_ctx;
+        rinfo.pub = pub;
+
+        rinfos[stream_index] = rinfo;
+
+        w_threads = av_malloc(sizeof(pthread_t) * pub->nb_threads);
+        winfos = av_malloc(sizeof(struct WriteInfo) * pub->nb_threads);
+
+        w_threads_p[stream_index] = w_threads;
+        winfos_p[stream_index] = winfos;
+
+        for (i = 0; i < pub->nb_threads; i++) {
+            winfos[i].pub = pub;
+            winfos[i].thread_id = i;
+            pthread_create(&w_threads[i], NULL, write_thread, &winfos_p[stream_index][i]);
+        }
+        w_threads_p[stream_index] = w_threads;
+        pthread_create(&r_thread, NULL, read_thread, &rinfos[stream_index]);
+        r_threads[stream_index] = r_thread;
     }
-    
-    pthread_create(&r_thread, NULL, read_thread, &rinfo);
-    
+
+
+    //pthread_create(&a_thread, NULL, accept_thread, &ainfo);
     accept_thread(&ainfo);
-    
-    pthread_join(r_thread, NULL);
-    
-    for (i = 0; i < pub->nb_threads; i++) {
-        pthread_join(w_threads[i], NULL);
+    for (stream_index = 0; stream_index < config->nb_streams; stream_index++) {
+        pthread_join(r_threads[stream_index], NULL);
+        if (pubs[stream_index]) {
+            for (i = 0; i < pubs[stream_index]->nb_threads; i++) {
+                pthread_join(w_threads_p[stream_index][i], NULL);
+            }
+        }
+        av_free(winfos_p[stream_index]);
+        av_free(w_threads_p[stream_index]);
+        // pubs[stream_index] could be null if the file could not be opened
+        if (pubs[stream_index])
+            publisher_free(pubs[stream_index]);
     }
-    av_free(w_threads);
-    av_free(winfos);
-    
-    publisher_freep(&pub);
+    av_free(rinfos);
+    av_free(winfos_p);
+    av_free(r_threads);
+    av_free(w_threads_p);
+    av_free(pubs);
+    av_free(ifmt_ctxs);
+
+    return NULL;
+}
+
+int main(int argc, char *argv[])
+{
+    struct HTTPDConfig *configs;
+    int nb_configs;
+    pthread_t *server_threads;
+    int i;
+
+    if (argc < 2) {
+        printf("Usage: %s config.lua\n", argv[0]);
+        return 1;
+    }
+
+    nb_configs = configs_read(&configs, argv[1]);
+    if (nb_configs <= 0) {
+        printf("No valid configurations parsed.\n");
+        return 1;
+    }
+    server_threads = av_malloc(nb_configs * sizeof(pthread_t));
+    for (i = 0; i < nb_configs; i++) {
+        config_dump(configs + i);
+        pthread_create(&server_threads[i], NULL, run_server, configs + i);
+    }
+
+    for (i = 0; i < nb_configs; i++) {
+        pthread_join(server_threads[i], NULL);
+        config_free(configs + i);
+    }
+    av_free(configs);
+    av_free(server_threads);
     return 0;
 }
-- 
2.16.2



More information about the ffmpeg-devel mailing list