[FFmpeg-devel] [PATCH 2/2] ffmpeg: use thread message API.

Nicolas George george at nsup.org
Thu Feb 20 16:22:14 CET 2014


Signed-off-by: Nicolas George <george at nsup.org>
---
 ffmpeg.c | 98 ++++++++++++++++++----------------------------------------------
 ffmpeg.h |  5 ++--
 2 files changed, 29 insertions(+), 74 deletions(-)


This applies on top of "ffmpeg: make reading packets from thread blocking.",
because the logic to determine if the file should be non-blocking is still
used.

This makes the helgrind error disappear.


diff --git a/ffmpeg.c b/ffmpeg.c
index 28bebe5..d3defff 100644
--- a/ffmpeg.c
+++ b/ffmpeg.c
@@ -59,6 +59,7 @@
 #include "libavutil/timestamp.h"
 #include "libavutil/bprint.h"
 #include "libavutil/time.h"
+#include "libavutil/threadmessage.h"
 #include "libavformat/os_support.h"
 
 #include "libavformat/ffm.h" // not public API
@@ -137,11 +138,6 @@ AVIOContext *progress_avio = NULL;
 
 static uint8_t *subtitle_out;
 
-#if HAVE_PTHREADS
-/* signal to input threads that they should exit; set by the main thread */
-static int transcoding_finished;
-#endif
-
 #define DEFAULT_PASS_LOGFILENAME_PREFIX "ffmpeg2pass"
 
 InputStream **input_streams = NULL;
@@ -2943,32 +2939,31 @@ static void *input_thread(void *arg)
     InputFile *f = arg;
     int ret = 0;
 
-    while (!transcoding_finished && ret >= 0) {
+    while (1) {
         AVPacket pkt;
         ret = av_read_frame(f->ctx, &pkt);
 
         if (ret == AVERROR(EAGAIN)) {
             av_usleep(10000);
-            ret = 0;
             continue;
-        } else if (ret < 0)
+        }
+        if (ret < 0) {
+            av_thread_message_queue_set_err_recv(f->in_thread_queue, ret);
             break;
-
-        pthread_mutex_lock(&f->fifo_lock);
-        while (!av_fifo_space(f->fifo))
-            pthread_cond_wait(&f->fifo_cond, &f->fifo_lock);
-
+        }
         av_dup_packet(&pkt);
-        av_fifo_generic_write(f->fifo, &pkt, sizeof(pkt), NULL);
-        pthread_cond_signal(&f->fifo_cond);
-
-        pthread_mutex_unlock(&f->fifo_lock);
+        ret = av_thread_message_queue_send(f->in_thread_queue, &pkt, 0);
+        if (ret < 0) {
+            if (ret != AVERROR_EOF)
+                av_log(f->ctx, AV_LOG_ERROR,
+                       "Unable to send packet to main thread: %s\n",
+                       av_err2str(ret));
+            av_free_packet(&pkt);
+            av_thread_message_queue_set_err_recv(f->in_thread_queue, ret);
+            break;
+        }
     }
 
-    pthread_mutex_lock(&f->fifo_lock);
-    f->finished = 1;
-    pthread_cond_signal(&f->fifo_cond);
-    pthread_mutex_unlock(&f->fifo_lock);
     return NULL;
 }
 
@@ -2976,34 +2971,19 @@ static void free_input_threads(void)
 {
     int i;
 
-    if (nb_input_files == 1)
-        return;
-
-    transcoding_finished = 1;
-
     for (i = 0; i < nb_input_files; i++) {
         InputFile *f = input_files[i];
         AVPacket pkt;
 
-        if (!f->fifo || f->joined)
+        if (!f->in_thread_queue)
             continue;
-
-        pthread_mutex_lock(&f->fifo_lock);
-        while (av_fifo_size(f->fifo)) {
-            av_fifo_generic_read(f->fifo, &pkt, sizeof(pkt), NULL);
+        av_thread_message_queue_set_err_send(f->in_thread_queue, AVERROR_EOF);
+        while (av_thread_message_queue_recv(f->in_thread_queue, &pkt, 0) >= 0)
             av_free_packet(&pkt);
-        }
-        pthread_cond_signal(&f->fifo_cond);
-        pthread_mutex_unlock(&f->fifo_lock);
 
         pthread_join(f->thread, NULL);
         f->joined = 1;
-
-        while (av_fifo_size(f->fifo)) {
-            av_fifo_generic_read(f->fifo, &pkt, sizeof(pkt), NULL);
-            av_free_packet(&pkt);
-        }
-        av_fifo_free(f->fifo);
+        av_thread_message_queue_free(&f->in_thread_queue);
     }
 }
 
@@ -3017,15 +2997,13 @@ static int init_input_threads(void)
     for (i = 0; i < nb_input_files; i++) {
         InputFile *f = input_files[i];
 
-        if (!(f->fifo = av_fifo_alloc(8*sizeof(AVPacket))))
-            return AVERROR(ENOMEM);
-
         if (f->ctx->pb ? !f->ctx->pb->seekable :
             strcmp(f->ctx->iformat->name, "lavfi"))
             f->non_blocking = 1;
-
-        pthread_mutex_init(&f->fifo_lock, NULL);
-        pthread_cond_init (&f->fifo_cond, NULL);
+        ret = av_thread_message_queue_alloc(&f->in_thread_queue,
+                                            sizeof(AVPacket), 8);
+        if (ret < 0)
+            return ret;
 
         if ((ret = pthread_create(&f->thread, NULL, input_thread, f)))
             return AVERROR(ret);
@@ -3035,31 +3013,9 @@ static int init_input_threads(void)
 
 static int get_input_packet_mt(InputFile *f, AVPacket *pkt)
 {
-    int ret = 0;
-
-    pthread_mutex_lock(&f->fifo_lock);
-
-    while (1) {
-    if (av_fifo_size(f->fifo)) {
-        av_fifo_generic_read(f->fifo, pkt, sizeof(*pkt), NULL);
-        pthread_cond_signal(&f->fifo_cond);
-        break;
-    } else {
-        if (f->finished) {
-            ret = AVERROR_EOF;
-            break;
-        }
-        if (f->non_blocking) {
-            ret = AVERROR(EAGAIN);
-            break;
-        }
-        pthread_cond_wait(&f->fifo_cond, &f->fifo_lock);
-    }
-    }
-
-    pthread_mutex_unlock(&f->fifo_lock);
-
-    return ret;
+    return av_thread_message_queue_recv(f->in_thread_queue, pkt,
+                                        f->non_blocking ?
+                                        AV_THREAD_MESSAGE_NONBLOCK : 0);
 }
 #endif
 
diff --git a/ffmpeg.h b/ffmpeg.h
index 151667d..8e3eefb 100644
--- a/ffmpeg.h
+++ b/ffmpeg.h
@@ -44,6 +44,7 @@
 #include "libavutil/fifo.h"
 #include "libavutil/pixfmt.h"
 #include "libavutil/rational.h"
+#include "libavutil/threadmessage.h"
 
 #include "libswresample/swresample.h"
 
@@ -326,13 +327,11 @@ typedef struct InputFile {
     int accurate_seek;
 
 #if HAVE_PTHREADS
+    AVThreadMessageQueue *in_thread_queue;
     pthread_t thread;           /* thread reading from this file */
     int non_blocking;           /* reading packets from the thread should not block */
     int finished;               /* the thread has exited */
     int joined;                 /* the thread has been joined */
-    pthread_mutex_t fifo_lock;  /* lock for access to fifo */
-    pthread_cond_t  fifo_cond;  /* the main thread will signal on this cond after reading from fifo */
-    AVFifoBuffer *fifo;         /* demuxed packets are stored here; freed by the main thread */
 #endif
 } InputFile;
 
-- 
1.8.5.3



More information about the ffmpeg-devel mailing list