[FFmpeg-devel] [PATCH 1/2] Audio Video Filtering using threads & semaphores

Nicolas George nicolas.george at normalesup.org
Thu Jul 26 17:00:47 CEST 2012


Le tridi 3 messidor, an CCXX, Manjunath Siddaiah a écrit :
> Ok, semaphores are replaced by POSIX mutexes and condition variables.
> This time only changes are in ffmpeg.c and 
> no configure file changes and remains the same.

Thanks for the patch. I am afraid that with all the time that has passed, it
will need to be rebased and adapted. In particular, the logic of the
poll_filter function has been altered. Sorry for the delay.

Here are my comments.

As far as I understand, you intend to run the whole filtering process in a
different thread from the ffmpeg command-line tool. Is that right?

> diff --git a/ffmpeg.c b/ffmpeg.c
> old mode 100644
> new mode 100755
> index 17fe6e5..bcf23ca
> --- a/ffmpeg.c
> +++ b/ffmpeg.c
> @@ -327,6 +327,18 @@ typedef struct OutputStream {
>      int copy_initial_nonkeyframes;
>  
>      int keep_pix_fmt;
> +#if CONFIG_AV_FILTER_THREADS
> +    int avfilter_thread_alive;
> +    pthread_t avfilter_thread;
> +    pthread_mutex_t mutex_avfilter;
> +    pthread_cond_t cond_avfilter;
> +    int flag_avfilter;
> +    pthread_mutex_t mutex_encoder;
> +    pthread_cond_t cond_encoder;
> +    int flag_encoder;
> +    int graph_id;
> +    int avfilter_ret;
> +#endif

Some comments about what the variables mean, and their relationships (who is
locking what) would be useful.

>  } OutputStream;
>  
>  
> @@ -1894,6 +1906,144 @@ static void do_video_stats(AVFormatContext *os, OutputStream *ost,
>      }
>  }
>  
> +#if CONFIG_AV_FILTER_THREADS
> +static int avfilter_frame(void *p)

It looks like this function is used as argument to pthread_create. The
prototype is wrong, and gratuitously so since the return value is
meaningless. Did your compiler not produce a warning for that?

> +{
> +    OutputStream *ost = (OutputStream *)p;
> +    while (ost->avfilter_thread_alive) {

ost->avfilter_thread_alive is shared: theoretically it must be protected by
a mutex.

> +        pthread_mutex_lock(&ost->mutex_avfilter);
> +        while (!ost->flag_avfilter)
> +            pthread_cond_wait(&ost->cond_avfilter, &ost->mutex_avfilter);
> +
> +        ost->avfilter_ret = avfilter_graph_request_oldest(filtergraphs[ost->graph_id]->graph);
> +        pthread_mutex_unlock(&ost->mutex_avfilter);
> +        ost->flag_avfilter = 0;

It looks wrong: you are doing the expensive stuff
(avfilter_graph_request_oldest is what triggers most of the work) while
holding the lock, and you release the lock just before you alter the
variable with the same name as the lock.

> +        
> +        pthread_mutex_lock(&ost->mutex_encoder);
> +        ost->flag_encoder = 1;
> +        pthread_cond_signal(&ost->cond_encoder);
> +        pthread_mutex_unlock(&ost->mutex_encoder);
> +    }
> +    return 0;
> +}
> +/* check for new output on any of the filtergraphs */
> +static int poll_filters(InputStream *ist)
> +{
> +    AVFilterBufferRef *picref;
> +    AVFrame *filtered_frame = NULL;
> +    int i, ret, ret_all;
> +    unsigned nb_success, nb_eof;
> +    int64_t frame_pts;
> +
> +    if (ist->st->codec->codec_type != AVMEDIA_TYPE_AUDIO && ist->st->codec->codec_type != AVMEDIA_TYPE_VIDEO)
> +        return 0;
> +
> +    ret_all = nb_success = nb_eof = 0;
> +    for (i = 0; i < nb_output_streams; i++) {
> +        OutputStream *ost = output_streams[i];
> +        if (ist->st->codec->codec_type == ost->st->codec->codec_type) {
> +            pthread_mutex_lock(&ost->mutex_encoder);
> +            while (!ost->flag_encoder)
> +                pthread_cond_wait(&ost->cond_encoder, &ost->mutex_encoder);
> +            pthread_mutex_unlock(&ost->mutex_encoder);
> +            ost->flag_encoder = 0;
> +            if (!ost->avfilter_ret) {
> +                nb_success++;
> +            } else if (ost->avfilter_ret == AVERROR_EOF) {
> +                nb_eof++;
> +            } else if (ost->avfilter_ret != AVERROR(EAGAIN)) {
> +                char buf[256];
> +                av_strerror(ost->avfilter_ret, buf, sizeof(buf));
> +                av_log(NULL, AV_LOG_WARNING,
> +                       "Error in request_frame(): %s\n", buf);
> +                ret_all = ost->avfilter_ret;
> +            }
> +        }
> +    }
> +    if (!nb_success)
> +        return nb_eof == ist->nb_filters ? AVERROR_EOF : ret_all;
> +
> +    /* Reap all buffers present in the buffer sinks */
> +    for (i = 0; i < nb_output_streams; i++) {
> +        OutputStream *ost = output_streams[i];
> +        OutputFile    *of = output_files[ost->file_index];
> +        int ret = 0;
> +
> +        if (!ost->filter)
> +            continue;
> +
> +        if (!ost->filtered_frame && !(ost->filtered_frame = avcodec_alloc_frame())) {
> +            return AVERROR(ENOMEM);
> +        } else
> +            avcodec_get_frame_defaults(ost->filtered_frame);
> +        filtered_frame = ost->filtered_frame;
> +
> +        while (!ost->is_past_recording_time) {
> +            if (ost->enc->type == AVMEDIA_TYPE_AUDIO &&
> +                !(ost->enc->capabilities & CODEC_CAP_VARIABLE_FRAME_SIZE))
> +                ret = av_buffersink_read_samples(ost->filter->filter, &picref,
> +                                                ost->st->codec->frame_size);
> +            else
> +#ifdef SINKA
> +                ret = av_buffersink_read(ost->filter->filter, &picref);
> +#else
> +                ret = av_buffersink_get_buffer_ref(ost->filter->filter, &picref,
> +                                                   AV_BUFFERSINK_FLAG_NO_REQUEST);
> +#endif
> +            if (ret < 0) {
> +                if (ret != AVERROR(EAGAIN) && ret != AVERROR_EOF) {
> +                    char buf[256];
> +                    av_strerror(ret, buf, sizeof(buf));
> +                    av_log(NULL, AV_LOG_WARNING,
> +                           "Error in av_buffersink_get_buffer_ref(): %s\n", buf);
> +                }
> +                break;
> +            }
> +            frame_pts = AV_NOPTS_VALUE;
> +            if (picref->pts != AV_NOPTS_VALUE) {
> +                filtered_frame->pts = frame_pts = av_rescale_q(picref->pts,
> +                                                ost->filter->filter->inputs[0]->time_base,
> +                                                ost->st->codec->time_base) -
> +                                    av_rescale_q(of->start_time,
> +                                                AV_TIME_BASE_Q,
> +                                                ost->st->codec->time_base);
> +
> +                if (of->start_time && filtered_frame->pts < 0) {
> +                    avfilter_unref_buffer(picref);
> +                    continue;
> +                }
> +            }
> +            //if (ost->source_index >= 0)
> +            //    *filtered_frame= *input_streams[ost->source_index]->decoded_frame; //for me_threshold
> +
> +
> +            switch (ost->filter->filter->inputs[0]->type) {
> +            case AVMEDIA_TYPE_VIDEO:
> +                avfilter_fill_frame_from_video_buffer_ref(filtered_frame, picref);
> +                filtered_frame->pts = frame_pts;
> +                if (!ost->frame_aspect_ratio)
> +                    ost->st->codec->sample_aspect_ratio = picref->video->sample_aspect_ratio;
> +
> +                do_video_out(of->ctx, ost, filtered_frame,
> +                             same_quant ? ost->last_quality :
> +                                          ost->st->codec->global_quality);
> +                break;
> +            case AVMEDIA_TYPE_AUDIO:
> +                avfilter_copy_buf_props(filtered_frame, picref);
> +                filtered_frame->pts = frame_pts;
> +                do_audio_out(of->ctx, ost, filtered_frame);
> +                break;
> +            default:
> +                // TODO support subtitle filters
> +                av_assert0(0);
> +            }
> +
> +            avfilter_unref_buffer(picref);
> +        }
> +    }
> +    return nb_eof == ist->nb_filters ? AVERROR_EOF : ret_all;
> +}

Rewriting the whole body of poll_filters() seems wrong: one of the versions
will unavoidably bitrot, i.e. not get the bugfixes that the other version
will receive.

> +#else
>  /* check for new output on any of the filtergraphs */
>  static int poll_filters(void)
>  {
> @@ -2004,7 +2154,7 @@ static int poll_filters(void)
>      }
>      return nb_eof == nb_filtergraphs ? AVERROR_EOF : ret_all;
>  }
> -
> +#endif
>  static void print_report(int is_last_report, int64_t timer_start, int64_t cur_time)
>  {
>      char buf[1024];
> @@ -2351,6 +2501,17 @@ static int decode_audio(InputStream *ist, AVPacket *pkt, int *got_output)
>      }
>  
>      if (!*got_output) {
> +#if CONFIG_AV_FILTER_THREADS
> +    for (i = 0; i < nb_output_streams; i++) {
> +        OutputStream *ost = output_streams[i];
> +        if (ost->filter && ost->st->codec->codec_type == AVMEDIA_TYPE_AUDIO) {
> +            pthread_mutex_lock(&ost->mutex_avfilter);
> +            ost->flag_avfilter = 1;
> +            pthread_cond_signal(&ost->cond_avfilter);
> +            pthread_mutex_unlock(&ost->mutex_avfilter);
> +        }
> +    }
> +#endif
>          /* no audio frame */
>          if (!pkt->size)
>              for (i = 0; i < ist->nb_filters; i++)
> @@ -2424,6 +2585,17 @@ static int decode_audio(InputStream *ist, AVPacket *pkt, int *got_output)
>  
>      for (i = 0; i < ist->nb_filters; i++)
>          av_buffersrc_add_frame(ist->filters[i]->filter, decoded_frame, 0);
> +#if CONFIG_AV_FILTER_THREADS
> +    for (i = 0; i < nb_output_streams; i++) {
> +        OutputStream *ost = output_streams[i];
> +        if (ost->filter && ost->st->codec->codec_type == AVMEDIA_TYPE_AUDIO) {
> +            pthread_mutex_lock(&ost->mutex_avfilter);
> +            ost->flag_avfilter = 1;
> +            pthread_cond_signal(&ost->cond_avfilter);
> +            pthread_mutex_unlock(&ost->mutex_avfilter);
> +        }
> +    }
> +#endif
>  
>      return ret;
>  }
> @@ -2454,6 +2626,17 @@ static int decode_video(InputStream *ist, AVPacket *pkt, int *got_output)
>      quality = same_quant ? decoded_frame->quality : 0;
>      if (!*got_output) {
>          /* no picture yet */
> +#if CONFIG_AV_FILTER_THREADS
> +    for (i = 0; i < nb_output_streams; i++) {
> +        OutputStream *ost = output_streams[i];
> +        if (ost->filter && ost->st->codec->codec_type == AVMEDIA_TYPE_VIDEO) {
> +            pthread_mutex_lock(&ost->mutex_avfilter);
> +            ost->flag_avfilter = 1;
> +            pthread_cond_signal(&ost->cond_avfilter);
> +            pthread_mutex_unlock(&ost->mutex_avfilter);
> +        }
> +    }
> +#endif
>          if (!pkt->size)
>              for (i = 0; i < ist->nb_filters; i++)
>                  av_buffersrc_add_ref(ist->filters[i]->filter, NULL, AV_BUFFERSRC_FLAG_NO_COPY);
> @@ -2532,7 +2715,17 @@ static int decode_video(InputStream *ist, AVPacket *pkt, int *got_output)
>          }
>  
>      }
> -
> +#if CONFIG_AV_FILTER_THREADS
> +    for (i = 0; i < nb_output_streams; i++) {
> +        OutputStream *ost = output_streams[i];
> +        if (ost->filter && ost->st->codec->codec_type == AVMEDIA_TYPE_VIDEO) {
> +            pthread_mutex_lock(&ost->mutex_avfilter);
> +            ost->flag_avfilter = 1;
> +            pthread_cond_signal(&ost->cond_avfilter);
> +            pthread_mutex_unlock(&ost->mutex_avfilter);
> +        }
> +    }
> +#endif
>      av_free(buffer_to_free);
>      return ret;
>  }
> @@ -2791,7 +2984,9 @@ static int transcode_init(void)
>      for (i = 0; i < nb_filtergraphs; i++)
>          if ((ret = avfilter_graph_config(filtergraphs[i]->graph, NULL)) < 0)
>              return ret;
> -
> +#if CONFIG_AV_FILTER_THREADS
> +        int graph_id = 0;
> +#endif
>      /* for each output stream, we compute the right encoding parameters */
>      for (i = 0; i < nb_output_streams; i++) {
>          ost = output_streams[i];
> @@ -2954,6 +3149,34 @@ static int transcode_init(void)
>                          av_log(NULL, AV_LOG_FATAL, "Error opening filters!\n");
>                          exit(1);
>                      }
> +#if CONFIG_AV_FILTER_THREADS
> +                    if (pthread_mutex_init(&ost->mutex_avfilter, NULL)) {
> +                        av_log(NULL, AV_LOG_FATAL, "Unsuccessful in initializing mutex for avfilter\n");
> +                        exit(1);
> +                    }
> +                    if (pthread_cond_init(&ost->cond_avfilter, NULL)) {
> +                        av_log(NULL, AV_LOG_FATAL, "Unsuccessful in initializing condition for avfilter\n");
> +                        exit(1);
> +                    }
> +                    ost->flag_avfilter = 0;
> +
> +                    if (pthread_mutex_init(&ost->mutex_encoder, NULL)) {
> +                        av_log(NULL, AV_LOG_FATAL, "Unsuccessful in initializing mutex for encoder\n");
> +                        exit(1);
> +                    }
> +                    if (pthread_cond_init(&ost->cond_encoder, NULL)) {
> +                        av_log(NULL, AV_LOG_FATAL, "Unsuccessful in initializing condition for avfilter\n");
> +                        exit(1);
> +                    }
> +                    ost->flag_encoder = 0;
> +
> +                    ost->avfilter_thread_alive = 1;
> +                    if (pthread_create(&ost->avfilter_thread, NULL, avfilter_frame, (void *)ost)) {
> +                        av_log(NULL, AV_LOG_FATAL, "Unsuccessful in creating thread ost->stream_thread\n");
> +                        exit(1);
> +                    }
> +                    ost->graph_id = graph_id++;
> +#endif
>              }
>  
>              switch (codec->codec_type) {
> @@ -3611,7 +3834,11 @@ static int transcode(void)
>  
>          // fprintf(stderr,"read #%d.%d size=%d\n", ist->file_index, ist->st->index, pkt.size);
>          if ((ret = output_packet(ist, &pkt)) < 0 ||
> +#if CONFIG_AV_FILTER_THREADS
> +            ((ret = poll_filters(ist)) < 0 && ret != AVERROR_EOF)) {
> +#else
>              ((ret = poll_filters()) < 0 && ret != AVERROR_EOF)) {
> +#endif
>              char buf[128];
>              av_strerror(ret, buf, sizeof(buf));
>              av_log(NULL, AV_LOG_ERROR, "Error while decoding stream #%d:%d: %s\n",
> @@ -3638,8 +3865,13 @@ static int transcode(void)
>          if (!input_files[ist->file_index]->eof_reached && ist->decoding_needed) {
>              output_packet(ist, NULL);
>          }
> +#if CONFIG_AV_FILTER_THREADS
> +        poll_filters(ist);
> +#endif
>      }
> +#if !(CONFIG_AV_FILTER_THREADS)
>      poll_filters();
> +#endif
>      flush_encoders();
>  
>      term_exit();
> @@ -3659,6 +3891,19 @@ static int transcode(void)
>          if (ost->encoding_needed) {
>              av_freep(&ost->st->codec->stats_in);
>              avcodec_close(ost->st->codec);
> +#if CONFIG_AV_FILTER_THREADS
> +            if (ost->filter && (ost->st->codec->codec_type == AVMEDIA_TYPE_VIDEO || ost->st->codec->codec_type == AVMEDIA_TYPE_AUDIO)) {
> +                int ret;
> +                ost->avfilter_thread_alive = 0;
> +                pthread_mutex_lock(&ost->mutex_avfilter);
> +                ost->flag_avfilter = 1;
> +                pthread_cond_signal(&ost->cond_avfilter);
> +                pthread_mutex_unlock(&ost->mutex_avfilter);
> +                ret = pthread_join(ost->avfilter_thread, NULL);
> +                if (ret)
> +                    av_log(NULL, AV_LOG_FATAL, "Error %d in Joining thread of %d th stream\n", ret, i);
> +            }
> +#endif
>          }
>      }
>  

I must say, I am quite confused by your design, I am not sure what thread is
responsible for what. As far as I understand, the operations on buffersrc
and buffersink stay in the main thread while the actual processing,
triggered by avfilter_graph_request_oldest, is done in a separate thread.
But I am also confused since you seem to be creating a thread for each
output stream, while several output streams can belong to the same filter
graph.

You must remember that libavfilter as a whole (and buffersrc and buffersink
in particular) is not thread-safe, so having operations on buffersrc and
buffersink in one thread and requests on another is not safe.

I believe you could achieve something much simpler if you try to make
buffersrc and buffersink thread-safe and able to synchronize. Something like
that would probably work:

	av_buffersrc_set_thread_sync(buffersrc, sync_object);

where sync_object is more or less a pair (mutex,condition). For buffersink,
it would be slightly more tricky because the return value of request_frame
must be serialized together with the bufref themselves, but that is doable.

If this is done, then the changes in ffmpeg should amount only to starting a
thread per filter graph that repeatedly calls avfilter_graph_request_oldest.

Of course, this is only advice based on the ideas I had during the time I
thought about this patch. You are entirely free to disregard it entirely.
All I can say is that this patch is too complex for me to understand, so I
can not take the responsibility to approve it.

Regards,

-- 
  Nicolas George
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 198 bytes
Desc: Digital signature
URL: <http://ffmpeg.org/pipermail/ffmpeg-devel/attachments/20120726/84c38df6/attachment.asc>


More information about the ffmpeg-devel mailing list