[FFmpeg-devel] [PATCH 10/11] avformat/fifo: Add AVFMT_FLAG_NONBLOCK support

sebechlebskyjan at gmail.com sebechlebskyjan at gmail.com
Tue Aug 2 16:24:21 EEST 2016


From: Jan Sebechlebsky <sebechlebskyjan at gmail.com>

Add support for nonblocking calls.

Signed-off-by: Jan Sebechlebsky <sebechlebskyjan at gmail.com>
---
 libavformat/fifo.c | 70 +++++++++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 59 insertions(+), 11 deletions(-)

diff --git a/libavformat/fifo.c b/libavformat/fifo.c
index bc8c973..efd91e3 100644
--- a/libavformat/fifo.c
+++ b/libavformat/fifo.c
@@ -25,6 +25,7 @@
 #include "avformat.h"
 #include "internal.h"
 #include "pthread.h"
+#include "url.h"
 
 #define FIFO_DEFAULT_QUEUE_SIZE              60
 #define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS   16
@@ -77,6 +78,13 @@ typedef struct FifoContext {
      * from failure or queue overflow */
     uint8_t restart_with_keyframe;
 
+    /* Whether termination was requested by invoking deinit
+     * before the thread was finished. Used only in non-blocking
+     * mode - when AVFMT_FLAG_NONBLOCK is set. */
+    volatile uint8_t termination_requested;
+
+    /* Original interrupt callback of the underlying muxer. */
+    AVIOInterruptCB orig_interrupt_callback;
 } FifoContext;
 
 typedef struct FifoThreadContext {
@@ -110,6 +118,16 @@ typedef struct FifoMessage {
     AVPacket pkt;
 } FifoMessage;
 
+static int fifo_interrupt_callback_wrapper(void *arg)
+{
+    FifoContext *ctx = arg;
+
+    if (ctx->termination_requested)
+        return 1;
+
+    return ff_check_interrupt(&ctx->orig_interrupt_callback);
+}
+
 static int fifo_thread_write_header(FifoThreadContext *ctx)
 {
     AVFormatContext *avf = ctx->avf;
@@ -448,6 +466,8 @@ static void *fifo_consumer_thread(void *data)
 static int fifo_mux_init(AVFormatContext *avf)
 {
     FifoContext *fifo = avf->priv_data;
+    AVIOInterruptCB interrupt_cb = {.callback = fifo_interrupt_callback_wrapper,
+                                    .opaque = fifo};
     AVFormatContext *avf2;
     int ret = 0, i;
 
@@ -458,7 +478,8 @@ static int fifo_mux_init(AVFormatContext *avf)
 
     fifo->avf = avf2;
 
-    avf2->interrupt_callback = avf->interrupt_callback;
+    fifo->orig_interrupt_callback = avf->interrupt_callback;
+    avf2->interrupt_callback = interrupt_cb;
     avf2->max_delay = avf->max_delay;
     ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
     if (ret < 0)
@@ -543,7 +564,7 @@ static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
 {
     FifoContext *fifo = avf->priv_data;
     FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT};
-    int ret;
+    int ret, queue_flags = 0;
 
     if (pkt) {
         av_init_packet(&msg.pkt);
@@ -552,15 +573,21 @@ static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
             return ret;
     }
 
-    ret = av_thread_message_queue_send(fifo->queue, &msg,
-                                       fifo->drop_pkts_on_overflow ?
-                                       AV_THREAD_MESSAGE_NONBLOCK : 0);
+    if (fifo->drop_pkts_on_overflow || (avf->flags & AVFMT_FLAG_NONBLOCK))
+        queue_flags |= AVFMT_FLAG_NONBLOCK;
+
+    ret = av_thread_message_queue_send(fifo->queue, &msg, queue_flags);
+
     if (ret == AVERROR(EAGAIN)) {
-        uint8_t overflow_set = 0;
+        uint8_t overflow_set;
+
+        if (avf->flags & AVFMT_FLAG_NONBLOCK)
+            return ret;
 
         /* Queue is full, set fifo->overflow_flag to 1
          * to let consumer thread know the queue should
          * be flushed. */
+        overflow_set = 0;
         pthread_mutex_lock(&fifo->overflow_flag_lock);
         if (!fifo->overflow_flag)
             fifo->overflow_flag = overflow_set = 1;
@@ -588,11 +615,22 @@ static int fifo_write_trailer(AVFormatContext *avf)
 
     av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
 
-    ret = pthread_join( fifo->writer_thread, NULL);
-    if (ret < 0) {
-        av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
-               av_err2str(AVERROR(ret)));
-        return AVERROR(ret);
+    if (!(avf->flags & AVFMT_FLAG_NONBLOCK)) {
+        ret = pthread_join( fifo->writer_thread, NULL);
+        if (ret < 0) {
+            av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
+                av_err2str(AVERROR(ret)));
+            return AVERROR(ret);
+        }
+    } else {
+        ret = pthread_tryjoin_np( fifo->writer_thread, NULL);
+        if (ret == EBUSY)
+            return AVERROR(EAGAIN);
+        if (ret) {
+            av_log(avf, AV_LOG_ERROR, "pthread_tryjoin_np error: %s\n",
+                   av_err2str(AVERROR(ret)));
+            return AVERROR(ret);
+        }
     }
 
     ret = fifo->write_trailer_ret;
@@ -603,6 +641,16 @@ static void fifo_deinit(AVFormatContext *avf)
 {
     FifoContext *fifo = avf->priv_data;
 
+    if (avf->flags & AVFMT_FLAG_NONBLOCK) {
+        int ret;
+        fifo->termination_requested = 1;
+        ret = pthread_join( fifo->writer_thread, NULL);
+        if (ret < 0) {
+            av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
+                   av_err2str(AVERROR(ret)));
+        }
+    }
+
     if (fifo->format_options)
         av_dict_free(&fifo->format_options);
 
-- 
1.9.1



More information about the ffmpeg-devel mailing list