[FFmpeg-devel] [PATCH 10/10] fftools/ffmpeg: convert to a threaded architecture

Paul B Mahol onemda at gmail.com
Wed Dec 6 13:22:48 EET 2023


On Wed, Dec 6, 2023 at 11:32 AM Anton Khirnov <anton at khirnov.net> wrote:

> Change the main loop and every component (demuxers, decoders, filters,
> encoders, muxers) to use the previously added transcode scheduler. Every
> instance of every such component was already running in a separate
> thread, but now they can actually run in parallel.
>
> Changes the results of ffmpeg-fix_sub_duration_heartbeat - tested by
> JEEB to be more correct and deterministic.
>

Un-reviewable, please split it.


> ---
>  Changelog                                     |   2 +
>  fftools/ffmpeg.c                              | 374 +--------
>  fftools/ffmpeg.h                              |  97 +--
>  fftools/ffmpeg_dec.c                          | 321 ++------
>  fftools/ffmpeg_demux.c                        | 268 ++++---
>  fftools/ffmpeg_enc.c                          | 368 ++-------
>  fftools/ffmpeg_filter.c                       | 722 +++++-------------
>  fftools/ffmpeg_mux.c                          | 324 ++------
>  fftools/ffmpeg_mux.h                          |  24 +-
>  fftools/ffmpeg_mux_init.c                     |  88 +--
>  fftools/ffmpeg_opt.c                          |   6 +-
>  .../fate/ffmpeg-fix_sub_duration_heartbeat    |  36 +-
>  12 files changed, 600 insertions(+), 2030 deletions(-)
>
> diff --git a/Changelog b/Changelog
> index f00bc27ca4..67ef92eb02 100644
> --- a/Changelog
> +++ b/Changelog
> @@ -7,6 +7,8 @@ version <next>:
>  - EVC encoding using external library libxeve
>  - QOA decoder and demuxer
>  - aap filter
> +- demuxing, decoding, filtering, encoding, and muxing in the
> +  ffmpeg CLI now all run in parallel
>
>  version 6.1:
>  - libaribcaption decoder
> diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
> index b8a97258a0..30b594fd97 100644
> --- a/fftools/ffmpeg.c
> +++ b/fftools/ffmpeg.c
> @@ -117,7 +117,7 @@ typedef struct BenchmarkTimeStamps {
>  static BenchmarkTimeStamps get_benchmark_time_stamps(void);
>  static int64_t getmaxrss(void);
>
> -unsigned nb_output_dumped = 0;
> +atomic_uint nb_output_dumped = 0;
>
>  static BenchmarkTimeStamps current_time;
>  AVIOContext *progress_avio = NULL;
> @@ -138,30 +138,6 @@ static struct termios oldtty;
>  static int restore_tty;
>  #endif
>
> -/* sub2video hack:
> -   Convert subtitles to video with alpha to insert them in filter graphs.
> -   This is a temporary solution until libavfilter gets real subtitles
> support.
> - */
> -
> -static void sub2video_heartbeat(InputFile *infile, int64_t pts,
> AVRational tb)
> -{
> -    /* When a frame is read from a file, examine all sub2video streams in
> -       the same file and send the sub2video frame again. Otherwise,
> decoded
> -       video frames could be accumulating in the filter graph while a
> filter
> -       (possibly overlay) is desperately waiting for a subtitle frame. */
> -    for (int i = 0; i < infile->nb_streams; i++) {
> -        InputStream *ist = infile->streams[i];
> -
> -        if (ist->dec_ctx->codec_type != AVMEDIA_TYPE_SUBTITLE)
> -            continue;
> -
> -        for (int j = 0; j < ist->nb_filters; j++)
> -            ifilter_sub2video_heartbeat(ist->filters[j], pts, tb);
> -    }
> -}
> -
> -/* end of sub2video hack */
> -
>  static void term_exit_sigsafe(void)
>  {
>  #if HAVE_TERMIOS_H
> @@ -499,23 +475,13 @@ void update_benchmark(const char *fmt, ...)
>      }
>  }
>
> -void close_output_stream(OutputStream *ost)
> -{
> -    OutputFile *of = output_files[ost->file_index];
> -    ost->finished |= ENCODER_FINISHED;
> -
> -    if (ost->sq_idx_encode >= 0)
> -        sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
> -}
> -
> -static void print_report(int is_last_report, int64_t timer_start, int64_t
> cur_time)
> +static void print_report(int is_last_report, int64_t timer_start, int64_t
> cur_time, int64_t pts)
>  {
>      AVBPrint buf, buf_script;
>      int64_t total_size = of_filesize(output_files[0]);
>      int vid;
>      double bitrate;
>      double speed;
> -    int64_t pts = AV_NOPTS_VALUE;
>      static int64_t last_time = -1;
>      static int first_report = 1;
>      uint64_t nb_frames_dup = 0, nb_frames_drop = 0;
> @@ -533,7 +499,7 @@ static void print_report(int is_last_report, int64_t
> timer_start, int64_t cur_ti
>              last_time = cur_time;
>          }
>          if (((cur_time - last_time) < stats_period && !first_report) ||
> -            (first_report && nb_output_dumped < nb_output_files))
> +            (first_report && atomic_load(&nb_output_dumped) <
> nb_output_files))
>              return;
>          last_time = cur_time;
>      }
> @@ -544,7 +510,7 @@ static void print_report(int is_last_report, int64_t
> timer_start, int64_t cur_ti
>      av_bprint_init(&buf, 0, AV_BPRINT_SIZE_AUTOMATIC);
>      av_bprint_init(&buf_script, 0, AV_BPRINT_SIZE_AUTOMATIC);
>      for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
> -        const float q = ost->enc ? ost->quality / (float) FF_QP2LAMBDA :
> -1;
> +        const float q = ost->enc ? atomic_load(&ost->quality) / (float)
> FF_QP2LAMBDA : -1;
>
>          if (vid && ost->type == AVMEDIA_TYPE_VIDEO) {
>              av_bprintf(&buf, "q=%2.1f ", q);
> @@ -565,22 +531,18 @@ static void print_report(int is_last_report, int64_t
> timer_start, int64_t cur_ti
>              if (is_last_report)
>                  av_bprintf(&buf, "L");
>
> -            nb_frames_dup  = ost->filter->nb_frames_dup;
> -            nb_frames_drop = ost->filter->nb_frames_drop;
> +            nb_frames_dup  = atomic_load(&ost->filter->nb_frames_dup);
> +            nb_frames_drop = atomic_load(&ost->filter->nb_frames_drop);
>
>              vid = 1;
>          }
> -        /* compute min output value */
> -        if (ost->last_mux_dts != AV_NOPTS_VALUE) {
> -            if (pts == AV_NOPTS_VALUE || ost->last_mux_dts > pts)
> -                pts = ost->last_mux_dts;
> -            if (copy_ts) {
> -                if (copy_ts_first_pts == AV_NOPTS_VALUE && pts > 1)
> -                    copy_ts_first_pts = pts;
> -                if (copy_ts_first_pts != AV_NOPTS_VALUE)
> -                    pts -= copy_ts_first_pts;
> -            }
> -        }
> +    }
> +
> +    if (copy_ts) {
> +        if (copy_ts_first_pts == AV_NOPTS_VALUE && pts > 1)
> +            copy_ts_first_pts = pts;
> +        if (copy_ts_first_pts != AV_NOPTS_VALUE)
> +            pts -= copy_ts_first_pts;
>      }
>
>      us    = FFABS64U(pts) % AV_TIME_BASE;
> @@ -783,81 +745,6 @@ int subtitle_wrap_frame(AVFrame *frame, AVSubtitle
> *subtitle, int copy)
>      return 0;
>  }
>
> -int trigger_fix_sub_duration_heartbeat(OutputStream *ost, const AVPacket
> *pkt)
> -{
> -    OutputFile *of = output_files[ost->file_index];
> -    int64_t signal_pts = av_rescale_q(pkt->pts, pkt->time_base,
> -                                      AV_TIME_BASE_Q);
> -
> -    if (!ost->fix_sub_duration_heartbeat || !(pkt->flags &
> AV_PKT_FLAG_KEY))
> -        // we are only interested in heartbeats on streams configured, and
> -        // only on random access points.
> -        return 0;
> -
> -    for (int i = 0; i < of->nb_streams; i++) {
> -        OutputStream *iter_ost = of->streams[i];
> -        InputStream  *ist      = iter_ost->ist;
> -        int ret = AVERROR_BUG;
> -
> -        if (iter_ost == ost || !ist || !ist->decoding_needed ||
> -            ist->dec_ctx->codec_type != AVMEDIA_TYPE_SUBTITLE)
> -            // We wish to skip the stream that causes the heartbeat,
> -            // output streams without an input stream, streams not decoded
> -            // (as fix_sub_duration is only done for decoded subtitles) as
> -            // well as non-subtitle streams.
> -            continue;
> -
> -        if ((ret = fix_sub_duration_heartbeat(ist, signal_pts)) < 0)
> -            return ret;
> -    }
> -
> -    return 0;
> -}
> -
> -/* pkt = NULL means EOF (needed to flush decoder buffers) */
> -static int process_input_packet(InputStream *ist, const AVPacket *pkt,
> int no_eof)
> -{
> -    InputFile *f = input_files[ist->file_index];
> -    int64_t dts_est = AV_NOPTS_VALUE;
> -    int ret = 0;
> -    int eof_reached = 0;
> -
> -    if (ist->decoding_needed) {
> -        ret = dec_packet(ist, pkt, no_eof);
> -        if (ret < 0 && ret != AVERROR_EOF)
> -            return ret;
> -    }
> -    if (ret == AVERROR_EOF || (!pkt && !ist->decoding_needed))
> -        eof_reached = 1;
> -
> -    if (pkt && pkt->opaque_ref) {
> -        DemuxPktData *pd = (DemuxPktData*)pkt->opaque_ref->data;
> -        dts_est = pd->dts_est;
> -    }
> -
> -    if (f->recording_time != INT64_MAX) {
> -        int64_t start_time = 0;
> -        if (copy_ts) {
> -            start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time
> : 0;
> -            start_time += start_at_zero ? 0 : f->start_time_effective;
> -        }
> -        if (dts_est >= f->recording_time + start_time)
> -            pkt = NULL;
> -    }
> -
> -    for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
> -        OutputStream *ost = ist->outputs[oidx];
> -        if (ost->enc || (!pkt && no_eof))
> -            continue;
> -
> -        ret = of_streamcopy(ost, pkt, dts_est);
> -        if (ret < 0)
> -            return ret;
> -    }
> -
> -    return !eof_reached;
> -}
> -
>  static void print_stream_maps(void)
>  {
>      av_log(NULL, AV_LOG_INFO, "Stream mapping:\n");
> @@ -934,43 +821,6 @@ static void print_stream_maps(void)
>      }
>  }
>
> -/**
> - * Select the output stream to process.
> - *
> - * @retval 0 an output stream was selected
> - * @retval AVERROR(EAGAIN) need to wait until more input is available
> - * @retval AVERROR_EOF no more streams need output
> - */
> -static int choose_output(OutputStream **post)
> -{
> -    int64_t opts_min = INT64_MAX;
> -    OutputStream *ost_min = NULL;
> -
> -    for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
> -        int64_t opts;
> -
> -        if (ost->filter && ost->filter->last_pts != AV_NOPTS_VALUE) {
> -            opts = ost->filter->last_pts;
> -        } else {
> -            opts = ost->last_mux_dts == AV_NOPTS_VALUE ?
> -                   INT64_MIN : ost->last_mux_dts;
> -        }
> -
> -        if (!ost->initialized && !ost->finished) {
> -            ost_min = ost;
> -            break;
> -        }
> -        if (!ost->finished && opts < opts_min) {
> -            opts_min = opts;
> -            ost_min  = ost;
> -        }
> -    }
> -    if (!ost_min)
> -        return AVERROR_EOF;
> -    *post = ost_min;
> -    return ost_min->unavailable ? AVERROR(EAGAIN) : 0;
> -}
> -
>  static void set_tty_echo(int on)
>  {
>  #if HAVE_TERMIOS_H
> @@ -1042,149 +892,21 @@ static int check_keyboard_interaction(int64_t
> cur_time)
>      return 0;
>  }
>
> -static void reset_eagain(void)
> -{
> -    for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost))
> -        ost->unavailable = 0;
> -}
> -
> -static void decode_flush(InputFile *ifile)
> -{
> -    for (int i = 0; i < ifile->nb_streams; i++) {
> -        InputStream *ist = ifile->streams[i];
> -
> -        if (ist->discard || !ist->decoding_needed)
> -            continue;
> -
> -        dec_packet(ist, NULL, 1);
> -    }
> -}
> -
> -/*
> - * Return
> - * - 0 -- one packet was read and processed
> - * - AVERROR(EAGAIN) -- no packets were available for selected file,
> - *   this function should be called again
> - * - AVERROR_EOF -- this function should not be called again
> - */
> -static int process_input(int file_index, AVPacket *pkt)
> -{
> -    InputFile *ifile = input_files[file_index];
> -    InputStream *ist;
> -    int ret, i;
> -
> -    ret = ifile_get_packet(ifile, pkt);
> -
> -    if (ret == 1) {
> -        /* the input file is looped: flush the decoders */
> -        decode_flush(ifile);
> -        return AVERROR(EAGAIN);
> -    }
> -    if (ret < 0) {
> -        if (ret != AVERROR_EOF) {
> -            av_log(ifile, AV_LOG_ERROR,
> -                   "Error retrieving a packet from demuxer: %s\n",
> av_err2str(ret));
> -            if (exit_on_error)
> -                return ret;
> -        }
> -
> -        for (i = 0; i < ifile->nb_streams; i++) {
> -            ist = ifile->streams[i];
> -            if (!ist->discard) {
> -                ret = process_input_packet(ist, NULL, 0);
> -                if (ret>0)
> -                    return 0;
> -                else if (ret < 0)
> -                    return ret;
> -            }
> -
> -            /* mark all outputs that don't go through lavfi as finished */
> -            for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
> -                OutputStream *ost = ist->outputs[oidx];
> -                OutputFile    *of = output_files[ost->file_index];
> -
> -                ret = of_output_packet(of, ost, NULL);
> -                if (ret < 0)
> -                    return ret;
> -            }
> -        }
> -
> -        ifile->eof_reached = 1;
> -        return AVERROR(EAGAIN);
> -    }
> -
> -    reset_eagain();
> -
> -    ist = ifile->streams[pkt->stream_index];
> -
> -    sub2video_heartbeat(ifile, pkt->pts, pkt->time_base);
> -
> -    ret = process_input_packet(ist, pkt, 0);
> -
> -    av_packet_unref(pkt);
> -
> -    return ret < 0 ? ret : 0;
> -}
> -
> -/**
> - * Run a single step of transcoding.
> - *
> - * @return  0 for success, <0 for error
> - */
> -static int transcode_step(OutputStream *ost, AVPacket *demux_pkt)
> -{
> -    InputStream  *ist = NULL;
> -    int ret;
> -
> -    if (ost->filter) {
> -        if ((ret = fg_transcode_step(ost->filter->graph, &ist)) < 0)
> -            return ret;
> -        if (!ist)
> -            return 0;
> -    } else {
> -        ist = ost->ist;
> -        av_assert0(ist);
> -    }
> -
> -    ret = process_input(ist->file_index, demux_pkt);
> -    if (ret == AVERROR(EAGAIN)) {
> -        return 0;
> -    }
> -
> -    if (ret < 0)
> -        return ret == AVERROR_EOF ? 0 : ret;
> -
> -    // process_input() above might have caused output to become available
> -    // in multiple filtergraphs, so we process all of them
> -    for (int i = 0; i < nb_filtergraphs; i++) {
> -        ret = reap_filters(filtergraphs[i], 0);
> -        if (ret < 0)
> -            return ret;
> -    }
> -
> -    return 0;
> -}
> -
>  /*
>   * The following code is the main loop of the file converter
>   */
> -static int transcode(Scheduler *sch, int *err_rate_exceeded)
> +static int transcode(Scheduler *sch)
>  {
>      int ret = 0, i;
> -    InputStream *ist;
> -    int64_t timer_start;
> -    AVPacket *demux_pkt = NULL;
> +    int64_t timer_start, transcode_ts = 0;
>
>      print_stream_maps();
>
> -    *err_rate_exceeded = 0;
>      atomic_store(&transcode_init_done, 1);
>
> -    demux_pkt = av_packet_alloc();
> -    if (!demux_pkt) {
> -        ret = AVERROR(ENOMEM);
> -        goto fail;
> -    }
> +    ret = sch_start(sch);
> +    if (ret < 0)
> +        return ret;
>
>      if (stdin_interaction) {
>          av_log(NULL, AV_LOG_INFO, "Press [q] to stop, [?] for help\n");
> @@ -1192,8 +914,7 @@ static int transcode(Scheduler *sch, int
> *err_rate_exceeded)
>
>      timer_start = av_gettime_relative();
>
> -    while (!received_sigterm) {
> -        OutputStream *ost;
> +    while (!sch_wait(sch, stats_period, &transcode_ts)) {
>          int64_t cur_time= av_gettime_relative();
>
>          /* if 'q' pressed, exits */
> @@ -1201,49 +922,11 @@ static int transcode(Scheduler *sch, int
> *err_rate_exceeded)
>              if (check_keyboard_interaction(cur_time) < 0)
>                  break;
>
> -        ret = choose_output(&ost);
> -        if (ret == AVERROR(EAGAIN)) {
> -            reset_eagain();
> -            av_usleep(10000);
> -            ret = 0;
> -            continue;
> -        } else if (ret < 0) {
> -            av_log(NULL, AV_LOG_VERBOSE, "No more output streams to write
> to, finishing.\n");
> -            ret = 0;
> -            break;
> -        }
> -
> -        ret = transcode_step(ost, demux_pkt);
> -        if (ret < 0 && ret != AVERROR_EOF) {
> -            av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n",
> av_err2str(ret));
> -            break;
> -        }
> -
>          /* dump report by using the output first video and audio streams
> */
> -        print_report(0, timer_start, cur_time);
> +        print_report(0, timer_start, cur_time, transcode_ts);
>      }
>
> -    /* at the end of stream, we must flush the decoder buffers */
> -    for (ist = ist_iter(NULL); ist; ist = ist_iter(ist)) {
> -        float err_rate;
> -
> -        if (!input_files[ist->file_index]->eof_reached) {
> -            int err = process_input_packet(ist, NULL, 0);
> -            ret = err_merge(ret, err);
> -        }
> -
> -        err_rate = (ist->frames_decoded || ist->decode_errors) ?
> -                   ist->decode_errors / (ist->frames_decoded +
> ist->decode_errors) : 0.f;
> -        if (err_rate > max_error_rate) {
> -            av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds
> maximum %g\n",
> -                   err_rate, max_error_rate);
> -            *err_rate_exceeded = 1;
> -        } else if (err_rate)
> -            av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n",
> err_rate);
> -    }
> -    ret = err_merge(ret, enc_flush());
> -
> -    term_exit();
> +    ret = sch_stop(sch);
>
>      /* write the trailer if needed */
>      for (i = 0; i < nb_output_files; i++) {
> @@ -1251,11 +934,10 @@ static int transcode(Scheduler *sch, int
> *err_rate_exceeded)
>          ret = err_merge(ret, err);
>      }
>
> -    /* dump report by using the first video and audio streams */
> -    print_report(1, timer_start, av_gettime_relative());
> +    term_exit();
>
> -fail:
> -    av_packet_free(&demux_pkt);
> +    /* dump report by using the first video and audio streams */
> +    print_report(1, timer_start, av_gettime_relative(), transcode_ts);
>
>      return ret;
>  }
> @@ -1308,7 +990,7 @@ int main(int argc, char **argv)
>  {
>      Scheduler *sch = NULL;
>
> -    int ret, err_rate_exceeded;
> +    int ret;
>      BenchmarkTimeStamps ti;
>
>      init_dynload();
> @@ -1350,7 +1032,7 @@ int main(int argc, char **argv)
>      }
>
>      current_time = ti = get_benchmark_time_stamps();
> -    ret = transcode(sch, &err_rate_exceeded);
> +    ret = transcode(sch);
>      if (ret >= 0 && do_benchmark) {
>          int64_t utime, stime, rtime;
>          current_time = get_benchmark_time_stamps();
> @@ -1362,8 +1044,8 @@ int main(int argc, char **argv)
>                 utime / 1000000.0, stime / 1000000.0, rtime / 1000000.0);
>      }
>
> -    ret = received_nb_signals ? 255 :
> -          err_rate_exceeded   ?  69 : ret;
> +    ret = received_nb_signals                 ? 255 :
> +          (ret == FFMPEG_ERROR_RATE_EXCEEDED) ?  69 : ret;
>
>  finish:
>      if (ret == AVERROR_EXIT)
> diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
> index a89038b765..ba82b7490d 100644
> --- a/fftools/ffmpeg.h
> +++ b/fftools/ffmpeg.h
> @@ -61,6 +61,8 @@
>  #define FFMPEG_OPT_TOP 1
>  #define FFMPEG_OPT_FORCE_KF_SOURCE_NO_DROP 1
>
> +#define FFMPEG_ERROR_RATE_EXCEEDED FFERRTAG('E', 'R', 'E', 'D')
> +
>  enum VideoSyncMethod {
>      VSYNC_AUTO = -1,
>      VSYNC_PASSTHROUGH,
> @@ -82,13 +84,16 @@ enum HWAccelID {
>  };
>
>  enum FrameOpaque {
> -    FRAME_OPAQUE_REAP_FILTERS = 1,
> -    FRAME_OPAQUE_CHOOSE_INPUT,
> -    FRAME_OPAQUE_SUB_HEARTBEAT,
> +    FRAME_OPAQUE_SUB_HEARTBEAT = 1,
>      FRAME_OPAQUE_EOF,
>      FRAME_OPAQUE_SEND_COMMAND,
>  };
>
> +enum PacketOpaque {
> +    PKT_OPAQUE_SUB_HEARTBEAT = 1,
> +    PKT_OPAQUE_FIX_SUB_DURATION,
> +};
> +
>  typedef struct HWDevice {
>      const char *name;
>      enum AVHWDeviceType type;
> @@ -309,11 +314,8 @@ typedef struct OutputFilter {
>
>      enum AVMediaType     type;
>
> -    /* pts of the last frame received from this filter, in AV_TIME_BASE_Q
> */
> -    int64_t last_pts;
> -
> -    uint64_t nb_frames_dup;
> -    uint64_t nb_frames_drop;
> +    atomic_uint_least64_t nb_frames_dup;
> +    atomic_uint_least64_t nb_frames_drop;
>  } OutputFilter;
>
>  typedef struct FilterGraph {
> @@ -426,11 +428,6 @@ typedef struct InputFile {
>
>      float readrate;
>      int accurate_seek;
> -
> -    /* when looping the input file, this queue is used by decoders to
> report
> -     * the last frame timestamp back to the demuxer thread */
> -    AVThreadMessageQueue *audio_ts_queue;
> -    int                   audio_ts_queue_size;
>  } InputFile;
>
>  enum forced_keyframes_const {
> @@ -532,8 +529,6 @@ typedef struct OutputStream {
>      InputStream *ist;
>
>      AVStream *st;            /* stream in the output file */
> -    /* dts of the last packet sent to the muxing queue, in AV_TIME_BASE_Q
> */
> -    int64_t last_mux_dts;
>
>      AVRational enc_timebase;
>
> @@ -578,13 +573,6 @@ typedef struct OutputStream {
>      AVDictionary *sws_dict;
>      AVDictionary *swr_opts;
>      char *apad;
> -    OSTFinished finished;        /* no more packets should be written for
> this stream */
> -    int unavailable;                     /* true if the steram is
> unavailable (possibly temporarily) */
> -
> -    // init_output_stream() has been called for this stream
> -    // The encoder and the bitstream filters have been initialized and
> the stream
> -    // parameters are set in the AVStream.
> -    int initialized;
>
>      const char *attachment_filename;
>
> @@ -598,9 +586,8 @@ typedef struct OutputStream {
>      uint64_t samples_encoded;
>
>      /* packet quality factor */
> -    int quality;
> +    atomic_int quality;
>
> -    int sq_idx_encode;
>      int sq_idx_mux;
>
>      EncStats enc_stats_pre;
> @@ -658,7 +645,6 @@ extern FilterGraph **filtergraphs;
>  extern int        nb_filtergraphs;
>
>  extern char *vstats_filename;
> -extern char *sdp_filename;
>
>  extern float dts_delta_threshold;
>  extern float dts_error_threshold;
> @@ -691,7 +677,7 @@ extern const AVIOInterruptCB int_cb;
>  extern const OptionDef options[];
>  extern HWDevice *filter_hw_device;
>
> -extern unsigned nb_output_dumped;
> +extern atomic_uint nb_output_dumped;
>
>  extern int ignore_unknown_streams;
>  extern int copy_unknown_streams;
> @@ -737,10 +723,6 @@ FrameData *frame_data(AVFrame *frame);
>
>  const FrameData *frame_data_c(AVFrame *frame);
>
> -int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int
> keep_reference);
> -int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb);
> -void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts,
> AVRational tb);
> -
>  /**
>   * Set up fallback filtering parameters from a decoder context. They will
> only
>   * be used if no frames are ever sent on this input, otherwise the actual
> @@ -761,26 +743,9 @@ int fg_create(FilterGraph **pfg, char *graph_desc,
> Scheduler *sch);
>
>  void fg_free(FilterGraph **pfg);
>
> -/**
> - * Perform a step of transcoding for the specified filter graph.
> - *
> - * @param[in]  graph     filter graph to consider
> - * @param[out] best_ist  input stream where a frame would allow to
> continue
> - * @return  0 for success, <0 for error
> - */
> -int fg_transcode_step(FilterGraph *graph, InputStream **best_ist);
> -
>  void fg_send_command(FilterGraph *fg, double time, const char *target,
>                       const char *command, const char *arg, int
> all_filters);
>
> -/**
> - * Get and encode new output from specified filtergraph, without causing
> - * activity.
> - *
> - * @return  0 for success, <0 for severe errors
> - */
> -int reap_filters(FilterGraph *fg, int flush);
> -
>  int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch);
>
>  void enc_stats_write(OutputStream *ost, EncStats *es,
> @@ -807,25 +772,11 @@ int hwaccel_retrieve_data(AVCodecContext *avctx,
> AVFrame *input);
>  int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx);
>  void dec_free(Decoder **pdec);
>
> -/**
> - * Submit a packet for decoding
> - *
> - * When pkt==NULL and no_eof=0, there will be no more input. Flush
> decoders and
> - * mark all downstreams as finished.
> - *
> - * When pkt==NULL and no_eof=1, the stream was reset (e.g. after a seek).
> Flush
> - * decoders and await further input.
> - */
> -int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof);
> -
>  int enc_alloc(Encoder **penc, const AVCodec *codec,
>                Scheduler *sch, unsigned sch_idx);
>  void enc_free(Encoder **penc);
>
> -int enc_open(OutputStream *ost, const AVFrame *frame);
> -int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle
> *sub);
> -int enc_frame(OutputStream *ost, AVFrame *frame);
> -int enc_flush(void);
> +int enc_open(void *opaque, const AVFrame *frame);
>
>  /*
>   * Initialize muxing state for the given stream, should be called
> @@ -840,30 +791,11 @@ void of_free(OutputFile **pof);
>
>  void of_enc_stats_close(void);
>
> -int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt);
> -
> -/**
> - * @param dts predicted packet dts in AV_TIME_BASE_Q
> - */
> -int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts);
> -
>  int64_t of_filesize(OutputFile *of);
>
>  int ifile_open(const OptionsContext *o, const char *filename, Scheduler
> *sch);
>  void ifile_close(InputFile **f);
>
> -/**
> - * Get next input packet from the demuxer.
> - *
> - * @param pkt the packet is written here when this function returns 0
> - * @return
> - * - 0 when a packet has been read successfully
> - * - 1 when stream end was reached, but the stream is looped;
> - *     caller should flush decoders and read from this demuxer again
> - * - a negative error code on failure
> - */
> -int ifile_get_packet(InputFile *f, AVPacket *pkt);
> -
>  int ist_output_add(InputStream *ist, OutputStream *ost);
>  int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple);
>
> @@ -880,9 +812,6 @@ InputStream *ist_iter(InputStream *prev);
>   * pass NULL to start iteration */
>  OutputStream *ost_iter(OutputStream *prev);
>
> -void close_output_stream(OutputStream *ost);
> -int trigger_fix_sub_duration_heartbeat(OutputStream *ost, const AVPacket
> *pkt);
> -int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts);
>  void update_benchmark(const char *fmt, ...);
>
>  #define SPECIFIER_OPT_FMT_str  "%s"
> diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c
> index 90ea0d6d93..5dde82a276 100644
> --- a/fftools/ffmpeg_dec.c
> +++ b/fftools/ffmpeg_dec.c
> @@ -54,24 +54,6 @@ struct Decoder {
>
>      Scheduler      *sch;
>      unsigned        sch_idx;
> -
> -    pthread_t       thread;
> -    /**
> -     * Queue for sending coded packets from the main thread to
> -     * the decoder thread.
> -     *
> -     * An empty packet is sent to flush the decoder without terminating
> -     * decoding.
> -     */
> -    ThreadQueue    *queue_in;
> -    /**
> -     * Queue for sending decoded frames from the decoder thread
> -     * to the main thread.
> -     *
> -     * An empty frame is sent to signal that a single packet has been
> fully
> -     * processed.
> -     */
> -    ThreadQueue    *queue_out;
>  };
>
>  // data that is local to the decoder thread and not visible outside of it
> @@ -80,24 +62,6 @@ typedef struct DecThreadContext {
>      AVPacket        *pkt;
>  } DecThreadContext;
>
> -static int dec_thread_stop(Decoder *d)
> -{
> -    void *ret;
> -
> -    if (!d->queue_in)
> -        return 0;
> -
> -    tq_send_finish(d->queue_in, 0);
> -    tq_receive_finish(d->queue_out, 0);
> -
> -    pthread_join(d->thread, &ret);
> -
> -    tq_free(&d->queue_in);
> -    tq_free(&d->queue_out);
> -
> -    return (intptr_t)ret;
> -}
> -
>  void dec_free(Decoder **pdec)
>  {
>      Decoder *dec = *pdec;
> @@ -105,8 +69,6 @@ void dec_free(Decoder **pdec)
>      if (!dec)
>          return;
>
> -    dec_thread_stop(dec);
> -
>      av_frame_free(&dec->frame);
>      av_packet_free(&dec->pkt);
>
> @@ -148,25 +110,6 @@ fail:
>      return AVERROR(ENOMEM);
>  }
>
> -static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
> -{
> -    int i, ret = 0;
> -
> -    for (i = 0; i < ist->nb_filters; i++) {
> -        ret = ifilter_send_frame(ist->filters[i], decoded_frame,
> -                                 i < ist->nb_filters - 1 ||
> -                                 ist->dec->type == AVMEDIA_TYPE_SUBTITLE);
> -        if (ret == AVERROR_EOF)
> -            ret = 0; /* ignore */
> -        if (ret < 0) {
> -            av_log(NULL, AV_LOG_ERROR,
> -                   "Failed to inject frame into filter network: %s\n",
> av_err2str(ret));
> -            break;
> -        }
> -    }
> -    return ret;
> -}
> -
>  static AVRational audio_samplerate_update(void *logctx, Decoder *d,
>                                            const AVFrame *frame)
>  {
> @@ -421,28 +364,14 @@ static int process_subtitle(InputStream *ist,
> AVFrame *frame)
>      if (!subtitle)
>          return 0;
>
> -    ret = send_frame_to_filters(ist, frame);
> +    ret = sch_dec_send(d->sch, d->sch_idx, frame);
>      if (ret < 0)
> -        return ret;
> +        av_frame_unref(frame);
>
> -    subtitle = (AVSubtitle*)frame->buf[0]->data;
> -    if (!subtitle->num_rects)
> -        return 0;
> -
> -    for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
> -        OutputStream *ost = ist->outputs[oidx];
> -        if (!ost->enc || ost->type != AVMEDIA_TYPE_SUBTITLE)
> -            continue;
> -
> -        ret = enc_subtitle(output_files[ost->file_index], ost, subtitle);
> -        if (ret < 0)
> -            return ret;
> -    }
> -
> -    return 0;
> +    return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
>  }
>
> -int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts)
> +static int fix_sub_duration_heartbeat(InputStream *ist, int64_t
> signal_pts)
>  {
>      Decoder *d = ist->decoder;
>      int ret = AVERROR_BUG;
> @@ -468,12 +397,24 @@ int fix_sub_duration_heartbeat(InputStream *ist,
> int64_t signal_pts)
>  static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
>                                 AVFrame *frame)
>  {
> -    Decoder          *d = ist->decoder;
> +    Decoder *d = ist->decoder;
>      AVPacket *flush_pkt = NULL;
>      AVSubtitle subtitle;
>      int got_output;
>      int ret;
>
> +    if (pkt && (intptr_t)pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT) {
> +        frame->pts       = pkt->pts;
> +        frame->time_base = pkt->time_base;
> +        frame->opaque    = (void*)(intptr_t)FRAME_OPAQUE_SUB_HEARTBEAT;
> +
> +        ret = sch_dec_send(d->sch, d->sch_idx, frame);
> +        return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
> +    } else if (pkt && (intptr_t)pkt->opaque ==
> PKT_OPAQUE_FIX_SUB_DURATION) {
> +        return fix_sub_duration_heartbeat(ist, av_rescale_q(pkt->pts,
> pkt->time_base,
> +
> AV_TIME_BASE_Q));
> +    }
> +
>      if (!pkt) {
>          flush_pkt = av_packet_alloc();
>          if (!flush_pkt)
> @@ -496,7 +437,7 @@ static int transcode_subtitles(InputStream *ist, const
> AVPacket *pkt,
>
>      ist->frames_decoded++;
>
> -    // XXX the queue for transferring data back to the main thread runs
> +    // XXX the queue for transferring data to consumers runs
>      // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put that
>      // inside the frame
>      // eventually, subtitles should be switched to use AVFrames natively
> @@ -509,26 +450,7 @@ static int transcode_subtitles(InputStream *ist,
> const AVPacket *pkt,
>      frame->width  = ist->dec_ctx->width;
>      frame->height = ist->dec_ctx->height;
>
> -    ret = tq_send(d->queue_out, 0, frame);
> -    if (ret < 0)
> -        av_frame_unref(frame);
> -
> -    return ret;
> -}
> -
> -static int send_filter_eof(InputStream *ist)
> -{
> -    Decoder *d = ist->decoder;
> -    int i, ret;
> -
> -    for (i = 0; i < ist->nb_filters; i++) {
> -        int64_t end_pts = d->last_frame_pts == AV_NOPTS_VALUE ?
> AV_NOPTS_VALUE :
> -                          d->last_frame_pts + d->last_frame_duration_est;
> -        ret = ifilter_send_eof(ist->filters[i], end_pts,
> d->last_frame_tb);
> -        if (ret < 0)
> -            return ret;
> -    }
> -    return 0;
> +    return process_subtitle(ist, frame);
>  }
>
>  static int packet_decode(InputStream *ist, AVPacket *pkt, AVFrame *frame)
> @@ -635,9 +557,11 @@ static int packet_decode(InputStream *ist, AVPacket
> *pkt, AVFrame *frame)
>
>          ist->frames_decoded++;
>
> -        ret = tq_send(d->queue_out, 0, frame);
> -        if (ret < 0)
> -            return ret;
> +        ret = sch_dec_send(d->sch, d->sch_idx, frame);
> +        if (ret < 0) {
> +            av_frame_unref(frame);
> +            return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
> +        }
>      }
>  }
>
> @@ -679,7 +603,6 @@ fail:
>  void *decoder_thread(void *arg)
>  {
>      InputStream *ist = arg;
> -    InputFile *ifile = input_files[ist->file_index];
>      Decoder       *d = ist->decoder;
>      DecThreadContext dt;
>      int ret = 0, input_status = 0;
> @@ -691,19 +614,31 @@ void *decoder_thread(void *arg)
>      dec_thread_set_name(ist);
>
>      while (!input_status) {
> -        int dummy, flush_buffers;
> +        int flush_buffers, have_data;
>
> -        input_status = tq_receive(d->queue_in, &dummy, dt.pkt);
> -        flush_buffers = input_status >= 0 && !dt.pkt->buf;
> -        if (!dt.pkt->buf)
> +        input_status  = sch_dec_receive(d->sch, d->sch_idx, dt.pkt);
> +        have_data     = input_status >= 0 &&
> +            (dt.pkt->buf || dt.pkt->side_data_elems ||
> +             (intptr_t)dt.pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT ||
> +             (intptr_t)dt.pkt->opaque == PKT_OPAQUE_FIX_SUB_DURATION);
> +        flush_buffers = input_status >= 0 && !have_data;
> +        if (!have_data)
>              av_log(ist, AV_LOG_VERBOSE, "Decoder thread received %s
> packet\n",
>                     flush_buffers ? "flush" : "EOF");
>
> -        ret = packet_decode(ist, dt.pkt->buf ? dt.pkt : NULL, dt.frame);
> +        ret = packet_decode(ist, have_data ? dt.pkt : NULL, dt.frame);
>
>          av_packet_unref(dt.pkt);
>          av_frame_unref(dt.frame);
>
> +        // AVERROR_EOF  - EOF from the decoder
> +        // AVERROR_EXIT - EOF from the scheduler
> +        // we treat them differently when flushing
> +        if (ret == AVERROR_EXIT) {
> +            ret = AVERROR_EOF;
> +            flush_buffers = 0;
> +        }
> +
>          if (ret == AVERROR_EOF) {
>              av_log(ist, AV_LOG_VERBOSE, "Decoder returned EOF, %s\n",
>                     flush_buffers ? "resetting" : "finishing");
> @@ -711,11 +646,10 @@ void *decoder_thread(void *arg)
>              if (!flush_buffers)
>                  break;
>
> -            /* report last frame duration to the demuxer thread */
> +            /* report last frame duration to the scheduler */
>              if (ist->dec->type == AVMEDIA_TYPE_AUDIO) {
> -                Timestamp ts = { .ts = d->last_frame_pts +
> d->last_frame_duration_est,
> -                                 .tb = d->last_frame_tb };
> -                av_thread_message_queue_send(ifile->audio_ts_queue, &ts,
> 0);
> +                dt.pkt->pts       = d->last_frame_pts +
> d->last_frame_duration_est;
> +                dt.pkt->time_base = d->last_frame_tb;
>              }
>
>              avcodec_flush_buffers(ist->dec_ctx);
> @@ -724,149 +658,47 @@ void *decoder_thread(void *arg)
>                     av_err2str(ret));
>              break;
>          }
> -
> -        // signal to the consumer thread that the entire packet was
> processed
> -        ret = tq_send(d->queue_out, 0, dt.frame);
> -        if (ret < 0) {
> -            if (ret != AVERROR_EOF)
> -                av_log(ist, AV_LOG_ERROR, "Error communicating with the
> main thread\n");
> -            break;
> -        }
>      }
>
>      // EOF is normal thread termination
>      if (ret == AVERROR_EOF)
>          ret = 0;
>
> +    // on success send EOF timestamp to our downstreams
> +    if (ret >= 0) {
> +        float err_rate;
> +
> +        av_frame_unref(dt.frame);
> +
> +        dt.frame->opaque    = (void*)(intptr_t)FRAME_OPAQUE_EOF;
> +        dt.frame->pts       = d->last_frame_pts == AV_NOPTS_VALUE ?
> AV_NOPTS_VALUE :
> +                              d->last_frame_pts +
> d->last_frame_duration_est;
> +        dt.frame->time_base = d->last_frame_tb;
> +
> +        ret = sch_dec_send(d->sch, d->sch_idx, dt.frame);
> +        if (ret < 0 && ret != AVERROR_EOF) {
> +            av_log(NULL, AV_LOG_FATAL,
> +                   "Error signalling EOF timestamp: %s\n",
> av_err2str(ret));
> +            goto finish;
> +        }
> +        ret = 0;
> +
> +        err_rate = (ist->frames_decoded || ist->decode_errors) ?
> +                   ist->decode_errors / (ist->frames_decoded +
> ist->decode_errors) : 0.f;
> +        if (err_rate > max_error_rate) {
> +            av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds
> maximum %g\n",
> +                   err_rate, max_error_rate);
> +            ret = FFMPEG_ERROR_RATE_EXCEEDED;
> +        } else if (err_rate)
> +            av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n",
> err_rate);
> +    }
> +
>  finish:
> -    tq_receive_finish(d->queue_in,  0);
> -    tq_send_finish   (d->queue_out, 0);
> -
> -    // make sure the demuxer does not get stuck waiting for audio
> durations
> -    // that will never arrive
> -    if (ifile->audio_ts_queue && ist->dec->type == AVMEDIA_TYPE_AUDIO)
> -        av_thread_message_queue_set_err_recv(ifile->audio_ts_queue,
> AVERROR_EOF);
> -
>      dec_thread_uninit(&dt);
>
> -    av_log(ist, AV_LOG_VERBOSE, "Terminating decoder thread\n");
> -
>      return (void*)(intptr_t)ret;
>  }
>
> -int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
> -{
> -    Decoder *d = ist->decoder;
> -    int ret = 0, thread_ret;
> -
> -    // thread already joined
> -    if (!d->queue_in)
> -        return AVERROR_EOF;
> -
> -    // send the packet/flush request/EOF to the decoder thread
> -    if (pkt || no_eof) {
> -        av_packet_unref(d->pkt);
> -
> -        if (pkt) {
> -            ret = av_packet_ref(d->pkt, pkt);
> -            if (ret < 0)
> -                goto finish;
> -        }
> -
> -        ret = tq_send(d->queue_in, 0, d->pkt);
> -        if (ret < 0)
> -            goto finish;
> -    } else
> -        tq_send_finish(d->queue_in, 0);
> -
> -    // retrieve all decoded data for the packet
> -    while (1) {
> -        int dummy;
> -
> -        ret = tq_receive(d->queue_out, &dummy, d->frame);
> -        if (ret < 0)
> -            goto finish;
> -
> -        // packet fully processed
> -        if (!d->frame->buf[0])
> -            return 0;
> -
> -        // process the decoded frame
> -        if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE) {
> -            ret = process_subtitle(ist, d->frame);
> -        } else {
> -            ret = send_frame_to_filters(ist, d->frame);
> -        }
> -        av_frame_unref(d->frame);
> -        if (ret < 0)
> -            goto finish;
> -    }
> -
> -finish:
> -    thread_ret = dec_thread_stop(d);
> -    if (thread_ret < 0) {
> -        av_log(ist, AV_LOG_ERROR, "Decoder thread returned error: %s\n",
> -               av_err2str(thread_ret));
> -        ret = err_merge(ret, thread_ret);
> -    }
> -    // non-EOF errors here are all fatal
> -    if (ret < 0 && ret != AVERROR_EOF)
> -        return ret;
> -
> -    // signal EOF to our downstreams
> -    ret = send_filter_eof(ist);
> -    if (ret < 0) {
> -        av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n");
> -        return ret;
> -    }
> -
> -    return AVERROR_EOF;
> -}
> -
> -static int dec_thread_start(InputStream *ist)
> -{
> -    Decoder *d = ist->decoder;
> -    ObjPool *op;
> -    int ret = 0;
> -
> -    op = objpool_alloc_packets();
> -    if (!op)
> -        return AVERROR(ENOMEM);
> -
> -    d->queue_in = tq_alloc(1, 1, op, pkt_move);
> -    if (!d->queue_in) {
> -        objpool_free(&op);
> -        return AVERROR(ENOMEM);
> -    }
> -
> -    op = objpool_alloc_frames();
> -    if (!op)
> -        goto fail;
> -
> -    d->queue_out = tq_alloc(1, 4, op, frame_move);
> -    if (!d->queue_out) {
> -        objpool_free(&op);
> -        goto fail;
> -    }
> -
> -    ret = pthread_create(&d->thread, NULL, decoder_thread, ist);
> -    if (ret) {
> -        ret = AVERROR(ret);
> -        av_log(ist, AV_LOG_ERROR, "pthread_create() failed: %s\n",
> -               av_err2str(ret));
> -        goto fail;
> -    }
> -
> -    return 0;
> -fail:
> -    if (ret >= 0)
> -        ret = AVERROR(ENOMEM);
> -
> -    tq_free(&d->queue_in);
> -    tq_free(&d->queue_out);
> -    return ret;
> -}
> -
>  static enum AVPixelFormat get_format(AVCodecContext *s, const enum
> AVPixelFormat *pix_fmts)
>  {
>      InputStream *ist = s->opaque;
> @@ -1118,12 +950,5 @@ int dec_open(InputStream *ist, Scheduler *sch,
> unsigned sch_idx)
>      if (ret < 0)
>          return ret;
>
> -    ret = dec_thread_start(ist);
> -    if (ret < 0) {
> -        av_log(ist, AV_LOG_ERROR, "Error starting decoder thread: %s\n",
> -               av_err2str(ret));
> -        return ret;
> -    }
> -
>      return 0;
>  }
> diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c
> index 2234dbe076..91cd7a1125 100644
> --- a/fftools/ffmpeg_demux.c
> +++ b/fftools/ffmpeg_demux.c
> @@ -22,8 +22,6 @@
>  #include "ffmpeg.h"
>  #include "ffmpeg_sched.h"
>  #include "ffmpeg_utils.h"
> -#include "objpool.h"
> -#include "thread_queue.h"
>
>  #include "libavutil/avassert.h"
>  #include "libavutil/avstring.h"
> @@ -35,7 +33,6 @@
>  #include "libavutil/pixdesc.h"
>  #include "libavutil/time.h"
>  #include "libavutil/timestamp.h"
> -#include "libavutil/thread.h"
>
>  #include "libavcodec/packet.h"
>
> @@ -66,7 +63,11 @@ typedef struct DemuxStream {
>
>      double ts_scale;
>
> +    // scheduler returned EOF for this stream
> +    int finished;
> +
>      int streamcopy_needed;
> +    int have_sub2video;
>
>      int wrap_correction_done;
>      int saw_first_ts;
> @@ -101,6 +102,7 @@ typedef struct Demuxer {
>
>      /* number of times input stream should be looped */
>      int loop;
> +    int have_audio_dec;
>      /* duration of the looped segment of the input file */
>      Timestamp duration;
>      /* pts with the smallest/largest values ever seen */
> @@ -113,11 +115,12 @@ typedef struct Demuxer {
>      double readrate_initial_burst;
>
>      Scheduler            *sch;
> -    ThreadQueue          *thread_queue;
> -    int                   thread_queue_size;
> -    pthread_t             thread;
> +
> +    AVPacket             *pkt_heartbeat;
>
>      int                   read_started;
> +    int                   nb_streams_used;
> +    int                   nb_streams_finished;
>  } Demuxer;
>
>  static DemuxStream *ds_from_ist(InputStream *ist)
> @@ -153,7 +156,7 @@ static void report_new_stream(Demuxer *d, const
> AVPacket *pkt)
>      d->nb_streams_warn = pkt->stream_index + 1;
>  }
>
> -static int seek_to_start(Demuxer *d)
> +static int seek_to_start(Demuxer *d, Timestamp end_pts)
>  {
>      InputFile    *ifile = &d->f;
>      AVFormatContext *is = ifile->ctx;
> @@ -163,21 +166,10 @@ static int seek_to_start(Demuxer *d)
>      if (ret < 0)
>          return ret;
>
> -    if (ifile->audio_ts_queue_size) {
> -        int got_ts = 0;
> -
> -        while (got_ts < ifile->audio_ts_queue_size) {
> -            Timestamp ts;
> -            ret = av_thread_message_queue_recv(ifile->audio_ts_queue,
> &ts, 0);
> -            if (ret < 0)
> -                return ret;
> -            got_ts++;
> -
> -            if (d->max_pts.ts == AV_NOPTS_VALUE ||
> -                av_compare_ts(d->max_pts.ts, d->max_pts.tb, ts.ts, ts.tb)
> < 0)
> -                d->max_pts = ts;
> -        }
> -    }
> +    if (end_pts.ts != AV_NOPTS_VALUE &&
> +        (d->max_pts.ts == AV_NOPTS_VALUE ||
> +         av_compare_ts(d->max_pts.ts, d->max_pts.tb, end_pts.ts,
> end_pts.tb) < 0))
> +        d->max_pts = end_pts;
>
>      if (d->max_pts.ts != AV_NOPTS_VALUE) {
>          int64_t min_pts = d->min_pts.ts == AV_NOPTS_VALUE ? 0 :
> d->min_pts.ts;
> @@ -404,7 +396,7 @@ static int ts_fixup(Demuxer *d, AVPacket *pkt)
>      duration = av_rescale_q(d->duration.ts, d->duration.tb,
> pkt->time_base);
>      if (pkt->pts != AV_NOPTS_VALUE) {
>          // audio decoders take precedence for estimating total file
> duration
> -        int64_t pkt_duration = ifile->audio_ts_queue_size ? 0 :
> pkt->duration;
> +        int64_t pkt_duration = d->have_audio_dec ? 0 : pkt->duration;
>
>          pkt->pts += duration;
>
> @@ -440,7 +432,7 @@ static int ts_fixup(Demuxer *d, AVPacket *pkt)
>      return 0;
>  }
>
> -static int input_packet_process(Demuxer *d, AVPacket *pkt)
> +static int input_packet_process(Demuxer *d, AVPacket *pkt, unsigned
> *send_flags)
>  {
>      InputFile     *f = &d->f;
>      InputStream *ist = f->streams[pkt->stream_index];
> @@ -451,6 +443,16 @@ static int input_packet_process(Demuxer *d, AVPacket
> *pkt)
>      if (ret < 0)
>          return ret;
>
> +    if (f->recording_time != INT64_MAX) {
> +        int64_t start_time = 0;
> +        if (copy_ts) {
> +            start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time
> : 0;
> +            start_time += start_at_zero ? 0 : f->start_time_effective;
> +        }
> +        if (ds->dts >= f->recording_time + start_time)
> +            *send_flags |= DEMUX_SEND_STREAMCOPY_EOF;
> +    }
> +
>      ds->data_size += pkt->size;
>      ds->nb_packets++;
>
> @@ -465,6 +467,8 @@ static int input_packet_process(Demuxer *d, AVPacket
> *pkt)
>                 av_ts2timestr(input_files[ist->file_index]->ts_offset,
> &AV_TIME_BASE_Q));
>      }
>
> +    pkt->stream_index = ds->sch_idx_stream;
> +
>      return 0;
>  }
>
> @@ -488,6 +492,65 @@ static void readrate_sleep(Demuxer *d)
>      }
>  }
>
> +static int do_send(Demuxer *d, DemuxStream *ds, AVPacket *pkt, unsigned
> flags,
> +                   const char *pkt_desc)
> +{
> +    int ret;
> +
> +    ret = sch_demux_send(d->sch, d->f.index, pkt, flags);
> +    if (ret == AVERROR_EOF) {
> +        av_packet_unref(pkt);
> +
> +        av_log(ds, AV_LOG_VERBOSE, "All consumers of this stream are
> done\n");
> +        ds->finished = 1;
> +
> +        if (++d->nb_streams_finished == d->nb_streams_used) {
> +            av_log(d, AV_LOG_VERBOSE, "All consumers are done\n");
> +            return AVERROR_EOF;
> +        }
> +    } else if (ret < 0) {
> +        if (ret != AVERROR_EXIT)
> +            av_log(d, AV_LOG_ERROR,
> +                   "Unable to send %s packet to consumers: %s\n",
> +                   pkt_desc, av_err2str(ret));
> +        return ret;
> +    }
> +
> +    return 0;
> +}
> +
> +static int demux_send(Demuxer *d, DemuxStream *ds, AVPacket *pkt,
> unsigned flags)
> +{
> +    InputFile  *f = &d->f;
> +    int ret;
> +
> +    // send heartbeat for sub2video streams
> +    if (d->pkt_heartbeat && pkt->pts != AV_NOPTS_VALUE) {
> +        for (int i = 0; i < f->nb_streams; i++) {
> +            DemuxStream *ds1 = ds_from_ist(f->streams[i]);
> +
> +            if (ds1->finished || !ds1->have_sub2video)
> +                continue;
> +
> +            d->pkt_heartbeat->pts          = pkt->pts;
> +            d->pkt_heartbeat->time_base    = pkt->time_base;
> +            d->pkt_heartbeat->stream_index = ds1->sch_idx_stream;
> +            d->pkt_heartbeat->opaque       =
> (void*)(intptr_t)PKT_OPAQUE_SUB_HEARTBEAT;
> +
> +            ret = do_send(d, ds1, d->pkt_heartbeat, 0, "heartbeat");
> +            if (ret < 0)
> +                return ret;
> +        }
> +    }
> +
> +    ret = do_send(d, ds, pkt, flags, "demuxed");
> +    if (ret < 0)
> +        return ret;
> +
> +
> +    return 0;
> +}
> +
>  static void discard_unused_programs(InputFile *ifile)
>  {
>      for (int j = 0; j < ifile->ctx->nb_programs; j++) {
> @@ -527,9 +590,13 @@ static void *input_thread(void *arg)
>
>      discard_unused_programs(f);
>
> +    d->read_started    = 1;
>      d->wallclock_start = av_gettime_relative();
>
>      while (1) {
> +        DemuxStream *ds;
> +        unsigned send_flags = 0;
> +
>          ret = av_read_frame(f->ctx, pkt);
>
>          if (ret == AVERROR(EAGAIN)) {
> @@ -538,11 +605,13 @@ static void *input_thread(void *arg)
>          }
>          if (ret < 0) {
>              if (d->loop) {
> -                /* signal looping to the consumer thread */
> +                /* signal looping to our consumers */
>                  pkt->stream_index = -1;
> -                ret = tq_send(d->thread_queue, 0, pkt);
> +
> +                ret = sch_demux_send(d->sch, f->index, pkt, 0);
>                  if (ret >= 0)
> -                    ret = seek_to_start(d);
> +                    ret = seek_to_start(d, (Timestamp){ .ts = pkt->pts,
> +                                                        .tb =
> pkt->time_base });
>                  if (ret >= 0)
>                      continue;
>
> @@ -551,9 +620,11 @@ static void *input_thread(void *arg)
>
>              if (ret == AVERROR_EOF)
>                  av_log(d, AV_LOG_VERBOSE, "EOF while reading input\n");
> -            else
> +            else {
>                  av_log(d, AV_LOG_ERROR, "Error during demuxing: %s\n",
>                         av_err2str(ret));
> +                ret = exit_on_error ? ret : 0;
> +            }
>
>              break;
>          }
> @@ -565,8 +636,9 @@ static void *input_thread(void *arg)
>
>          /* the following test is needed in case new streams appear
>             dynamically in stream : we ignore them */
> -        if (pkt->stream_index >= f->nb_streams ||
> -            f->streams[pkt->stream_index]->discard) {
> +        ds = pkt->stream_index < f->nb_streams ?
> +             ds_from_ist(f->streams[pkt->stream_index]) : NULL;
> +        if (!ds || ds->ist.discard || ds->finished) {
>              report_new_stream(d, pkt);
>              av_packet_unref(pkt);
>              continue;
> @@ -583,122 +655,26 @@ static void *input_thread(void *arg)
>              }
>          }
>
> -        ret = input_packet_process(d, pkt);
> +        ret = input_packet_process(d, pkt, &send_flags);
>          if (ret < 0)
>              break;
>
>          if (f->readrate)
>              readrate_sleep(d);
>
> -        ret = tq_send(d->thread_queue, 0, pkt);
> -        if (ret < 0) {
> -            if (ret != AVERROR_EOF)
> -                av_log(f, AV_LOG_ERROR,
> -                       "Unable to send packet to main thread: %s\n",
> -                       av_err2str(ret));
> +        ret = demux_send(d, ds, pkt, send_flags);
> +        if (ret < 0)
>              break;
> -        }
>      }
>
> +    // EOF/EXIT is normal termination
> +    if (ret == AVERROR_EOF || ret == AVERROR_EXIT)
> +        ret = 0;
> +
>  finish:
> -    av_assert0(ret < 0);
> -    tq_send_finish(d->thread_queue, 0);
> -
>      av_packet_free(&pkt);
>
> -    av_log(d, AV_LOG_VERBOSE, "Terminating demuxer thread\n");
> -
> -    return NULL;
> -}
> -
> -static void thread_stop(Demuxer *d)
> -{
> -    InputFile *f = &d->f;
> -
> -    if (!d->thread_queue)
> -        return;
> -
> -    tq_receive_finish(d->thread_queue, 0);
> -
> -    pthread_join(d->thread, NULL);
> -
> -    tq_free(&d->thread_queue);
> -
> -    av_thread_message_queue_free(&f->audio_ts_queue);
> -}
> -
> -static int thread_start(Demuxer *d)
> -{
> -    int ret;
> -    InputFile *f = &d->f;
> -    ObjPool *op;
> -
> -    if (d->thread_queue_size <= 0)
> -        d->thread_queue_size = (nb_input_files > 1 ? 8 : 1);
> -
> -    op = objpool_alloc_packets();
> -    if (!op)
> -        return AVERROR(ENOMEM);
> -
> -    d->thread_queue = tq_alloc(1, d->thread_queue_size, op, pkt_move);
> -    if (!d->thread_queue) {
> -        objpool_free(&op);
> -        return AVERROR(ENOMEM);
> -    }
> -
> -    if (d->loop) {
> -        int nb_audio_dec = 0;
> -
> -        for (int i = 0; i < f->nb_streams; i++) {
> -            InputStream *ist = f->streams[i];
> -            nb_audio_dec += !!(ist->decoding_needed &&
> -                               ist->st->codecpar->codec_type ==
> AVMEDIA_TYPE_AUDIO);
> -        }
> -
> -        if (nb_audio_dec) {
> -            ret = av_thread_message_queue_alloc(&f->audio_ts_queue,
> -                                                nb_audio_dec,
> sizeof(Timestamp));
> -            if (ret < 0)
> -                goto fail;
> -            f->audio_ts_queue_size = nb_audio_dec;
> -        }
> -    }
> -
> -    if ((ret = pthread_create(&d->thread, NULL, input_thread, d))) {
> -        av_log(d, AV_LOG_ERROR, "pthread_create failed: %s. Try to
> increase `ulimit -v` or decrease `ulimit -s`.\n", strerror(ret));
> -        ret = AVERROR(ret);
> -        goto fail;
> -    }
> -
> -    d->read_started = 1;
> -
> -    return 0;
> -fail:
> -    tq_free(&d->thread_queue);
> -    return ret;
> -}
> -
> -int ifile_get_packet(InputFile *f, AVPacket *pkt)
> -{
> -    Demuxer *d = demuxer_from_ifile(f);
> -    int ret, dummy;
> -
> -    if (!d->thread_queue) {
> -        ret = thread_start(d);
> -        if (ret < 0)
> -            return ret;
> -    }
> -
> -    ret = tq_receive(d->thread_queue, &dummy, pkt);
> -    if (ret < 0)
> -        return ret;
> -
> -    if (pkt->stream_index == -1) {
> -        av_assert0(!pkt->data && !pkt->side_data_elems);
> -        return 1;
> -    }
> -
> -    return 0;
> +    return (void*)(intptr_t)ret;
>  }
>
>  static void demux_final_stats(Demuxer *d)
> @@ -769,8 +745,6 @@ void ifile_close(InputFile **pf)
>      if (!f)
>          return;
>
> -    thread_stop(d);
> -
>      if (d->read_started)
>          demux_final_stats(d);
>
> @@ -780,6 +754,8 @@ void ifile_close(InputFile **pf)
>
>      avformat_close_input(&f->ctx);
>
> +    av_packet_free(&d->pkt_heartbeat);
> +
>      av_freep(pf);
>  }
>
> @@ -802,7 +778,11 @@ static int ist_use(InputStream *ist, int
> decoding_needed)
>          ds->sch_idx_stream = ret;
>      }
>
> -    ist->discard          = 0;
> +    if (ist->discard) {
> +        ist->discard = 0;
> +        d->nb_streams_used++;
> +    }
> +
>      ist->st->discard      = ist->user_set_discard;
>      ist->decoding_needed |= decoding_needed;
>      ds->streamcopy_needed |= !decoding_needed;
> @@ -823,6 +803,8 @@ static int ist_use(InputStream *ist, int
> decoding_needed)
>          ret = dec_open(ist, d->sch, ds->sch_idx_dec);
>          if (ret < 0)
>              return ret;
> +
> +        d->have_audio_dec |= is_audio;
>      }
>
>      return 0;
> @@ -848,6 +830,7 @@ int ist_output_add(InputStream *ist, OutputStream *ost)
>
>  int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple)
>  {
> +    Demuxer      *d = demuxer_from_ifile(input_files[ist->file_index]);
>      DemuxStream *ds = ds_from_ist(ist);
>      int ret;
>
> @@ -866,6 +849,15 @@ int ist_filter_add(InputStream *ist, InputFilter
> *ifilter, int is_simple)
>      if (ret < 0)
>          return ret;
>
> +    if (ist->dec_ctx->codec_type == AVMEDIA_TYPE_SUBTITLE) {
> +        if (!d->pkt_heartbeat) {
> +            d->pkt_heartbeat = av_packet_alloc();
> +            if (!d->pkt_heartbeat)
> +                return AVERROR(ENOMEM);
> +        }
> +        ds->have_sub2video = 1;
> +    }
> +
>      return ds->sch_idx_dec;
>  }
>
> @@ -1607,8 +1599,6 @@ int ifile_open(const OptionsContext *o, const char
> *filename, Scheduler *sch)
>                 "since neither -readrate nor -re were given\n");
>      }
>
> -    d->thread_queue_size = o->thread_queue_size;
> -
>      /* Add all the streams from the given input file to the demuxer */
>      for (int i = 0; i < ic->nb_streams; i++) {
>          ret = ist_add(o, d, ic->streams[i]);
> diff --git a/fftools/ffmpeg_enc.c b/fftools/ffmpeg_enc.c
> index 9871381c0e..9383b167f7 100644
> --- a/fftools/ffmpeg_enc.c
> +++ b/fftools/ffmpeg_enc.c
> @@ -41,12 +41,6 @@
>  #include "libavformat/avformat.h"
>
>  struct Encoder {
> -    AVFrame *sq_frame;
> -
> -    // packet for receiving encoded output
> -    AVPacket *pkt;
> -    AVFrame  *sub_frame;
> -
>      // combined size of all the packets received from the encoder
>      uint64_t data_size;
>
> @@ -54,25 +48,9 @@ struct Encoder {
>      uint64_t packets_encoded;
>
>      int opened;
> -    int finished;
>
>      Scheduler      *sch;
>      unsigned        sch_idx;
> -
> -    pthread_t       thread;
> -    /**
> -     * Queue for sending frames from the main thread to
> -     * the encoder thread.
> -     */
> -    ThreadQueue    *queue_in;
> -    /**
> -     * Queue for sending encoded packets from the encoder thread
> -     * to the main thread.
> -     *
> -     * An empty packet is sent to signal that a previously sent
> -     * frame has been fully processed.
> -     */
> -    ThreadQueue    *queue_out;
>  };
>
>  // data that is local to the decoder thread and not visible outside of it
> @@ -81,24 +59,6 @@ typedef struct EncoderThread {
>      AVPacket  *pkt;
>  } EncoderThread;
>
> -static int enc_thread_stop(Encoder *e)
> -{
> -    void *ret;
> -
> -    if (!e->queue_in)
> -        return 0;
> -
> -    tq_send_finish(e->queue_in, 0);
> -    tq_receive_finish(e->queue_out, 0);
> -
> -    pthread_join(e->thread, &ret);
> -
> -    tq_free(&e->queue_in);
> -    tq_free(&e->queue_out);
> -
> -    return (int)(intptr_t)ret;
> -}
> -
>  void enc_free(Encoder **penc)
>  {
>      Encoder *enc = *penc;
> @@ -106,13 +66,6 @@ void enc_free(Encoder **penc)
>      if (!enc)
>          return;
>
> -    enc_thread_stop(enc);
> -
> -    av_frame_free(&enc->sq_frame);
> -    av_frame_free(&enc->sub_frame);
> -
> -    av_packet_free(&enc->pkt);
> -
>      av_freep(penc);
>  }
>
> @@ -127,25 +80,12 @@ int enc_alloc(Encoder **penc, const AVCodec *codec,
>      if (!enc)
>          return AVERROR(ENOMEM);
>
> -    if (codec->type == AVMEDIA_TYPE_SUBTITLE) {
> -        enc->sub_frame = av_frame_alloc();
> -        if (!enc->sub_frame)
> -            goto fail;
> -    }
> -
> -    enc->pkt = av_packet_alloc();
> -    if (!enc->pkt)
> -        goto fail;
> -
>      enc->sch     = sch;
>      enc->sch_idx = sch_idx;
>
>      *penc = enc;
>
>      return 0;
> -fail:
> -    enc_free(&enc);
> -    return AVERROR(ENOMEM);
>  }
>
>  static int hw_device_setup_for_encode(OutputStream *ost, AVBufferRef
> *frames_ref)
> @@ -224,52 +164,9 @@ static int set_encoder_id(OutputFile *of,
> OutputStream *ost)
>      return 0;
>  }
>
> -static int enc_thread_start(OutputStream *ost)
> -{
> -    Encoder *e = ost->enc;
> -    ObjPool *op;
> -    int ret = 0;
> -
> -    op = objpool_alloc_frames();
> -    if (!op)
> -        return AVERROR(ENOMEM);
> -
> -    e->queue_in = tq_alloc(1, 1, op, frame_move);
> -    if (!e->queue_in) {
> -        objpool_free(&op);
> -        return AVERROR(ENOMEM);
> -    }
> -
> -    op = objpool_alloc_packets();
> -    if (!op)
> -        goto fail;
> -
> -    e->queue_out = tq_alloc(1, 4, op, pkt_move);
> -    if (!e->queue_out) {
> -        objpool_free(&op);
> -        goto fail;
> -    }
> -
> -    ret = pthread_create(&e->thread, NULL, encoder_thread, ost);
> -    if (ret) {
> -        ret = AVERROR(ret);
> -        av_log(ost, AV_LOG_ERROR, "pthread_create() failed: %s\n",
> -               av_err2str(ret));
> -        goto fail;
> -    }
> -
> -    return 0;
> -fail:
> -    if (ret >= 0)
> -        ret = AVERROR(ENOMEM);
> -
> -    tq_free(&e->queue_in);
> -    tq_free(&e->queue_out);
> -    return ret;
> -}
> -
> -int enc_open(OutputStream *ost, const AVFrame *frame)
> +int enc_open(void *opaque, const AVFrame *frame)
>  {
> +    OutputStream *ost = opaque;
>      InputStream *ist = ost->ist;
>      Encoder              *e = ost->enc;
>      AVCodecContext *enc_ctx = ost->enc_ctx;
> @@ -277,6 +174,7 @@ int enc_open(OutputStream *ost, const AVFrame *frame)
>      const AVCodec      *enc = enc_ctx->codec;
>      OutputFile      *of = output_files[ost->file_index];
>      FrameData *fd;
> +    int frame_samples = 0;
>      int ret;
>
>      if (e->opened)
> @@ -420,17 +318,8 @@ int enc_open(OutputStream *ost, const AVFrame *frame)
>
>      e->opened = 1;
>
> -    if (ost->sq_idx_encode >= 0) {
> -        e->sq_frame = av_frame_alloc();
> -        if (!e->sq_frame)
> -            return AVERROR(ENOMEM);
> -    }
> -
> -    if (ost->enc_ctx->frame_size) {
> -        av_assert0(ost->sq_idx_encode >= 0);
> -        sq_frame_samples(output_files[ost->file_index]->sq_encode,
> -                         ost->sq_idx_encode, ost->enc_ctx->frame_size);
> -    }
> +    if (ost->enc_ctx->frame_size)
> +        frame_samples = ost->enc_ctx->frame_size;
>
>      ret = check_avoptions(ost->encoder_opts);
>      if (ret < 0)
> @@ -476,18 +365,11 @@ int enc_open(OutputStream *ost, const AVFrame *frame)
>      if (ost->st->time_base.num <= 0 || ost->st->time_base.den <= 0)
>          ost->st->time_base = av_add_q(ost->enc_ctx->time_base,
> (AVRational){0, 1});
>
> -    ret = enc_thread_start(ost);
> -    if (ret < 0) {
> -        av_log(ost, AV_LOG_ERROR, "Error starting encoder thread: %s\n",
> -               av_err2str(ret));
> -        return ret;
> -    }
> -
>      ret = of_stream_init(of, ost);
>      if (ret < 0)
>          return ret;
>
> -    return 0;
> +    return frame_samples;
>  }
>
>  static int check_recording_time(OutputStream *ost, int64_t ts, AVRational
> tb)
> @@ -514,8 +396,7 @@ static int do_subtitle_out(OutputFile *of,
> OutputStream *ost, const AVSubtitle *
>          av_log(ost, AV_LOG_ERROR, "Subtitle packets must have a pts\n");
>          return exit_on_error ? AVERROR(EINVAL) : 0;
>      }
> -    if (ost->finished ||
> -        (of->start_time != AV_NOPTS_VALUE && sub->pts < of->start_time))
> +    if ((of->start_time != AV_NOPTS_VALUE && sub->pts < of->start_time))
>          return 0;
>
>      enc = ost->enc_ctx;
> @@ -579,7 +460,7 @@ static int do_subtitle_out(OutputFile *of,
> OutputStream *ost, const AVSubtitle *
>          }
>          pkt->dts = pkt->pts;
>
> -        ret = tq_send(e->queue_out, 0, pkt);
> +        ret = sch_enc_send(e->sch, e->sch_idx, pkt);
>          if (ret < 0) {
>              av_packet_unref(pkt);
>              return ret;
> @@ -671,10 +552,13 @@ static int update_video_stats(OutputStream *ost,
> const AVPacket *pkt, int write_
>      int64_t frame_number;
>      double ti1, bitrate, avg_bitrate;
>      double psnr_val = -1;
> +    int quality;
>
> -    ost->quality   = sd ? AV_RL32(sd) : -1;
> +    quality        = sd ? AV_RL32(sd) : -1;
>      pict_type      = sd ? sd[4] : AV_PICTURE_TYPE_NONE;
>
> +    atomic_store(&ost->quality, quality);
> +
>      if ((enc->flags & AV_CODEC_FLAG_PSNR) && sd && sd[5]) {
>          // FIXME the scaling assumes 8bit
>          double error = AV_RL64(sd + 8) / (enc->width * enc->height *
> 255.0 * 255.0);
> @@ -697,10 +581,10 @@ static int update_video_stats(OutputStream *ost,
> const AVPacket *pkt, int write_
>      frame_number = e->packets_encoded;
>      if (vstats_version <= 1) {
>          fprintf(vstats_file, "frame= %5"PRId64" q= %2.1f ", frame_number,
> -                ost->quality / (float)FF_QP2LAMBDA);
> +                quality / (float)FF_QP2LAMBDA);
>      } else  {
>          fprintf(vstats_file, "out= %2d st= %2d frame= %5"PRId64" q= %2.1f
> ", ost->file_index, ost->index, frame_number,
> -                ost->quality / (float)FF_QP2LAMBDA);
> +                quality / (float)FF_QP2LAMBDA);
>      }
>
>      if (psnr_val >= 0)
> @@ -801,18 +685,11 @@ static int encode_frame(OutputFile *of, OutputStream
> *ost, AVFrame *frame,
>                     av_ts2str(pkt->duration), av_ts2timestr(pkt->duration,
> &enc->time_base));
>          }
>
> -        if ((ret = trigger_fix_sub_duration_heartbeat(ost, pkt)) < 0) {
> -            av_log(NULL, AV_LOG_ERROR,
> -                   "Subtitle heartbeat logic failed in %s! (%s)\n",
> -                   __func__, av_err2str(ret));
> -            return ret;
> -        }
> -
>          e->data_size += pkt->size;
>
>          e->packets_encoded++;
>
> -        ret = tq_send(e->queue_out, 0, pkt);
> +        ret = sch_enc_send(e->sch, e->sch_idx, pkt);
>          if (ret < 0) {
>              av_packet_unref(pkt);
>              return ret;
> @@ -822,50 +699,6 @@ static int encode_frame(OutputFile *of, OutputStream
> *ost, AVFrame *frame,
>      av_assert0(0);
>  }
>
> -static int submit_encode_frame(OutputFile *of, OutputStream *ost,
> -                               AVFrame *frame, AVPacket *pkt)
> -{
> -    Encoder *e = ost->enc;
> -    int ret;
> -
> -    if (ost->sq_idx_encode < 0)
> -        return encode_frame(of, ost, frame, pkt);
> -
> -    if (frame) {
> -        ret = av_frame_ref(e->sq_frame, frame);
> -        if (ret < 0)
> -            return ret;
> -        frame = e->sq_frame;
> -    }
> -
> -    ret = sq_send(of->sq_encode, ost->sq_idx_encode,
> -                  SQFRAME(frame));
> -    if (ret < 0) {
> -        if (frame)
> -            av_frame_unref(frame);
> -        if (ret != AVERROR_EOF)
> -            return ret;
> -    }
> -
> -    while (1) {
> -        AVFrame *enc_frame = e->sq_frame;
> -
> -        ret = sq_receive(of->sq_encode, ost->sq_idx_encode,
> -                               SQFRAME(enc_frame));
> -        if (ret == AVERROR_EOF) {
> -            enc_frame = NULL;
> -        } else if (ret < 0) {
> -            return (ret == AVERROR(EAGAIN)) ? 0 : ret;
> -        }
> -
> -        ret = encode_frame(of, ost, enc_frame, pkt);
> -        if (enc_frame)
> -            av_frame_unref(enc_frame);
> -        if (ret < 0)
> -            return ret;
> -    }
> -}
> -
>  static int do_audio_out(OutputFile *of, OutputStream *ost,
>                          AVFrame *frame, AVPacket *pkt)
>  {
> @@ -881,7 +714,7 @@ static int do_audio_out(OutputFile *of, OutputStream
> *ost,
>      if (!check_recording_time(ost, frame->pts, frame->time_base))
>          return AVERROR_EOF;
>
> -    return submit_encode_frame(of, ost, frame, pkt);
> +    return encode_frame(of, ost, frame, pkt);
>  }
>
>  static enum AVPictureType forced_kf_apply(void *logctx, KeyframeForceCtx
> *kf,
> @@ -949,7 +782,7 @@ static int do_video_out(OutputFile *of, OutputStream
> *ost,
>      }
>  #endif
>
> -    return submit_encode_frame(of, ost, in_picture, pkt);
> +    return encode_frame(of, ost, in_picture, pkt);
>  }
>
>  static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt)
> @@ -958,9 +791,12 @@ static int frame_encode(OutputStream *ost, AVFrame
> *frame, AVPacket *pkt)
>      enum AVMediaType type = ost->type;
>
>      if (type == AVMEDIA_TYPE_SUBTITLE) {
> +        const AVSubtitle *subtitle = frame && frame->buf[0] ?
> +                                     (AVSubtitle*)frame->buf[0]->data :
> NULL;
> +
>          // no flushing for subtitles
> -        return frame ?
> -               do_subtitle_out(of, ost, (AVSubtitle*)frame->buf[0]->data,
> pkt) : 0;
> +        return subtitle && subtitle->num_rects ?
> +               do_subtitle_out(of, ost, subtitle, pkt) : 0;
>      }
>
>      if (frame) {
> @@ -968,7 +804,7 @@ static int frame_encode(OutputStream *ost, AVFrame
> *frame, AVPacket *pkt)
>                                                do_audio_out(of, ost,
> frame, pkt);
>      }
>
> -    return submit_encode_frame(of, ost, NULL, pkt);
> +    return  encode_frame(of, ost, NULL, pkt);
>  }
>
>  static void enc_thread_set_name(const OutputStream *ost)
> @@ -1009,24 +845,50 @@ fail:
>  void *encoder_thread(void *arg)
>  {
>      OutputStream *ost = arg;
> -    OutputFile    *of = output_files[ost->file_index];
>      Encoder        *e = ost->enc;
>      EncoderThread et;
>      int ret = 0, input_status = 0;
> +    int name_set = 0;
>
>      ret = enc_thread_init(&et);
>      if (ret < 0)
>          goto finish;
>
> -    enc_thread_set_name(ost);
> +    /* Open the subtitle encoders immediately. AVFrame-based encoders
> +     * are opened through a callback from the scheduler once they get
> +     * their first frame
> +     *
> +     * N.B.: because the callback is called from a different thread,
> +     * enc_ctx MUST NOT be accessed before sch_enc_receive() returns
> +     * for the first time for audio/video. */
> +    if (ost->type != AVMEDIA_TYPE_VIDEO && ost->type !=
> AVMEDIA_TYPE_AUDIO) {
> +        ret = enc_open(ost, NULL);
> +        if (ret < 0)
> +            goto finish;
> +    }
>
>      while (!input_status) {
> -        int dummy;
> -
> -        input_status = tq_receive(e->queue_in, &dummy, et.frame);
> -        if (input_status < 0)
> +        input_status = sch_enc_receive(e->sch, e->sch_idx, et.frame);
> +        if (input_status == AVERROR_EOF) {
>              av_log(ost, AV_LOG_VERBOSE, "Encoder thread received EOF\n");
>
> +            if (!e->opened) {
> +                av_log(ost, AV_LOG_ERROR, "Could not open encoder before
> EOF\n");
> +                ret = AVERROR(EINVAL);
> +                goto finish;
> +            }
> +        } else if (input_status < 0) {
> +            ret = input_status;
> +            av_log(ost, AV_LOG_ERROR, "Error receiving a frame for
> encoding: %s\n",
> +                   av_err2str(ret));
> +            goto finish;
> +        }
> +
> +        if (!name_set) {
> +            enc_thread_set_name(ost);
> +            name_set = 1;
> +        }
> +
>          ret = frame_encode(ost, input_status >= 0 ? et.frame : NULL,
> et.pkt);
>
>          av_packet_unref(et.pkt);
> @@ -1040,15 +902,6 @@ void *encoder_thread(void *arg)
>                         av_err2str(ret));
>              break;
>          }
> -
> -        // signal to the consumer thread that the frame was encoded
> -        ret = tq_send(e->queue_out, 0, et.pkt);
> -        if (ret < 0) {
> -            if (ret != AVERROR_EOF)
> -                av_log(ost, AV_LOG_ERROR,
> -                       "Error communicating with the main thread\n");
> -            break;
> -        }
>      }
>
>      // EOF is normal thread termination
> @@ -1056,118 +909,7 @@ void *encoder_thread(void *arg)
>          ret = 0;
>
>  finish:
> -    if (ost->sq_idx_encode >= 0)
> -        sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
> -
> -    tq_receive_finish(e->queue_in,  0);
> -    tq_send_finish   (e->queue_out, 0);
> -
>      enc_thread_uninit(&et);
>
> -    av_log(ost, AV_LOG_VERBOSE, "Terminating encoder thread\n");
> -
>      return (void*)(intptr_t)ret;
>  }
> -
> -int enc_frame(OutputStream *ost, AVFrame *frame)
> -{
> -    OutputFile *of = output_files[ost->file_index];
> -    Encoder     *e = ost->enc;
> -    int ret, thread_ret;
> -
> -    ret = enc_open(ost, frame);
> -    if (ret < 0)
> -        return ret;
> -
> -    if (!e->queue_in)
> -        return AVERROR_EOF;
> -
> -    // send the frame/EOF to the encoder thread
> -    if (frame) {
> -        ret = tq_send(e->queue_in, 0, frame);
> -        if (ret < 0)
> -            goto finish;
> -    } else
> -        tq_send_finish(e->queue_in, 0);
> -
> -    // retrieve all encoded data for the frame
> -    while (1) {
> -        int dummy;
> -
> -        ret = tq_receive(e->queue_out, &dummy, e->pkt);
> -        if (ret < 0)
> -            break;
> -
> -        // frame fully encoded
> -        if (!e->pkt->data && !e->pkt->side_data_elems)
> -            return 0;
> -
> -        // process the encoded packet
> -        ret = of_output_packet(of, ost, e->pkt);
> -        if (ret < 0)
> -            goto finish;
> -    }
> -
> -finish:
> -    thread_ret = enc_thread_stop(e);
> -    if (thread_ret < 0) {
> -        av_log(ost, AV_LOG_ERROR, "Encoder thread returned error: %s\n",
> -               av_err2str(thread_ret));
> -        ret = err_merge(ret, thread_ret);
> -    }
> -
> -    if (ret < 0 && ret != AVERROR_EOF)
> -        return ret;
> -
> -    // signal EOF to the muxer
> -    return of_output_packet(of, ost, NULL);
> -}
> -
> -int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub)
> -{
> -    Encoder *e = ost->enc;
> -    AVFrame *f = e->sub_frame;
> -    int ret;
> -
> -    // XXX the queue for transferring data to the encoder thread runs
> -    // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put
> -    // that inside the frame
> -    // eventually, subtitles should be switched to use AVFrames natively
> -    ret = subtitle_wrap_frame(f, sub, 1);
> -    if (ret < 0)
> -        return ret;
> -
> -    ret = enc_frame(ost, f);
> -    av_frame_unref(f);
> -
> -    return ret;
> -}
> -
> -int enc_flush(void)
> -{
> -    int ret = 0;
> -
> -    for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
> -        OutputFile      *of = output_files[ost->file_index];
> -        if (ost->sq_idx_encode >= 0)
> -            sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
> -    }
> -
> -    for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
> -        Encoder          *e = ost->enc;
> -        AVCodecContext *enc = ost->enc_ctx;
> -        int err;
> -
> -        if (!enc || !e->opened ||
> -            (enc->codec_type != AVMEDIA_TYPE_VIDEO && enc->codec_type !=
> AVMEDIA_TYPE_AUDIO))
> -            continue;
> -
> -        err = enc_frame(ost, NULL);
> -        if (err != AVERROR_EOF && ret < 0)
> -            ret = err_merge(ret, err);
> -
> -        av_assert0(!e->queue_in);
> -    }
> -
> -    return ret;
> -}
> diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c
> index 635b1b0b6e..ada235b084 100644
> --- a/fftools/ffmpeg_filter.c
> +++ b/fftools/ffmpeg_filter.c
> @@ -21,8 +21,6 @@
>  #include <stdint.h>
>
>  #include "ffmpeg.h"
> -#include "ffmpeg_utils.h"
> -#include "thread_queue.h"
>
>  #include "libavfilter/avfilter.h"
>  #include "libavfilter/buffersink.h"
> @@ -53,10 +51,11 @@ typedef struct FilterGraphPriv {
>      // true when the filtergraph contains only meta filters
>      // that do not modify the frame data
>      int is_meta;
> +    // source filters are present in the graph
> +    int have_sources;
>      int disable_conversions;
>
> -    int nb_inputs_bound;
> -    int nb_outputs_bound;
> +    unsigned nb_outputs_done;
>
>      const char *graph_desc;
>
> @@ -67,41 +66,6 @@ typedef struct FilterGraphPriv {
>
>      Scheduler       *sch;
>      unsigned         sch_idx;
> -
> -    pthread_t        thread;
> -    /**
> -     * Queue for sending frames from the main thread to the filtergraph.
> Has
> -     * nb_inputs+1 streams - the first nb_inputs stream correspond to
> -     * filtergraph inputs. Frames on those streams may have their opaque
> set to
> -     * - FRAME_OPAQUE_EOF: frame contains no data, but pts+timebase of the
> -     *   EOF event for the correspondint stream. Will be immediately
> followed by
> -     *   this stream being send-closed.
> -     * - FRAME_OPAQUE_SUB_HEARTBEAT: frame contains no data, but
> pts+timebase of
> -     *   a subtitle heartbeat event. Will only be sent for sub2video
> streams.
> -     *
> -     * The last stream is "control" - the main thread sends empty
> AVFrames with
> -     * opaque set to
> -     * - FRAME_OPAQUE_REAP_FILTERS: a request to retrieve all frame
> available
> -     *   from filtergraph outputs. These frames are sent to corresponding
> -     *   streams in queue_out. Finally an empty frame is sent to the
> control
> -     *   stream in queue_out.
> -     * - FRAME_OPAQUE_CHOOSE_INPUT: same as above, but in case no frames
> are
> -     *   available the terminating empty frame's opaque will contain the
> index+1
> -     *   of the filtergraph input to which more input frames should be
> supplied.
> -     */
> -    ThreadQueue     *queue_in;
> -    /**
> -     * Queue for sending frames from the filtergraph back to the main
> thread.
> -     * Has nb_outputs+1 streams - the first nb_outputs stream correspond
> to
> -     * filtergraph outputs.
> -     *
> -     * The last stream is "control" - see documentation for queue_in for
> more
> -     * details.
> -     */
> -    ThreadQueue     *queue_out;
> -    // submitting frames to filter thread returned EOF
> -    // this only happens on thread exit, so is not per-input
> -    int              eof_in;
>  } FilterGraphPriv;
>
>  static FilterGraphPriv *fgp_from_fg(FilterGraph *fg)
> @@ -123,6 +87,9 @@ typedef struct FilterGraphThread {
>      // The output index is stored in frame opaque.
>      AVFifo  *frame_queue_out;
>
> +    // index of the next input to request from the scheduler
> +    unsigned next_in;
> +    // set to 1 after at least one frame passed through this output
>      int      got_frame;
>
>      // EOF status of each input/output, as received by the thread
> @@ -253,9 +220,6 @@ typedef struct OutputFilterPriv {
>      int64_t ts_offset;
>      int64_t next_pts;
>      FPSConvContext fps;
> -
> -    // set to 1 after at least one frame passed through this output
> -    int got_frame;
>  } OutputFilterPriv;
>
>  static OutputFilterPriv *ofp_from_ofilter(OutputFilter *ofilter)
> @@ -653,57 +617,6 @@ static int ifilter_has_all_input_formats(FilterGraph
> *fg)
>
>  static void *filter_thread(void *arg);
>
> -// start the filtering thread once all inputs and outputs are bound
> -static int fg_thread_try_start(FilterGraphPriv *fgp)
> -{
> -    FilterGraph *fg = &fgp->fg;
> -    ObjPool *op;
> -    int ret = 0;
> -
> -    if (fgp->nb_inputs_bound  < fg->nb_inputs ||
> -        fgp->nb_outputs_bound < fg->nb_outputs)
> -        return 0;
> -
> -    op = objpool_alloc_frames();
> -    if (!op)
> -        return AVERROR(ENOMEM);
> -
> -    fgp->queue_in = tq_alloc(fg->nb_inputs + 1, 1, op, frame_move);
> -    if (!fgp->queue_in) {
> -        objpool_free(&op);
> -        return AVERROR(ENOMEM);
> -    }
> -
> -    // at least one output is mandatory
> -    op = objpool_alloc_frames();
> -    if (!op)
> -        goto fail;
> -
> -    fgp->queue_out = tq_alloc(fg->nb_outputs + 1, 1, op, frame_move);
> -    if (!fgp->queue_out) {
> -        objpool_free(&op);
> -        goto fail;
> -    }
> -
> -    ret = pthread_create(&fgp->thread, NULL, filter_thread, fgp);
> -    if (ret) {
> -        ret = AVERROR(ret);
> -        av_log(NULL, AV_LOG_ERROR, "pthread_create() for filtergraph %d
> failed: %s\n",
> -               fg->index, av_err2str(ret));
> -        goto fail;
> -    }
> -
> -    return 0;
> -fail:
> -    if (ret >= 0)
> -        ret = AVERROR(ENOMEM);
> -
> -    tq_free(&fgp->queue_in);
> -    tq_free(&fgp->queue_out);
> -
> -    return ret;
> -}
> -
>  static char *describe_filter_link(FilterGraph *fg, AVFilterInOut *inout,
> int in)
>  {
>      AVFilterContext *ctx = inout->filter_ctx;
> @@ -729,7 +642,6 @@ static OutputFilter *ofilter_alloc(FilterGraph *fg)
>      ofilter->graph    = fg;
>      ofp->format       = -1;
>      ofp->index        = fg->nb_outputs - 1;
> -    ofilter->last_pts = AV_NOPTS_VALUE;
>
>      return ofilter;
>  }
> @@ -760,10 +672,7 @@ static int ifilter_bind_ist(InputFilter *ifilter,
> InputStream *ist)
>              return AVERROR(ENOMEM);
>      }
>
> -    fgp->nb_inputs_bound++;
> -    av_assert0(fgp->nb_inputs_bound <= ifilter->graph->nb_inputs);
> -
> -    return fg_thread_try_start(fgp);
> +    return 0;
>  }
>
>  static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost)
> @@ -902,10 +811,7 @@ int ofilter_bind_ost(OutputFilter *ofilter,
> OutputStream *ost,
>      if (ret < 0)
>          return ret;
>
> -    fgp->nb_outputs_bound++;
> -    av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs);
> -
> -    return fg_thread_try_start(fgp);
> +    return 0;
>  }
>
>  static InputFilter *ifilter_alloc(FilterGraph *fg)
> @@ -935,34 +841,6 @@ static InputFilter *ifilter_alloc(FilterGraph *fg)
>      return ifilter;
>  }
>
> -static int fg_thread_stop(FilterGraphPriv *fgp)
> -{
> -    void *ret;
> -
> -    if (!fgp->queue_in)
> -        return 0;
> -
> -    for (int i = 0; i <= fgp->fg.nb_inputs; i++) {
> -        InputFilterPriv *ifp = i < fgp->fg.nb_inputs ?
> -                               ifp_from_ifilter(fgp->fg.inputs[i]) : NULL;
> -
> -        if (ifp)
> -            ifp->eof = 1;
> -
> -        tq_send_finish(fgp->queue_in, i);
> -    }
> -
> -    for (int i = 0; i <= fgp->fg.nb_outputs; i++)
> -        tq_receive_finish(fgp->queue_out, i);
> -
> -    pthread_join(fgp->thread, &ret);
> -
> -    tq_free(&fgp->queue_in);
> -    tq_free(&fgp->queue_out);
> -
> -    return (int)(intptr_t)ret;
> -}
> -
>  void fg_free(FilterGraph **pfg)
>  {
>      FilterGraph *fg = *pfg;
> @@ -972,8 +850,6 @@ void fg_free(FilterGraph **pfg)
>          return;
>      fgp = fgp_from_fg(fg);
>
> -    fg_thread_stop(fgp);
> -
>      avfilter_graph_free(&fg->graph);
>      for (int j = 0; j < fg->nb_inputs; j++) {
>          InputFilter *ifilter = fg->inputs[j];
> @@ -1072,6 +948,15 @@ int fg_create(FilterGraph **pfg, char *graph_desc,
> Scheduler *sch)
>      if (ret < 0)
>          goto fail;
>
> +    for (unsigned i = 0; i < graph->nb_filters; i++) {
> +        const AVFilter *f = graph->filters[i]->filter;
> +        if (!avfilter_filter_pad_count(f, 0) &&
> +            !(f->flags & AVFILTER_FLAG_DYNAMIC_INPUTS)) {
> +            fgp->have_sources = 1;
> +            break;
> +        }
> +    }
> +
>      for (AVFilterInOut *cur = inputs; cur; cur = cur->next) {
>          InputFilter *const ifilter = ifilter_alloc(fg);
>          InputFilterPriv       *ifp;
> @@ -1800,6 +1685,7 @@ static int configure_filtergraph(FilterGraph *fg,
> const FilterGraphThread *fgt)
>      AVBufferRef *hw_device;
>      AVFilterInOut *inputs, *outputs, *cur;
>      int ret, i, simple = filtergraph_is_simple(fg);
> +    int have_input_eof = 0;
>      const char *graph_desc = fgp->graph_desc;
>
>      cleanup_filtergraph(fg);
> @@ -1922,11 +1808,18 @@ static int configure_filtergraph(FilterGraph *fg,
> const FilterGraphThread *fgt)
>              ret = av_buffersrc_add_frame(ifp->filter, NULL);
>              if (ret < 0)
>                  goto fail;
> +            have_input_eof = 1;
>          }
>      }
>
> -    return 0;
> +    if (have_input_eof) {
> +        // make sure the EOF propagates to the end of the graph
> +        ret = avfilter_graph_request_oldest(fg->graph);
> +        if (ret < 0 && ret != AVERROR(EAGAIN) && ret != AVERROR_EOF)
> +            goto fail;
> +    }
>
> +    return 0;
>  fail:
>      cleanup_filtergraph(fg);
>      return ret;
> @@ -2182,7 +2075,7 @@ static void video_sync_process(OutputFilterPriv
> *ofp, AVFrame *frame,
>                                                  fps->frames_prev_hist[2]);
>
>          if (!*nb_frames && fps->last_dropped) {
> -            ofilter->nb_frames_drop++;
> +            atomic_fetch_add(&ofilter->nb_frames_drop, 1);
>              fps->last_dropped++;
>          }
>
> @@ -2260,21 +2153,23 @@ finish:
>      fps->frames_prev_hist[0] = *nb_frames_prev;
>
>      if (*nb_frames_prev == 0 && fps->last_dropped) {
> -        ofilter->nb_frames_drop++;
> +        atomic_fetch_add(&ofilter->nb_frames_drop, 1);
>          av_log(ost, AV_LOG_VERBOSE,
>                 "*** dropping frame %"PRId64" at ts %"PRId64"\n",
>                 fps->frame_number, fps->last_frame->pts);
>      }
>      if (*nb_frames > (*nb_frames_prev && fps->last_dropped) + (*nb_frames
> > *nb_frames_prev)) {
> +        uint64_t nb_frames_dup;
>          if (*nb_frames > dts_error_threshold * 30) {
>              av_log(ost, AV_LOG_ERROR, "%"PRId64" frame duplication too
> large, skipping\n", *nb_frames - 1);
> -            ofilter->nb_frames_drop++;
> +            atomic_fetch_add(&ofilter->nb_frames_drop, 1);
>              *nb_frames = 0;
>              return;
>          }
> -        ofilter->nb_frames_dup += *nb_frames - (*nb_frames_prev &&
> fps->last_dropped) - (*nb_frames > *nb_frames_prev);
> +        nb_frames_dup = atomic_fetch_add(&ofilter->nb_frames_dup,
> +                                         *nb_frames - (*nb_frames_prev &&
> fps->last_dropped) - (*nb_frames > *nb_frames_prev));
>          av_log(ost, AV_LOG_VERBOSE, "*** %"PRId64" dup!\n", *nb_frames -
> 1);
> -        if (ofilter->nb_frames_dup > fps->dup_warning) {
> +        if (nb_frames_dup > fps->dup_warning) {
>              av_log(ost, AV_LOG_WARNING, "More than %"PRIu64" frames
> duplicated\n", fps->dup_warning);
>              fps->dup_warning *= 10;
>          }
> @@ -2284,8 +2179,57 @@ finish:
>      fps->dropped_keyframe |= fps->last_dropped && (frame->flags &
> AV_FRAME_FLAG_KEY);
>  }
>
> +static int close_output(OutputFilterPriv *ofp, FilterGraphThread *fgt)
> +{
> +    FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph);
> +    int ret;
> +
> +    // we are finished and no frames were ever seen at this output,
> +    // at least initialize the encoder with a dummy frame
> +    if (!fgt->got_frame) {
> +        AVFrame *frame = fgt->frame;
> +        FrameData *fd;
> +
> +        frame->time_base   = ofp->tb_out;
> +        frame->format      = ofp->format;
> +
> +        frame->width               = ofp->width;
> +        frame->height              = ofp->height;
> +        frame->sample_aspect_ratio = ofp->sample_aspect_ratio;
> +
> +        frame->sample_rate = ofp->sample_rate;
> +        if (ofp->ch_layout.nb_channels) {
> +            ret = av_channel_layout_copy(&frame->ch_layout,
> &ofp->ch_layout);
> +            if (ret < 0)
> +                return ret;
> +        }
> +
> +        fd = frame_data(frame);
> +        if (!fd)
> +            return AVERROR(ENOMEM);
> +
> +        fd->frame_rate_filter = ofp->fps.framerate;
> +
> +        av_assert0(!frame->buf[0]);
> +
> +        av_log(ofp->ofilter.ost, AV_LOG_WARNING,
> +               "No filtered frames for output stream, trying to "
> +               "initialize anyway.\n");
> +
> +        ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame);
> +        if (ret < 0) {
> +            av_frame_unref(frame);
> +            return ret;
> +        }
> +    }
> +
> +    fgt->eof_out[ofp->index] = 1;
> +
> +    return sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, NULL);
> +}
> +
>  static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
> -                           AVFrame *frame, int buffer)
> +                           AVFrame *frame)
>  {
>      FilterGraphPriv  *fgp = fgp_from_fg(ofp->ofilter.graph);
>      AVFrame   *frame_prev = ofp->fps.last_frame;
> @@ -2332,28 +2276,17 @@ static int fg_output_frame(OutputFilterPriv *ofp,
> FilterGraphThread *fgt,
>              frame_out = frame;
>          }
>
> -        if (buffer) {
> -            AVFrame *f = av_frame_alloc();
> -
> -            if (!f) {
> -                av_frame_unref(frame_out);
> -                return AVERROR(ENOMEM);
> -            }
> -
> -            av_frame_move_ref(f, frame_out);
> -            f->opaque = (void*)(intptr_t)ofp->index;
> -
> -            ret = av_fifo_write(fgt->frame_queue_out, &f, 1);
> -            if (ret < 0) {
> -                av_frame_free(&f);
> -                return AVERROR(ENOMEM);
> -            }
> -        } else {
> -            // return the frame to the main thread
> -            ret = tq_send(fgp->queue_out, ofp->index, frame_out);
> +        {
> +            // send the frame to consumers
> +            ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index,
> frame_out);
>              if (ret < 0) {
>                  av_frame_unref(frame_out);
> -                fgt->eof_out[ofp->index] = 1;
> +
> +                if (!fgt->eof_out[ofp->index]) {
> +                    fgt->eof_out[ofp->index] = 1;
> +                    fgp->nb_outputs_done++;
> +                }
> +
>                  return ret == AVERROR_EOF ? 0 : ret;
>              }
>          }
> @@ -2374,16 +2307,14 @@ static int fg_output_frame(OutputFilterPriv *ofp,
> FilterGraphThread *fgt,
>          av_frame_move_ref(frame_prev, frame);
>      }
>
> -    if (!frame) {
> -        tq_send_finish(fgp->queue_out, ofp->index);
> -        fgt->eof_out[ofp->index] = 1;
> -    }
> +    if (!frame)
> +        return close_output(ofp, fgt);
>
>      return 0;
>  }
>
>  static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
> -                          AVFrame *frame,  int buffer)
> +                          AVFrame *frame)
>  {
>      FilterGraphPriv    *fgp = fgp_from_fg(ofp->ofilter.graph);
>      OutputStream       *ost = ofp->ofilter.ost;
> @@ -2393,8 +2324,8 @@ static int fg_output_step(OutputFilterPriv *ofp,
> FilterGraphThread *fgt,
>
>      ret = av_buffersink_get_frame_flags(filter, frame,
>                                          AV_BUFFERSINK_FLAG_NO_REQUEST);
> -    if (ret == AVERROR_EOF && !buffer && !fgt->eof_out[ofp->index]) {
> -        ret = fg_output_frame(ofp, fgt, NULL, buffer);
> +    if (ret == AVERROR_EOF && !fgt->eof_out[ofp->index]) {
> +        ret = fg_output_frame(ofp, fgt, NULL);
>          return (ret < 0) ? ret : 1;
>      } else if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
>          return 1;
> @@ -2448,7 +2379,7 @@ static int fg_output_step(OutputFilterPriv *ofp,
> FilterGraphThread *fgt,
>          fd->frame_rate_filter = ofp->fps.framerate;
>      }
>
> -    ret = fg_output_frame(ofp, fgt, frame, buffer);
> +    ret = fg_output_frame(ofp, fgt, frame);
>      av_frame_unref(frame);
>      if (ret < 0)
>          return ret;
> @@ -2456,44 +2387,68 @@ static int fg_output_step(OutputFilterPriv *ofp,
> FilterGraphThread *fgt,
>      return 0;
>  }
>
> -/* retrieve all frames available at filtergraph outputs and either send
> them to
> - * the main thread (buffer=0) or buffer them for later (buffer=1) */
> +/* retrieve all frames available at filtergraph outputs
> + * and send them to consumers */
>  static int read_frames(FilterGraph *fg, FilterGraphThread *fgt,
> -                       AVFrame *frame, int buffer)
> +                       AVFrame *frame)
>  {
>      FilterGraphPriv *fgp = fgp_from_fg(fg);
> -    int ret = 0;
> +    int did_step = 0;
>
> -    if (!fg->graph)
> -        return 0;
> -
> -    // process buffered frames
> -    if (!buffer) {
> -        AVFrame *f;
> -
> -        while (av_fifo_read(fgt->frame_queue_out, &f, 1) >= 0) {
> -            int out_idx = (intptr_t)f->opaque;
> -            f->opaque = NULL;
> -            ret = tq_send(fgp->queue_out, out_idx, f);
> -            av_frame_free(&f);
> -            if (ret < 0 && ret != AVERROR_EOF)
> -                return ret;
> +    // graph not configured, just select the input to request
> +    if (!fg->graph) {
> +        for (int i = 0; i < fg->nb_inputs; i++) {
> +            InputFilterPriv *ifp = ifp_from_ifilter(fg->inputs[i]);
> +            if (ifp->format < 0 && !fgt->eof_in[i]) {
> +                fgt->next_in = i;
> +                return 0;
> +            }
>          }
> +
> +        // This state - graph is not configured, but all inputs are either
> +        // initialized or EOF - should be unreachable because sending EOF
> to a
> +        // filter without even a fallback format should fail
> +        av_assert0(0);
> +        return AVERROR_BUG;
>      }
>
> -    /* Reap all buffers present in the buffer sinks */
> -    for (int i = 0; i < fg->nb_outputs; i++) {
> -        OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
> -        int ret = 0;
> +    while (fgp->nb_outputs_done < fg->nb_outputs) {
> +        int ret;
>
> -        while (!ret) {
> -            ret = fg_output_step(ofp, fgt, frame, buffer);
> -            if (ret < 0)
> -                return ret;
> +        ret = avfilter_graph_request_oldest(fg->graph);
> +        if (ret == AVERROR(EAGAIN)) {
> +            fgt->next_in = choose_input(fg, fgt);
> +            break;
> +        } else if (ret < 0) {
> +            if (ret == AVERROR_EOF)
> +                av_log(fg, AV_LOG_VERBOSE, "Filtergraph returned EOF,
> finishing\n");
> +            else
> +                av_log(fg, AV_LOG_ERROR,
> +                       "Error requesting a frame from the filtergraph:
> %s\n",
> +                       av_err2str(ret));
> +            return ret;
>          }
> -    }
> +        fgt->next_in = fg->nb_inputs;
>
> -    return 0;
> +        // return after one iteration, so that scheduler can rate-control
> us
> +        if (did_step && fgp->have_sources)
> +            return 0;
> +
> +        /* Reap all buffers present in the buffer sinks */
> +        for (int i = 0; i < fg->nb_outputs; i++) {
> +            OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
> +
> +            ret = 0;
> +            while (!ret) {
> +                ret = fg_output_step(ofp, fgt, frame);
> +                if (ret < 0)
> +                    return ret;
> +            }
> +        }
> +        did_step = 1;
> +    };
> +
> +    return (fgp->nb_outputs_done == fg->nb_outputs) ? AVERROR_EOF : 0;
>  }
>
>  static void sub2video_heartbeat(InputFilter *ifilter, int64_t pts,
> AVRational tb)
> @@ -2571,6 +2526,9 @@ static int send_eof(FilterGraphThread *fgt,
> InputFilter *ifilter,
>      InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
>      int ret;
>
> +    if (fgt->eof_in[ifp->index])
> +       return 0;
> +
>      fgt->eof_in[ifp->index] = 1;
>
>      if (ifp->filter) {
> @@ -2672,7 +2630,7 @@ static int send_frame(FilterGraph *fg,
> FilterGraphThread *fgt,
>              return ret;
>          }
>
> -        ret = fg->graph ? read_frames(fg, fgt, tmp, 1) : 0;
> +        ret = fg->graph ? read_frames(fg, fgt, tmp) : 0;
>          av_frame_free(&tmp);
>          if (ret < 0)
>              return ret;
> @@ -2705,82 +2663,6 @@ static int send_frame(FilterGraph *fg,
> FilterGraphThread *fgt,
>      return 0;
>  }
>
> -static int msg_process(FilterGraphPriv *fgp, FilterGraphThread *fgt,
> -                       AVFrame *frame)
> -{
> -    const enum FrameOpaque msg = (intptr_t)frame->opaque;
> -    FilterGraph            *fg = &fgp->fg;
> -    int              graph_eof = 0;
> -    int ret;
> -
> -    frame->opaque = NULL;
> -    av_assert0(msg > 0);
> -    av_assert0(msg == FRAME_OPAQUE_SEND_COMMAND || !frame->buf[0]);
> -
> -    if (!fg->graph) {
> -        // graph not configured yet, ignore all messages other than
> choosing
> -        // the input to read from
> -        if (msg != FRAME_OPAQUE_CHOOSE_INPUT) {
> -            av_frame_unref(frame);
> -            goto done;
> -        }
> -
> -        for (int i = 0; i < fg->nb_inputs; i++) {
> -            InputFilter *ifilter = fg->inputs[i];
> -            InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
> -            if (ifp->format < 0 && !fgt->eof_in[i]) {
> -                frame->opaque = (void*)(intptr_t)(i + 1);
> -                goto done;
> -            }
> -        }
> -
> -        // This state - graph is not configured, but all inputs are either
> -        // initialized or EOF - should be unreachable because sending EOF
> to a
> -        // filter without even a fallback format should fail
> -        av_assert0(0);
> -        return AVERROR_BUG;
> -    }
> -
> -    if (msg == FRAME_OPAQUE_SEND_COMMAND) {
> -        FilterCommand *fc = (FilterCommand*)frame->buf[0]->data;
> -        send_command(fg, fc->time, fc->target, fc->command, fc->arg,
> fc->all_filters);
> -        av_frame_unref(frame);
> -        goto done;
> -    }
> -
> -    if (msg == FRAME_OPAQUE_CHOOSE_INPUT) {
> -        ret = avfilter_graph_request_oldest(fg->graph);
> -
> -        graph_eof = ret == AVERROR_EOF;
> -
> -        if (ret == AVERROR(EAGAIN)) {
> -            frame->opaque = (void*)(intptr_t)(choose_input(fg, fgt) + 1);
> -            goto done;
> -        } else if (ret < 0 && !graph_eof)
> -            return ret;
> -    }
> -
> -    ret = read_frames(fg, fgt, frame, 0);
> -    if (ret < 0) {
> -        av_log(fg, AV_LOG_ERROR, "Error sending filtered frames for
> encoding\n");
> -        return ret;
> -    }
> -
> -    if (graph_eof)
> -        return AVERROR_EOF;
> -
> -    // signal to the main thread that we are done processing the message
> -done:
> -    ret = tq_send(fgp->queue_out, fg->nb_outputs, frame);
> -    if (ret < 0) {
> -        if (ret != AVERROR_EOF)
> -            av_log(fg, AV_LOG_ERROR, "Error communicating with the main
> thread\n");
> -        return ret;
> -    }
> -
> -    return 0;
> -}
> -
>  static void fg_thread_set_name(const FilterGraph *fg)
>  {
>      char name[16];
> @@ -2867,294 +2749,94 @@ static void *filter_thread(void *arg)
>          InputFilter *ifilter;
>          InputFilterPriv *ifp;
>          enum FrameOpaque o;
> -        int input_idx, eof_frame;
> +        unsigned input_idx = fgt.next_in;
>
> -        input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame);
> -        if (input_idx < 0 ||
> -            (input_idx == fg->nb_inputs && input_status < 0)) {
> +        input_status = sch_filter_receive(fgp->sch, fgp->sch_idx,
> +                                          &input_idx, fgt.frame);
> +        if (input_status == AVERROR_EOF) {
>              av_log(fg, AV_LOG_VERBOSE, "Filtering thread received EOF\n");
>              break;
> +        } else if (input_status == AVERROR(EAGAIN)) {
> +            // should only happen when we didn't request any input
> +            av_assert0(input_idx == fg->nb_inputs);
> +            goto read_frames;
>          }
> +        av_assert0(input_status >= 0);
> +
> +        o = (intptr_t)fgt.frame->opaque;
>
>          o = (intptr_t)fgt.frame->opaque;
>
>          // message on the control stream
>          if (input_idx == fg->nb_inputs) {
> -            ret = msg_process(fgp, &fgt, fgt.frame);
> -            if (ret < 0)
> -                goto finish;
> +            FilterCommand *fc;
>
> +            av_assert0(o == FRAME_OPAQUE_SEND_COMMAND &&
> fgt.frame->buf[0]);
> +
> +            fc = (FilterCommand*)fgt.frame->buf[0]->data;
> +            send_command(fg, fc->time, fc->target, fc->command, fc->arg,
> +                         fc->all_filters);
> +            av_frame_unref(fgt.frame);
>              continue;
>          }
>
>          // we received an input frame or EOF
>          ifilter   = fg->inputs[input_idx];
>          ifp       = ifp_from_ifilter(ifilter);
> -        eof_frame = input_status >= 0 && o == FRAME_OPAQUE_EOF;
> +
>          if (ifp->type_src == AVMEDIA_TYPE_SUBTITLE) {
>              int hb_frame = input_status >= 0 && o ==
> FRAME_OPAQUE_SUB_HEARTBEAT;
>              ret = sub2video_frame(ifilter, (fgt.frame->buf[0] ||
> hb_frame) ? fgt.frame : NULL);
> -        } else if (input_status >= 0 && fgt.frame->buf[0]) {
> +        } else if (fgt.frame->buf[0]) {
>              ret = send_frame(fg, &fgt, ifilter, fgt.frame);
>          } else {
> -            int64_t   pts = input_status >= 0 ? fgt.frame->pts :
> AV_NOPTS_VALUE;
> -            AVRational tb = input_status >= 0 ? fgt.frame->time_base :
> (AVRational){ 1, 1 };
> -            ret = send_eof(&fgt, ifilter, pts, tb);
> +            av_assert1(o == FRAME_OPAQUE_EOF);
> +            ret = send_eof(&fgt, ifilter, fgt.frame->pts,
> fgt.frame->time_base);
>          }
>          av_frame_unref(fgt.frame);
>          if (ret < 0)
> +            goto finish;
> +
> +read_frames:
> +        // retrieve all newly avalable frames
> +        ret = read_frames(fg, &fgt, fgt.frame);
> +        if (ret == AVERROR_EOF) {
> +            av_log(fg, AV_LOG_VERBOSE, "All consumers returned EOF\n");
>              break;
> -
> -        if (eof_frame) {
> -            // an EOF frame is immediately followed by sender closing
> -            // the corresponding stream, so retrieve that event
> -            input_status = tq_receive(fgp->queue_in, &input_idx,
> fgt.frame);
> -            av_assert0(input_status == AVERROR_EOF && input_idx ==
> ifp->index);
> -        }
> -
> -        // signal to the main thread that we are done
> -        ret = tq_send(fgp->queue_out, fg->nb_outputs, fgt.frame);
> -        if (ret < 0) {
> -            if (ret == AVERROR_EOF)
> -                break;
> -
> -            av_log(fg, AV_LOG_ERROR, "Error communicating with the main
> thread\n");
> +        } else if (ret < 0) {
> +            av_log(fg, AV_LOG_ERROR, "Error sending frames to consumers:
> %s\n",
> +                   av_err2str(ret));
>              goto finish;
>          }
>      }
>
> +    for (unsigned i = 0; i < fg->nb_outputs; i++) {
> +        OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
> +
> +        if (fgt.eof_out[i])
> +            continue;
> +
> +        ret = fg_output_frame(ofp, &fgt, NULL);
> +        if (ret < 0)
> +            goto finish;
> +    }
> +
>  finish:
>      // EOF is normal termination
>      if (ret == AVERROR_EOF)
>          ret = 0;
>
> -    for (int i = 0; i <= fg->nb_inputs; i++)
> -        tq_receive_finish(fgp->queue_in, i);
> -    for (int i = 0; i <= fg->nb_outputs; i++)
> -        tq_send_finish(fgp->queue_out, i);
> -
>      fg_thread_uninit(&fgt);
>
> -    av_log(fg, AV_LOG_VERBOSE, "Terminating filtering thread\n");
> -
>      return (void*)(intptr_t)ret;
>  }
>
> -static int thread_send_frame(FilterGraphPriv *fgp, InputFilter *ifilter,
> -                             AVFrame *frame, enum FrameOpaque type)
> -{
> -    InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
> -    int output_idx, ret;
> -
> -    if (ifp->eof) {
> -        av_frame_unref(frame);
> -        return AVERROR_EOF;
> -    }
> -
> -    frame->opaque = (void*)(intptr_t)type;
> -
> -    ret = tq_send(fgp->queue_in, ifp->index, frame);
> -    if (ret < 0) {
> -        ifp->eof = 1;
> -        av_frame_unref(frame);
> -        return ret;
> -    }
> -
> -    if (type == FRAME_OPAQUE_EOF)
> -        tq_send_finish(fgp->queue_in, ifp->index);
> -
> -    // wait for the frame to be processed
> -    ret = tq_receive(fgp->queue_out, &output_idx, frame);
> -    av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF);
> -
> -    return ret;
> -}
> -
> -int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int
> keep_reference)
> -{
> -    FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
> -    int ret;
> -
> -    if (keep_reference) {
> -        ret = av_frame_ref(fgp->frame, frame);
> -        if (ret < 0)
> -            return ret;
> -    } else
> -        av_frame_move_ref(fgp->frame, frame);
> -
> -    return thread_send_frame(fgp, ifilter, fgp->frame, 0);
> -}
> -
> -int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb)
> -{
> -    FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
> -    int ret;
> -
> -    fgp->frame->pts       = pts;
> -    fgp->frame->time_base = tb;
> -
> -    ret = thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_EOF);
> -
> -    return ret == AVERROR_EOF ? 0 : ret;
> -}
> -
> -void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts,
> AVRational tb)
> -{
> -    FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
> -
> -    fgp->frame->pts       = pts;
> -    fgp->frame->time_base = tb;
> -
> -    thread_send_frame(fgp, ifilter, fgp->frame,
> FRAME_OPAQUE_SUB_HEARTBEAT);
> -}
> -
> -int fg_transcode_step(FilterGraph *graph, InputStream **best_ist)
> -{
> -    FilterGraphPriv *fgp = fgp_from_fg(graph);
> -    int ret, got_frames = 0;
> -
> -    if (fgp->eof_in)
> -        return AVERROR_EOF;
> -
> -    // signal to the filtering thread to return all frames it can
> -    av_assert0(!fgp->frame->buf[0]);
> -    fgp->frame->opaque = (void*)(intptr_t)(best_ist             ?
> -                                           FRAME_OPAQUE_CHOOSE_INPUT :
> -                                           FRAME_OPAQUE_REAP_FILTERS);
> -
> -    ret = tq_send(fgp->queue_in, graph->nb_inputs, fgp->frame);
> -    if (ret < 0) {
> -        fgp->eof_in = 1;
> -        goto finish;
> -    }
> -
> -    while (1) {
> -        OutputFilter *ofilter;
> -        OutputFilterPriv *ofp;
> -        OutputStream *ost;
> -        int output_idx;
> -
> -        ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame);
> -
> -        // EOF on the whole queue or the control stream
> -        if (output_idx < 0 ||
> -            (ret < 0 && output_idx == graph->nb_outputs))
> -            goto finish;
> -
> -        // EOF for a specific stream
> -        if (ret < 0) {
> -            ofilter = graph->outputs[output_idx];
> -            ofp     = ofp_from_ofilter(ofilter);
> -
> -            // we are finished and no frames were ever seen at this
> output,
> -            // at least initialize the encoder with a dummy frame
> -            if (!ofp->got_frame) {
> -                AVFrame *frame = fgp->frame;
> -                FrameData *fd;
> -
> -                frame->time_base   = ofp->tb_out;
> -                frame->format      = ofp->format;
> -
> -                frame->width               = ofp->width;
> -                frame->height              = ofp->height;
> -                frame->sample_aspect_ratio = ofp->sample_aspect_ratio;
> -
> -                frame->sample_rate = ofp->sample_rate;
> -                if (ofp->ch_layout.nb_channels) {
> -                    ret = av_channel_layout_copy(&frame->ch_layout,
> &ofp->ch_layout);
> -                    if (ret < 0)
> -                        return ret;
> -                }
> -
> -                fd = frame_data(frame);
> -                if (!fd)
> -                    return AVERROR(ENOMEM);
> -
> -                fd->frame_rate_filter = ofp->fps.framerate;
> -
> -                av_assert0(!frame->buf[0]);
> -
> -                av_log(ofilter->ost, AV_LOG_WARNING,
> -                       "No filtered frames for output stream, trying to "
> -                       "initialize anyway.\n");
> -
> -                enc_open(ofilter->ost, frame);
> -                av_frame_unref(frame);
> -            }
> -
> -            close_output_stream(graph->outputs[output_idx]->ost);
> -            continue;
> -        }
> -
> -        // request was fully processed by the filtering thread,
> -        // return the input stream to read from, if needed
> -        if (output_idx == graph->nb_outputs) {
> -            int input_idx = (intptr_t)fgp->frame->opaque - 1;
> -            av_assert0(input_idx <= graph->nb_inputs);
> -
> -            if (best_ist) {
> -                *best_ist = (input_idx >= 0 && input_idx <
> graph->nb_inputs) ?
> -
> ifp_from_ifilter(graph->inputs[input_idx])->ist : NULL;
> -
> -                if (input_idx < 0 && !got_frames) {
> -                    for (int i = 0; i < graph->nb_outputs; i++)
> -                        graph->outputs[i]->ost->unavailable = 1;
> -                }
> -            }
> -            break;
> -        }
> -
> -        // got a frame from the filtering thread, send it for encoding
> -        ofilter = graph->outputs[output_idx];
> -        ost     = ofilter->ost;
> -        ofp     = ofp_from_ofilter(ofilter);
> -
> -        if (ost->finished) {
> -            av_frame_unref(fgp->frame);
> -            tq_receive_finish(fgp->queue_out, output_idx);
> -            continue;
> -        }
> -
> -        if (fgp->frame->pts != AV_NOPTS_VALUE) {
> -            ofilter->last_pts = av_rescale_q(fgp->frame->pts,
> -                                             fgp->frame->time_base,
> -                                             AV_TIME_BASE_Q);
> -        }
> -
> -        ret = enc_frame(ost, fgp->frame);
> -        av_frame_unref(fgp->frame);
> -        if (ret < 0)
> -            goto finish;
> -
> -        ofp->got_frame = 1;
> -        got_frames     = 1;
> -    }
> -
> -finish:
> -    if (ret < 0) {
> -        fgp->eof_in = 1;
> -        for (int i = 0; i < graph->nb_outputs; i++)
> -            close_output_stream(graph->outputs[i]->ost);
> -    }
> -
> -    return ret;
> -}
> -
> -int reap_filters(FilterGraph *fg, int flush)
> -{
> -    return fg_transcode_step(fg, NULL);
> -}
> -
>  void fg_send_command(FilterGraph *fg, double time, const char *target,
>                       const char *command, const char *arg, int
> all_filters)
>  {
>      FilterGraphPriv *fgp = fgp_from_fg(fg);
>      AVBufferRef *buf;
>      FilterCommand *fc;
> -    int output_idx, ret;
> -
> -    if (!fgp->queue_in)
> -        return;
>
>      fc = av_mallocz(sizeof(*fc));
>      if (!fc)
> @@ -3180,13 +2862,5 @@ void fg_send_command(FilterGraph *fg, double time,
> const char *target,
>      fgp->frame->buf[0] = buf;
>      fgp->frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SEND_COMMAND;
>
> -    ret = tq_send(fgp->queue_in, fg->nb_inputs, fgp->frame);
> -    if (ret < 0) {
> -        av_frame_unref(fgp->frame);
> -        return;
> -    }
> -
> -    // wait for the frame to be processed
> -    ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame);
> -    av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF);
> +    sch_filter_command(fgp->sch, fgp->sch_idx, fgp->frame);
>  }
> diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c
> index ef5c2f60e0..067dc65d4e 100644
> --- a/fftools/ffmpeg_mux.c
> +++ b/fftools/ffmpeg_mux.c
> @@ -23,16 +23,13 @@
>  #include "ffmpeg.h"
>  #include "ffmpeg_mux.h"
>  #include "ffmpeg_utils.h"
> -#include "objpool.h"
>  #include "sync_queue.h"
> -#include "thread_queue.h"
>
>  #include "libavutil/fifo.h"
>  #include "libavutil/intreadwrite.h"
>  #include "libavutil/log.h"
>  #include "libavutil/mem.h"
>  #include "libavutil/timestamp.h"
> -#include "libavutil/thread.h"
>
>  #include "libavcodec/packet.h"
>
> @@ -41,10 +38,9 @@
>
>  typedef struct MuxThreadContext {
>      AVPacket *pkt;
> +    AVPacket *fix_sub_duration_pkt;
>  } MuxThreadContext;
>
> -int want_sdp = 1;
> -
>  static Muxer *mux_from_of(OutputFile *of)
>  {
>      return (Muxer*)of;
> @@ -207,14 +203,41 @@ static int sync_queue_process(Muxer *mux,
> OutputStream *ost, AVPacket *pkt, int
>      return 0;
>  }
>
> +static int of_streamcopy(OutputStream *ost, AVPacket *pkt);
> +
>  /* apply the output bitstream filters */
> -static int mux_packet_filter(Muxer *mux, OutputStream *ost,
> -                             AVPacket *pkt, int *stream_eof)
> +static int mux_packet_filter(Muxer *mux, MuxThreadContext *mt,
> +                             OutputStream *ost, AVPacket *pkt, int
> *stream_eof)
>  {
>      MuxStream *ms = ms_from_ost(ost);
>      const char *err_msg;
>      int ret = 0;
>
> +    if (pkt && !ost->enc) {
> +        ret = of_streamcopy(ost, pkt);
> +        if (ret == AVERROR(EAGAIN))
> +            return 0;
> +        else if (ret == AVERROR_EOF) {
> +            av_packet_unref(pkt);
> +            pkt = NULL;
> +            ret = 0;
> +        } else if (ret < 0)
> +            goto fail;
> +    }
> +
> +    // emit heartbeat for -fix_sub_duration;
> +    // we are only interested in heartbeats on on random access points.
> +    if (pkt && (pkt->flags & AV_PKT_FLAG_KEY)) {
> +        mt->fix_sub_duration_pkt->opaque    =
> (void*)(intptr_t)PKT_OPAQUE_FIX_SUB_DURATION;
> +        mt->fix_sub_duration_pkt->pts       = pkt->pts;
> +        mt->fix_sub_duration_pkt->time_base = pkt->time_base;
> +
> +        ret = sch_mux_sub_heartbeat(mux->sch, mux->sch_idx, ms->sch_idx,
> +                                    mt->fix_sub_duration_pkt);
> +        if (ret < 0)
> +            goto fail;
> +    }
> +
>      if (ms->bsf_ctx) {
>          int bsf_eof = 0;
>
> @@ -278,6 +301,7 @@ static void thread_set_name(OutputFile *of)
>  static void mux_thread_uninit(MuxThreadContext *mt)
>  {
>      av_packet_free(&mt->pkt);
> +    av_packet_free(&mt->fix_sub_duration_pkt);
>
>      memset(mt, 0, sizeof(*mt));
>  }
> @@ -290,6 +314,10 @@ static int mux_thread_init(MuxThreadContext *mt)
>      if (!mt->pkt)
>          goto fail;
>
> +    mt->fix_sub_duration_pkt = av_packet_alloc();
> +    if (!mt->fix_sub_duration_pkt)
> +        goto fail;
> +
>      return 0;
>
>  fail:
> @@ -316,19 +344,22 @@ void *muxer_thread(void *arg)
>          OutputStream *ost;
>          int stream_idx, stream_eof = 0;
>
> -        ret = tq_receive(mux->tq, &stream_idx, mt.pkt);
> +        ret = sch_mux_receive(mux->sch, of->index, mt.pkt);
> +        stream_idx = mt.pkt->stream_index;
>          if (stream_idx < 0) {
>              av_log(mux, AV_LOG_VERBOSE, "All streams finished\n");
>              ret = 0;
>              break;
>          }
>
> -        ost = of->streams[stream_idx];
> -        ret = mux_packet_filter(mux, ost, ret < 0 ? NULL : mt.pkt,
> &stream_eof);
> +        ost = of->streams[mux->sch_stream_idx[stream_idx]];
> +        mt.pkt->stream_index = ost->index;
> +
> +        ret = mux_packet_filter(mux, &mt, ost, ret < 0 ? NULL : mt.pkt,
> &stream_eof);
>          av_packet_unref(mt.pkt);
>          if (ret == AVERROR_EOF) {
>              if (stream_eof) {
> -                tq_receive_finish(mux->tq, stream_idx);
> +                sch_mux_receive_finish(mux->sch, of->index, stream_idx);
>              } else {
>                  av_log(mux, AV_LOG_VERBOSE, "Muxer returned EOF\n");
>                  ret = 0;
> @@ -343,243 +374,55 @@ void *muxer_thread(void *arg)
>  finish:
>      mux_thread_uninit(&mt);
>
> -    for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
> -        tq_receive_finish(mux->tq, i);
> -
> -    av_log(mux, AV_LOG_VERBOSE, "Terminating muxer thread\n");
> -
>      return (void*)(intptr_t)ret;
>  }
>
> -static int thread_submit_packet(Muxer *mux, OutputStream *ost, AVPacket
> *pkt)
> -{
> -    int ret = 0;
> -
> -    if (!pkt || ost->finished & MUXER_FINISHED)
> -        goto finish;
> -
> -    ret = tq_send(mux->tq, ost->index, pkt);
> -    if (ret < 0)
> -        goto finish;
> -
> -    return 0;
> -
> -finish:
> -    if (pkt)
> -        av_packet_unref(pkt);
> -
> -    ost->finished |= MUXER_FINISHED;
> -    tq_send_finish(mux->tq, ost->index);
> -    return ret == AVERROR_EOF ? 0 : ret;
> -}
> -
> -static int queue_packet(OutputStream *ost, AVPacket *pkt)
> -{
> -    MuxStream *ms = ms_from_ost(ost);
> -    AVPacket *tmp_pkt = NULL;
> -    int ret;
> -
> -    if (!av_fifo_can_write(ms->muxing_queue)) {
> -        size_t cur_size = av_fifo_can_read(ms->muxing_queue);
> -        size_t pkt_size = pkt ? pkt->size : 0;
> -        unsigned int are_we_over_size =
> -            (ms->muxing_queue_data_size + pkt_size) >
> ms->muxing_queue_data_threshold;
> -        size_t limit    = are_we_over_size ? ms->max_muxing_queue_size :
> SIZE_MAX;
> -        size_t new_size = FFMIN(2 * cur_size, limit);
> -
> -        if (new_size <= cur_size) {
> -            av_log(ost, AV_LOG_ERROR,
> -                   "Too many packets buffered for output stream %d:%d.\n",
> -                   ost->file_index, ost->st->index);
> -            return AVERROR(ENOSPC);
> -        }
> -        ret = av_fifo_grow2(ms->muxing_queue, new_size - cur_size);
> -        if (ret < 0)
> -            return ret;
> -    }
> -
> -    if (pkt) {
> -        ret = av_packet_make_refcounted(pkt);
> -        if (ret < 0)
> -            return ret;
> -
> -        tmp_pkt = av_packet_alloc();
> -        if (!tmp_pkt)
> -            return AVERROR(ENOMEM);
> -
> -        av_packet_move_ref(tmp_pkt, pkt);
> -        ms->muxing_queue_data_size += tmp_pkt->size;
> -    }
> -    av_fifo_write(ms->muxing_queue, &tmp_pkt, 1);
> -
> -    return 0;
> -}
> -
> -static int submit_packet(Muxer *mux, AVPacket *pkt, OutputStream *ost)
> -{
> -    int ret;
> -
> -    if (mux->tq) {
> -        return thread_submit_packet(mux, ost, pkt);
> -    } else {
> -        /* the muxer is not initialized yet, buffer the packet */
> -        ret = queue_packet(ost, pkt);
> -        if (ret < 0) {
> -            if (pkt)
> -                av_packet_unref(pkt);
> -            return ret;
> -        }
> -    }
> -
> -    return 0;
> -}
> -
> -int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
> -{
> -    Muxer *mux = mux_from_of(of);
> -    int ret = 0;
> -
> -    if (pkt && pkt->dts != AV_NOPTS_VALUE)
> -        ost->last_mux_dts = av_rescale_q(pkt->dts, pkt->time_base,
> AV_TIME_BASE_Q);
> -
> -    ret = submit_packet(mux, pkt, ost);
> -    if (ret < 0) {
> -        av_log(ost, AV_LOG_ERROR, "Error submitting a packet to the
> muxer: %s",
> -               av_err2str(ret));
> -        return ret;
> -    }
> -
> -    return 0;
> -}
> -
> -int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts)
> +static int of_streamcopy(OutputStream *ost, AVPacket *pkt)
>  {
>      OutputFile *of = output_files[ost->file_index];
>      MuxStream  *ms = ms_from_ost(ost);
> +    DemuxPktData *pd = pkt->opaque_ref ?
> (DemuxPktData*)pkt->opaque_ref->data : NULL;
> +    int64_t      dts = pd ? pd->dts_est : AV_NOPTS_VALUE;
>      int64_t start_time = (of->start_time == AV_NOPTS_VALUE) ? 0 :
> of->start_time;
>      int64_t ts_offset;
> -    AVPacket *opkt = ms->pkt;
> -    int ret;
> -
> -    av_packet_unref(opkt);
>
>      if (of->recording_time != INT64_MAX &&
>          dts >= of->recording_time + start_time)
> -        pkt = NULL;
> -
> -    // EOF: flush output bitstream filters.
> -    if (!pkt)
> -        return of_output_packet(of, ost, NULL);
> +        return AVERROR_EOF;
>
>      if (!ms->streamcopy_started && !(pkt->flags & AV_PKT_FLAG_KEY) &&
>          !ms->copy_initial_nonkeyframes)
> -        return 0;
> +        return AVERROR(EAGAIN);
>
>      if (!ms->streamcopy_started) {
>          if (!ms->copy_prior_start &&
>              (pkt->pts == AV_NOPTS_VALUE ?
>               dts < ms->ts_copy_start :
>               pkt->pts < av_rescale_q(ms->ts_copy_start, AV_TIME_BASE_Q,
> pkt->time_base)))
> -            return 0;
> +            return AVERROR(EAGAIN);
>
>          if (of->start_time != AV_NOPTS_VALUE && dts < of->start_time)
> -            return 0;
> +            return AVERROR(EAGAIN);
>      }
>
> -    ret = av_packet_ref(opkt, pkt);
> -    if (ret < 0)
> -        return ret;
> -
> -    ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, opkt->time_base);
> +    ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, pkt->time_base);
>
>      if (pkt->pts != AV_NOPTS_VALUE)
> -        opkt->pts -= ts_offset;
> +        pkt->pts -= ts_offset;
>
>      if (pkt->dts == AV_NOPTS_VALUE) {
> -        opkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, opkt->time_base);
> +        pkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, pkt->time_base);
>      } else if (ost->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
> -        opkt->pts = opkt->dts - ts_offset;
> -    }
> -    opkt->dts -= ts_offset;
> -
> -    {
> -        int ret = trigger_fix_sub_duration_heartbeat(ost, pkt);
> -        if (ret < 0) {
> -            av_log(NULL, AV_LOG_ERROR,
> -                   "Subtitle heartbeat logic failed in %s! (%s)\n",
> -                   __func__, av_err2str(ret));
> -            return ret;
> -        }
> +        pkt->pts = pkt->dts - ts_offset;
>      }
>
> -    ret = of_output_packet(of, ost, opkt);
> -    if (ret < 0)
> -        return ret;
> +    pkt->dts -= ts_offset;
>
>      ms->streamcopy_started = 1;
>
>      return 0;
>  }
>
> -static int thread_stop(Muxer *mux)
> -{
> -    void *ret;
> -
> -    if (!mux || !mux->tq)
> -        return 0;
> -
> -    for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
> -        tq_send_finish(mux->tq, i);
> -
> -    pthread_join(mux->thread, &ret);
> -
> -    tq_free(&mux->tq);
> -
> -    return (int)(intptr_t)ret;
> -}
> -
> -static int thread_start(Muxer *mux)
> -{
> -    AVFormatContext *fc = mux->fc;
> -    ObjPool *op;
> -    int ret;
> -
> -    op = objpool_alloc_packets();
> -    if (!op)
> -        return AVERROR(ENOMEM);
> -
> -    mux->tq = tq_alloc(fc->nb_streams, mux->thread_queue_size, op,
> pkt_move);
> -    if (!mux->tq) {
> -        objpool_free(&op);
> -        return AVERROR(ENOMEM);
> -    }
> -
> -    ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)mux);
> -    if (ret) {
> -        tq_free(&mux->tq);
> -        return AVERROR(ret);
> -    }
> -
> -    /* flush the muxing queues */
> -    for (int i = 0; i < fc->nb_streams; i++) {
> -        OutputStream *ost = mux->of.streams[i];
> -        MuxStream     *ms = ms_from_ost(ost);
> -        AVPacket *pkt;
> -
> -        while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) {
> -            ret = thread_submit_packet(mux, ost, pkt);
> -            if (pkt) {
> -                ms->muxing_queue_data_size -= pkt->size;
> -                av_packet_free(&pkt);
> -            }
> -            if (ret < 0)
> -                return ret;
> -        }
> -    }
> -
> -    return 0;
> -}
> -
>  int print_sdp(const char *filename);
>
>  int print_sdp(const char *filename)
> @@ -590,11 +433,6 @@ int print_sdp(const char *filename)
>      AVIOContext *sdp_pb;
>      AVFormatContext **avc;
>
> -    for (i = 0; i < nb_output_files; i++) {
> -        if (!mux_from_of(output_files[i])->header_written)
> -            return 0;
> -    }
> -
>      avc = av_malloc_array(nb_output_files, sizeof(*avc));
>      if (!avc)
>          return AVERROR(ENOMEM);
> @@ -629,25 +467,17 @@ int print_sdp(const char *filename)
>          avio_closep(&sdp_pb);
>      }
>
> -    // SDP successfully written, allow muxer threads to start
> -    ret = 1;
> -
>  fail:
>      av_freep(&avc);
>      return ret;
>  }
>
> -int mux_check_init(Muxer *mux)
> +int mux_check_init(void *arg)
>  {
> +    Muxer     *mux = arg;
>      OutputFile *of = &mux->of;
>      AVFormatContext *fc = mux->fc;
> -    int ret, i;
> -
> -    for (i = 0; i < fc->nb_streams; i++) {
> -        OutputStream *ost = of->streams[i];
> -        if (!ost->initialized)
> -            return 0;
> -    }
> +    int ret;
>
>      ret = avformat_write_header(fc, &mux->opts);
>      if (ret < 0) {
> @@ -659,27 +489,7 @@ int mux_check_init(Muxer *mux)
>      mux->header_written = 1;
>
>      av_dump_format(fc, of->index, fc->url, 1);
> -    nb_output_dumped++;
> -
> -    if (sdp_filename || want_sdp) {
> -        ret = print_sdp(sdp_filename);
> -        if (ret < 0) {
> -            av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n");
> -            return ret;
> -        } else if (ret == 1) {
> -            /* SDP is written only after all the muxers are ready, so now
> we
> -             * start ALL the threads */
> -            for (i = 0; i < nb_output_files; i++) {
> -                ret = thread_start(mux_from_of(output_files[i]));
> -                if (ret < 0)
> -                    return ret;
> -            }
> -        }
> -    } else {
> -        ret = thread_start(mux_from_of(of));
> -        if (ret < 0)
> -            return ret;
> -    }
> +    atomic_fetch_add(&nb_output_dumped, 1);
>
>      return 0;
>  }
> @@ -736,9 +546,10 @@ int of_stream_init(OutputFile *of, OutputStream *ost)
>                                           ost->st->time_base);
>      }
>
> -    ost->initialized = 1;
> +    if (ms->sch_idx >= 0)
> +        return sch_mux_stream_ready(mux->sch, of->index, ms->sch_idx);
>
> -    return mux_check_init(mux);
> +    return 0;
>  }
>
>  static int check_written(OutputFile *of)
> @@ -852,15 +663,13 @@ int of_write_trailer(OutputFile *of)
>      AVFormatContext *fc = mux->fc;
>      int ret, mux_result = 0;
>
> -    if (!mux->tq) {
> +    if (!mux->header_written) {
>          av_log(mux, AV_LOG_ERROR,
>                 "Nothing was written into output file, because "
>                 "at least one of its streams received no packets.\n");
>          return AVERROR(EINVAL);
>      }
>
> -    mux_result = thread_stop(mux);
> -
>      ret = av_write_trailer(fc);
>      if (ret < 0) {
>          av_log(mux, AV_LOG_ERROR, "Error writing trailer: %s\n",
> av_err2str(ret));
> @@ -905,13 +714,6 @@ static void ost_free(OutputStream **post)
>          ost->logfile = NULL;
>      }
>
> -    if (ms->muxing_queue) {
> -        AVPacket *pkt;
> -        while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0)
> -            av_packet_free(&pkt);
> -        av_fifo_freep2(&ms->muxing_queue);
> -    }
> -
>      avcodec_parameters_free(&ost->par_in);
>
>      av_bsf_free(&ms->bsf_ctx);
> @@ -976,8 +778,6 @@ void of_free(OutputFile **pof)
>          return;
>      mux = mux_from_of(of);
>
> -    thread_stop(mux);
> -
>      sq_free(&of->sq_encode);
>      sq_free(&mux->sq_mux);
>
> diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h
> index eee2b2cb07..5d7cf3fa76 100644
> --- a/fftools/ffmpeg_mux.h
> +++ b/fftools/ffmpeg_mux.h
> @@ -25,7 +25,6 @@
>  #include <stdint.h>
>
>  #include "ffmpeg_sched.h"
> -#include "thread_queue.h"
>
>  #include "libavformat/avformat.h"
>
> @@ -33,7 +32,6 @@
>
>  #include "libavutil/dict.h"
>  #include "libavutil/fifo.h"
> -#include "libavutil/thread.h"
>
>  typedef struct MuxStream {
>      OutputStream ost;
> @@ -41,9 +39,6 @@ typedef struct MuxStream {
>      // name used for logging
>      char log_name[32];
>
> -    /* the packets are buffered here until the muxer is ready to be
> initialized */
> -    AVFifo *muxing_queue;
> -
>      AVBSFContext *bsf_ctx;
>      AVPacket     *bsf_pkt;
>
> @@ -57,17 +52,6 @@ typedef struct MuxStream {
>
>      int64_t max_frames;
>
> -    /*
> -     * The size of the AVPackets' buffers in queue.
> -     * Updated when a packet is either pushed or pulled from the queue.
> -     */
> -    size_t muxing_queue_data_size;
> -
> -    int max_muxing_queue_size;
> -
> -    /* Threshold after which max_muxing_queue_size will be in effect */
> -    size_t muxing_queue_data_threshold;
> -
>      // timestamp from which the streamcopied streams should start,
>      // in AV_TIME_BASE_Q;
>      // everything before it should be discarded
> @@ -106,9 +90,6 @@ typedef struct Muxer {
>      int         *sch_stream_idx;
>      int       nb_sch_stream_idx;
>
> -    pthread_t    thread;
> -    ThreadQueue *tq;
> -
>      AVDictionary *opts;
>
>      int thread_queue_size;
> @@ -122,10 +103,7 @@ typedef struct Muxer {
>      AVPacket *sq_pkt;
>  } Muxer;
>
> -/* whether we want to print an SDP, set in of_open() */
> -extern int want_sdp;
> -
> -int mux_check_init(Muxer *mux);
> +int mux_check_init(void *arg);
>
>  static MuxStream *ms_from_ost(OutputStream *ost)
>  {
> diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c
> index 534b4379c7..6459296ab0 100644
> --- a/fftools/ffmpeg_mux_init.c
> +++ b/fftools/ffmpeg_mux_init.c
> @@ -924,13 +924,6 @@ static int new_stream_audio(Muxer *mux, const
> OptionsContext *o,
>      return 0;
>  }
>
> -static int new_stream_attachment(Muxer *mux, const OptionsContext *o,
> -                                 OutputStream *ost)
> -{
> -    ost->finished    = 1;
> -    return 0;
> -}
> -
>  static int new_stream_subtitle(Muxer *mux, const OptionsContext *o,
>                                 OutputStream *ost)
>  {
> @@ -1168,9 +1161,6 @@ static int ost_add(Muxer *mux, const OptionsContext
> *o, enum AVMediaType type,
>      if (!ost->par_in)
>          return AVERROR(ENOMEM);
>
> -    ms->muxing_queue = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
> -    if (!ms->muxing_queue)
> -        return AVERROR(ENOMEM);
>      ms->last_mux_dts = AV_NOPTS_VALUE;
>
>      ost->st         = st;
> @@ -1190,7 +1180,8 @@ static int ost_add(Muxer *mux, const OptionsContext
> *o, enum AVMediaType type,
>          if (!ost->enc_ctx)
>              return AVERROR(ENOMEM);
>
> -        ret = sch_add_enc(mux->sch, encoder_thread, ost, NULL);
> +        ret = sch_add_enc(mux->sch, encoder_thread, ost,
> +                          ost->type == AVMEDIA_TYPE_SUBTITLE ? NULL :
> enc_open);
>          if (ret < 0)
>              return ret;
>          ms->sch_idx_enc = ret;
> @@ -1414,9 +1405,6 @@ static int ost_add(Muxer *mux, const OptionsContext
> *o, enum AVMediaType type,
>
>          sch_mux_stream_buffering(mux->sch, mux->sch_idx, ms->sch_idx,
>                                   max_muxing_queue_size,
> muxing_queue_data_threshold);
> -
> -        ms->max_muxing_queue_size       = max_muxing_queue_size;
> -        ms->muxing_queue_data_threshold = muxing_queue_data_threshold;
>      }
>
>      MATCH_PER_STREAM_OPT(bits_per_raw_sample, i, ost->bits_per_raw_sample,
> @@ -1434,8 +1422,6 @@ static int ost_add(Muxer *mux, const OptionsContext
> *o, enum AVMediaType type,
>      if (ost->enc_ctx &&
> av_get_exact_bits_per_sample(ost->enc_ctx->codec_id) == 24)
>          av_dict_set(&ost->swr_opts, "output_sample_bits", "24", 0);
>
> -    ost->last_mux_dts = AV_NOPTS_VALUE;
> -
>      MATCH_PER_STREAM_OPT(copy_initial_nonkeyframes, i,
>                           ms->copy_initial_nonkeyframes, oc, st);
>
> @@ -1443,7 +1429,6 @@ static int ost_add(Muxer *mux, const OptionsContext
> *o, enum AVMediaType type,
>      case AVMEDIA_TYPE_VIDEO:      ret = new_stream_video     (mux, o,
> ost); break;
>      case AVMEDIA_TYPE_AUDIO:      ret = new_stream_audio     (mux, o,
> ost); break;
>      case AVMEDIA_TYPE_SUBTITLE:   ret = new_stream_subtitle  (mux, o,
> ost); break;
> -    case AVMEDIA_TYPE_ATTACHMENT: ret = new_stream_attachment(mux, o,
> ost); break;
>      }
>      if (ret < 0)
>          return ret;
> @@ -1938,7 +1923,6 @@ static int setup_sync_queues(Muxer *mux,
> AVFormatContext *oc, int64_t buf_size_u
>          MuxStream     *ms = ms_from_ost(ost);
>          enum AVMediaType type = ost->type;
>
> -        ost->sq_idx_encode = -1;
>          ost->sq_idx_mux    = -1;
>
>          nb_interleaved += IS_INTERLEAVED(type);
> @@ -1961,11 +1945,17 @@ static int setup_sync_queues(Muxer *mux,
> AVFormatContext *oc, int64_t buf_size_u
>       * - at least one encoded audio/video stream is frame-limited, since
>       *   that has similar semantics to 'shortest'
>       * - at least one audio encoder requires constant frame sizes
> +     *
> +     * Note that encoding sync queues are handled in the scheduler,
> because
> +     * different encoders run in different threads and need external
> +     * synchronization, while muxer sync queues can be handled inside the
> muxer
>       */
>      if ((of->shortest && nb_av_enc > 1) || limit_frames_av_enc ||
> nb_audio_fs) {
> -        of->sq_encode = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, mux);
> -        if (!of->sq_encode)
> -            return AVERROR(ENOMEM);
> +        int sq_idx, ret;
> +
> +        sq_idx = sch_add_sq_enc(mux->sch, buf_size_us, mux);
> +        if (sq_idx < 0)
> +            return sq_idx;
>
>          for (int i = 0; i < oc->nb_streams; i++) {
>              OutputStream *ost = of->streams[i];
> @@ -1975,13 +1965,11 @@ static int setup_sync_queues(Muxer *mux,
> AVFormatContext *oc, int64_t buf_size_u
>              if (!IS_AV_ENC(ost, type))
>                  continue;
>
> -            ost->sq_idx_encode = sq_add_stream(of->sq_encode,
> -                                               of->shortest ||
> ms->max_frames < INT64_MAX);
> -            if (ost->sq_idx_encode < 0)
> -                return ost->sq_idx_encode;
> -
> -            if (ms->max_frames != INT64_MAX)
> -                sq_limit_frames(of->sq_encode, ost->sq_idx_encode,
> ms->max_frames);
> +            ret = sch_sq_add_enc(mux->sch, sq_idx, ms->sch_idx_enc,
> +                                 of->shortest || ms->max_frames <
> INT64_MAX,
> +                                 ms->max_frames);
> +            if (ret < 0)
> +                return ret;
>          }
>      }
>
> @@ -2652,23 +2640,6 @@ static int validate_enc_avopt(Muxer *mux, const
> AVDictionary *codec_avopt)
>      return 0;
>  }
>
> -static int init_output_stream_nofilter(OutputStream *ost)
> -{
> -    int ret = 0;
> -
> -    if (ost->enc_ctx) {
> -        ret = enc_open(ost, NULL);
> -        if (ret < 0)
> -            return ret;
> -    } else {
> -        ret = of_stream_init(output_files[ost->file_index], ost);
> -        if (ret < 0)
> -            return ret;
> -    }
> -
> -    return ret;
> -}
> -
>  static const char *output_file_item_name(void *obj)
>  {
>      const Muxer *mux = obj;
> @@ -2751,8 +2722,6 @@ int of_open(const OptionsContext *o, const char
> *filename, Scheduler *sch)
>      av_strlcat(mux->log_name, "/",               sizeof(mux->log_name));
>      av_strlcat(mux->log_name, oc->oformat->name, sizeof(mux->log_name));
>
> -    if (strcmp(oc->oformat->name, "rtp"))
> -        want_sdp = 0;
>
>      of->format = oc->oformat;
>      if (recording_time != INT64_MAX)
> @@ -2768,7 +2737,7 @@ int of_open(const OptionsContext *o, const char
> *filename, Scheduler *sch)
>                                             AVFMT_FLAG_BITEXACT);
>      }
>
> -    err = sch_add_mux(sch, muxer_thread, NULL, mux,
> +    err = sch_add_mux(sch, muxer_thread, mux_check_init, mux,
>                        !strcmp(oc->oformat->name, "rtp"));
>      if (err < 0)
>          return err;
> @@ -2854,26 +2823,15 @@ int of_open(const OptionsContext *o, const char
> *filename, Scheduler *sch)
>
>      of->url        = filename;
>
> -    /* initialize stream copy and subtitle/data streams.
> -     * Encoded AVFrame based streams will get initialized when the first
> AVFrame
> -     * is received in do_video_out
> -     */
> +    /* initialize streamcopy streams. */
>      for (int i = 0; i < of->nb_streams; i++) {
>          OutputStream *ost = of->streams[i];
>
> -        if (ost->filter)
> -            continue;
> -
> -        err = init_output_stream_nofilter(ost);
> -        if (err < 0)
> -            return err;
> -    }
> -
> -    /* write the header for files with no streams */
> -    if (of->format->flags & AVFMT_NOSTREAMS && oc->nb_streams == 0) {
> -        int ret = mux_check_init(mux);
> -        if (ret < 0)
> -            return ret;
> +        if (!ost->enc) {
> +            err = of_stream_init(of, ost);
> +            if (err < 0)
> +                return err;
> +        }
>      }
>
>      return 0;
> diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
> index d463306546..6177a96a4e 100644
> --- a/fftools/ffmpeg_opt.c
> +++ b/fftools/ffmpeg_opt.c
> @@ -64,7 +64,6 @@ const char *const opt_name_top_field_first[]
>       = {"top", NULL};
>  HWDevice *filter_hw_device;
>
>  char *vstats_filename;
> -char *sdp_filename;
>
>  float audio_drift_threshold = 0.1;
>  float dts_delta_threshold   = 10;
> @@ -580,9 +579,8 @@ fail:
>
>  static int opt_sdp_file(void *optctx, const char *opt, const char *arg)
>  {
> -    av_free(sdp_filename);
> -    sdp_filename = av_strdup(arg);
> -    return 0;
> +    Scheduler *sch = optctx;
> +    return sch_sdp_filename(sch, arg);
>  }
>
>  #if CONFIG_VAAPI
> diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> index 957a410921..bc9b833799 100644
> --- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> +++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> @@ -1,48 +1,40 @@
>  1
> -00:00:00,968 --> 00:00:01,001
> +00:00:00,968 --> 00:00:01,168
>  <font face="Monospace">{\an7}(</font>
>
>  2
> -00:00:01,001 --> 00:00:01,168
> -<font face="Monospace">{\an7}(</font>
> -
> -3
>  00:00:01,168 --> 00:00:01,368
>  <font face="Monospace">{\an7}(<i> inaudibl</i></font>
>
> -4
> +3
>  00:00:01,368 --> 00:00:01,568
>  <font face="Monospace">{\an7}(<i> inaudible radio chat</i></font>
>
> -5
> +4
>  00:00:01,568 --> 00:00:02,002
>  <font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font>
>
> +5
> +00:00:02,002 --> 00:00:03,103
> +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font>
> +
>  6
> -00:00:02,002 --> 00:00:03,003
> -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font>
> -
> -7
> -00:00:03,003 --> 00:00:03,103
> -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font>
> -
> -8
>  00:00:03,103 --> 00:00:03,303
> -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
> +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
>  >></font>
>
> -9
> +7
>  00:00:03,303 --> 00:00:03,503
> -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
> +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
>  >> Safety rema</font>
>
> -10
> +8
>  00:00:03,504 --> 00:00:03,704
> -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
> +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
>  >> Safety remains our numb</font>
>
> -11
> +9
>  00:00:03,704 --> 00:00:04,004
> -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
> +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
>  >> Safety remains our number one</font>
>
> --
> 2.42.0
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel at ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request at ffmpeg.org with subject "unsubscribe".
>


More information about the ffmpeg-devel mailing list