[FFmpeg-devel] [PATCH 13/13] fftools/ffmpeg_demux: merge streams in a LCEVC stream group

James Almer jamrial at gmail.com
Sat Aug 31 19:31:14 EEST 2024


Add the LCEVC data stream payloads as packet side data to the main video
stream, ensuring the former is always output by the demuxer even if not
used by the process.

Signed-off-by: James Almer <jamrial at gmail.com>
---
 configure              |   2 +-
 fftools/ffmpeg.h       |  17 +++
 fftools/ffmpeg_demux.c | 307 ++++++++++++++++++++++++++++++++++++-----
 3 files changed, 292 insertions(+), 34 deletions(-)

diff --git a/configure b/configure
index 3b7cf05bb5..3af3654483 100755
--- a/configure
+++ b/configure
@@ -4044,7 +4044,7 @@ ffmpeg_deps="avcodec avfilter avformat threads"
 ffmpeg_select="aformat_filter anull_filter atrim_filter format_filter
                hflip_filter null_filter
                transpose_filter trim_filter vflip_filter"
-ffmpeg_suggest="ole32 psapi shell32"
+ffmpeg_suggest="ole32 psapi shell32 lcevc_merge_bsf"
 ffplay_deps="avcodec avformat avfilter swscale swresample sdl2"
 ffplay_select="crop_filter transpose_filter hflip_filter vflip_filter rotate_filter"
 ffplay_suggest="shell32 libplacebo vulkan"
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index 3c5d933e17..a9fb55fb6e 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -440,6 +440,17 @@ typedef struct InputStream {
     int                nb_outputs;
 } InputStream;
 
+typedef struct InputStreamGroup {
+    const AVClass        *class;
+
+    /* parent source */
+    struct InputFile     *file;
+
+    int                   index;
+
+    AVStreamGroup        *stg;
+} InputStreamGroup;
+
 typedef struct InputFile {
     const AVClass   *class;
 
@@ -461,6 +472,12 @@ typedef struct InputFile {
      * if new streams appear dynamically during demuxing */
     InputStream    **streams;
     int           nb_streams;
+
+    /**
+     * stream groups that ffmpeg is aware of
+     */
+    InputStreamGroup **stream_groups;
+    int             nb_stream_groups;
 } InputFile;
 
 enum forced_keyframes_const {
diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c
index 039ee0c785..eb7cc96f4c 100644
--- a/fftools/ffmpeg_demux.c
+++ b/fftools/ffmpeg_demux.c
@@ -90,12 +90,29 @@ typedef struct DemuxStream {
 
     AVBSFContext            *bsf;
 
+    InputStreamGroup       **stream_groups;
+    int                      nb_stream_groups;
+
     /* number of packets successfully read for this stream */
     uint64_t                 nb_packets;
     // combined size of all the packets read
     uint64_t                 data_size;
 } DemuxStream;
 
+typedef struct DemuxStreamGroup {
+    InputStreamGroup         istg;
+
+    // main stream for merged output
+    InputStream             *stream;
+
+    // name used for logging
+    char                     log_name[32];
+
+    int                      discard;
+
+    AVBSFContext            *bsf;
+} DemuxStreamGroup;
+
 typedef struct Demuxer {
     InputFile             f;
 
@@ -142,6 +159,7 @@ typedef struct DemuxThreadContext {
     AVPacket *pkt_demux;
     // packet for reading from BSFs
     AVPacket *pkt_bsf;
+    AVPacket *pkt_group_bsf;
 } DemuxThreadContext;
 
 static DemuxStream *ds_from_ist(InputStream *ist)
@@ -149,6 +167,11 @@ static DemuxStream *ds_from_ist(InputStream *ist)
     return (DemuxStream*)ist;
 }
 
+static DemuxStreamGroup *dsg_from_istg(InputStreamGroup *istg)
+{
+    return (DemuxStreamGroup*)istg;
+}
+
 static Demuxer *demuxer_from_ifile(InputFile *f)
 {
     return (Demuxer*)f;
@@ -537,17 +560,69 @@ static int do_send(Demuxer *d, DemuxStream *ds, AVPacket *pkt, unsigned flags,
     return 0;
 }
 
+static int demux_filter(Demuxer *d, DemuxThreadContext *dt, DemuxStream *ds,
+                        AVBSFContext *bsf, AVPacket *pkt, void *logctx)
+{
+    int ret;
+
+    if (pkt)
+        av_packet_rescale_ts(pkt, pkt->time_base, bsf->time_base_in);
+
+    ret = av_bsf_send_packet(bsf, pkt);
+    if (ret < 0) {
+        if (pkt)
+            av_packet_unref(pkt);
+        av_log(logctx, AV_LOG_ERROR, "Error submitting a packet for filtering: %s\n",
+               av_err2str(ret));
+        return ret;
+    }
+
+    while (1) {
+        ret = av_bsf_receive_packet(bsf, dt->pkt_bsf);
+        if (ret == AVERROR(EAGAIN))
+            return 0;
+        else if (ret < 0) {
+            if (ret != AVERROR_EOF)
+                av_log(logctx, AV_LOG_ERROR,
+                       "Error applying bitstream filters to a packet: %s\n",
+                       av_err2str(ret));
+            break;
+        }
+
+        dt->pkt_bsf->time_base = bsf->time_base_out;
+
+        ret = do_send(d, ds, dt->pkt_bsf, 0, "filtered");
+        if (ret < 0) {
+            av_packet_unref(dt->pkt_bsf);
+            break;
+        }
+    }
+
+    return ret;
+}
+
 static int demux_send(Demuxer *d, DemuxThreadContext *dt, DemuxStream *ds,
                       AVPacket *pkt, unsigned flags)
 {
     InputFile  *f = &d->f;
-    int ret;
+    int ret = 0;
 
     // pkt can be NULL only when flushing BSFs
     av_assert0(ds->bsf || pkt);
 
+    // a stream can only be disabled if it's needed by a group
+    av_assert0(ds->nb_stream_groups || !ds->discard);
+
+    // create a reference for the packet to be filtered by group bsfs
+    if (pkt && ds->nb_stream_groups) {
+        av_packet_unref(dt->pkt_group_bsf);
+        ret = av_packet_ref(dt->pkt_group_bsf, pkt);
+        if (ret < 0)
+            return ret;
+    }
+
     // send heartbeat for sub2video streams
-    if (d->pkt_heartbeat && pkt && pkt->pts != AV_NOPTS_VALUE) {
+    if (d->pkt_heartbeat && pkt && !ds->discard && pkt->pts != AV_NOPTS_VALUE) {
         for (int i = 0; i < f->nb_streams; i++) {
             DemuxStream *ds1 = ds_from_ist(f->streams[i]);
 
@@ -564,39 +639,30 @@ static int demux_send(Demuxer *d, DemuxThreadContext *dt, DemuxStream *ds,
         }
     }
 
-    if (ds->bsf) {
-        if (pkt)
-            av_packet_rescale_ts(pkt, pkt->time_base, ds->bsf->time_base_in);
+    for (int i = 0; i < ds->nb_stream_groups; i++) {
+        DemuxStreamGroup *dsg = dsg_from_istg(ds->stream_groups[i]);
 
-        ret = av_bsf_send_packet(ds->bsf, pkt);
-        if (ret < 0) {
-            if (pkt)
-                av_packet_unref(pkt);
-            av_log(ds, AV_LOG_ERROR, "Error submitting a packet for filtering: %s\n",
-                   av_err2str(ret));
+        // if the main stream is disabled, we don't want to filter
+        if (ds == ds_from_ist(dsg->stream) && ds->discard)
+            continue;
+
+        ret = demux_filter(d, dt, ds_from_ist(dsg->stream), dsg->bsf,
+                           pkt ? dt->pkt_group_bsf : NULL, dsg);
+        if (ret < 0)
             return ret;
-        }
 
-        while (1) {
-            ret = av_bsf_receive_packet(ds->bsf, dt->pkt_bsf);
-            if (ret == AVERROR(EAGAIN))
-                return 0;
-            else if (ret < 0) {
-                if (ret != AVERROR_EOF)
-                    av_log(ds, AV_LOG_ERROR,
-                           "Error applying bitstream filters to a packet: %s\n",
-                           av_err2str(ret));
-                return ret;
-            }
+        // TODO handle streams belonging to more than one Stream group
+        if (i == (ds->nb_stream_groups - 1) && ds == ds_from_ist(dsg->stream))
+            return 0;
+    }
 
-            dt->pkt_bsf->time_base = ds->bsf->time_base_out;
+    if (ds->discard)
+        return 0;
 
-            ret = do_send(d, ds, dt->pkt_bsf, 0, "filtered");
-            if (ret < 0) {
-                av_packet_unref(dt->pkt_bsf);
-                return ret;
-            }
-        }
+    if (ds->bsf) {
+        ret = demux_filter(d, dt, ds, ds->bsf, pkt, ds);
+        if (ret < 0)
+            return ret;
     } else {
         ret = do_send(d, ds, pkt, flags, "demuxed");
         if (ret < 0)
@@ -660,6 +726,7 @@ static void demux_thread_uninit(DemuxThreadContext *dt)
 {
     av_packet_free(&dt->pkt_demux);
     av_packet_free(&dt->pkt_bsf);
+    av_packet_free(&dt->pkt_group_bsf);
 
     memset(dt, 0, sizeof(*dt));
 }
@@ -676,6 +743,10 @@ static int demux_thread_init(DemuxThreadContext *dt)
     if (!dt->pkt_bsf)
         return AVERROR(ENOMEM);
 
+    dt->pkt_group_bsf = av_packet_alloc();
+    if (!dt->pkt_group_bsf)
+        return AVERROR(ENOMEM);
+
     return 0;
 }
 
@@ -749,9 +820,16 @@ static int input_thread(void *arg)
         ds = dt.pkt_demux->stream_index < f->nb_streams ?
              ds_from_ist(f->streams[dt.pkt_demux->stream_index]) : NULL;
         if (!ds || ds->discard || ds->finished) {
-            report_new_stream(d, dt.pkt_demux);
-            av_packet_unref(dt.pkt_demux);
-            continue;
+            int i = 0;
+            /* Is the stream disabled, but still needed to handle a group? */
+            for (; ds && i < ds->nb_stream_groups; i++)
+                if (!dsg_from_istg(ds->stream_groups[i])->discard)
+                    break;
+            if (!ds || i == ds->nb_stream_groups) {
+                report_new_stream(d, dt.pkt_demux);
+                av_packet_unref(dt.pkt_demux);
+                continue;
+            }
         }
 
         if (dt.pkt_demux->flags & AV_PKT_FLAG_CORRUPT) {
@@ -849,9 +927,25 @@ static void ist_free(InputStream **pist)
 
     av_bsf_free(&ds->bsf);
 
+    av_freep(&ds->stream_groups);
+
     av_freep(pist);
 }
 
+static void istg_free(InputStreamGroup **pistg)
+{
+    InputStreamGroup *istg = *pistg;
+    DemuxStreamGroup *dsg;
+
+    if (!istg)
+        return;
+    dsg = dsg_from_istg(istg);
+
+    av_bsf_free(&dsg->bsf);
+
+    av_freep(pistg);
+}
+
 void ifile_close(InputFile **pf)
 {
     InputFile *f = *pf;
@@ -866,6 +960,9 @@ void ifile_close(InputFile **pf)
     for (int i = 0; i < f->nb_streams; i++)
         ist_free(&f->streams[i]);
     av_freep(&f->streams);
+    for (int i = 0; i < f->nb_stream_groups; i++)
+        istg_free(&f->stream_groups[i]);
+    av_freep(&f->stream_groups);
 
     avformat_close_input(&f->ctx);
 
@@ -961,6 +1058,19 @@ static int ist_use(InputStream *ist, int decoding_needed)
         d->have_audio_dec |= is_audio;
     }
 
+    // if this stream is the main one in any group, enable said group and
+    // all its streams, so lavf will return their packets
+    for (int i = 0; i < ds->nb_stream_groups; i++) {
+        DemuxStreamGroup *dsg = dsg_from_istg(ds->stream_groups[i]);
+        AVStreamGroup *stg = ds->stream_groups[i]->stg;
+
+        if (ist != dsg->stream)
+            continue;
+        for (int j = 0; j < stg->nb_streams; j++)
+            stg->streams[j]->discard = 0;
+        dsg->discard = 0;
+    }
+
     return 0;
 }
 
@@ -1524,6 +1634,128 @@ static int ist_add(const OptionsContext *o, Demuxer *d, AVStream *st, AVDictiona
     return 0;
 }
 
+static const char *input_stream_group_item_name(void *obj)
+{
+    const DemuxStreamGroup *dsg = obj;
+
+    return dsg->log_name;
+}
+
+static const AVClass input_stream_group_class = {
+    .class_name = "InputStreamGroup",
+    .version    = LIBAVUTIL_VERSION_INT,
+    .item_name  = input_stream_group_item_name,
+    .category   = AV_CLASS_CATEGORY_DEMUXER,
+};
+
+static DemuxStreamGroup *demux_stream_group_alloc(Demuxer *d, AVStreamGroup *stg)
+{
+    InputFile    *f = &d->f;
+    DemuxStreamGroup *dsg;
+
+    dsg = allocate_array_elem(&f->stream_groups, sizeof(*dsg), &f->nb_stream_groups);
+    if (!dsg)
+        return NULL;
+
+    dsg->istg.stg       = stg;
+    dsg->istg.file      = f;
+    dsg->istg.index     = stg->index;
+    dsg->istg.class     = &input_stream_group_class;
+
+    snprintf(dsg->log_name, sizeof(dsg->log_name), "istg#%d:%d/%s",
+             d->f.index, stg->index, avformat_stream_group_name(stg->type));
+
+    return dsg;
+}
+
+static int istg_add(Demuxer *d, AVStreamGroup *stg)
+{
+    InputFile    *f = &d->f;
+    DemuxStreamGroup *dsg;
+    const AVBitStreamFilter *filter;
+    int base_idx = -1, enhancement_idx = -1;
+    int ret;
+
+    // TODO: generic handling of groups, once support for more is added
+    if (stg->type != AV_STREAM_GROUP_PARAMS_LCEVC)
+        return 0;
+
+    if (stg->nb_streams != 2)
+        return AVERROR_BUG;
+
+    filter = av_bsf_get_by_name("lcevc_merge");
+    if (!filter)
+        return 0;
+
+    dsg = demux_stream_group_alloc(d, stg);
+    if (!dsg)
+        return AVERROR(ENOMEM);
+
+    dsg->discard = 1;
+
+    // set the main stream for the group
+    for (int i = 0; i < stg->nb_streams; i++) {
+        int j;
+
+        for (j = 0; j < f->nb_streams; j++)
+            if (stg->streams[i] == f->streams[j]->st)
+                break;
+
+        if (j == f->nb_streams)
+            return AVERROR_BUG;
+
+        if (stg->streams[i]->codecpar->codec_type != AVMEDIA_TYPE_VIDEO) {
+            if (stg->streams[i]->codecpar->codec_type != AVMEDIA_TYPE_DATA ||
+                enhancement_idx > 0)
+                return AVERROR_BUG;
+            enhancement_idx = f->streams[j]->st->index;
+            continue;
+        } else if (base_idx > 0)
+            return AVERROR_BUG;
+
+        dsg->stream = f->streams[j];
+        base_idx = f->streams[j]->st->index;
+    }
+
+    /* since the API lets us know what streams belong to a given group, but
+     * not what groups a given stream is part of, add a pointer to the
+     * DemuxStreamGroup to all relevant DemuxStream structs for this purpose */
+    for (int i = 0; i < stg->nb_streams; i++) {
+        DemuxStreamGroup **dsg1;
+        DemuxStream *ds;
+        int j;
+
+        for (j = 0; j < f->nb_streams; j++)
+            if (stg->streams[i] == f->streams[j]->st)
+                break;
+
+        if (j == f->nb_streams)
+            return AVERROR_BUG;
+
+        ds = ds_from_ist(f->streams[j]);
+        dsg1 = av_dynarray2_add((void **)&ds->stream_groups, &ds->nb_stream_groups, sizeof(*dsg1), NULL);
+        if (!dsg1)
+            return AVERROR(ENOMEM);
+
+        *dsg1 = dsg;
+    }
+
+    ret = av_bsf_alloc(filter, &dsg->bsf);
+    if (ret < 0)
+        return ret;
+
+    av_opt_set_int(dsg->bsf->priv_data, "base_idx", base_idx, 0);
+    av_opt_set_int(dsg->bsf->priv_data, "enhancement_idx", enhancement_idx, 0);
+
+    dsg->bsf->time_base_in = stg->streams[0]->time_base;
+
+    ret = av_bsf_init(dsg->bsf);
+    if (ret < 0)
+        return ret;
+
+    return 0;
+}
+
 static int dump_attachment(InputStream *ist, const char *filename)
 {
     AVStream *st = ist->st;
@@ -1878,6 +2110,15 @@ int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch)
         }
     }
 
+    /* Add all the stream groups from the given input file to the demuxer */
+    for (int i = 0; i < ic->nb_stream_groups; i++) {
+        ret = istg_add(d, ic->stream_groups[i]);
+        if (ret < 0) {
+            av_dict_free(&opts_used);
+            return ret;
+        }
+    }
+
     /* dump the file content */
     av_dump_format(ic, f->index, filename, 0);
 
-- 
2.46.0



More information about the ffmpeg-devel mailing list