[FFmpeg-devel] [PATCH 10/10] fftools/ffmpeg: convert to a threaded architecture
Paul B Mahol
onemda at gmail.com
Wed Dec 6 13:22:48 EET 2023
On Wed, Dec 6, 2023 at 11:32 AM Anton Khirnov <anton at khirnov.net> wrote:
> Change the main loop and every component (demuxers, decoders, filters,
> encoders, muxers) to use the previously added transcode scheduler. Every
> instance of every such component was already running in a separate
> thread, but now they can actually run in parallel.
>
> Changes the results of ffmpeg-fix_sub_duration_heartbeat - tested by
> JEEB to be more correct and deterministic.
>
Un-reviewable, please split it.
> ---
> Changelog | 2 +
> fftools/ffmpeg.c | 374 +--------
> fftools/ffmpeg.h | 97 +--
> fftools/ffmpeg_dec.c | 321 ++------
> fftools/ffmpeg_demux.c | 268 ++++---
> fftools/ffmpeg_enc.c | 368 ++-------
> fftools/ffmpeg_filter.c | 722 +++++-------------
> fftools/ffmpeg_mux.c | 324 ++------
> fftools/ffmpeg_mux.h | 24 +-
> fftools/ffmpeg_mux_init.c | 88 +--
> fftools/ffmpeg_opt.c | 6 +-
> .../fate/ffmpeg-fix_sub_duration_heartbeat | 36 +-
> 12 files changed, 600 insertions(+), 2030 deletions(-)
>
> diff --git a/Changelog b/Changelog
> index f00bc27ca4..67ef92eb02 100644
> --- a/Changelog
> +++ b/Changelog
> @@ -7,6 +7,8 @@ version <next>:
> - EVC encoding using external library libxeve
> - QOA decoder and demuxer
> - aap filter
> +- demuxing, decoding, filtering, encoding, and muxing in the
> + ffmpeg CLI now all run in parallel
>
> version 6.1:
> - libaribcaption decoder
> diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
> index b8a97258a0..30b594fd97 100644
> --- a/fftools/ffmpeg.c
> +++ b/fftools/ffmpeg.c
> @@ -117,7 +117,7 @@ typedef struct BenchmarkTimeStamps {
> static BenchmarkTimeStamps get_benchmark_time_stamps(void);
> static int64_t getmaxrss(void);
>
> -unsigned nb_output_dumped = 0;
> +atomic_uint nb_output_dumped = 0;
>
> static BenchmarkTimeStamps current_time;
> AVIOContext *progress_avio = NULL;
> @@ -138,30 +138,6 @@ static struct termios oldtty;
> static int restore_tty;
> #endif
>
> -/* sub2video hack:
> - Convert subtitles to video with alpha to insert them in filter graphs.
> - This is a temporary solution until libavfilter gets real subtitles
> support.
> - */
> -
> -static void sub2video_heartbeat(InputFile *infile, int64_t pts,
> AVRational tb)
> -{
> - /* When a frame is read from a file, examine all sub2video streams in
> - the same file and send the sub2video frame again. Otherwise,
> decoded
> - video frames could be accumulating in the filter graph while a
> filter
> - (possibly overlay) is desperately waiting for a subtitle frame. */
> - for (int i = 0; i < infile->nb_streams; i++) {
> - InputStream *ist = infile->streams[i];
> -
> - if (ist->dec_ctx->codec_type != AVMEDIA_TYPE_SUBTITLE)
> - continue;
> -
> - for (int j = 0; j < ist->nb_filters; j++)
> - ifilter_sub2video_heartbeat(ist->filters[j], pts, tb);
> - }
> -}
> -
> -/* end of sub2video hack */
> -
> static void term_exit_sigsafe(void)
> {
> #if HAVE_TERMIOS_H
> @@ -499,23 +475,13 @@ void update_benchmark(const char *fmt, ...)
> }
> }
>
> -void close_output_stream(OutputStream *ost)
> -{
> - OutputFile *of = output_files[ost->file_index];
> - ost->finished |= ENCODER_FINISHED;
> -
> - if (ost->sq_idx_encode >= 0)
> - sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
> -}
> -
> -static void print_report(int is_last_report, int64_t timer_start, int64_t
> cur_time)
> +static void print_report(int is_last_report, int64_t timer_start, int64_t
> cur_time, int64_t pts)
> {
> AVBPrint buf, buf_script;
> int64_t total_size = of_filesize(output_files[0]);
> int vid;
> double bitrate;
> double speed;
> - int64_t pts = AV_NOPTS_VALUE;
> static int64_t last_time = -1;
> static int first_report = 1;
> uint64_t nb_frames_dup = 0, nb_frames_drop = 0;
> @@ -533,7 +499,7 @@ static void print_report(int is_last_report, int64_t
> timer_start, int64_t cur_ti
> last_time = cur_time;
> }
> if (((cur_time - last_time) < stats_period && !first_report) ||
> - (first_report && nb_output_dumped < nb_output_files))
> + (first_report && atomic_load(&nb_output_dumped) <
> nb_output_files))
> return;
> last_time = cur_time;
> }
> @@ -544,7 +510,7 @@ static void print_report(int is_last_report, int64_t
> timer_start, int64_t cur_ti
> av_bprint_init(&buf, 0, AV_BPRINT_SIZE_AUTOMATIC);
> av_bprint_init(&buf_script, 0, AV_BPRINT_SIZE_AUTOMATIC);
> for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
> - const float q = ost->enc ? ost->quality / (float) FF_QP2LAMBDA :
> -1;
> + const float q = ost->enc ? atomic_load(&ost->quality) / (float)
> FF_QP2LAMBDA : -1;
>
> if (vid && ost->type == AVMEDIA_TYPE_VIDEO) {
> av_bprintf(&buf, "q=%2.1f ", q);
> @@ -565,22 +531,18 @@ static void print_report(int is_last_report, int64_t
> timer_start, int64_t cur_ti
> if (is_last_report)
> av_bprintf(&buf, "L");
>
> - nb_frames_dup = ost->filter->nb_frames_dup;
> - nb_frames_drop = ost->filter->nb_frames_drop;
> + nb_frames_dup = atomic_load(&ost->filter->nb_frames_dup);
> + nb_frames_drop = atomic_load(&ost->filter->nb_frames_drop);
>
> vid = 1;
> }
> - /* compute min output value */
> - if (ost->last_mux_dts != AV_NOPTS_VALUE) {
> - if (pts == AV_NOPTS_VALUE || ost->last_mux_dts > pts)
> - pts = ost->last_mux_dts;
> - if (copy_ts) {
> - if (copy_ts_first_pts == AV_NOPTS_VALUE && pts > 1)
> - copy_ts_first_pts = pts;
> - if (copy_ts_first_pts != AV_NOPTS_VALUE)
> - pts -= copy_ts_first_pts;
> - }
> - }
> + }
> +
> + if (copy_ts) {
> + if (copy_ts_first_pts == AV_NOPTS_VALUE && pts > 1)
> + copy_ts_first_pts = pts;
> + if (copy_ts_first_pts != AV_NOPTS_VALUE)
> + pts -= copy_ts_first_pts;
> }
>
> us = FFABS64U(pts) % AV_TIME_BASE;
> @@ -783,81 +745,6 @@ int subtitle_wrap_frame(AVFrame *frame, AVSubtitle
> *subtitle, int copy)
> return 0;
> }
>
> -int trigger_fix_sub_duration_heartbeat(OutputStream *ost, const AVPacket
> *pkt)
> -{
> - OutputFile *of = output_files[ost->file_index];
> - int64_t signal_pts = av_rescale_q(pkt->pts, pkt->time_base,
> - AV_TIME_BASE_Q);
> -
> - if (!ost->fix_sub_duration_heartbeat || !(pkt->flags &
> AV_PKT_FLAG_KEY))
> - // we are only interested in heartbeats on streams configured, and
> - // only on random access points.
> - return 0;
> -
> - for (int i = 0; i < of->nb_streams; i++) {
> - OutputStream *iter_ost = of->streams[i];
> - InputStream *ist = iter_ost->ist;
> - int ret = AVERROR_BUG;
> -
> - if (iter_ost == ost || !ist || !ist->decoding_needed ||
> - ist->dec_ctx->codec_type != AVMEDIA_TYPE_SUBTITLE)
> - // We wish to skip the stream that causes the heartbeat,
> - // output streams without an input stream, streams not decoded
> - // (as fix_sub_duration is only done for decoded subtitles) as
> - // well as non-subtitle streams.
> - continue;
> -
> - if ((ret = fix_sub_duration_heartbeat(ist, signal_pts)) < 0)
> - return ret;
> - }
> -
> - return 0;
> -}
> -
> -/* pkt = NULL means EOF (needed to flush decoder buffers) */
> -static int process_input_packet(InputStream *ist, const AVPacket *pkt,
> int no_eof)
> -{
> - InputFile *f = input_files[ist->file_index];
> - int64_t dts_est = AV_NOPTS_VALUE;
> - int ret = 0;
> - int eof_reached = 0;
> -
> - if (ist->decoding_needed) {
> - ret = dec_packet(ist, pkt, no_eof);
> - if (ret < 0 && ret != AVERROR_EOF)
> - return ret;
> - }
> - if (ret == AVERROR_EOF || (!pkt && !ist->decoding_needed))
> - eof_reached = 1;
> -
> - if (pkt && pkt->opaque_ref) {
> - DemuxPktData *pd = (DemuxPktData*)pkt->opaque_ref->data;
> - dts_est = pd->dts_est;
> - }
> -
> - if (f->recording_time != INT64_MAX) {
> - int64_t start_time = 0;
> - if (copy_ts) {
> - start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time
> : 0;
> - start_time += start_at_zero ? 0 : f->start_time_effective;
> - }
> - if (dts_est >= f->recording_time + start_time)
> - pkt = NULL;
> - }
> -
> - for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
> - OutputStream *ost = ist->outputs[oidx];
> - if (ost->enc || (!pkt && no_eof))
> - continue;
> -
> - ret = of_streamcopy(ost, pkt, dts_est);
> - if (ret < 0)
> - return ret;
> - }
> -
> - return !eof_reached;
> -}
> -
> static void print_stream_maps(void)
> {
> av_log(NULL, AV_LOG_INFO, "Stream mapping:\n");
> @@ -934,43 +821,6 @@ static void print_stream_maps(void)
> }
> }
>
> -/**
> - * Select the output stream to process.
> - *
> - * @retval 0 an output stream was selected
> - * @retval AVERROR(EAGAIN) need to wait until more input is available
> - * @retval AVERROR_EOF no more streams need output
> - */
> -static int choose_output(OutputStream **post)
> -{
> - int64_t opts_min = INT64_MAX;
> - OutputStream *ost_min = NULL;
> -
> - for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
> - int64_t opts;
> -
> - if (ost->filter && ost->filter->last_pts != AV_NOPTS_VALUE) {
> - opts = ost->filter->last_pts;
> - } else {
> - opts = ost->last_mux_dts == AV_NOPTS_VALUE ?
> - INT64_MIN : ost->last_mux_dts;
> - }
> -
> - if (!ost->initialized && !ost->finished) {
> - ost_min = ost;
> - break;
> - }
> - if (!ost->finished && opts < opts_min) {
> - opts_min = opts;
> - ost_min = ost;
> - }
> - }
> - if (!ost_min)
> - return AVERROR_EOF;
> - *post = ost_min;
> - return ost_min->unavailable ? AVERROR(EAGAIN) : 0;
> -}
> -
> static void set_tty_echo(int on)
> {
> #if HAVE_TERMIOS_H
> @@ -1042,149 +892,21 @@ static int check_keyboard_interaction(int64_t
> cur_time)
> return 0;
> }
>
> -static void reset_eagain(void)
> -{
> - for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost))
> - ost->unavailable = 0;
> -}
> -
> -static void decode_flush(InputFile *ifile)
> -{
> - for (int i = 0; i < ifile->nb_streams; i++) {
> - InputStream *ist = ifile->streams[i];
> -
> - if (ist->discard || !ist->decoding_needed)
> - continue;
> -
> - dec_packet(ist, NULL, 1);
> - }
> -}
> -
> -/*
> - * Return
> - * - 0 -- one packet was read and processed
> - * - AVERROR(EAGAIN) -- no packets were available for selected file,
> - * this function should be called again
> - * - AVERROR_EOF -- this function should not be called again
> - */
> -static int process_input(int file_index, AVPacket *pkt)
> -{
> - InputFile *ifile = input_files[file_index];
> - InputStream *ist;
> - int ret, i;
> -
> - ret = ifile_get_packet(ifile, pkt);
> -
> - if (ret == 1) {
> - /* the input file is looped: flush the decoders */
> - decode_flush(ifile);
> - return AVERROR(EAGAIN);
> - }
> - if (ret < 0) {
> - if (ret != AVERROR_EOF) {
> - av_log(ifile, AV_LOG_ERROR,
> - "Error retrieving a packet from demuxer: %s\n",
> av_err2str(ret));
> - if (exit_on_error)
> - return ret;
> - }
> -
> - for (i = 0; i < ifile->nb_streams; i++) {
> - ist = ifile->streams[i];
> - if (!ist->discard) {
> - ret = process_input_packet(ist, NULL, 0);
> - if (ret>0)
> - return 0;
> - else if (ret < 0)
> - return ret;
> - }
> -
> - /* mark all outputs that don't go through lavfi as finished */
> - for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
> - OutputStream *ost = ist->outputs[oidx];
> - OutputFile *of = output_files[ost->file_index];
> -
> - ret = of_output_packet(of, ost, NULL);
> - if (ret < 0)
> - return ret;
> - }
> - }
> -
> - ifile->eof_reached = 1;
> - return AVERROR(EAGAIN);
> - }
> -
> - reset_eagain();
> -
> - ist = ifile->streams[pkt->stream_index];
> -
> - sub2video_heartbeat(ifile, pkt->pts, pkt->time_base);
> -
> - ret = process_input_packet(ist, pkt, 0);
> -
> - av_packet_unref(pkt);
> -
> - return ret < 0 ? ret : 0;
> -}
> -
> -/**
> - * Run a single step of transcoding.
> - *
> - * @return 0 for success, <0 for error
> - */
> -static int transcode_step(OutputStream *ost, AVPacket *demux_pkt)
> -{
> - InputStream *ist = NULL;
> - int ret;
> -
> - if (ost->filter) {
> - if ((ret = fg_transcode_step(ost->filter->graph, &ist)) < 0)
> - return ret;
> - if (!ist)
> - return 0;
> - } else {
> - ist = ost->ist;
> - av_assert0(ist);
> - }
> -
> - ret = process_input(ist->file_index, demux_pkt);
> - if (ret == AVERROR(EAGAIN)) {
> - return 0;
> - }
> -
> - if (ret < 0)
> - return ret == AVERROR_EOF ? 0 : ret;
> -
> - // process_input() above might have caused output to become available
> - // in multiple filtergraphs, so we process all of them
> - for (int i = 0; i < nb_filtergraphs; i++) {
> - ret = reap_filters(filtergraphs[i], 0);
> - if (ret < 0)
> - return ret;
> - }
> -
> - return 0;
> -}
> -
> /*
> * The following code is the main loop of the file converter
> */
> -static int transcode(Scheduler *sch, int *err_rate_exceeded)
> +static int transcode(Scheduler *sch)
> {
> int ret = 0, i;
> - InputStream *ist;
> - int64_t timer_start;
> - AVPacket *demux_pkt = NULL;
> + int64_t timer_start, transcode_ts = 0;
>
> print_stream_maps();
>
> - *err_rate_exceeded = 0;
> atomic_store(&transcode_init_done, 1);
>
> - demux_pkt = av_packet_alloc();
> - if (!demux_pkt) {
> - ret = AVERROR(ENOMEM);
> - goto fail;
> - }
> + ret = sch_start(sch);
> + if (ret < 0)
> + return ret;
>
> if (stdin_interaction) {
> av_log(NULL, AV_LOG_INFO, "Press [q] to stop, [?] for help\n");
> @@ -1192,8 +914,7 @@ static int transcode(Scheduler *sch, int
> *err_rate_exceeded)
>
> timer_start = av_gettime_relative();
>
> - while (!received_sigterm) {
> - OutputStream *ost;
> + while (!sch_wait(sch, stats_period, &transcode_ts)) {
> int64_t cur_time= av_gettime_relative();
>
> /* if 'q' pressed, exits */
> @@ -1201,49 +922,11 @@ static int transcode(Scheduler *sch, int
> *err_rate_exceeded)
> if (check_keyboard_interaction(cur_time) < 0)
> break;
>
> - ret = choose_output(&ost);
> - if (ret == AVERROR(EAGAIN)) {
> - reset_eagain();
> - av_usleep(10000);
> - ret = 0;
> - continue;
> - } else if (ret < 0) {
> - av_log(NULL, AV_LOG_VERBOSE, "No more output streams to write
> to, finishing.\n");
> - ret = 0;
> - break;
> - }
> -
> - ret = transcode_step(ost, demux_pkt);
> - if (ret < 0 && ret != AVERROR_EOF) {
> - av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n",
> av_err2str(ret));
> - break;
> - }
> -
> /* dump report by using the output first video and audio streams
> */
> - print_report(0, timer_start, cur_time);
> + print_report(0, timer_start, cur_time, transcode_ts);
> }
>
> - /* at the end of stream, we must flush the decoder buffers */
> - for (ist = ist_iter(NULL); ist; ist = ist_iter(ist)) {
> - float err_rate;
> -
> - if (!input_files[ist->file_index]->eof_reached) {
> - int err = process_input_packet(ist, NULL, 0);
> - ret = err_merge(ret, err);
> - }
> -
> - err_rate = (ist->frames_decoded || ist->decode_errors) ?
> - ist->decode_errors / (ist->frames_decoded +
> ist->decode_errors) : 0.f;
> - if (err_rate > max_error_rate) {
> - av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds
> maximum %g\n",
> - err_rate, max_error_rate);
> - *err_rate_exceeded = 1;
> - } else if (err_rate)
> - av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n",
> err_rate);
> - }
> - ret = err_merge(ret, enc_flush());
> -
> - term_exit();
> + ret = sch_stop(sch);
>
> /* write the trailer if needed */
> for (i = 0; i < nb_output_files; i++) {
> @@ -1251,11 +934,10 @@ static int transcode(Scheduler *sch, int
> *err_rate_exceeded)
> ret = err_merge(ret, err);
> }
>
> - /* dump report by using the first video and audio streams */
> - print_report(1, timer_start, av_gettime_relative());
> + term_exit();
>
> -fail:
> - av_packet_free(&demux_pkt);
> + /* dump report by using the first video and audio streams */
> + print_report(1, timer_start, av_gettime_relative(), transcode_ts);
>
> return ret;
> }
> @@ -1308,7 +990,7 @@ int main(int argc, char **argv)
> {
> Scheduler *sch = NULL;
>
> - int ret, err_rate_exceeded;
> + int ret;
> BenchmarkTimeStamps ti;
>
> init_dynload();
> @@ -1350,7 +1032,7 @@ int main(int argc, char **argv)
> }
>
> current_time = ti = get_benchmark_time_stamps();
> - ret = transcode(sch, &err_rate_exceeded);
> + ret = transcode(sch);
> if (ret >= 0 && do_benchmark) {
> int64_t utime, stime, rtime;
> current_time = get_benchmark_time_stamps();
> @@ -1362,8 +1044,8 @@ int main(int argc, char **argv)
> utime / 1000000.0, stime / 1000000.0, rtime / 1000000.0);
> }
>
> - ret = received_nb_signals ? 255 :
> - err_rate_exceeded ? 69 : ret;
> + ret = received_nb_signals ? 255 :
> + (ret == FFMPEG_ERROR_RATE_EXCEEDED) ? 69 : ret;
>
> finish:
> if (ret == AVERROR_EXIT)
> diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
> index a89038b765..ba82b7490d 100644
> --- a/fftools/ffmpeg.h
> +++ b/fftools/ffmpeg.h
> @@ -61,6 +61,8 @@
> #define FFMPEG_OPT_TOP 1
> #define FFMPEG_OPT_FORCE_KF_SOURCE_NO_DROP 1
>
> +#define FFMPEG_ERROR_RATE_EXCEEDED FFERRTAG('E', 'R', 'E', 'D')
> +
> enum VideoSyncMethod {
> VSYNC_AUTO = -1,
> VSYNC_PASSTHROUGH,
> @@ -82,13 +84,16 @@ enum HWAccelID {
> };
>
> enum FrameOpaque {
> - FRAME_OPAQUE_REAP_FILTERS = 1,
> - FRAME_OPAQUE_CHOOSE_INPUT,
> - FRAME_OPAQUE_SUB_HEARTBEAT,
> + FRAME_OPAQUE_SUB_HEARTBEAT = 1,
> FRAME_OPAQUE_EOF,
> FRAME_OPAQUE_SEND_COMMAND,
> };
>
> +enum PacketOpaque {
> + PKT_OPAQUE_SUB_HEARTBEAT = 1,
> + PKT_OPAQUE_FIX_SUB_DURATION,
> +};
> +
> typedef struct HWDevice {
> const char *name;
> enum AVHWDeviceType type;
> @@ -309,11 +314,8 @@ typedef struct OutputFilter {
>
> enum AVMediaType type;
>
> - /* pts of the last frame received from this filter, in AV_TIME_BASE_Q
> */
> - int64_t last_pts;
> -
> - uint64_t nb_frames_dup;
> - uint64_t nb_frames_drop;
> + atomic_uint_least64_t nb_frames_dup;
> + atomic_uint_least64_t nb_frames_drop;
> } OutputFilter;
>
> typedef struct FilterGraph {
> @@ -426,11 +428,6 @@ typedef struct InputFile {
>
> float readrate;
> int accurate_seek;
> -
> - /* when looping the input file, this queue is used by decoders to
> report
> - * the last frame timestamp back to the demuxer thread */
> - AVThreadMessageQueue *audio_ts_queue;
> - int audio_ts_queue_size;
> } InputFile;
>
> enum forced_keyframes_const {
> @@ -532,8 +529,6 @@ typedef struct OutputStream {
> InputStream *ist;
>
> AVStream *st; /* stream in the output file */
> - /* dts of the last packet sent to the muxing queue, in AV_TIME_BASE_Q
> */
> - int64_t last_mux_dts;
>
> AVRational enc_timebase;
>
> @@ -578,13 +573,6 @@ typedef struct OutputStream {
> AVDictionary *sws_dict;
> AVDictionary *swr_opts;
> char *apad;
> - OSTFinished finished; /* no more packets should be written for
> this stream */
> - int unavailable; /* true if the steram is
> unavailable (possibly temporarily) */
> -
> - // init_output_stream() has been called for this stream
> - // The encoder and the bitstream filters have been initialized and
> the stream
> - // parameters are set in the AVStream.
> - int initialized;
>
> const char *attachment_filename;
>
> @@ -598,9 +586,8 @@ typedef struct OutputStream {
> uint64_t samples_encoded;
>
> /* packet quality factor */
> - int quality;
> + atomic_int quality;
>
> - int sq_idx_encode;
> int sq_idx_mux;
>
> EncStats enc_stats_pre;
> @@ -658,7 +645,6 @@ extern FilterGraph **filtergraphs;
> extern int nb_filtergraphs;
>
> extern char *vstats_filename;
> -extern char *sdp_filename;
>
> extern float dts_delta_threshold;
> extern float dts_error_threshold;
> @@ -691,7 +677,7 @@ extern const AVIOInterruptCB int_cb;
> extern const OptionDef options[];
> extern HWDevice *filter_hw_device;
>
> -extern unsigned nb_output_dumped;
> +extern atomic_uint nb_output_dumped;
>
> extern int ignore_unknown_streams;
> extern int copy_unknown_streams;
> @@ -737,10 +723,6 @@ FrameData *frame_data(AVFrame *frame);
>
> const FrameData *frame_data_c(AVFrame *frame);
>
> -int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int
> keep_reference);
> -int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb);
> -void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts,
> AVRational tb);
> -
> /**
> * Set up fallback filtering parameters from a decoder context. They will
> only
> * be used if no frames are ever sent on this input, otherwise the actual
> @@ -761,26 +743,9 @@ int fg_create(FilterGraph **pfg, char *graph_desc,
> Scheduler *sch);
>
> void fg_free(FilterGraph **pfg);
>
> -/**
> - * Perform a step of transcoding for the specified filter graph.
> - *
> - * @param[in] graph filter graph to consider
> - * @param[out] best_ist input stream where a frame would allow to
> continue
> - * @return 0 for success, <0 for error
> - */
> -int fg_transcode_step(FilterGraph *graph, InputStream **best_ist);
> -
> void fg_send_command(FilterGraph *fg, double time, const char *target,
> const char *command, const char *arg, int
> all_filters);
>
> -/**
> - * Get and encode new output from specified filtergraph, without causing
> - * activity.
> - *
> - * @return 0 for success, <0 for severe errors
> - */
> -int reap_filters(FilterGraph *fg, int flush);
> -
> int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch);
>
> void enc_stats_write(OutputStream *ost, EncStats *es,
> @@ -807,25 +772,11 @@ int hwaccel_retrieve_data(AVCodecContext *avctx,
> AVFrame *input);
> int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx);
> void dec_free(Decoder **pdec);
>
> -/**
> - * Submit a packet for decoding
> - *
> - * When pkt==NULL and no_eof=0, there will be no more input. Flush
> decoders and
> - * mark all downstreams as finished.
> - *
> - * When pkt==NULL and no_eof=1, the stream was reset (e.g. after a seek).
> Flush
> - * decoders and await further input.
> - */
> -int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof);
> -
> int enc_alloc(Encoder **penc, const AVCodec *codec,
> Scheduler *sch, unsigned sch_idx);
> void enc_free(Encoder **penc);
>
> -int enc_open(OutputStream *ost, const AVFrame *frame);
> -int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle
> *sub);
> -int enc_frame(OutputStream *ost, AVFrame *frame);
> -int enc_flush(void);
> +int enc_open(void *opaque, const AVFrame *frame);
>
> /*
> * Initialize muxing state for the given stream, should be called
> @@ -840,30 +791,11 @@ void of_free(OutputFile **pof);
>
> void of_enc_stats_close(void);
>
> -int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt);
> -
> -/**
> - * @param dts predicted packet dts in AV_TIME_BASE_Q
> - */
> -int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts);
> -
> int64_t of_filesize(OutputFile *of);
>
> int ifile_open(const OptionsContext *o, const char *filename, Scheduler
> *sch);
> void ifile_close(InputFile **f);
>
> -/**
> - * Get next input packet from the demuxer.
> - *
> - * @param pkt the packet is written here when this function returns 0
> - * @return
> - * - 0 when a packet has been read successfully
> - * - 1 when stream end was reached, but the stream is looped;
> - * caller should flush decoders and read from this demuxer again
> - * - a negative error code on failure
> - */
> -int ifile_get_packet(InputFile *f, AVPacket *pkt);
> -
> int ist_output_add(InputStream *ist, OutputStream *ost);
> int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple);
>
> @@ -880,9 +812,6 @@ InputStream *ist_iter(InputStream *prev);
> * pass NULL to start iteration */
> OutputStream *ost_iter(OutputStream *prev);
>
> -void close_output_stream(OutputStream *ost);
> -int trigger_fix_sub_duration_heartbeat(OutputStream *ost, const AVPacket
> *pkt);
> -int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts);
> void update_benchmark(const char *fmt, ...);
>
> #define SPECIFIER_OPT_FMT_str "%s"
> diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c
> index 90ea0d6d93..5dde82a276 100644
> --- a/fftools/ffmpeg_dec.c
> +++ b/fftools/ffmpeg_dec.c
> @@ -54,24 +54,6 @@ struct Decoder {
>
> Scheduler *sch;
> unsigned sch_idx;
> -
> - pthread_t thread;
> - /**
> - * Queue for sending coded packets from the main thread to
> - * the decoder thread.
> - *
> - * An empty packet is sent to flush the decoder without terminating
> - * decoding.
> - */
> - ThreadQueue *queue_in;
> - /**
> - * Queue for sending decoded frames from the decoder thread
> - * to the main thread.
> - *
> - * An empty frame is sent to signal that a single packet has been
> fully
> - * processed.
> - */
> - ThreadQueue *queue_out;
> };
>
> // data that is local to the decoder thread and not visible outside of it
> @@ -80,24 +62,6 @@ typedef struct DecThreadContext {
> AVPacket *pkt;
> } DecThreadContext;
>
> -static int dec_thread_stop(Decoder *d)
> -{
> - void *ret;
> -
> - if (!d->queue_in)
> - return 0;
> -
> - tq_send_finish(d->queue_in, 0);
> - tq_receive_finish(d->queue_out, 0);
> -
> - pthread_join(d->thread, &ret);
> -
> - tq_free(&d->queue_in);
> - tq_free(&d->queue_out);
> -
> - return (intptr_t)ret;
> -}
> -
> void dec_free(Decoder **pdec)
> {
> Decoder *dec = *pdec;
> @@ -105,8 +69,6 @@ void dec_free(Decoder **pdec)
> if (!dec)
> return;
>
> - dec_thread_stop(dec);
> -
> av_frame_free(&dec->frame);
> av_packet_free(&dec->pkt);
>
> @@ -148,25 +110,6 @@ fail:
> return AVERROR(ENOMEM);
> }
>
> -static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
> -{
> - int i, ret = 0;
> -
> - for (i = 0; i < ist->nb_filters; i++) {
> - ret = ifilter_send_frame(ist->filters[i], decoded_frame,
> - i < ist->nb_filters - 1 ||
> - ist->dec->type == AVMEDIA_TYPE_SUBTITLE);
> - if (ret == AVERROR_EOF)
> - ret = 0; /* ignore */
> - if (ret < 0) {
> - av_log(NULL, AV_LOG_ERROR,
> - "Failed to inject frame into filter network: %s\n",
> av_err2str(ret));
> - break;
> - }
> - }
> - return ret;
> -}
> -
> static AVRational audio_samplerate_update(void *logctx, Decoder *d,
> const AVFrame *frame)
> {
> @@ -421,28 +364,14 @@ static int process_subtitle(InputStream *ist,
> AVFrame *frame)
> if (!subtitle)
> return 0;
>
> - ret = send_frame_to_filters(ist, frame);
> + ret = sch_dec_send(d->sch, d->sch_idx, frame);
> if (ret < 0)
> - return ret;
> + av_frame_unref(frame);
>
> - subtitle = (AVSubtitle*)frame->buf[0]->data;
> - if (!subtitle->num_rects)
> - return 0;
> -
> - for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
> - OutputStream *ost = ist->outputs[oidx];
> - if (!ost->enc || ost->type != AVMEDIA_TYPE_SUBTITLE)
> - continue;
> -
> - ret = enc_subtitle(output_files[ost->file_index], ost, subtitle);
> - if (ret < 0)
> - return ret;
> - }
> -
> - return 0;
> + return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
> }
>
> -int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts)
> +static int fix_sub_duration_heartbeat(InputStream *ist, int64_t
> signal_pts)
> {
> Decoder *d = ist->decoder;
> int ret = AVERROR_BUG;
> @@ -468,12 +397,24 @@ int fix_sub_duration_heartbeat(InputStream *ist,
> int64_t signal_pts)
> static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
> AVFrame *frame)
> {
> - Decoder *d = ist->decoder;
> + Decoder *d = ist->decoder;
> AVPacket *flush_pkt = NULL;
> AVSubtitle subtitle;
> int got_output;
> int ret;
>
> + if (pkt && (intptr_t)pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT) {
> + frame->pts = pkt->pts;
> + frame->time_base = pkt->time_base;
> + frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SUB_HEARTBEAT;
> +
> + ret = sch_dec_send(d->sch, d->sch_idx, frame);
> + return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
> + } else if (pkt && (intptr_t)pkt->opaque ==
> PKT_OPAQUE_FIX_SUB_DURATION) {
> + return fix_sub_duration_heartbeat(ist, av_rescale_q(pkt->pts,
> pkt->time_base,
> +
> AV_TIME_BASE_Q));
> + }
> +
> if (!pkt) {
> flush_pkt = av_packet_alloc();
> if (!flush_pkt)
> @@ -496,7 +437,7 @@ static int transcode_subtitles(InputStream *ist, const
> AVPacket *pkt,
>
> ist->frames_decoded++;
>
> - // XXX the queue for transferring data back to the main thread runs
> + // XXX the queue for transferring data to consumers runs
> // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put that
> // inside the frame
> // eventually, subtitles should be switched to use AVFrames natively
> @@ -509,26 +450,7 @@ static int transcode_subtitles(InputStream *ist,
> const AVPacket *pkt,
> frame->width = ist->dec_ctx->width;
> frame->height = ist->dec_ctx->height;
>
> - ret = tq_send(d->queue_out, 0, frame);
> - if (ret < 0)
> - av_frame_unref(frame);
> -
> - return ret;
> -}
> -
> -static int send_filter_eof(InputStream *ist)
> -{
> - Decoder *d = ist->decoder;
> - int i, ret;
> -
> - for (i = 0; i < ist->nb_filters; i++) {
> - int64_t end_pts = d->last_frame_pts == AV_NOPTS_VALUE ?
> AV_NOPTS_VALUE :
> - d->last_frame_pts + d->last_frame_duration_est;
> - ret = ifilter_send_eof(ist->filters[i], end_pts,
> d->last_frame_tb);
> - if (ret < 0)
> - return ret;
> - }
> - return 0;
> + return process_subtitle(ist, frame);
> }
>
> static int packet_decode(InputStream *ist, AVPacket *pkt, AVFrame *frame)
> @@ -635,9 +557,11 @@ static int packet_decode(InputStream *ist, AVPacket
> *pkt, AVFrame *frame)
>
> ist->frames_decoded++;
>
> - ret = tq_send(d->queue_out, 0, frame);
> - if (ret < 0)
> - return ret;
> + ret = sch_dec_send(d->sch, d->sch_idx, frame);
> + if (ret < 0) {
> + av_frame_unref(frame);
> + return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
> + }
> }
> }
>
> @@ -679,7 +603,6 @@ fail:
> void *decoder_thread(void *arg)
> {
> InputStream *ist = arg;
> - InputFile *ifile = input_files[ist->file_index];
> Decoder *d = ist->decoder;
> DecThreadContext dt;
> int ret = 0, input_status = 0;
> @@ -691,19 +614,31 @@ void *decoder_thread(void *arg)
> dec_thread_set_name(ist);
>
> while (!input_status) {
> - int dummy, flush_buffers;
> + int flush_buffers, have_data;
>
> - input_status = tq_receive(d->queue_in, &dummy, dt.pkt);
> - flush_buffers = input_status >= 0 && !dt.pkt->buf;
> - if (!dt.pkt->buf)
> + input_status = sch_dec_receive(d->sch, d->sch_idx, dt.pkt);
> + have_data = input_status >= 0 &&
> + (dt.pkt->buf || dt.pkt->side_data_elems ||
> + (intptr_t)dt.pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT ||
> + (intptr_t)dt.pkt->opaque == PKT_OPAQUE_FIX_SUB_DURATION);
> + flush_buffers = input_status >= 0 && !have_data;
> + if (!have_data)
> av_log(ist, AV_LOG_VERBOSE, "Decoder thread received %s
> packet\n",
> flush_buffers ? "flush" : "EOF");
>
> - ret = packet_decode(ist, dt.pkt->buf ? dt.pkt : NULL, dt.frame);
> + ret = packet_decode(ist, have_data ? dt.pkt : NULL, dt.frame);
>
> av_packet_unref(dt.pkt);
> av_frame_unref(dt.frame);
>
> + // AVERROR_EOF - EOF from the decoder
> + // AVERROR_EXIT - EOF from the scheduler
> + // we treat them differently when flushing
> + if (ret == AVERROR_EXIT) {
> + ret = AVERROR_EOF;
> + flush_buffers = 0;
> + }
> +
> if (ret == AVERROR_EOF) {
> av_log(ist, AV_LOG_VERBOSE, "Decoder returned EOF, %s\n",
> flush_buffers ? "resetting" : "finishing");
> @@ -711,11 +646,10 @@ void *decoder_thread(void *arg)
> if (!flush_buffers)
> break;
>
> - /* report last frame duration to the demuxer thread */
> + /* report last frame duration to the scheduler */
> if (ist->dec->type == AVMEDIA_TYPE_AUDIO) {
> - Timestamp ts = { .ts = d->last_frame_pts +
> d->last_frame_duration_est,
> - .tb = d->last_frame_tb };
> - av_thread_message_queue_send(ifile->audio_ts_queue, &ts,
> 0);
> + dt.pkt->pts = d->last_frame_pts +
> d->last_frame_duration_est;
> + dt.pkt->time_base = d->last_frame_tb;
> }
>
> avcodec_flush_buffers(ist->dec_ctx);
> @@ -724,149 +658,47 @@ void *decoder_thread(void *arg)
> av_err2str(ret));
> break;
> }
> -
> - // signal to the consumer thread that the entire packet was
> processed
> - ret = tq_send(d->queue_out, 0, dt.frame);
> - if (ret < 0) {
> - if (ret != AVERROR_EOF)
> - av_log(ist, AV_LOG_ERROR, "Error communicating with the
> main thread\n");
> - break;
> - }
> }
>
> // EOF is normal thread termination
> if (ret == AVERROR_EOF)
> ret = 0;
>
> + // on success send EOF timestamp to our downstreams
> + if (ret >= 0) {
> + float err_rate;
> +
> + av_frame_unref(dt.frame);
> +
> + dt.frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_EOF;
> + dt.frame->pts = d->last_frame_pts == AV_NOPTS_VALUE ?
> AV_NOPTS_VALUE :
> + d->last_frame_pts +
> d->last_frame_duration_est;
> + dt.frame->time_base = d->last_frame_tb;
> +
> + ret = sch_dec_send(d->sch, d->sch_idx, dt.frame);
> + if (ret < 0 && ret != AVERROR_EOF) {
> + av_log(NULL, AV_LOG_FATAL,
> + "Error signalling EOF timestamp: %s\n",
> av_err2str(ret));
> + goto finish;
> + }
> + ret = 0;
> +
> + err_rate = (ist->frames_decoded || ist->decode_errors) ?
> + ist->decode_errors / (ist->frames_decoded +
> ist->decode_errors) : 0.f;
> + if (err_rate > max_error_rate) {
> + av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds
> maximum %g\n",
> + err_rate, max_error_rate);
> + ret = FFMPEG_ERROR_RATE_EXCEEDED;
> + } else if (err_rate)
> + av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n",
> err_rate);
> + }
> +
> finish:
> - tq_receive_finish(d->queue_in, 0);
> - tq_send_finish (d->queue_out, 0);
> -
> - // make sure the demuxer does not get stuck waiting for audio
> durations
> - // that will never arrive
> - if (ifile->audio_ts_queue && ist->dec->type == AVMEDIA_TYPE_AUDIO)
> - av_thread_message_queue_set_err_recv(ifile->audio_ts_queue,
> AVERROR_EOF);
> -
> dec_thread_uninit(&dt);
>
> - av_log(ist, AV_LOG_VERBOSE, "Terminating decoder thread\n");
> -
> return (void*)(intptr_t)ret;
> }
>
> -int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
> -{
> - Decoder *d = ist->decoder;
> - int ret = 0, thread_ret;
> -
> - // thread already joined
> - if (!d->queue_in)
> - return AVERROR_EOF;
> -
> - // send the packet/flush request/EOF to the decoder thread
> - if (pkt || no_eof) {
> - av_packet_unref(d->pkt);
> -
> - if (pkt) {
> - ret = av_packet_ref(d->pkt, pkt);
> - if (ret < 0)
> - goto finish;
> - }
> -
> - ret = tq_send(d->queue_in, 0, d->pkt);
> - if (ret < 0)
> - goto finish;
> - } else
> - tq_send_finish(d->queue_in, 0);
> -
> - // retrieve all decoded data for the packet
> - while (1) {
> - int dummy;
> -
> - ret = tq_receive(d->queue_out, &dummy, d->frame);
> - if (ret < 0)
> - goto finish;
> -
> - // packet fully processed
> - if (!d->frame->buf[0])
> - return 0;
> -
> - // process the decoded frame
> - if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE) {
> - ret = process_subtitle(ist, d->frame);
> - } else {
> - ret = send_frame_to_filters(ist, d->frame);
> - }
> - av_frame_unref(d->frame);
> - if (ret < 0)
> - goto finish;
> - }
> -
> -finish:
> - thread_ret = dec_thread_stop(d);
> - if (thread_ret < 0) {
> - av_log(ist, AV_LOG_ERROR, "Decoder thread returned error: %s\n",
> - av_err2str(thread_ret));
> - ret = err_merge(ret, thread_ret);
> - }
> - // non-EOF errors here are all fatal
> - if (ret < 0 && ret != AVERROR_EOF)
> - return ret;
> -
> - // signal EOF to our downstreams
> - ret = send_filter_eof(ist);
> - if (ret < 0) {
> - av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n");
> - return ret;
> - }
> -
> - return AVERROR_EOF;
> -}
> -
> -static int dec_thread_start(InputStream *ist)
> -{
> - Decoder *d = ist->decoder;
> - ObjPool *op;
> - int ret = 0;
> -
> - op = objpool_alloc_packets();
> - if (!op)
> - return AVERROR(ENOMEM);
> -
> - d->queue_in = tq_alloc(1, 1, op, pkt_move);
> - if (!d->queue_in) {
> - objpool_free(&op);
> - return AVERROR(ENOMEM);
> - }
> -
> - op = objpool_alloc_frames();
> - if (!op)
> - goto fail;
> -
> - d->queue_out = tq_alloc(1, 4, op, frame_move);
> - if (!d->queue_out) {
> - objpool_free(&op);
> - goto fail;
> - }
> -
> - ret = pthread_create(&d->thread, NULL, decoder_thread, ist);
> - if (ret) {
> - ret = AVERROR(ret);
> - av_log(ist, AV_LOG_ERROR, "pthread_create() failed: %s\n",
> - av_err2str(ret));
> - goto fail;
> - }
> -
> - return 0;
> -fail:
> - if (ret >= 0)
> - ret = AVERROR(ENOMEM);
> -
> - tq_free(&d->queue_in);
> - tq_free(&d->queue_out);
> - return ret;
> -}
> -
> static enum AVPixelFormat get_format(AVCodecContext *s, const enum
> AVPixelFormat *pix_fmts)
> {
> InputStream *ist = s->opaque;
> @@ -1118,12 +950,5 @@ int dec_open(InputStream *ist, Scheduler *sch,
> unsigned sch_idx)
> if (ret < 0)
> return ret;
>
> - ret = dec_thread_start(ist);
> - if (ret < 0) {
> - av_log(ist, AV_LOG_ERROR, "Error starting decoder thread: %s\n",
> - av_err2str(ret));
> - return ret;
> - }
> -
> return 0;
> }
> diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c
> index 2234dbe076..91cd7a1125 100644
> --- a/fftools/ffmpeg_demux.c
> +++ b/fftools/ffmpeg_demux.c
> @@ -22,8 +22,6 @@
> #include "ffmpeg.h"
> #include "ffmpeg_sched.h"
> #include "ffmpeg_utils.h"
> -#include "objpool.h"
> -#include "thread_queue.h"
>
> #include "libavutil/avassert.h"
> #include "libavutil/avstring.h"
> @@ -35,7 +33,6 @@
> #include "libavutil/pixdesc.h"
> #include "libavutil/time.h"
> #include "libavutil/timestamp.h"
> -#include "libavutil/thread.h"
>
> #include "libavcodec/packet.h"
>
> @@ -66,7 +63,11 @@ typedef struct DemuxStream {
>
> double ts_scale;
>
> + // scheduler returned EOF for this stream
> + int finished;
> +
> int streamcopy_needed;
> + int have_sub2video;
>
> int wrap_correction_done;
> int saw_first_ts;
> @@ -101,6 +102,7 @@ typedef struct Demuxer {
>
> /* number of times input stream should be looped */
> int loop;
> + int have_audio_dec;
> /* duration of the looped segment of the input file */
> Timestamp duration;
> /* pts with the smallest/largest values ever seen */
> @@ -113,11 +115,12 @@ typedef struct Demuxer {
> double readrate_initial_burst;
>
> Scheduler *sch;
> - ThreadQueue *thread_queue;
> - int thread_queue_size;
> - pthread_t thread;
> +
> + AVPacket *pkt_heartbeat;
>
> int read_started;
> + int nb_streams_used;
> + int nb_streams_finished;
> } Demuxer;
>
> static DemuxStream *ds_from_ist(InputStream *ist)
> @@ -153,7 +156,7 @@ static void report_new_stream(Demuxer *d, const
> AVPacket *pkt)
> d->nb_streams_warn = pkt->stream_index + 1;
> }
>
> -static int seek_to_start(Demuxer *d)
> +static int seek_to_start(Demuxer *d, Timestamp end_pts)
> {
> InputFile *ifile = &d->f;
> AVFormatContext *is = ifile->ctx;
> @@ -163,21 +166,10 @@ static int seek_to_start(Demuxer *d)
> if (ret < 0)
> return ret;
>
> - if (ifile->audio_ts_queue_size) {
> - int got_ts = 0;
> -
> - while (got_ts < ifile->audio_ts_queue_size) {
> - Timestamp ts;
> - ret = av_thread_message_queue_recv(ifile->audio_ts_queue,
> &ts, 0);
> - if (ret < 0)
> - return ret;
> - got_ts++;
> -
> - if (d->max_pts.ts == AV_NOPTS_VALUE ||
> - av_compare_ts(d->max_pts.ts, d->max_pts.tb, ts.ts, ts.tb)
> < 0)
> - d->max_pts = ts;
> - }
> - }
> + if (end_pts.ts != AV_NOPTS_VALUE &&
> + (d->max_pts.ts == AV_NOPTS_VALUE ||
> + av_compare_ts(d->max_pts.ts, d->max_pts.tb, end_pts.ts,
> end_pts.tb) < 0))
> + d->max_pts = end_pts;
>
> if (d->max_pts.ts != AV_NOPTS_VALUE) {
> int64_t min_pts = d->min_pts.ts == AV_NOPTS_VALUE ? 0 :
> d->min_pts.ts;
> @@ -404,7 +396,7 @@ static int ts_fixup(Demuxer *d, AVPacket *pkt)
> duration = av_rescale_q(d->duration.ts, d->duration.tb,
> pkt->time_base);
> if (pkt->pts != AV_NOPTS_VALUE) {
> // audio decoders take precedence for estimating total file
> duration
> - int64_t pkt_duration = ifile->audio_ts_queue_size ? 0 :
> pkt->duration;
> + int64_t pkt_duration = d->have_audio_dec ? 0 : pkt->duration;
>
> pkt->pts += duration;
>
> @@ -440,7 +432,7 @@ static int ts_fixup(Demuxer *d, AVPacket *pkt)
> return 0;
> }
>
> -static int input_packet_process(Demuxer *d, AVPacket *pkt)
> +static int input_packet_process(Demuxer *d, AVPacket *pkt, unsigned
> *send_flags)
> {
> InputFile *f = &d->f;
> InputStream *ist = f->streams[pkt->stream_index];
> @@ -451,6 +443,16 @@ static int input_packet_process(Demuxer *d, AVPacket
> *pkt)
> if (ret < 0)
> return ret;
>
> + if (f->recording_time != INT64_MAX) {
> + int64_t start_time = 0;
> + if (copy_ts) {
> + start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time
> : 0;
> + start_time += start_at_zero ? 0 : f->start_time_effective;
> + }
> + if (ds->dts >= f->recording_time + start_time)
> + *send_flags |= DEMUX_SEND_STREAMCOPY_EOF;
> + }
> +
> ds->data_size += pkt->size;
> ds->nb_packets++;
>
> @@ -465,6 +467,8 @@ static int input_packet_process(Demuxer *d, AVPacket
> *pkt)
> av_ts2timestr(input_files[ist->file_index]->ts_offset,
> &AV_TIME_BASE_Q));
> }
>
> + pkt->stream_index = ds->sch_idx_stream;
> +
> return 0;
> }
>
> @@ -488,6 +492,65 @@ static void readrate_sleep(Demuxer *d)
> }
> }
>
> +static int do_send(Demuxer *d, DemuxStream *ds, AVPacket *pkt, unsigned
> flags,
> + const char *pkt_desc)
> +{
> + int ret;
> +
> + ret = sch_demux_send(d->sch, d->f.index, pkt, flags);
> + if (ret == AVERROR_EOF) {
> + av_packet_unref(pkt);
> +
> + av_log(ds, AV_LOG_VERBOSE, "All consumers of this stream are
> done\n");
> + ds->finished = 1;
> +
> + if (++d->nb_streams_finished == d->nb_streams_used) {
> + av_log(d, AV_LOG_VERBOSE, "All consumers are done\n");
> + return AVERROR_EOF;
> + }
> + } else if (ret < 0) {
> + if (ret != AVERROR_EXIT)
> + av_log(d, AV_LOG_ERROR,
> + "Unable to send %s packet to consumers: %s\n",
> + pkt_desc, av_err2str(ret));
> + return ret;
> + }
> +
> + return 0;
> +}
> +
> +static int demux_send(Demuxer *d, DemuxStream *ds, AVPacket *pkt,
> unsigned flags)
> +{
> + InputFile *f = &d->f;
> + int ret;
> +
> + // send heartbeat for sub2video streams
> + if (d->pkt_heartbeat && pkt->pts != AV_NOPTS_VALUE) {
> + for (int i = 0; i < f->nb_streams; i++) {
> + DemuxStream *ds1 = ds_from_ist(f->streams[i]);
> +
> + if (ds1->finished || !ds1->have_sub2video)
> + continue;
> +
> + d->pkt_heartbeat->pts = pkt->pts;
> + d->pkt_heartbeat->time_base = pkt->time_base;
> + d->pkt_heartbeat->stream_index = ds1->sch_idx_stream;
> + d->pkt_heartbeat->opaque =
> (void*)(intptr_t)PKT_OPAQUE_SUB_HEARTBEAT;
> +
> + ret = do_send(d, ds1, d->pkt_heartbeat, 0, "heartbeat");
> + if (ret < 0)
> + return ret;
> + }
> + }
> +
> + ret = do_send(d, ds, pkt, flags, "demuxed");
> + if (ret < 0)
> + return ret;
> +
> +
> + return 0;
> +}
> +
> static void discard_unused_programs(InputFile *ifile)
> {
> for (int j = 0; j < ifile->ctx->nb_programs; j++) {
> @@ -527,9 +590,13 @@ static void *input_thread(void *arg)
>
> discard_unused_programs(f);
>
> + d->read_started = 1;
> d->wallclock_start = av_gettime_relative();
>
> while (1) {
> + DemuxStream *ds;
> + unsigned send_flags = 0;
> +
> ret = av_read_frame(f->ctx, pkt);
>
> if (ret == AVERROR(EAGAIN)) {
> @@ -538,11 +605,13 @@ static void *input_thread(void *arg)
> }
> if (ret < 0) {
> if (d->loop) {
> - /* signal looping to the consumer thread */
> + /* signal looping to our consumers */
> pkt->stream_index = -1;
> - ret = tq_send(d->thread_queue, 0, pkt);
> +
> + ret = sch_demux_send(d->sch, f->index, pkt, 0);
> if (ret >= 0)
> - ret = seek_to_start(d);
> + ret = seek_to_start(d, (Timestamp){ .ts = pkt->pts,
> + .tb =
> pkt->time_base });
> if (ret >= 0)
> continue;
>
> @@ -551,9 +620,11 @@ static void *input_thread(void *arg)
>
> if (ret == AVERROR_EOF)
> av_log(d, AV_LOG_VERBOSE, "EOF while reading input\n");
> - else
> + else {
> av_log(d, AV_LOG_ERROR, "Error during demuxing: %s\n",
> av_err2str(ret));
> + ret = exit_on_error ? ret : 0;
> + }
>
> break;
> }
> @@ -565,8 +636,9 @@ static void *input_thread(void *arg)
>
> /* the following test is needed in case new streams appear
> dynamically in stream : we ignore them */
> - if (pkt->stream_index >= f->nb_streams ||
> - f->streams[pkt->stream_index]->discard) {
> + ds = pkt->stream_index < f->nb_streams ?
> + ds_from_ist(f->streams[pkt->stream_index]) : NULL;
> + if (!ds || ds->ist.discard || ds->finished) {
> report_new_stream(d, pkt);
> av_packet_unref(pkt);
> continue;
> @@ -583,122 +655,26 @@ static void *input_thread(void *arg)
> }
> }
>
> - ret = input_packet_process(d, pkt);
> + ret = input_packet_process(d, pkt, &send_flags);
> if (ret < 0)
> break;
>
> if (f->readrate)
> readrate_sleep(d);
>
> - ret = tq_send(d->thread_queue, 0, pkt);
> - if (ret < 0) {
> - if (ret != AVERROR_EOF)
> - av_log(f, AV_LOG_ERROR,
> - "Unable to send packet to main thread: %s\n",
> - av_err2str(ret));
> + ret = demux_send(d, ds, pkt, send_flags);
> + if (ret < 0)
> break;
> - }
> }
>
> + // EOF/EXIT is normal termination
> + if (ret == AVERROR_EOF || ret == AVERROR_EXIT)
> + ret = 0;
> +
> finish:
> - av_assert0(ret < 0);
> - tq_send_finish(d->thread_queue, 0);
> -
> av_packet_free(&pkt);
>
> - av_log(d, AV_LOG_VERBOSE, "Terminating demuxer thread\n");
> -
> - return NULL;
> -}
> -
> -static void thread_stop(Demuxer *d)
> -{
> - InputFile *f = &d->f;
> -
> - if (!d->thread_queue)
> - return;
> -
> - tq_receive_finish(d->thread_queue, 0);
> -
> - pthread_join(d->thread, NULL);
> -
> - tq_free(&d->thread_queue);
> -
> - av_thread_message_queue_free(&f->audio_ts_queue);
> -}
> -
> -static int thread_start(Demuxer *d)
> -{
> - int ret;
> - InputFile *f = &d->f;
> - ObjPool *op;
> -
> - if (d->thread_queue_size <= 0)
> - d->thread_queue_size = (nb_input_files > 1 ? 8 : 1);
> -
> - op = objpool_alloc_packets();
> - if (!op)
> - return AVERROR(ENOMEM);
> -
> - d->thread_queue = tq_alloc(1, d->thread_queue_size, op, pkt_move);
> - if (!d->thread_queue) {
> - objpool_free(&op);
> - return AVERROR(ENOMEM);
> - }
> -
> - if (d->loop) {
> - int nb_audio_dec = 0;
> -
> - for (int i = 0; i < f->nb_streams; i++) {
> - InputStream *ist = f->streams[i];
> - nb_audio_dec += !!(ist->decoding_needed &&
> - ist->st->codecpar->codec_type ==
> AVMEDIA_TYPE_AUDIO);
> - }
> -
> - if (nb_audio_dec) {
> - ret = av_thread_message_queue_alloc(&f->audio_ts_queue,
> - nb_audio_dec,
> sizeof(Timestamp));
> - if (ret < 0)
> - goto fail;
> - f->audio_ts_queue_size = nb_audio_dec;
> - }
> - }
> -
> - if ((ret = pthread_create(&d->thread, NULL, input_thread, d))) {
> - av_log(d, AV_LOG_ERROR, "pthread_create failed: %s. Try to
> increase `ulimit -v` or decrease `ulimit -s`.\n", strerror(ret));
> - ret = AVERROR(ret);
> - goto fail;
> - }
> -
> - d->read_started = 1;
> -
> - return 0;
> -fail:
> - tq_free(&d->thread_queue);
> - return ret;
> -}
> -
> -int ifile_get_packet(InputFile *f, AVPacket *pkt)
> -{
> - Demuxer *d = demuxer_from_ifile(f);
> - int ret, dummy;
> -
> - if (!d->thread_queue) {
> - ret = thread_start(d);
> - if (ret < 0)
> - return ret;
> - }
> -
> - ret = tq_receive(d->thread_queue, &dummy, pkt);
> - if (ret < 0)
> - return ret;
> -
> - if (pkt->stream_index == -1) {
> - av_assert0(!pkt->data && !pkt->side_data_elems);
> - return 1;
> - }
> -
> - return 0;
> + return (void*)(intptr_t)ret;
> }
>
> static void demux_final_stats(Demuxer *d)
> @@ -769,8 +745,6 @@ void ifile_close(InputFile **pf)
> if (!f)
> return;
>
> - thread_stop(d);
> -
> if (d->read_started)
> demux_final_stats(d);
>
> @@ -780,6 +754,8 @@ void ifile_close(InputFile **pf)
>
> avformat_close_input(&f->ctx);
>
> + av_packet_free(&d->pkt_heartbeat);
> +
> av_freep(pf);
> }
>
> @@ -802,7 +778,11 @@ static int ist_use(InputStream *ist, int
> decoding_needed)
> ds->sch_idx_stream = ret;
> }
>
> - ist->discard = 0;
> + if (ist->discard) {
> + ist->discard = 0;
> + d->nb_streams_used++;
> + }
> +
> ist->st->discard = ist->user_set_discard;
> ist->decoding_needed |= decoding_needed;
> ds->streamcopy_needed |= !decoding_needed;
> @@ -823,6 +803,8 @@ static int ist_use(InputStream *ist, int
> decoding_needed)
> ret = dec_open(ist, d->sch, ds->sch_idx_dec);
> if (ret < 0)
> return ret;
> +
> + d->have_audio_dec |= is_audio;
> }
>
> return 0;
> @@ -848,6 +830,7 @@ int ist_output_add(InputStream *ist, OutputStream *ost)
>
> int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple)
> {
> + Demuxer *d = demuxer_from_ifile(input_files[ist->file_index]);
> DemuxStream *ds = ds_from_ist(ist);
> int ret;
>
> @@ -866,6 +849,15 @@ int ist_filter_add(InputStream *ist, InputFilter
> *ifilter, int is_simple)
> if (ret < 0)
> return ret;
>
> + if (ist->dec_ctx->codec_type == AVMEDIA_TYPE_SUBTITLE) {
> + if (!d->pkt_heartbeat) {
> + d->pkt_heartbeat = av_packet_alloc();
> + if (!d->pkt_heartbeat)
> + return AVERROR(ENOMEM);
> + }
> + ds->have_sub2video = 1;
> + }
> +
> return ds->sch_idx_dec;
> }
>
> @@ -1607,8 +1599,6 @@ int ifile_open(const OptionsContext *o, const char
> *filename, Scheduler *sch)
> "since neither -readrate nor -re were given\n");
> }
>
> - d->thread_queue_size = o->thread_queue_size;
> -
> /* Add all the streams from the given input file to the demuxer */
> for (int i = 0; i < ic->nb_streams; i++) {
> ret = ist_add(o, d, ic->streams[i]);
> diff --git a/fftools/ffmpeg_enc.c b/fftools/ffmpeg_enc.c
> index 9871381c0e..9383b167f7 100644
> --- a/fftools/ffmpeg_enc.c
> +++ b/fftools/ffmpeg_enc.c
> @@ -41,12 +41,6 @@
> #include "libavformat/avformat.h"
>
> struct Encoder {
> - AVFrame *sq_frame;
> -
> - // packet for receiving encoded output
> - AVPacket *pkt;
> - AVFrame *sub_frame;
> -
> // combined size of all the packets received from the encoder
> uint64_t data_size;
>
> @@ -54,25 +48,9 @@ struct Encoder {
> uint64_t packets_encoded;
>
> int opened;
> - int finished;
>
> Scheduler *sch;
> unsigned sch_idx;
> -
> - pthread_t thread;
> - /**
> - * Queue for sending frames from the main thread to
> - * the encoder thread.
> - */
> - ThreadQueue *queue_in;
> - /**
> - * Queue for sending encoded packets from the encoder thread
> - * to the main thread.
> - *
> - * An empty packet is sent to signal that a previously sent
> - * frame has been fully processed.
> - */
> - ThreadQueue *queue_out;
> };
>
> // data that is local to the decoder thread and not visible outside of it
> @@ -81,24 +59,6 @@ typedef struct EncoderThread {
> AVPacket *pkt;
> } EncoderThread;
>
> -static int enc_thread_stop(Encoder *e)
> -{
> - void *ret;
> -
> - if (!e->queue_in)
> - return 0;
> -
> - tq_send_finish(e->queue_in, 0);
> - tq_receive_finish(e->queue_out, 0);
> -
> - pthread_join(e->thread, &ret);
> -
> - tq_free(&e->queue_in);
> - tq_free(&e->queue_out);
> -
> - return (int)(intptr_t)ret;
> -}
> -
> void enc_free(Encoder **penc)
> {
> Encoder *enc = *penc;
> @@ -106,13 +66,6 @@ void enc_free(Encoder **penc)
> if (!enc)
> return;
>
> - enc_thread_stop(enc);
> -
> - av_frame_free(&enc->sq_frame);
> - av_frame_free(&enc->sub_frame);
> -
> - av_packet_free(&enc->pkt);
> -
> av_freep(penc);
> }
>
> @@ -127,25 +80,12 @@ int enc_alloc(Encoder **penc, const AVCodec *codec,
> if (!enc)
> return AVERROR(ENOMEM);
>
> - if (codec->type == AVMEDIA_TYPE_SUBTITLE) {
> - enc->sub_frame = av_frame_alloc();
> - if (!enc->sub_frame)
> - goto fail;
> - }
> -
> - enc->pkt = av_packet_alloc();
> - if (!enc->pkt)
> - goto fail;
> -
> enc->sch = sch;
> enc->sch_idx = sch_idx;
>
> *penc = enc;
>
> return 0;
> -fail:
> - enc_free(&enc);
> - return AVERROR(ENOMEM);
> }
>
> static int hw_device_setup_for_encode(OutputStream *ost, AVBufferRef
> *frames_ref)
> @@ -224,52 +164,9 @@ static int set_encoder_id(OutputFile *of,
> OutputStream *ost)
> return 0;
> }
>
> -static int enc_thread_start(OutputStream *ost)
> -{
> - Encoder *e = ost->enc;
> - ObjPool *op;
> - int ret = 0;
> -
> - op = objpool_alloc_frames();
> - if (!op)
> - return AVERROR(ENOMEM);
> -
> - e->queue_in = tq_alloc(1, 1, op, frame_move);
> - if (!e->queue_in) {
> - objpool_free(&op);
> - return AVERROR(ENOMEM);
> - }
> -
> - op = objpool_alloc_packets();
> - if (!op)
> - goto fail;
> -
> - e->queue_out = tq_alloc(1, 4, op, pkt_move);
> - if (!e->queue_out) {
> - objpool_free(&op);
> - goto fail;
> - }
> -
> - ret = pthread_create(&e->thread, NULL, encoder_thread, ost);
> - if (ret) {
> - ret = AVERROR(ret);
> - av_log(ost, AV_LOG_ERROR, "pthread_create() failed: %s\n",
> - av_err2str(ret));
> - goto fail;
> - }
> -
> - return 0;
> -fail:
> - if (ret >= 0)
> - ret = AVERROR(ENOMEM);
> -
> - tq_free(&e->queue_in);
> - tq_free(&e->queue_out);
> - return ret;
> -}
> -
> -int enc_open(OutputStream *ost, const AVFrame *frame)
> +int enc_open(void *opaque, const AVFrame *frame)
> {
> + OutputStream *ost = opaque;
> InputStream *ist = ost->ist;
> Encoder *e = ost->enc;
> AVCodecContext *enc_ctx = ost->enc_ctx;
> @@ -277,6 +174,7 @@ int enc_open(OutputStream *ost, const AVFrame *frame)
> const AVCodec *enc = enc_ctx->codec;
> OutputFile *of = output_files[ost->file_index];
> FrameData *fd;
> + int frame_samples = 0;
> int ret;
>
> if (e->opened)
> @@ -420,17 +318,8 @@ int enc_open(OutputStream *ost, const AVFrame *frame)
>
> e->opened = 1;
>
> - if (ost->sq_idx_encode >= 0) {
> - e->sq_frame = av_frame_alloc();
> - if (!e->sq_frame)
> - return AVERROR(ENOMEM);
> - }
> -
> - if (ost->enc_ctx->frame_size) {
> - av_assert0(ost->sq_idx_encode >= 0);
> - sq_frame_samples(output_files[ost->file_index]->sq_encode,
> - ost->sq_idx_encode, ost->enc_ctx->frame_size);
> - }
> + if (ost->enc_ctx->frame_size)
> + frame_samples = ost->enc_ctx->frame_size;
>
> ret = check_avoptions(ost->encoder_opts);
> if (ret < 0)
> @@ -476,18 +365,11 @@ int enc_open(OutputStream *ost, const AVFrame *frame)
> if (ost->st->time_base.num <= 0 || ost->st->time_base.den <= 0)
> ost->st->time_base = av_add_q(ost->enc_ctx->time_base,
> (AVRational){0, 1});
>
> - ret = enc_thread_start(ost);
> - if (ret < 0) {
> - av_log(ost, AV_LOG_ERROR, "Error starting encoder thread: %s\n",
> - av_err2str(ret));
> - return ret;
> - }
> -
> ret = of_stream_init(of, ost);
> if (ret < 0)
> return ret;
>
> - return 0;
> + return frame_samples;
> }
>
> static int check_recording_time(OutputStream *ost, int64_t ts, AVRational
> tb)
> @@ -514,8 +396,7 @@ static int do_subtitle_out(OutputFile *of,
> OutputStream *ost, const AVSubtitle *
> av_log(ost, AV_LOG_ERROR, "Subtitle packets must have a pts\n");
> return exit_on_error ? AVERROR(EINVAL) : 0;
> }
> - if (ost->finished ||
> - (of->start_time != AV_NOPTS_VALUE && sub->pts < of->start_time))
> + if ((of->start_time != AV_NOPTS_VALUE && sub->pts < of->start_time))
> return 0;
>
> enc = ost->enc_ctx;
> @@ -579,7 +460,7 @@ static int do_subtitle_out(OutputFile *of,
> OutputStream *ost, const AVSubtitle *
> }
> pkt->dts = pkt->pts;
>
> - ret = tq_send(e->queue_out, 0, pkt);
> + ret = sch_enc_send(e->sch, e->sch_idx, pkt);
> if (ret < 0) {
> av_packet_unref(pkt);
> return ret;
> @@ -671,10 +552,13 @@ static int update_video_stats(OutputStream *ost,
> const AVPacket *pkt, int write_
> int64_t frame_number;
> double ti1, bitrate, avg_bitrate;
> double psnr_val = -1;
> + int quality;
>
> - ost->quality = sd ? AV_RL32(sd) : -1;
> + quality = sd ? AV_RL32(sd) : -1;
> pict_type = sd ? sd[4] : AV_PICTURE_TYPE_NONE;
>
> + atomic_store(&ost->quality, quality);
> +
> if ((enc->flags & AV_CODEC_FLAG_PSNR) && sd && sd[5]) {
> // FIXME the scaling assumes 8bit
> double error = AV_RL64(sd + 8) / (enc->width * enc->height *
> 255.0 * 255.0);
> @@ -697,10 +581,10 @@ static int update_video_stats(OutputStream *ost,
> const AVPacket *pkt, int write_
> frame_number = e->packets_encoded;
> if (vstats_version <= 1) {
> fprintf(vstats_file, "frame= %5"PRId64" q= %2.1f ", frame_number,
> - ost->quality / (float)FF_QP2LAMBDA);
> + quality / (float)FF_QP2LAMBDA);
> } else {
> fprintf(vstats_file, "out= %2d st= %2d frame= %5"PRId64" q= %2.1f
> ", ost->file_index, ost->index, frame_number,
> - ost->quality / (float)FF_QP2LAMBDA);
> + quality / (float)FF_QP2LAMBDA);
> }
>
> if (psnr_val >= 0)
> @@ -801,18 +685,11 @@ static int encode_frame(OutputFile *of, OutputStream
> *ost, AVFrame *frame,
> av_ts2str(pkt->duration), av_ts2timestr(pkt->duration,
> &enc->time_base));
> }
>
> - if ((ret = trigger_fix_sub_duration_heartbeat(ost, pkt)) < 0) {
> - av_log(NULL, AV_LOG_ERROR,
> - "Subtitle heartbeat logic failed in %s! (%s)\n",
> - __func__, av_err2str(ret));
> - return ret;
> - }
> -
> e->data_size += pkt->size;
>
> e->packets_encoded++;
>
> - ret = tq_send(e->queue_out, 0, pkt);
> + ret = sch_enc_send(e->sch, e->sch_idx, pkt);
> if (ret < 0) {
> av_packet_unref(pkt);
> return ret;
> @@ -822,50 +699,6 @@ static int encode_frame(OutputFile *of, OutputStream
> *ost, AVFrame *frame,
> av_assert0(0);
> }
>
> -static int submit_encode_frame(OutputFile *of, OutputStream *ost,
> - AVFrame *frame, AVPacket *pkt)
> -{
> - Encoder *e = ost->enc;
> - int ret;
> -
> - if (ost->sq_idx_encode < 0)
> - return encode_frame(of, ost, frame, pkt);
> -
> - if (frame) {
> - ret = av_frame_ref(e->sq_frame, frame);
> - if (ret < 0)
> - return ret;
> - frame = e->sq_frame;
> - }
> -
> - ret = sq_send(of->sq_encode, ost->sq_idx_encode,
> - SQFRAME(frame));
> - if (ret < 0) {
> - if (frame)
> - av_frame_unref(frame);
> - if (ret != AVERROR_EOF)
> - return ret;
> - }
> -
> - while (1) {
> - AVFrame *enc_frame = e->sq_frame;
> -
> - ret = sq_receive(of->sq_encode, ost->sq_idx_encode,
> - SQFRAME(enc_frame));
> - if (ret == AVERROR_EOF) {
> - enc_frame = NULL;
> - } else if (ret < 0) {
> - return (ret == AVERROR(EAGAIN)) ? 0 : ret;
> - }
> -
> - ret = encode_frame(of, ost, enc_frame, pkt);
> - if (enc_frame)
> - av_frame_unref(enc_frame);
> - if (ret < 0)
> - return ret;
> - }
> -}
> -
> static int do_audio_out(OutputFile *of, OutputStream *ost,
> AVFrame *frame, AVPacket *pkt)
> {
> @@ -881,7 +714,7 @@ static int do_audio_out(OutputFile *of, OutputStream
> *ost,
> if (!check_recording_time(ost, frame->pts, frame->time_base))
> return AVERROR_EOF;
>
> - return submit_encode_frame(of, ost, frame, pkt);
> + return encode_frame(of, ost, frame, pkt);
> }
>
> static enum AVPictureType forced_kf_apply(void *logctx, KeyframeForceCtx
> *kf,
> @@ -949,7 +782,7 @@ static int do_video_out(OutputFile *of, OutputStream
> *ost,
> }
> #endif
>
> - return submit_encode_frame(of, ost, in_picture, pkt);
> + return encode_frame(of, ost, in_picture, pkt);
> }
>
> static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt)
> @@ -958,9 +791,12 @@ static int frame_encode(OutputStream *ost, AVFrame
> *frame, AVPacket *pkt)
> enum AVMediaType type = ost->type;
>
> if (type == AVMEDIA_TYPE_SUBTITLE) {
> + const AVSubtitle *subtitle = frame && frame->buf[0] ?
> + (AVSubtitle*)frame->buf[0]->data :
> NULL;
> +
> // no flushing for subtitles
> - return frame ?
> - do_subtitle_out(of, ost, (AVSubtitle*)frame->buf[0]->data,
> pkt) : 0;
> + return subtitle && subtitle->num_rects ?
> + do_subtitle_out(of, ost, subtitle, pkt) : 0;
> }
>
> if (frame) {
> @@ -968,7 +804,7 @@ static int frame_encode(OutputStream *ost, AVFrame
> *frame, AVPacket *pkt)
> do_audio_out(of, ost,
> frame, pkt);
> }
>
> - return submit_encode_frame(of, ost, NULL, pkt);
> + return encode_frame(of, ost, NULL, pkt);
> }
>
> static void enc_thread_set_name(const OutputStream *ost)
> @@ -1009,24 +845,50 @@ fail:
> void *encoder_thread(void *arg)
> {
> OutputStream *ost = arg;
> - OutputFile *of = output_files[ost->file_index];
> Encoder *e = ost->enc;
> EncoderThread et;
> int ret = 0, input_status = 0;
> + int name_set = 0;
>
> ret = enc_thread_init(&et);
> if (ret < 0)
> goto finish;
>
> - enc_thread_set_name(ost);
> + /* Open the subtitle encoders immediately. AVFrame-based encoders
> + * are opened through a callback from the scheduler once they get
> + * their first frame
> + *
> + * N.B.: because the callback is called from a different thread,
> + * enc_ctx MUST NOT be accessed before sch_enc_receive() returns
> + * for the first time for audio/video. */
> + if (ost->type != AVMEDIA_TYPE_VIDEO && ost->type !=
> AVMEDIA_TYPE_AUDIO) {
> + ret = enc_open(ost, NULL);
> + if (ret < 0)
> + goto finish;
> + }
>
> while (!input_status) {
> - int dummy;
> -
> - input_status = tq_receive(e->queue_in, &dummy, et.frame);
> - if (input_status < 0)
> + input_status = sch_enc_receive(e->sch, e->sch_idx, et.frame);
> + if (input_status == AVERROR_EOF) {
> av_log(ost, AV_LOG_VERBOSE, "Encoder thread received EOF\n");
>
> + if (!e->opened) {
> + av_log(ost, AV_LOG_ERROR, "Could not open encoder before
> EOF\n");
> + ret = AVERROR(EINVAL);
> + goto finish;
> + }
> + } else if (input_status < 0) {
> + ret = input_status;
> + av_log(ost, AV_LOG_ERROR, "Error receiving a frame for
> encoding: %s\n",
> + av_err2str(ret));
> + goto finish;
> + }
> +
> + if (!name_set) {
> + enc_thread_set_name(ost);
> + name_set = 1;
> + }
> +
> ret = frame_encode(ost, input_status >= 0 ? et.frame : NULL,
> et.pkt);
>
> av_packet_unref(et.pkt);
> @@ -1040,15 +902,6 @@ void *encoder_thread(void *arg)
> av_err2str(ret));
> break;
> }
> -
> - // signal to the consumer thread that the frame was encoded
> - ret = tq_send(e->queue_out, 0, et.pkt);
> - if (ret < 0) {
> - if (ret != AVERROR_EOF)
> - av_log(ost, AV_LOG_ERROR,
> - "Error communicating with the main thread\n");
> - break;
> - }
> }
>
> // EOF is normal thread termination
> @@ -1056,118 +909,7 @@ void *encoder_thread(void *arg)
> ret = 0;
>
> finish:
> - if (ost->sq_idx_encode >= 0)
> - sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
> -
> - tq_receive_finish(e->queue_in, 0);
> - tq_send_finish (e->queue_out, 0);
> -
> enc_thread_uninit(&et);
>
> - av_log(ost, AV_LOG_VERBOSE, "Terminating encoder thread\n");
> -
> return (void*)(intptr_t)ret;
> }
> -
> -int enc_frame(OutputStream *ost, AVFrame *frame)
> -{
> - OutputFile *of = output_files[ost->file_index];
> - Encoder *e = ost->enc;
> - int ret, thread_ret;
> -
> - ret = enc_open(ost, frame);
> - if (ret < 0)
> - return ret;
> -
> - if (!e->queue_in)
> - return AVERROR_EOF;
> -
> - // send the frame/EOF to the encoder thread
> - if (frame) {
> - ret = tq_send(e->queue_in, 0, frame);
> - if (ret < 0)
> - goto finish;
> - } else
> - tq_send_finish(e->queue_in, 0);
> -
> - // retrieve all encoded data for the frame
> - while (1) {
> - int dummy;
> -
> - ret = tq_receive(e->queue_out, &dummy, e->pkt);
> - if (ret < 0)
> - break;
> -
> - // frame fully encoded
> - if (!e->pkt->data && !e->pkt->side_data_elems)
> - return 0;
> -
> - // process the encoded packet
> - ret = of_output_packet(of, ost, e->pkt);
> - if (ret < 0)
> - goto finish;
> - }
> -
> -finish:
> - thread_ret = enc_thread_stop(e);
> - if (thread_ret < 0) {
> - av_log(ost, AV_LOG_ERROR, "Encoder thread returned error: %s\n",
> - av_err2str(thread_ret));
> - ret = err_merge(ret, thread_ret);
> - }
> -
> - if (ret < 0 && ret != AVERROR_EOF)
> - return ret;
> -
> - // signal EOF to the muxer
> - return of_output_packet(of, ost, NULL);
> -}
> -
> -int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub)
> -{
> - Encoder *e = ost->enc;
> - AVFrame *f = e->sub_frame;
> - int ret;
> -
> - // XXX the queue for transferring data to the encoder thread runs
> - // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put
> - // that inside the frame
> - // eventually, subtitles should be switched to use AVFrames natively
> - ret = subtitle_wrap_frame(f, sub, 1);
> - if (ret < 0)
> - return ret;
> -
> - ret = enc_frame(ost, f);
> - av_frame_unref(f);
> -
> - return ret;
> -}
> -
> -int enc_flush(void)
> -{
> - int ret = 0;
> -
> - for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
> - OutputFile *of = output_files[ost->file_index];
> - if (ost->sq_idx_encode >= 0)
> - sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
> - }
> -
> - for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
> - Encoder *e = ost->enc;
> - AVCodecContext *enc = ost->enc_ctx;
> - int err;
> -
> - if (!enc || !e->opened ||
> - (enc->codec_type != AVMEDIA_TYPE_VIDEO && enc->codec_type !=
> AVMEDIA_TYPE_AUDIO))
> - continue;
> -
> - err = enc_frame(ost, NULL);
> - if (err != AVERROR_EOF && ret < 0)
> - ret = err_merge(ret, err);
> -
> - av_assert0(!e->queue_in);
> - }
> -
> - return ret;
> -}
> diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c
> index 635b1b0b6e..ada235b084 100644
> --- a/fftools/ffmpeg_filter.c
> +++ b/fftools/ffmpeg_filter.c
> @@ -21,8 +21,6 @@
> #include <stdint.h>
>
> #include "ffmpeg.h"
> -#include "ffmpeg_utils.h"
> -#include "thread_queue.h"
>
> #include "libavfilter/avfilter.h"
> #include "libavfilter/buffersink.h"
> @@ -53,10 +51,11 @@ typedef struct FilterGraphPriv {
> // true when the filtergraph contains only meta filters
> // that do not modify the frame data
> int is_meta;
> + // source filters are present in the graph
> + int have_sources;
> int disable_conversions;
>
> - int nb_inputs_bound;
> - int nb_outputs_bound;
> + unsigned nb_outputs_done;
>
> const char *graph_desc;
>
> @@ -67,41 +66,6 @@ typedef struct FilterGraphPriv {
>
> Scheduler *sch;
> unsigned sch_idx;
> -
> - pthread_t thread;
> - /**
> - * Queue for sending frames from the main thread to the filtergraph.
> Has
> - * nb_inputs+1 streams - the first nb_inputs stream correspond to
> - * filtergraph inputs. Frames on those streams may have their opaque
> set to
> - * - FRAME_OPAQUE_EOF: frame contains no data, but pts+timebase of the
> - * EOF event for the correspondint stream. Will be immediately
> followed by
> - * this stream being send-closed.
> - * - FRAME_OPAQUE_SUB_HEARTBEAT: frame contains no data, but
> pts+timebase of
> - * a subtitle heartbeat event. Will only be sent for sub2video
> streams.
> - *
> - * The last stream is "control" - the main thread sends empty
> AVFrames with
> - * opaque set to
> - * - FRAME_OPAQUE_REAP_FILTERS: a request to retrieve all frame
> available
> - * from filtergraph outputs. These frames are sent to corresponding
> - * streams in queue_out. Finally an empty frame is sent to the
> control
> - * stream in queue_out.
> - * - FRAME_OPAQUE_CHOOSE_INPUT: same as above, but in case no frames
> are
> - * available the terminating empty frame's opaque will contain the
> index+1
> - * of the filtergraph input to which more input frames should be
> supplied.
> - */
> - ThreadQueue *queue_in;
> - /**
> - * Queue for sending frames from the filtergraph back to the main
> thread.
> - * Has nb_outputs+1 streams - the first nb_outputs stream correspond
> to
> - * filtergraph outputs.
> - *
> - * The last stream is "control" - see documentation for queue_in for
> more
> - * details.
> - */
> - ThreadQueue *queue_out;
> - // submitting frames to filter thread returned EOF
> - // this only happens on thread exit, so is not per-input
> - int eof_in;
> } FilterGraphPriv;
>
> static FilterGraphPriv *fgp_from_fg(FilterGraph *fg)
> @@ -123,6 +87,9 @@ typedef struct FilterGraphThread {
> // The output index is stored in frame opaque.
> AVFifo *frame_queue_out;
>
> + // index of the next input to request from the scheduler
> + unsigned next_in;
> + // set to 1 after at least one frame passed through this output
> int got_frame;
>
> // EOF status of each input/output, as received by the thread
> @@ -253,9 +220,6 @@ typedef struct OutputFilterPriv {
> int64_t ts_offset;
> int64_t next_pts;
> FPSConvContext fps;
> -
> - // set to 1 after at least one frame passed through this output
> - int got_frame;
> } OutputFilterPriv;
>
> static OutputFilterPriv *ofp_from_ofilter(OutputFilter *ofilter)
> @@ -653,57 +617,6 @@ static int ifilter_has_all_input_formats(FilterGraph
> *fg)
>
> static void *filter_thread(void *arg);
>
> -// start the filtering thread once all inputs and outputs are bound
> -static int fg_thread_try_start(FilterGraphPriv *fgp)
> -{
> - FilterGraph *fg = &fgp->fg;
> - ObjPool *op;
> - int ret = 0;
> -
> - if (fgp->nb_inputs_bound < fg->nb_inputs ||
> - fgp->nb_outputs_bound < fg->nb_outputs)
> - return 0;
> -
> - op = objpool_alloc_frames();
> - if (!op)
> - return AVERROR(ENOMEM);
> -
> - fgp->queue_in = tq_alloc(fg->nb_inputs + 1, 1, op, frame_move);
> - if (!fgp->queue_in) {
> - objpool_free(&op);
> - return AVERROR(ENOMEM);
> - }
> -
> - // at least one output is mandatory
> - op = objpool_alloc_frames();
> - if (!op)
> - goto fail;
> -
> - fgp->queue_out = tq_alloc(fg->nb_outputs + 1, 1, op, frame_move);
> - if (!fgp->queue_out) {
> - objpool_free(&op);
> - goto fail;
> - }
> -
> - ret = pthread_create(&fgp->thread, NULL, filter_thread, fgp);
> - if (ret) {
> - ret = AVERROR(ret);
> - av_log(NULL, AV_LOG_ERROR, "pthread_create() for filtergraph %d
> failed: %s\n",
> - fg->index, av_err2str(ret));
> - goto fail;
> - }
> -
> - return 0;
> -fail:
> - if (ret >= 0)
> - ret = AVERROR(ENOMEM);
> -
> - tq_free(&fgp->queue_in);
> - tq_free(&fgp->queue_out);
> -
> - return ret;
> -}
> -
> static char *describe_filter_link(FilterGraph *fg, AVFilterInOut *inout,
> int in)
> {
> AVFilterContext *ctx = inout->filter_ctx;
> @@ -729,7 +642,6 @@ static OutputFilter *ofilter_alloc(FilterGraph *fg)
> ofilter->graph = fg;
> ofp->format = -1;
> ofp->index = fg->nb_outputs - 1;
> - ofilter->last_pts = AV_NOPTS_VALUE;
>
> return ofilter;
> }
> @@ -760,10 +672,7 @@ static int ifilter_bind_ist(InputFilter *ifilter,
> InputStream *ist)
> return AVERROR(ENOMEM);
> }
>
> - fgp->nb_inputs_bound++;
> - av_assert0(fgp->nb_inputs_bound <= ifilter->graph->nb_inputs);
> -
> - return fg_thread_try_start(fgp);
> + return 0;
> }
>
> static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost)
> @@ -902,10 +811,7 @@ int ofilter_bind_ost(OutputFilter *ofilter,
> OutputStream *ost,
> if (ret < 0)
> return ret;
>
> - fgp->nb_outputs_bound++;
> - av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs);
> -
> - return fg_thread_try_start(fgp);
> + return 0;
> }
>
> static InputFilter *ifilter_alloc(FilterGraph *fg)
> @@ -935,34 +841,6 @@ static InputFilter *ifilter_alloc(FilterGraph *fg)
> return ifilter;
> }
>
> -static int fg_thread_stop(FilterGraphPriv *fgp)
> -{
> - void *ret;
> -
> - if (!fgp->queue_in)
> - return 0;
> -
> - for (int i = 0; i <= fgp->fg.nb_inputs; i++) {
> - InputFilterPriv *ifp = i < fgp->fg.nb_inputs ?
> - ifp_from_ifilter(fgp->fg.inputs[i]) : NULL;
> -
> - if (ifp)
> - ifp->eof = 1;
> -
> - tq_send_finish(fgp->queue_in, i);
> - }
> -
> - for (int i = 0; i <= fgp->fg.nb_outputs; i++)
> - tq_receive_finish(fgp->queue_out, i);
> -
> - pthread_join(fgp->thread, &ret);
> -
> - tq_free(&fgp->queue_in);
> - tq_free(&fgp->queue_out);
> -
> - return (int)(intptr_t)ret;
> -}
> -
> void fg_free(FilterGraph **pfg)
> {
> FilterGraph *fg = *pfg;
> @@ -972,8 +850,6 @@ void fg_free(FilterGraph **pfg)
> return;
> fgp = fgp_from_fg(fg);
>
> - fg_thread_stop(fgp);
> -
> avfilter_graph_free(&fg->graph);
> for (int j = 0; j < fg->nb_inputs; j++) {
> InputFilter *ifilter = fg->inputs[j];
> @@ -1072,6 +948,15 @@ int fg_create(FilterGraph **pfg, char *graph_desc,
> Scheduler *sch)
> if (ret < 0)
> goto fail;
>
> + for (unsigned i = 0; i < graph->nb_filters; i++) {
> + const AVFilter *f = graph->filters[i]->filter;
> + if (!avfilter_filter_pad_count(f, 0) &&
> + !(f->flags & AVFILTER_FLAG_DYNAMIC_INPUTS)) {
> + fgp->have_sources = 1;
> + break;
> + }
> + }
> +
> for (AVFilterInOut *cur = inputs; cur; cur = cur->next) {
> InputFilter *const ifilter = ifilter_alloc(fg);
> InputFilterPriv *ifp;
> @@ -1800,6 +1685,7 @@ static int configure_filtergraph(FilterGraph *fg,
> const FilterGraphThread *fgt)
> AVBufferRef *hw_device;
> AVFilterInOut *inputs, *outputs, *cur;
> int ret, i, simple = filtergraph_is_simple(fg);
> + int have_input_eof = 0;
> const char *graph_desc = fgp->graph_desc;
>
> cleanup_filtergraph(fg);
> @@ -1922,11 +1808,18 @@ static int configure_filtergraph(FilterGraph *fg,
> const FilterGraphThread *fgt)
> ret = av_buffersrc_add_frame(ifp->filter, NULL);
> if (ret < 0)
> goto fail;
> + have_input_eof = 1;
> }
> }
>
> - return 0;
> + if (have_input_eof) {
> + // make sure the EOF propagates to the end of the graph
> + ret = avfilter_graph_request_oldest(fg->graph);
> + if (ret < 0 && ret != AVERROR(EAGAIN) && ret != AVERROR_EOF)
> + goto fail;
> + }
>
> + return 0;
> fail:
> cleanup_filtergraph(fg);
> return ret;
> @@ -2182,7 +2075,7 @@ static void video_sync_process(OutputFilterPriv
> *ofp, AVFrame *frame,
> fps->frames_prev_hist[2]);
>
> if (!*nb_frames && fps->last_dropped) {
> - ofilter->nb_frames_drop++;
> + atomic_fetch_add(&ofilter->nb_frames_drop, 1);
> fps->last_dropped++;
> }
>
> @@ -2260,21 +2153,23 @@ finish:
> fps->frames_prev_hist[0] = *nb_frames_prev;
>
> if (*nb_frames_prev == 0 && fps->last_dropped) {
> - ofilter->nb_frames_drop++;
> + atomic_fetch_add(&ofilter->nb_frames_drop, 1);
> av_log(ost, AV_LOG_VERBOSE,
> "*** dropping frame %"PRId64" at ts %"PRId64"\n",
> fps->frame_number, fps->last_frame->pts);
> }
> if (*nb_frames > (*nb_frames_prev && fps->last_dropped) + (*nb_frames
> > *nb_frames_prev)) {
> + uint64_t nb_frames_dup;
> if (*nb_frames > dts_error_threshold * 30) {
> av_log(ost, AV_LOG_ERROR, "%"PRId64" frame duplication too
> large, skipping\n", *nb_frames - 1);
> - ofilter->nb_frames_drop++;
> + atomic_fetch_add(&ofilter->nb_frames_drop, 1);
> *nb_frames = 0;
> return;
> }
> - ofilter->nb_frames_dup += *nb_frames - (*nb_frames_prev &&
> fps->last_dropped) - (*nb_frames > *nb_frames_prev);
> + nb_frames_dup = atomic_fetch_add(&ofilter->nb_frames_dup,
> + *nb_frames - (*nb_frames_prev &&
> fps->last_dropped) - (*nb_frames > *nb_frames_prev));
> av_log(ost, AV_LOG_VERBOSE, "*** %"PRId64" dup!\n", *nb_frames -
> 1);
> - if (ofilter->nb_frames_dup > fps->dup_warning) {
> + if (nb_frames_dup > fps->dup_warning) {
> av_log(ost, AV_LOG_WARNING, "More than %"PRIu64" frames
> duplicated\n", fps->dup_warning);
> fps->dup_warning *= 10;
> }
> @@ -2284,8 +2179,57 @@ finish:
> fps->dropped_keyframe |= fps->last_dropped && (frame->flags &
> AV_FRAME_FLAG_KEY);
> }
>
> +static int close_output(OutputFilterPriv *ofp, FilterGraphThread *fgt)
> +{
> + FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph);
> + int ret;
> +
> + // we are finished and no frames were ever seen at this output,
> + // at least initialize the encoder with a dummy frame
> + if (!fgt->got_frame) {
> + AVFrame *frame = fgt->frame;
> + FrameData *fd;
> +
> + frame->time_base = ofp->tb_out;
> + frame->format = ofp->format;
> +
> + frame->width = ofp->width;
> + frame->height = ofp->height;
> + frame->sample_aspect_ratio = ofp->sample_aspect_ratio;
> +
> + frame->sample_rate = ofp->sample_rate;
> + if (ofp->ch_layout.nb_channels) {
> + ret = av_channel_layout_copy(&frame->ch_layout,
> &ofp->ch_layout);
> + if (ret < 0)
> + return ret;
> + }
> +
> + fd = frame_data(frame);
> + if (!fd)
> + return AVERROR(ENOMEM);
> +
> + fd->frame_rate_filter = ofp->fps.framerate;
> +
> + av_assert0(!frame->buf[0]);
> +
> + av_log(ofp->ofilter.ost, AV_LOG_WARNING,
> + "No filtered frames for output stream, trying to "
> + "initialize anyway.\n");
> +
> + ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame);
> + if (ret < 0) {
> + av_frame_unref(frame);
> + return ret;
> + }
> + }
> +
> + fgt->eof_out[ofp->index] = 1;
> +
> + return sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, NULL);
> +}
> +
> static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
> - AVFrame *frame, int buffer)
> + AVFrame *frame)
> {
> FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph);
> AVFrame *frame_prev = ofp->fps.last_frame;
> @@ -2332,28 +2276,17 @@ static int fg_output_frame(OutputFilterPriv *ofp,
> FilterGraphThread *fgt,
> frame_out = frame;
> }
>
> - if (buffer) {
> - AVFrame *f = av_frame_alloc();
> -
> - if (!f) {
> - av_frame_unref(frame_out);
> - return AVERROR(ENOMEM);
> - }
> -
> - av_frame_move_ref(f, frame_out);
> - f->opaque = (void*)(intptr_t)ofp->index;
> -
> - ret = av_fifo_write(fgt->frame_queue_out, &f, 1);
> - if (ret < 0) {
> - av_frame_free(&f);
> - return AVERROR(ENOMEM);
> - }
> - } else {
> - // return the frame to the main thread
> - ret = tq_send(fgp->queue_out, ofp->index, frame_out);
> + {
> + // send the frame to consumers
> + ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index,
> frame_out);
> if (ret < 0) {
> av_frame_unref(frame_out);
> - fgt->eof_out[ofp->index] = 1;
> +
> + if (!fgt->eof_out[ofp->index]) {
> + fgt->eof_out[ofp->index] = 1;
> + fgp->nb_outputs_done++;
> + }
> +
> return ret == AVERROR_EOF ? 0 : ret;
> }
> }
> @@ -2374,16 +2307,14 @@ static int fg_output_frame(OutputFilterPriv *ofp,
> FilterGraphThread *fgt,
> av_frame_move_ref(frame_prev, frame);
> }
>
> - if (!frame) {
> - tq_send_finish(fgp->queue_out, ofp->index);
> - fgt->eof_out[ofp->index] = 1;
> - }
> + if (!frame)
> + return close_output(ofp, fgt);
>
> return 0;
> }
>
> static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
> - AVFrame *frame, int buffer)
> + AVFrame *frame)
> {
> FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph);
> OutputStream *ost = ofp->ofilter.ost;
> @@ -2393,8 +2324,8 @@ static int fg_output_step(OutputFilterPriv *ofp,
> FilterGraphThread *fgt,
>
> ret = av_buffersink_get_frame_flags(filter, frame,
> AV_BUFFERSINK_FLAG_NO_REQUEST);
> - if (ret == AVERROR_EOF && !buffer && !fgt->eof_out[ofp->index]) {
> - ret = fg_output_frame(ofp, fgt, NULL, buffer);
> + if (ret == AVERROR_EOF && !fgt->eof_out[ofp->index]) {
> + ret = fg_output_frame(ofp, fgt, NULL);
> return (ret < 0) ? ret : 1;
> } else if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
> return 1;
> @@ -2448,7 +2379,7 @@ static int fg_output_step(OutputFilterPriv *ofp,
> FilterGraphThread *fgt,
> fd->frame_rate_filter = ofp->fps.framerate;
> }
>
> - ret = fg_output_frame(ofp, fgt, frame, buffer);
> + ret = fg_output_frame(ofp, fgt, frame);
> av_frame_unref(frame);
> if (ret < 0)
> return ret;
> @@ -2456,44 +2387,68 @@ static int fg_output_step(OutputFilterPriv *ofp,
> FilterGraphThread *fgt,
> return 0;
> }
>
> -/* retrieve all frames available at filtergraph outputs and either send
> them to
> - * the main thread (buffer=0) or buffer them for later (buffer=1) */
> +/* retrieve all frames available at filtergraph outputs
> + * and send them to consumers */
> static int read_frames(FilterGraph *fg, FilterGraphThread *fgt,
> - AVFrame *frame, int buffer)
> + AVFrame *frame)
> {
> FilterGraphPriv *fgp = fgp_from_fg(fg);
> - int ret = 0;
> + int did_step = 0;
>
> - if (!fg->graph)
> - return 0;
> -
> - // process buffered frames
> - if (!buffer) {
> - AVFrame *f;
> -
> - while (av_fifo_read(fgt->frame_queue_out, &f, 1) >= 0) {
> - int out_idx = (intptr_t)f->opaque;
> - f->opaque = NULL;
> - ret = tq_send(fgp->queue_out, out_idx, f);
> - av_frame_free(&f);
> - if (ret < 0 && ret != AVERROR_EOF)
> - return ret;
> + // graph not configured, just select the input to request
> + if (!fg->graph) {
> + for (int i = 0; i < fg->nb_inputs; i++) {
> + InputFilterPriv *ifp = ifp_from_ifilter(fg->inputs[i]);
> + if (ifp->format < 0 && !fgt->eof_in[i]) {
> + fgt->next_in = i;
> + return 0;
> + }
> }
> +
> + // This state - graph is not configured, but all inputs are either
> + // initialized or EOF - should be unreachable because sending EOF
> to a
> + // filter without even a fallback format should fail
> + av_assert0(0);
> + return AVERROR_BUG;
> }
>
> - /* Reap all buffers present in the buffer sinks */
> - for (int i = 0; i < fg->nb_outputs; i++) {
> - OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
> - int ret = 0;
> + while (fgp->nb_outputs_done < fg->nb_outputs) {
> + int ret;
>
> - while (!ret) {
> - ret = fg_output_step(ofp, fgt, frame, buffer);
> - if (ret < 0)
> - return ret;
> + ret = avfilter_graph_request_oldest(fg->graph);
> + if (ret == AVERROR(EAGAIN)) {
> + fgt->next_in = choose_input(fg, fgt);
> + break;
> + } else if (ret < 0) {
> + if (ret == AVERROR_EOF)
> + av_log(fg, AV_LOG_VERBOSE, "Filtergraph returned EOF,
> finishing\n");
> + else
> + av_log(fg, AV_LOG_ERROR,
> + "Error requesting a frame from the filtergraph:
> %s\n",
> + av_err2str(ret));
> + return ret;
> }
> - }
> + fgt->next_in = fg->nb_inputs;
>
> - return 0;
> + // return after one iteration, so that scheduler can rate-control
> us
> + if (did_step && fgp->have_sources)
> + return 0;
> +
> + /* Reap all buffers present in the buffer sinks */
> + for (int i = 0; i < fg->nb_outputs; i++) {
> + OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
> +
> + ret = 0;
> + while (!ret) {
> + ret = fg_output_step(ofp, fgt, frame);
> + if (ret < 0)
> + return ret;
> + }
> + }
> + did_step = 1;
> + };
> +
> + return (fgp->nb_outputs_done == fg->nb_outputs) ? AVERROR_EOF : 0;
> }
>
> static void sub2video_heartbeat(InputFilter *ifilter, int64_t pts,
> AVRational tb)
> @@ -2571,6 +2526,9 @@ static int send_eof(FilterGraphThread *fgt,
> InputFilter *ifilter,
> InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
> int ret;
>
> + if (fgt->eof_in[ifp->index])
> + return 0;
> +
> fgt->eof_in[ifp->index] = 1;
>
> if (ifp->filter) {
> @@ -2672,7 +2630,7 @@ static int send_frame(FilterGraph *fg,
> FilterGraphThread *fgt,
> return ret;
> }
>
> - ret = fg->graph ? read_frames(fg, fgt, tmp, 1) : 0;
> + ret = fg->graph ? read_frames(fg, fgt, tmp) : 0;
> av_frame_free(&tmp);
> if (ret < 0)
> return ret;
> @@ -2705,82 +2663,6 @@ static int send_frame(FilterGraph *fg,
> FilterGraphThread *fgt,
> return 0;
> }
>
> -static int msg_process(FilterGraphPriv *fgp, FilterGraphThread *fgt,
> - AVFrame *frame)
> -{
> - const enum FrameOpaque msg = (intptr_t)frame->opaque;
> - FilterGraph *fg = &fgp->fg;
> - int graph_eof = 0;
> - int ret;
> -
> - frame->opaque = NULL;
> - av_assert0(msg > 0);
> - av_assert0(msg == FRAME_OPAQUE_SEND_COMMAND || !frame->buf[0]);
> -
> - if (!fg->graph) {
> - // graph not configured yet, ignore all messages other than
> choosing
> - // the input to read from
> - if (msg != FRAME_OPAQUE_CHOOSE_INPUT) {
> - av_frame_unref(frame);
> - goto done;
> - }
> -
> - for (int i = 0; i < fg->nb_inputs; i++) {
> - InputFilter *ifilter = fg->inputs[i];
> - InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
> - if (ifp->format < 0 && !fgt->eof_in[i]) {
> - frame->opaque = (void*)(intptr_t)(i + 1);
> - goto done;
> - }
> - }
> -
> - // This state - graph is not configured, but all inputs are either
> - // initialized or EOF - should be unreachable because sending EOF
> to a
> - // filter without even a fallback format should fail
> - av_assert0(0);
> - return AVERROR_BUG;
> - }
> -
> - if (msg == FRAME_OPAQUE_SEND_COMMAND) {
> - FilterCommand *fc = (FilterCommand*)frame->buf[0]->data;
> - send_command(fg, fc->time, fc->target, fc->command, fc->arg,
> fc->all_filters);
> - av_frame_unref(frame);
> - goto done;
> - }
> -
> - if (msg == FRAME_OPAQUE_CHOOSE_INPUT) {
> - ret = avfilter_graph_request_oldest(fg->graph);
> -
> - graph_eof = ret == AVERROR_EOF;
> -
> - if (ret == AVERROR(EAGAIN)) {
> - frame->opaque = (void*)(intptr_t)(choose_input(fg, fgt) + 1);
> - goto done;
> - } else if (ret < 0 && !graph_eof)
> - return ret;
> - }
> -
> - ret = read_frames(fg, fgt, frame, 0);
> - if (ret < 0) {
> - av_log(fg, AV_LOG_ERROR, "Error sending filtered frames for
> encoding\n");
> - return ret;
> - }
> -
> - if (graph_eof)
> - return AVERROR_EOF;
> -
> - // signal to the main thread that we are done processing the message
> -done:
> - ret = tq_send(fgp->queue_out, fg->nb_outputs, frame);
> - if (ret < 0) {
> - if (ret != AVERROR_EOF)
> - av_log(fg, AV_LOG_ERROR, "Error communicating with the main
> thread\n");
> - return ret;
> - }
> -
> - return 0;
> -}
> -
> static void fg_thread_set_name(const FilterGraph *fg)
> {
> char name[16];
> @@ -2867,294 +2749,94 @@ static void *filter_thread(void *arg)
> InputFilter *ifilter;
> InputFilterPriv *ifp;
> enum FrameOpaque o;
> - int input_idx, eof_frame;
> + unsigned input_idx = fgt.next_in;
>
> - input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame);
> - if (input_idx < 0 ||
> - (input_idx == fg->nb_inputs && input_status < 0)) {
> + input_status = sch_filter_receive(fgp->sch, fgp->sch_idx,
> + &input_idx, fgt.frame);
> + if (input_status == AVERROR_EOF) {
> av_log(fg, AV_LOG_VERBOSE, "Filtering thread received EOF\n");
> break;
> + } else if (input_status == AVERROR(EAGAIN)) {
> + // should only happen when we didn't request any input
> + av_assert0(input_idx == fg->nb_inputs);
> + goto read_frames;
> }
> + av_assert0(input_status >= 0);
> +
> + o = (intptr_t)fgt.frame->opaque;
>
> o = (intptr_t)fgt.frame->opaque;
>
> // message on the control stream
> if (input_idx == fg->nb_inputs) {
> - ret = msg_process(fgp, &fgt, fgt.frame);
> - if (ret < 0)
> - goto finish;
> + FilterCommand *fc;
>
> + av_assert0(o == FRAME_OPAQUE_SEND_COMMAND &&
> fgt.frame->buf[0]);
> +
> + fc = (FilterCommand*)fgt.frame->buf[0]->data;
> + send_command(fg, fc->time, fc->target, fc->command, fc->arg,
> + fc->all_filters);
> + av_frame_unref(fgt.frame);
> continue;
> }
>
> // we received an input frame or EOF
> ifilter = fg->inputs[input_idx];
> ifp = ifp_from_ifilter(ifilter);
> - eof_frame = input_status >= 0 && o == FRAME_OPAQUE_EOF;
> +
> if (ifp->type_src == AVMEDIA_TYPE_SUBTITLE) {
> int hb_frame = input_status >= 0 && o ==
> FRAME_OPAQUE_SUB_HEARTBEAT;
> ret = sub2video_frame(ifilter, (fgt.frame->buf[0] ||
> hb_frame) ? fgt.frame : NULL);
> - } else if (input_status >= 0 && fgt.frame->buf[0]) {
> + } else if (fgt.frame->buf[0]) {
> ret = send_frame(fg, &fgt, ifilter, fgt.frame);
> } else {
> - int64_t pts = input_status >= 0 ? fgt.frame->pts :
> AV_NOPTS_VALUE;
> - AVRational tb = input_status >= 0 ? fgt.frame->time_base :
> (AVRational){ 1, 1 };
> - ret = send_eof(&fgt, ifilter, pts, tb);
> + av_assert1(o == FRAME_OPAQUE_EOF);
> + ret = send_eof(&fgt, ifilter, fgt.frame->pts,
> fgt.frame->time_base);
> }
> av_frame_unref(fgt.frame);
> if (ret < 0)
> + goto finish;
> +
> +read_frames:
> + // retrieve all newly avalable frames
> + ret = read_frames(fg, &fgt, fgt.frame);
> + if (ret == AVERROR_EOF) {
> + av_log(fg, AV_LOG_VERBOSE, "All consumers returned EOF\n");
> break;
> -
> - if (eof_frame) {
> - // an EOF frame is immediately followed by sender closing
> - // the corresponding stream, so retrieve that event
> - input_status = tq_receive(fgp->queue_in, &input_idx,
> fgt.frame);
> - av_assert0(input_status == AVERROR_EOF && input_idx ==
> ifp->index);
> - }
> -
> - // signal to the main thread that we are done
> - ret = tq_send(fgp->queue_out, fg->nb_outputs, fgt.frame);
> - if (ret < 0) {
> - if (ret == AVERROR_EOF)
> - break;
> -
> - av_log(fg, AV_LOG_ERROR, "Error communicating with the main
> thread\n");
> + } else if (ret < 0) {
> + av_log(fg, AV_LOG_ERROR, "Error sending frames to consumers:
> %s\n",
> + av_err2str(ret));
> goto finish;
> }
> }
>
> + for (unsigned i = 0; i < fg->nb_outputs; i++) {
> + OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
> +
> + if (fgt.eof_out[i])
> + continue;
> +
> + ret = fg_output_frame(ofp, &fgt, NULL);
> + if (ret < 0)
> + goto finish;
> + }
> +
> finish:
> // EOF is normal termination
> if (ret == AVERROR_EOF)
> ret = 0;
>
> - for (int i = 0; i <= fg->nb_inputs; i++)
> - tq_receive_finish(fgp->queue_in, i);
> - for (int i = 0; i <= fg->nb_outputs; i++)
> - tq_send_finish(fgp->queue_out, i);
> -
> fg_thread_uninit(&fgt);
>
> - av_log(fg, AV_LOG_VERBOSE, "Terminating filtering thread\n");
> -
> return (void*)(intptr_t)ret;
> }
>
> -static int thread_send_frame(FilterGraphPriv *fgp, InputFilter *ifilter,
> - AVFrame *frame, enum FrameOpaque type)
> -{
> - InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
> - int output_idx, ret;
> -
> - if (ifp->eof) {
> - av_frame_unref(frame);
> - return AVERROR_EOF;
> - }
> -
> - frame->opaque = (void*)(intptr_t)type;
> -
> - ret = tq_send(fgp->queue_in, ifp->index, frame);
> - if (ret < 0) {
> - ifp->eof = 1;
> - av_frame_unref(frame);
> - return ret;
> - }
> -
> - if (type == FRAME_OPAQUE_EOF)
> - tq_send_finish(fgp->queue_in, ifp->index);
> -
> - // wait for the frame to be processed
> - ret = tq_receive(fgp->queue_out, &output_idx, frame);
> - av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF);
> -
> - return ret;
> -}
> -
> -int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int
> keep_reference)
> -{
> - FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
> - int ret;
> -
> - if (keep_reference) {
> - ret = av_frame_ref(fgp->frame, frame);
> - if (ret < 0)
> - return ret;
> - } else
> - av_frame_move_ref(fgp->frame, frame);
> -
> - return thread_send_frame(fgp, ifilter, fgp->frame, 0);
> -}
> -
> -int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb)
> -{
> - FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
> - int ret;
> -
> - fgp->frame->pts = pts;
> - fgp->frame->time_base = tb;
> -
> - ret = thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_EOF);
> -
> - return ret == AVERROR_EOF ? 0 : ret;
> -}
> -
> -void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts,
> AVRational tb)
> -{
> - FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
> -
> - fgp->frame->pts = pts;
> - fgp->frame->time_base = tb;
> -
> - thread_send_frame(fgp, ifilter, fgp->frame,
> FRAME_OPAQUE_SUB_HEARTBEAT);
> -}
> -
> -int fg_transcode_step(FilterGraph *graph, InputStream **best_ist)
> -{
> - FilterGraphPriv *fgp = fgp_from_fg(graph);
> - int ret, got_frames = 0;
> -
> - if (fgp->eof_in)
> - return AVERROR_EOF;
> -
> - // signal to the filtering thread to return all frames it can
> - av_assert0(!fgp->frame->buf[0]);
> - fgp->frame->opaque = (void*)(intptr_t)(best_ist ?
> - FRAME_OPAQUE_CHOOSE_INPUT :
> - FRAME_OPAQUE_REAP_FILTERS);
> -
> - ret = tq_send(fgp->queue_in, graph->nb_inputs, fgp->frame);
> - if (ret < 0) {
> - fgp->eof_in = 1;
> - goto finish;
> - }
> -
> - while (1) {
> - OutputFilter *ofilter;
> - OutputFilterPriv *ofp;
> - OutputStream *ost;
> - int output_idx;
> -
> - ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame);
> -
> - // EOF on the whole queue or the control stream
> - if (output_idx < 0 ||
> - (ret < 0 && output_idx == graph->nb_outputs))
> - goto finish;
> -
> - // EOF for a specific stream
> - if (ret < 0) {
> - ofilter = graph->outputs[output_idx];
> - ofp = ofp_from_ofilter(ofilter);
> -
> - // we are finished and no frames were ever seen at this
> output,
> - // at least initialize the encoder with a dummy frame
> - if (!ofp->got_frame) {
> - AVFrame *frame = fgp->frame;
> - FrameData *fd;
> -
> - frame->time_base = ofp->tb_out;
> - frame->format = ofp->format;
> -
> - frame->width = ofp->width;
> - frame->height = ofp->height;
> - frame->sample_aspect_ratio = ofp->sample_aspect_ratio;
> -
> - frame->sample_rate = ofp->sample_rate;
> - if (ofp->ch_layout.nb_channels) {
> - ret = av_channel_layout_copy(&frame->ch_layout,
> &ofp->ch_layout);
> - if (ret < 0)
> - return ret;
> - }
> -
> - fd = frame_data(frame);
> - if (!fd)
> - return AVERROR(ENOMEM);
> -
> - fd->frame_rate_filter = ofp->fps.framerate;
> -
> - av_assert0(!frame->buf[0]);
> -
> - av_log(ofilter->ost, AV_LOG_WARNING,
> - "No filtered frames for output stream, trying to "
> - "initialize anyway.\n");
> -
> - enc_open(ofilter->ost, frame);
> - av_frame_unref(frame);
> - }
> -
> - close_output_stream(graph->outputs[output_idx]->ost);
> - continue;
> - }
> -
> - // request was fully processed by the filtering thread,
> - // return the input stream to read from, if needed
> - if (output_idx == graph->nb_outputs) {
> - int input_idx = (intptr_t)fgp->frame->opaque - 1;
> - av_assert0(input_idx <= graph->nb_inputs);
> -
> - if (best_ist) {
> - *best_ist = (input_idx >= 0 && input_idx <
> graph->nb_inputs) ?
> -
> ifp_from_ifilter(graph->inputs[input_idx])->ist : NULL;
> -
> - if (input_idx < 0 && !got_frames) {
> - for (int i = 0; i < graph->nb_outputs; i++)
> - graph->outputs[i]->ost->unavailable = 1;
> - }
> - }
> - break;
> - }
> -
> - // got a frame from the filtering thread, send it for encoding
> - ofilter = graph->outputs[output_idx];
> - ost = ofilter->ost;
> - ofp = ofp_from_ofilter(ofilter);
> -
> - if (ost->finished) {
> - av_frame_unref(fgp->frame);
> - tq_receive_finish(fgp->queue_out, output_idx);
> - continue;
> - }
> -
> - if (fgp->frame->pts != AV_NOPTS_VALUE) {
> - ofilter->last_pts = av_rescale_q(fgp->frame->pts,
> - fgp->frame->time_base,
> - AV_TIME_BASE_Q);
> - }
> -
> - ret = enc_frame(ost, fgp->frame);
> - av_frame_unref(fgp->frame);
> - if (ret < 0)
> - goto finish;
> -
> - ofp->got_frame = 1;
> - got_frames = 1;
> - }
> -
> -finish:
> - if (ret < 0) {
> - fgp->eof_in = 1;
> - for (int i = 0; i < graph->nb_outputs; i++)
> - close_output_stream(graph->outputs[i]->ost);
> - }
> -
> - return ret;
> -}
> -
> -int reap_filters(FilterGraph *fg, int flush)
> -{
> - return fg_transcode_step(fg, NULL);
> -}
> -
> void fg_send_command(FilterGraph *fg, double time, const char *target,
> const char *command, const char *arg, int
> all_filters)
> {
> FilterGraphPriv *fgp = fgp_from_fg(fg);
> AVBufferRef *buf;
> FilterCommand *fc;
> - int output_idx, ret;
> -
> - if (!fgp->queue_in)
> - return;
>
> fc = av_mallocz(sizeof(*fc));
> if (!fc)
> @@ -3180,13 +2862,5 @@ void fg_send_command(FilterGraph *fg, double time,
> const char *target,
> fgp->frame->buf[0] = buf;
> fgp->frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SEND_COMMAND;
>
> - ret = tq_send(fgp->queue_in, fg->nb_inputs, fgp->frame);
> - if (ret < 0) {
> - av_frame_unref(fgp->frame);
> - return;
> - }
> -
> - // wait for the frame to be processed
> - ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame);
> - av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF);
> + sch_filter_command(fgp->sch, fgp->sch_idx, fgp->frame);
> }
> diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c
> index ef5c2f60e0..067dc65d4e 100644
> --- a/fftools/ffmpeg_mux.c
> +++ b/fftools/ffmpeg_mux.c
> @@ -23,16 +23,13 @@
> #include "ffmpeg.h"
> #include "ffmpeg_mux.h"
> #include "ffmpeg_utils.h"
> -#include "objpool.h"
> #include "sync_queue.h"
> -#include "thread_queue.h"
>
> #include "libavutil/fifo.h"
> #include "libavutil/intreadwrite.h"
> #include "libavutil/log.h"
> #include "libavutil/mem.h"
> #include "libavutil/timestamp.h"
> -#include "libavutil/thread.h"
>
> #include "libavcodec/packet.h"
>
> @@ -41,10 +38,9 @@
>
> typedef struct MuxThreadContext {
> AVPacket *pkt;
> + AVPacket *fix_sub_duration_pkt;
> } MuxThreadContext;
>
> -int want_sdp = 1;
> -
> static Muxer *mux_from_of(OutputFile *of)
> {
> return (Muxer*)of;
> @@ -207,14 +203,41 @@ static int sync_queue_process(Muxer *mux,
> OutputStream *ost, AVPacket *pkt, int
> return 0;
> }
>
> +static int of_streamcopy(OutputStream *ost, AVPacket *pkt);
> +
> /* apply the output bitstream filters */
> -static int mux_packet_filter(Muxer *mux, OutputStream *ost,
> - AVPacket *pkt, int *stream_eof)
> +static int mux_packet_filter(Muxer *mux, MuxThreadContext *mt,
> + OutputStream *ost, AVPacket *pkt, int
> *stream_eof)
> {
> MuxStream *ms = ms_from_ost(ost);
> const char *err_msg;
> int ret = 0;
>
> + if (pkt && !ost->enc) {
> + ret = of_streamcopy(ost, pkt);
> + if (ret == AVERROR(EAGAIN))
> + return 0;
> + else if (ret == AVERROR_EOF) {
> + av_packet_unref(pkt);
> + pkt = NULL;
> + ret = 0;
> + } else if (ret < 0)
> + goto fail;
> + }
> +
> + // emit heartbeat for -fix_sub_duration;
> + // we are only interested in heartbeats on on random access points.
> + if (pkt && (pkt->flags & AV_PKT_FLAG_KEY)) {
> + mt->fix_sub_duration_pkt->opaque =
> (void*)(intptr_t)PKT_OPAQUE_FIX_SUB_DURATION;
> + mt->fix_sub_duration_pkt->pts = pkt->pts;
> + mt->fix_sub_duration_pkt->time_base = pkt->time_base;
> +
> + ret = sch_mux_sub_heartbeat(mux->sch, mux->sch_idx, ms->sch_idx,
> + mt->fix_sub_duration_pkt);
> + if (ret < 0)
> + goto fail;
> + }
> +
> if (ms->bsf_ctx) {
> int bsf_eof = 0;
>
> @@ -278,6 +301,7 @@ static void thread_set_name(OutputFile *of)
> static void mux_thread_uninit(MuxThreadContext *mt)
> {
> av_packet_free(&mt->pkt);
> + av_packet_free(&mt->fix_sub_duration_pkt);
>
> memset(mt, 0, sizeof(*mt));
> }
> @@ -290,6 +314,10 @@ static int mux_thread_init(MuxThreadContext *mt)
> if (!mt->pkt)
> goto fail;
>
> + mt->fix_sub_duration_pkt = av_packet_alloc();
> + if (!mt->fix_sub_duration_pkt)
> + goto fail;
> +
> return 0;
>
> fail:
> @@ -316,19 +344,22 @@ void *muxer_thread(void *arg)
> OutputStream *ost;
> int stream_idx, stream_eof = 0;
>
> - ret = tq_receive(mux->tq, &stream_idx, mt.pkt);
> + ret = sch_mux_receive(mux->sch, of->index, mt.pkt);
> + stream_idx = mt.pkt->stream_index;
> if (stream_idx < 0) {
> av_log(mux, AV_LOG_VERBOSE, "All streams finished\n");
> ret = 0;
> break;
> }
>
> - ost = of->streams[stream_idx];
> - ret = mux_packet_filter(mux, ost, ret < 0 ? NULL : mt.pkt,
> &stream_eof);
> + ost = of->streams[mux->sch_stream_idx[stream_idx]];
> + mt.pkt->stream_index = ost->index;
> +
> + ret = mux_packet_filter(mux, &mt, ost, ret < 0 ? NULL : mt.pkt,
> &stream_eof);
> av_packet_unref(mt.pkt);
> if (ret == AVERROR_EOF) {
> if (stream_eof) {
> - tq_receive_finish(mux->tq, stream_idx);
> + sch_mux_receive_finish(mux->sch, of->index, stream_idx);
> } else {
> av_log(mux, AV_LOG_VERBOSE, "Muxer returned EOF\n");
> ret = 0;
> @@ -343,243 +374,55 @@ void *muxer_thread(void *arg)
> finish:
> mux_thread_uninit(&mt);
>
> - for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
> - tq_receive_finish(mux->tq, i);
> -
> - av_log(mux, AV_LOG_VERBOSE, "Terminating muxer thread\n");
> -
> return (void*)(intptr_t)ret;
> }
>
> -static int thread_submit_packet(Muxer *mux, OutputStream *ost, AVPacket
> *pkt)
> -{
> - int ret = 0;
> -
> - if (!pkt || ost->finished & MUXER_FINISHED)
> - goto finish;
> -
> - ret = tq_send(mux->tq, ost->index, pkt);
> - if (ret < 0)
> - goto finish;
> -
> - return 0;
> -
> -finish:
> - if (pkt)
> - av_packet_unref(pkt);
> -
> - ost->finished |= MUXER_FINISHED;
> - tq_send_finish(mux->tq, ost->index);
> - return ret == AVERROR_EOF ? 0 : ret;
> -}
> -
> -static int queue_packet(OutputStream *ost, AVPacket *pkt)
> -{
> - MuxStream *ms = ms_from_ost(ost);
> - AVPacket *tmp_pkt = NULL;
> - int ret;
> -
> - if (!av_fifo_can_write(ms->muxing_queue)) {
> - size_t cur_size = av_fifo_can_read(ms->muxing_queue);
> - size_t pkt_size = pkt ? pkt->size : 0;
> - unsigned int are_we_over_size =
> - (ms->muxing_queue_data_size + pkt_size) >
> ms->muxing_queue_data_threshold;
> - size_t limit = are_we_over_size ? ms->max_muxing_queue_size :
> SIZE_MAX;
> - size_t new_size = FFMIN(2 * cur_size, limit);
> -
> - if (new_size <= cur_size) {
> - av_log(ost, AV_LOG_ERROR,
> - "Too many packets buffered for output stream %d:%d.\n",
> - ost->file_index, ost->st->index);
> - return AVERROR(ENOSPC);
> - }
> - ret = av_fifo_grow2(ms->muxing_queue, new_size - cur_size);
> - if (ret < 0)
> - return ret;
> - }
> -
> - if (pkt) {
> - ret = av_packet_make_refcounted(pkt);
> - if (ret < 0)
> - return ret;
> -
> - tmp_pkt = av_packet_alloc();
> - if (!tmp_pkt)
> - return AVERROR(ENOMEM);
> -
> - av_packet_move_ref(tmp_pkt, pkt);
> - ms->muxing_queue_data_size += tmp_pkt->size;
> - }
> - av_fifo_write(ms->muxing_queue, &tmp_pkt, 1);
> -
> - return 0;
> -}
> -
> -static int submit_packet(Muxer *mux, AVPacket *pkt, OutputStream *ost)
> -{
> - int ret;
> -
> - if (mux->tq) {
> - return thread_submit_packet(mux, ost, pkt);
> - } else {
> - /* the muxer is not initialized yet, buffer the packet */
> - ret = queue_packet(ost, pkt);
> - if (ret < 0) {
> - if (pkt)
> - av_packet_unref(pkt);
> - return ret;
> - }
> - }
> -
> - return 0;
> -}
> -
> -int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
> -{
> - Muxer *mux = mux_from_of(of);
> - int ret = 0;
> -
> - if (pkt && pkt->dts != AV_NOPTS_VALUE)
> - ost->last_mux_dts = av_rescale_q(pkt->dts, pkt->time_base,
> AV_TIME_BASE_Q);
> -
> - ret = submit_packet(mux, pkt, ost);
> - if (ret < 0) {
> - av_log(ost, AV_LOG_ERROR, "Error submitting a packet to the
> muxer: %s",
> - av_err2str(ret));
> - return ret;
> - }
> -
> - return 0;
> -}
> -
> -int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts)
> +static int of_streamcopy(OutputStream *ost, AVPacket *pkt)
> {
> OutputFile *of = output_files[ost->file_index];
> MuxStream *ms = ms_from_ost(ost);
> + DemuxPktData *pd = pkt->opaque_ref ?
> (DemuxPktData*)pkt->opaque_ref->data : NULL;
> + int64_t dts = pd ? pd->dts_est : AV_NOPTS_VALUE;
> int64_t start_time = (of->start_time == AV_NOPTS_VALUE) ? 0 :
> of->start_time;
> int64_t ts_offset;
> - AVPacket *opkt = ms->pkt;
> - int ret;
> -
> - av_packet_unref(opkt);
>
> if (of->recording_time != INT64_MAX &&
> dts >= of->recording_time + start_time)
> - pkt = NULL;
> -
> - // EOF: flush output bitstream filters.
> - if (!pkt)
> - return of_output_packet(of, ost, NULL);
> + return AVERROR_EOF;
>
> if (!ms->streamcopy_started && !(pkt->flags & AV_PKT_FLAG_KEY) &&
> !ms->copy_initial_nonkeyframes)
> - return 0;
> + return AVERROR(EAGAIN);
>
> if (!ms->streamcopy_started) {
> if (!ms->copy_prior_start &&
> (pkt->pts == AV_NOPTS_VALUE ?
> dts < ms->ts_copy_start :
> pkt->pts < av_rescale_q(ms->ts_copy_start, AV_TIME_BASE_Q,
> pkt->time_base)))
> - return 0;
> + return AVERROR(EAGAIN);
>
> if (of->start_time != AV_NOPTS_VALUE && dts < of->start_time)
> - return 0;
> + return AVERROR(EAGAIN);
> }
>
> - ret = av_packet_ref(opkt, pkt);
> - if (ret < 0)
> - return ret;
> -
> - ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, opkt->time_base);
> + ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, pkt->time_base);
>
> if (pkt->pts != AV_NOPTS_VALUE)
> - opkt->pts -= ts_offset;
> + pkt->pts -= ts_offset;
>
> if (pkt->dts == AV_NOPTS_VALUE) {
> - opkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, opkt->time_base);
> + pkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, pkt->time_base);
> } else if (ost->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
> - opkt->pts = opkt->dts - ts_offset;
> - }
> - opkt->dts -= ts_offset;
> -
> - {
> - int ret = trigger_fix_sub_duration_heartbeat(ost, pkt);
> - if (ret < 0) {
> - av_log(NULL, AV_LOG_ERROR,
> - "Subtitle heartbeat logic failed in %s! (%s)\n",
> - __func__, av_err2str(ret));
> - return ret;
> - }
> + pkt->pts = pkt->dts - ts_offset;
> }
>
> - ret = of_output_packet(of, ost, opkt);
> - if (ret < 0)
> - return ret;
> + pkt->dts -= ts_offset;
>
> ms->streamcopy_started = 1;
>
> return 0;
> }
>
> -static int thread_stop(Muxer *mux)
> -{
> - void *ret;
> -
> - if (!mux || !mux->tq)
> - return 0;
> -
> - for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
> - tq_send_finish(mux->tq, i);
> -
> - pthread_join(mux->thread, &ret);
> -
> - tq_free(&mux->tq);
> -
> - return (int)(intptr_t)ret;
> -}
> -
> -static int thread_start(Muxer *mux)
> -{
> - AVFormatContext *fc = mux->fc;
> - ObjPool *op;
> - int ret;
> -
> - op = objpool_alloc_packets();
> - if (!op)
> - return AVERROR(ENOMEM);
> -
> - mux->tq = tq_alloc(fc->nb_streams, mux->thread_queue_size, op,
> pkt_move);
> - if (!mux->tq) {
> - objpool_free(&op);
> - return AVERROR(ENOMEM);
> - }
> -
> - ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)mux);
> - if (ret) {
> - tq_free(&mux->tq);
> - return AVERROR(ret);
> - }
> -
> - /* flush the muxing queues */
> - for (int i = 0; i < fc->nb_streams; i++) {
> - OutputStream *ost = mux->of.streams[i];
> - MuxStream *ms = ms_from_ost(ost);
> - AVPacket *pkt;
> -
> - while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) {
> - ret = thread_submit_packet(mux, ost, pkt);
> - if (pkt) {
> - ms->muxing_queue_data_size -= pkt->size;
> - av_packet_free(&pkt);
> - }
> - if (ret < 0)
> - return ret;
> - }
> - }
> -
> - return 0;
> -}
> -
> int print_sdp(const char *filename);
>
> int print_sdp(const char *filename)
> @@ -590,11 +433,6 @@ int print_sdp(const char *filename)
> AVIOContext *sdp_pb;
> AVFormatContext **avc;
>
> - for (i = 0; i < nb_output_files; i++) {
> - if (!mux_from_of(output_files[i])->header_written)
> - return 0;
> - }
> -
> avc = av_malloc_array(nb_output_files, sizeof(*avc));
> if (!avc)
> return AVERROR(ENOMEM);
> @@ -629,25 +467,17 @@ int print_sdp(const char *filename)
> avio_closep(&sdp_pb);
> }
>
> - // SDP successfully written, allow muxer threads to start
> - ret = 1;
> -
> fail:
> av_freep(&avc);
> return ret;
> }
>
> -int mux_check_init(Muxer *mux)
> +int mux_check_init(void *arg)
> {
> + Muxer *mux = arg;
> OutputFile *of = &mux->of;
> AVFormatContext *fc = mux->fc;
> - int ret, i;
> -
> - for (i = 0; i < fc->nb_streams; i++) {
> - OutputStream *ost = of->streams[i];
> - if (!ost->initialized)
> - return 0;
> - }
> + int ret;
>
> ret = avformat_write_header(fc, &mux->opts);
> if (ret < 0) {
> @@ -659,27 +489,7 @@ int mux_check_init(Muxer *mux)
> mux->header_written = 1;
>
> av_dump_format(fc, of->index, fc->url, 1);
> - nb_output_dumped++;
> -
> - if (sdp_filename || want_sdp) {
> - ret = print_sdp(sdp_filename);
> - if (ret < 0) {
> - av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n");
> - return ret;
> - } else if (ret == 1) {
> - /* SDP is written only after all the muxers are ready, so now
> we
> - * start ALL the threads */
> - for (i = 0; i < nb_output_files; i++) {
> - ret = thread_start(mux_from_of(output_files[i]));
> - if (ret < 0)
> - return ret;
> - }
> - }
> - } else {
> - ret = thread_start(mux_from_of(of));
> - if (ret < 0)
> - return ret;
> - }
> + atomic_fetch_add(&nb_output_dumped, 1);
>
> return 0;
> }
> @@ -736,9 +546,10 @@ int of_stream_init(OutputFile *of, OutputStream *ost)
> ost->st->time_base);
> }
>
> - ost->initialized = 1;
> + if (ms->sch_idx >= 0)
> + return sch_mux_stream_ready(mux->sch, of->index, ms->sch_idx);
>
> - return mux_check_init(mux);
> + return 0;
> }
>
> static int check_written(OutputFile *of)
> @@ -852,15 +663,13 @@ int of_write_trailer(OutputFile *of)
> AVFormatContext *fc = mux->fc;
> int ret, mux_result = 0;
>
> - if (!mux->tq) {
> + if (!mux->header_written) {
> av_log(mux, AV_LOG_ERROR,
> "Nothing was written into output file, because "
> "at least one of its streams received no packets.\n");
> return AVERROR(EINVAL);
> }
>
> - mux_result = thread_stop(mux);
> -
> ret = av_write_trailer(fc);
> if (ret < 0) {
> av_log(mux, AV_LOG_ERROR, "Error writing trailer: %s\n",
> av_err2str(ret));
> @@ -905,13 +714,6 @@ static void ost_free(OutputStream **post)
> ost->logfile = NULL;
> }
>
> - if (ms->muxing_queue) {
> - AVPacket *pkt;
> - while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0)
> - av_packet_free(&pkt);
> - av_fifo_freep2(&ms->muxing_queue);
> - }
> -
> avcodec_parameters_free(&ost->par_in);
>
> av_bsf_free(&ms->bsf_ctx);
> @@ -976,8 +778,6 @@ void of_free(OutputFile **pof)
> return;
> mux = mux_from_of(of);
>
> - thread_stop(mux);
> -
> sq_free(&of->sq_encode);
> sq_free(&mux->sq_mux);
>
> diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h
> index eee2b2cb07..5d7cf3fa76 100644
> --- a/fftools/ffmpeg_mux.h
> +++ b/fftools/ffmpeg_mux.h
> @@ -25,7 +25,6 @@
> #include <stdint.h>
>
> #include "ffmpeg_sched.h"
> -#include "thread_queue.h"
>
> #include "libavformat/avformat.h"
>
> @@ -33,7 +32,6 @@
>
> #include "libavutil/dict.h"
> #include "libavutil/fifo.h"
> -#include "libavutil/thread.h"
>
> typedef struct MuxStream {
> OutputStream ost;
> @@ -41,9 +39,6 @@ typedef struct MuxStream {
> // name used for logging
> char log_name[32];
>
> - /* the packets are buffered here until the muxer is ready to be
> initialized */
> - AVFifo *muxing_queue;
> -
> AVBSFContext *bsf_ctx;
> AVPacket *bsf_pkt;
>
> @@ -57,17 +52,6 @@ typedef struct MuxStream {
>
> int64_t max_frames;
>
> - /*
> - * The size of the AVPackets' buffers in queue.
> - * Updated when a packet is either pushed or pulled from the queue.
> - */
> - size_t muxing_queue_data_size;
> -
> - int max_muxing_queue_size;
> -
> - /* Threshold after which max_muxing_queue_size will be in effect */
> - size_t muxing_queue_data_threshold;
> -
> // timestamp from which the streamcopied streams should start,
> // in AV_TIME_BASE_Q;
> // everything before it should be discarded
> @@ -106,9 +90,6 @@ typedef struct Muxer {
> int *sch_stream_idx;
> int nb_sch_stream_idx;
>
> - pthread_t thread;
> - ThreadQueue *tq;
> -
> AVDictionary *opts;
>
> int thread_queue_size;
> @@ -122,10 +103,7 @@ typedef struct Muxer {
> AVPacket *sq_pkt;
> } Muxer;
>
> -/* whether we want to print an SDP, set in of_open() */
> -extern int want_sdp;
> -
> -int mux_check_init(Muxer *mux);
> +int mux_check_init(void *arg);
>
> static MuxStream *ms_from_ost(OutputStream *ost)
> {
> diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c
> index 534b4379c7..6459296ab0 100644
> --- a/fftools/ffmpeg_mux_init.c
> +++ b/fftools/ffmpeg_mux_init.c
> @@ -924,13 +924,6 @@ static int new_stream_audio(Muxer *mux, const
> OptionsContext *o,
> return 0;
> }
>
> -static int new_stream_attachment(Muxer *mux, const OptionsContext *o,
> - OutputStream *ost)
> -{
> - ost->finished = 1;
> - return 0;
> -}
> -
> static int new_stream_subtitle(Muxer *mux, const OptionsContext *o,
> OutputStream *ost)
> {
> @@ -1168,9 +1161,6 @@ static int ost_add(Muxer *mux, const OptionsContext
> *o, enum AVMediaType type,
> if (!ost->par_in)
> return AVERROR(ENOMEM);
>
> - ms->muxing_queue = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
> - if (!ms->muxing_queue)
> - return AVERROR(ENOMEM);
> ms->last_mux_dts = AV_NOPTS_VALUE;
>
> ost->st = st;
> @@ -1190,7 +1180,8 @@ static int ost_add(Muxer *mux, const OptionsContext
> *o, enum AVMediaType type,
> if (!ost->enc_ctx)
> return AVERROR(ENOMEM);
>
> - ret = sch_add_enc(mux->sch, encoder_thread, ost, NULL);
> + ret = sch_add_enc(mux->sch, encoder_thread, ost,
> + ost->type == AVMEDIA_TYPE_SUBTITLE ? NULL :
> enc_open);
> if (ret < 0)
> return ret;
> ms->sch_idx_enc = ret;
> @@ -1414,9 +1405,6 @@ static int ost_add(Muxer *mux, const OptionsContext
> *o, enum AVMediaType type,
>
> sch_mux_stream_buffering(mux->sch, mux->sch_idx, ms->sch_idx,
> max_muxing_queue_size,
> muxing_queue_data_threshold);
> -
> - ms->max_muxing_queue_size = max_muxing_queue_size;
> - ms->muxing_queue_data_threshold = muxing_queue_data_threshold;
> }
>
> MATCH_PER_STREAM_OPT(bits_per_raw_sample, i, ost->bits_per_raw_sample,
> @@ -1434,8 +1422,6 @@ static int ost_add(Muxer *mux, const OptionsContext
> *o, enum AVMediaType type,
> if (ost->enc_ctx &&
> av_get_exact_bits_per_sample(ost->enc_ctx->codec_id) == 24)
> av_dict_set(&ost->swr_opts, "output_sample_bits", "24", 0);
>
> - ost->last_mux_dts = AV_NOPTS_VALUE;
> -
> MATCH_PER_STREAM_OPT(copy_initial_nonkeyframes, i,
> ms->copy_initial_nonkeyframes, oc, st);
>
> @@ -1443,7 +1429,6 @@ static int ost_add(Muxer *mux, const OptionsContext
> *o, enum AVMediaType type,
> case AVMEDIA_TYPE_VIDEO: ret = new_stream_video (mux, o,
> ost); break;
> case AVMEDIA_TYPE_AUDIO: ret = new_stream_audio (mux, o,
> ost); break;
> case AVMEDIA_TYPE_SUBTITLE: ret = new_stream_subtitle (mux, o,
> ost); break;
> - case AVMEDIA_TYPE_ATTACHMENT: ret = new_stream_attachment(mux, o,
> ost); break;
> }
> if (ret < 0)
> return ret;
> @@ -1938,7 +1923,6 @@ static int setup_sync_queues(Muxer *mux,
> AVFormatContext *oc, int64_t buf_size_u
> MuxStream *ms = ms_from_ost(ost);
> enum AVMediaType type = ost->type;
>
> - ost->sq_idx_encode = -1;
> ost->sq_idx_mux = -1;
>
> nb_interleaved += IS_INTERLEAVED(type);
> @@ -1961,11 +1945,17 @@ static int setup_sync_queues(Muxer *mux,
> AVFormatContext *oc, int64_t buf_size_u
> * - at least one encoded audio/video stream is frame-limited, since
> * that has similar semantics to 'shortest'
> * - at least one audio encoder requires constant frame sizes
> + *
> + * Note that encoding sync queues are handled in the scheduler,
> because
> + * different encoders run in different threads and need external
> + * synchronization, while muxer sync queues can be handled inside the
> muxer
> */
> if ((of->shortest && nb_av_enc > 1) || limit_frames_av_enc ||
> nb_audio_fs) {
> - of->sq_encode = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, mux);
> - if (!of->sq_encode)
> - return AVERROR(ENOMEM);
> + int sq_idx, ret;
> +
> + sq_idx = sch_add_sq_enc(mux->sch, buf_size_us, mux);
> + if (sq_idx < 0)
> + return sq_idx;
>
> for (int i = 0; i < oc->nb_streams; i++) {
> OutputStream *ost = of->streams[i];
> @@ -1975,13 +1965,11 @@ static int setup_sync_queues(Muxer *mux,
> AVFormatContext *oc, int64_t buf_size_u
> if (!IS_AV_ENC(ost, type))
> continue;
>
> - ost->sq_idx_encode = sq_add_stream(of->sq_encode,
> - of->shortest ||
> ms->max_frames < INT64_MAX);
> - if (ost->sq_idx_encode < 0)
> - return ost->sq_idx_encode;
> -
> - if (ms->max_frames != INT64_MAX)
> - sq_limit_frames(of->sq_encode, ost->sq_idx_encode,
> ms->max_frames);
> + ret = sch_sq_add_enc(mux->sch, sq_idx, ms->sch_idx_enc,
> + of->shortest || ms->max_frames <
> INT64_MAX,
> + ms->max_frames);
> + if (ret < 0)
> + return ret;
> }
> }
>
> @@ -2652,23 +2640,6 @@ static int validate_enc_avopt(Muxer *mux, const
> AVDictionary *codec_avopt)
> return 0;
> }
>
> -static int init_output_stream_nofilter(OutputStream *ost)
> -{
> - int ret = 0;
> -
> - if (ost->enc_ctx) {
> - ret = enc_open(ost, NULL);
> - if (ret < 0)
> - return ret;
> - } else {
> - ret = of_stream_init(output_files[ost->file_index], ost);
> - if (ret < 0)
> - return ret;
> - }
> -
> - return ret;
> -}
> -
> static const char *output_file_item_name(void *obj)
> {
> const Muxer *mux = obj;
> @@ -2751,8 +2722,6 @@ int of_open(const OptionsContext *o, const char
> *filename, Scheduler *sch)
> av_strlcat(mux->log_name, "/", sizeof(mux->log_name));
> av_strlcat(mux->log_name, oc->oformat->name, sizeof(mux->log_name));
>
> - if (strcmp(oc->oformat->name, "rtp"))
> - want_sdp = 0;
>
> of->format = oc->oformat;
> if (recording_time != INT64_MAX)
> @@ -2768,7 +2737,7 @@ int of_open(const OptionsContext *o, const char
> *filename, Scheduler *sch)
> AVFMT_FLAG_BITEXACT);
> }
>
> - err = sch_add_mux(sch, muxer_thread, NULL, mux,
> + err = sch_add_mux(sch, muxer_thread, mux_check_init, mux,
> !strcmp(oc->oformat->name, "rtp"));
> if (err < 0)
> return err;
> @@ -2854,26 +2823,15 @@ int of_open(const OptionsContext *o, const char
> *filename, Scheduler *sch)
>
> of->url = filename;
>
> - /* initialize stream copy and subtitle/data streams.
> - * Encoded AVFrame based streams will get initialized when the first
> AVFrame
> - * is received in do_video_out
> - */
> + /* initialize streamcopy streams. */
> for (int i = 0; i < of->nb_streams; i++) {
> OutputStream *ost = of->streams[i];
>
> - if (ost->filter)
> - continue;
> -
> - err = init_output_stream_nofilter(ost);
> - if (err < 0)
> - return err;
> - }
> -
> - /* write the header for files with no streams */
> - if (of->format->flags & AVFMT_NOSTREAMS && oc->nb_streams == 0) {
> - int ret = mux_check_init(mux);
> - if (ret < 0)
> - return ret;
> + if (!ost->enc) {
> + err = of_stream_init(of, ost);
> + if (err < 0)
> + return err;
> + }
> }
>
> return 0;
> diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
> index d463306546..6177a96a4e 100644
> --- a/fftools/ffmpeg_opt.c
> +++ b/fftools/ffmpeg_opt.c
> @@ -64,7 +64,6 @@ const char *const opt_name_top_field_first[]
> = {"top", NULL};
> HWDevice *filter_hw_device;
>
> char *vstats_filename;
> -char *sdp_filename;
>
> float audio_drift_threshold = 0.1;
> float dts_delta_threshold = 10;
> @@ -580,9 +579,8 @@ fail:
>
> static int opt_sdp_file(void *optctx, const char *opt, const char *arg)
> {
> - av_free(sdp_filename);
> - sdp_filename = av_strdup(arg);
> - return 0;
> + Scheduler *sch = optctx;
> + return sch_sdp_filename(sch, arg);
> }
>
> #if CONFIG_VAAPI
> diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> index 957a410921..bc9b833799 100644
> --- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> +++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> @@ -1,48 +1,40 @@
> 1
> -00:00:00,968 --> 00:00:01,001
> +00:00:00,968 --> 00:00:01,168
> <font face="Monospace">{\an7}(</font>
>
> 2
> -00:00:01,001 --> 00:00:01,168
> -<font face="Monospace">{\an7}(</font>
> -
> -3
> 00:00:01,168 --> 00:00:01,368
> <font face="Monospace">{\an7}(<i> inaudibl</i></font>
>
> -4
> +3
> 00:00:01,368 --> 00:00:01,568
> <font face="Monospace">{\an7}(<i> inaudible radio chat</i></font>
>
> -5
> +4
> 00:00:01,568 --> 00:00:02,002
> <font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font>
>
> +5
> +00:00:02,002 --> 00:00:03,103
> +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font>
> +
> 6
> -00:00:02,002 --> 00:00:03,003
> -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font>
> -
> -7
> -00:00:03,003 --> 00:00:03,103
> -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font>
> -
> -8
> 00:00:03,103 --> 00:00:03,303
> -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
> +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
> >></font>
>
> -9
> +7
> 00:00:03,303 --> 00:00:03,503
> -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
> +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
> >> Safety rema</font>
>
> -10
> +8
> 00:00:03,504 --> 00:00:03,704
> -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
> +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
> >> Safety remains our numb</font>
>
> -11
> +9
> 00:00:03,704 --> 00:00:04,004
> -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
> +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
> >> Safety remains our number one</font>
>
> --
> 2.42.0
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel at ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request at ffmpeg.org with subject "unsubscribe".
>
More information about the ffmpeg-devel
mailing list