[FFmpeg-devel] [PATCH 23/27] WIP fftools/ffmpeg_dec: convert to the scheduler
Anton Khirnov
anton at khirnov.net
Tue Sep 19 22:10:50 EEST 2023
---
fftools/ffmpeg.c | 9 +--
fftools/ffmpeg.h | 11 ---
fftools/ffmpeg_dec.c | 189 +++++++++----------------------------------
3 files changed, 42 insertions(+), 167 deletions(-)
diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index 00e57c4382..a09a9e1200 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -810,11 +810,6 @@ static int process_input_packet(InputStream *ist, const AVPacket *pkt, int no_eo
int ret = 0;
int eof_reached = 0;
- if (ist->decoding_needed) {
- ret = dec_packet(ist, pkt, no_eof);
- if (ret < 0 && ret != AVERROR_EOF)
- return ret;
- }
if (ret == AVERROR_EOF || (!pkt && !ist->decoding_needed))
eof_reached = 1;
@@ -1036,6 +1031,7 @@ static void reset_eagain(void)
ost->unavailable = 0;
}
+#if 0
static void decode_flush(InputFile *ifile)
{
for (int i = 0; i < ifile->nb_streams; i++) {
@@ -1047,6 +1043,7 @@ static void decode_flush(InputFile *ifile)
dec_packet(ist, NULL, 1);
}
}
+#endif
/*
* Return
@@ -1063,11 +1060,13 @@ static int process_input(int file_index, AVPacket *pkt)
ret = 0;
+#if 0
if (ret == 1) {
/* the input file is looped: flush the decoders */
decode_flush(ifile);
return AVERROR(EAGAIN);
}
+#endif
if (ret < 0) {
if (ret != AVERROR_EOF) {
av_log(ifile, AV_LOG_ERROR,
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index 4646c05bea..841f8d0d68 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -815,17 +815,6 @@ int hwaccel_retrieve_data(AVCodecContext *avctx, AVFrame *input);
int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx);
void dec_free(Decoder **pdec);
-/**
- * Submit a packet for decoding
- *
- * When pkt==NULL and no_eof=0, there will be no more input. Flush decoders and
- * mark all downstreams as finished.
- *
- * When pkt==NULL and no_eof=1, the stream was reset (e.g. after a seek). Flush
- * decoders and await further input.
- */
-int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof);
-
int enc_alloc(Encoder **penc, const AVCodec *codec,
Scheduler *sch, unsigned sch_idx);
void enc_free(Encoder **penc);
diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c
index dc8d0374a3..400fa666b9 100644
--- a/fftools/ffmpeg_dec.c
+++ b/fftools/ffmpeg_dec.c
@@ -53,24 +53,6 @@ struct Decoder {
Scheduler *sch;
unsigned sch_idx;
-
- pthread_t thread;
- /**
- * Queue for sending coded packets from the main thread to
- * the decoder thread.
- *
- * An empty packet is sent to flush the decoder without terminating
- * decoding.
- */
- ThreadQueue *queue_in;
- /**
- * Queue for sending decoded frames from the decoder thread
- * to the main thread.
- *
- * An empty frame is sent to signal that a single packet has been fully
- * processed.
- */
- ThreadQueue *queue_out;
};
// data that is local to the decoder thread and not visible outside of it
@@ -79,24 +61,6 @@ typedef struct DecThreadContext {
AVPacket *pkt;
} DecThreadContext;
-static int dec_thread_stop(Decoder *d)
-{
- void *ret;
-
- if (!d->queue_in)
- return 0;
-
- tq_send_finish(d->queue_in, 0);
- tq_receive_finish(d->queue_out, 0);
-
- pthread_join(d->thread, &ret);
-
- tq_free(&d->queue_in);
- tq_free(&d->queue_out);
-
- return (intptr_t)ret;
-}
-
void dec_free(Decoder **pdec)
{
Decoder *dec = *pdec;
@@ -104,8 +68,6 @@ void dec_free(Decoder **pdec)
if (!dec)
return;
- dec_thread_stop(dec);
-
av_frame_free(&dec->frame);
av_packet_free(&dec->pkt);
@@ -147,25 +109,6 @@ fail:
return AVERROR(ENOMEM);
}
-static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
-{
- int i, ret = 0;
-
- for (i = 0; i < ist->nb_filters; i++) {
- ret = ifilter_send_frame(ist->filters[i], decoded_frame,
- i < ist->nb_filters - 1 ||
- ist->dec->type == AVMEDIA_TYPE_SUBTITLE);
- if (ret == AVERROR_EOF)
- ret = 0; /* ignore */
- if (ret < 0) {
- av_log(NULL, AV_LOG_ERROR,
- "Failed to inject frame into filter network: %s\n", av_err2str(ret));
- break;
- }
- }
- return ret;
-}
-
static AVRational audio_samplerate_update(void *logctx, Decoder *d,
const AVFrame *frame)
{
@@ -420,7 +363,8 @@ static int process_subtitle(InputStream *ist, AVFrame *frame)
if (!subtitle)
return 0;
- ret = send_frame_to_filters(ist, frame);
+ // XXX
+ //ret = send_frame_to_filters(ist, frame);
if (ret < 0)
return ret;
@@ -495,7 +439,7 @@ static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
ist->frames_decoded++;
- // XXX the queue for transferring data back to the main thread runs
+ // XXX the queue for transferring data to consumers runs
// on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put that
// inside the frame
// eventually, subtitles should be switched to use AVFrames natively
@@ -508,26 +452,11 @@ static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
frame->width = ist->dec_ctx->width;
frame->height = ist->dec_ctx->height;
- ret = tq_send(d->queue_out, 0, frame);
+ ret = sch_dec_send(d->sch, d->sch_idx, frame);
if (ret < 0)
av_frame_unref(frame);
- return ret;
-}
-
-static int send_filter_eof(InputStream *ist)
-{
- Decoder *d = ist->decoder;
- int i, ret;
-
- for (i = 0; i < ist->nb_filters; i++) {
- int64_t end_pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE :
- d->last_frame_pts + d->last_frame_duration_est;
- ret = ifilter_send_eof(ist->filters[i], end_pts, d->last_frame_tb);
- if (ret < 0)
- return ret;
- }
- return 0;
+ return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
}
static int packet_decode(InputStream *ist, const AVPacket *pkt, AVFrame *frame)
@@ -629,9 +558,9 @@ static int packet_decode(InputStream *ist, const AVPacket *pkt, AVFrame *frame)
ist->frames_decoded++;
- ret = tq_send(d->queue_out, 0, frame);
+ ret = sch_dec_send(d->sch, d->sch_idx, frame);
if (ret < 0)
- return ret;
+ return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
}
}
@@ -685,9 +614,9 @@ void *decoder_thread(void *arg)
dec_thread_set_name(ist);
while (!input_status) {
- int dummy, flush_buffers;
+ int flush_buffers;
- input_status = tq_receive(d->queue_in, &dummy, dt.pkt);
+ input_status = sch_dec_receive(d->sch, d->sch_idx, dt.pkt);
flush_buffers = input_status >= 0 && !dt.pkt->buf;
if (!dt.pkt->buf)
av_log(ist, AV_LOG_VERBOSE, "Decoder thread received %s packet\n",
@@ -698,6 +627,14 @@ void *decoder_thread(void *arg)
av_packet_unref(dt.pkt);
av_frame_unref(dt.frame);
+ // AVERROR_EOF - EOF from the decoder
+ // AVERROR_EXIT - EOF from the scheduler
+ // we treat them differently when flushing
+ if (ret == AVERROR_EXIT) {
+ ret = AVERROR_EOF;
+ flush_buffers = 0;
+ }
+
if (ret == AVERROR_EOF) {
av_log(ist, AV_LOG_VERBOSE, "Decoder returned EOF, %s\n",
flush_buffers ? "resetting" : "finishing");
@@ -725,23 +662,32 @@ void *decoder_thread(void *arg)
av_err2str(ret));
break;
}
-
- // signal to the consumer thread that the entire packet was processed
- ret = tq_send(d->queue_out, 0, dt.frame);
- if (ret < 0) {
- if (ret != AVERROR_EOF)
- av_log(ist, AV_LOG_ERROR, "Error communicating with the main thread\n");
- break;
- }
}
// EOF is normal thread termination
if (ret == AVERROR_EOF)
ret = 0;
+ // on success send EOF timestamp to our downstreams
+ if (ret >= 0) {
+ av_frame_unref(dt.frame);
+
+ dt.frame->pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE :
+ d->last_frame_pts + d->last_frame_duration_est;
+ dt.frame->time_base = d->last_frame_tb;
+
+ // XXX check EOF/EXIT handling
+ ret = sch_dec_send(d->sch, d->sch_idx, dt.frame);
+ if (ret < 0 && ret != AVERROR_EOF) {
+ av_log(NULL, AV_LOG_FATAL,
+ "Error signalling EOF timestamp: %s\n", av_err2str(ret));
+ goto finish;
+ }
+ ret = 0;
+ }
+
finish:
- tq_receive_finish(d->queue_in, 0);
- tq_send_finish (d->queue_out, 0);
+ sch_dec_send(d->sch, d->sch_idx, NULL);
#if 0
// make sure the demuxer does not get stuck waiting for audio durations
@@ -757,15 +703,12 @@ finish:
return (void*)(intptr_t)ret;
}
+#if 0
int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
{
Decoder *d = ist->decoder;
int ret = 0, thread_ret;
- // thread already joined
- if (!d->queue_in)
- return AVERROR_EOF;
-
// send the packet/flush request/EOF to the decoder thread
if (pkt || no_eof) {
av_packet_unref(d->pkt);
@@ -816,59 +759,10 @@ finish:
if (ret < 0 && ret != AVERROR_EOF)
return ret;
- // signal EOF to our downstreams
- ret = send_filter_eof(ist);
- if (ret < 0) {
- av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n");
- return ret;
- }
return AVERROR_EOF;
}
-
-static int dec_thread_start(InputStream *ist)
-{
- Decoder *d = ist->decoder;
- ObjPool *op;
- int ret = 0;
-
- op = objpool_alloc_packets();
- if (!op)
- return AVERROR(ENOMEM);
-
- d->queue_in = tq_alloc(1, 1, op, pkt_move);
- if (!d->queue_in) {
- objpool_free(&op);
- return AVERROR(ENOMEM);
- }
-
- op = objpool_alloc_frames();
- if (!op)
- goto fail;
-
- d->queue_out = tq_alloc(1, 4, op, frame_move);
- if (!d->queue_out) {
- objpool_free(&op);
- goto fail;
- }
-
- ret = pthread_create(&d->thread, NULL, decoder_thread, ist);
- if (ret) {
- ret = AVERROR(ret);
- av_log(ist, AV_LOG_ERROR, "pthread_create() failed: %s\n",
- av_err2str(ret));
- goto fail;
- }
-
- return 0;
-fail:
- if (ret >= 0)
- ret = AVERROR(ENOMEM);
-
- tq_free(&d->queue_in);
- tq_free(&d->queue_out);
- return ret;
-}
+#endif
static enum AVPixelFormat get_format(AVCodecContext *s, const enum AVPixelFormat *pix_fmts)
{
@@ -1121,12 +1015,5 @@ int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx)
if (ret < 0)
return ret;
- ret = dec_thread_start(ist);
- if (ret < 0) {
- av_log(ist, AV_LOG_ERROR, "Error starting decoder thread: %s\n",
- av_err2str(ret));
- return ret;
- }
-
return 0;
}
--
2.40.1
More information about the ffmpeg-devel
mailing list