[FFmpeg-devel] [PATCH] avutil/WIP: add AVAsync API

Hendrik Leppkes h.leppkes at gmail.com
Wed Nov 11 08:45:52 CET 2015


On Wed, Nov 11, 2015 at 1:27 AM, Clément Bœsch <u at pkh.me> wrote:
> From: Clément Bœsch <clement at stupeflix.com>
>
> ---
>
> So here is a first prototype of a higher level API following an
> asynchronous model to (for now) simply use the current synchronous API.
>
> I suggest to look at libavutil/async.h (sorry no doxy yet) and
> doc/examples/async_demuxing_decoding.c for an example of usage.
>
> Basically what the current draft proposes for the user is to instanciate
> an asynchronous context, in which you register Readers (the user will
> generally do a demuxing in the associated callback to get a packet), in
> which you register Decoders (basically just a wrapper for an
> AVCodecContext) to which you associate a push frame callback.
>
> You construct something like this:
>
>   AVAsyncContext(
>     AVAsyncReader(
>       AVAsyncDecoder(...),
>       AVAsyncDecoder(...),
>       AVAsyncDecoder(...),
>       ...
>     ),
>     AVAsyncReader(
>       AVAsyncDecoder(...),
>       AVAsyncDecoder(...),
>       ...
>     ),
>     ....
>   )
>
> You will generally have a Reader per file but that can be anything you
> want as long as you can pull of packets.
>
> Now implementation wise, the interesting part is that every level is
> asynchronous:
>
> - The context will spawn a thread for each reader
> - Then in each reader, you will have a thread for each decoder
> - Then each decoder will have its own queue of packets, and its own
>   queue of output frames
> - Then along with each decoder, you will have Watcher in its dedicated
>   thread: the watcher is monitoring the frame queue of its decoder, and
>   will call the user push frame callback anytime a new one appears in
>   the queue.
>
> The interesting part about the watcher is that it will allow the user to
> transparently do any operation on the frame without blocking the
> decoding or the demuxing: basically, blocking in push_frame for the user
> will just block any subsequent call to the callback, but the packet
> and frame queues of the decoder will continue to fill up, ready to be
> pop'ed.
>
> Note: the size of the packet and frame queue for each decoder is
> configurable
>
> Note2: the packet queue is not set in the reader so that a decoder that
> isn't pulling its packets and decoding fast enough will not block the
> others. As a result, the Reader just reads one packet and is then
> responsible for broadcasting it to the appropriate decoder (by just
> adding it to its internal queue).
>
> Now all of this doesn't require any modification to the current API so
> it doesn't really solve the problem of writing asynchronous
> decoders/hwaccel. The goal for the next iteration will be to adjust the
> VideoToolbox hwaccel to make use of the asynchronous model, and
> basically just make it call the user callback by itself. It will be a
> good candidate for such thing.
>
> Another concerning limitation in the current code is the inability to
> seek: basically the user can not lock the codec and reader context to
> execute a seek, nor flush the queues. And I'd like suggestion of
> direction in that regard:
>
> - first, in threadmessage it seems I can't yet flush the fifo properly
>   (for example by calling av_frame_free() on each entry of the frame
>   queue): should I extend the API? Nicolas, maybe you have a suggestion?
>
> - secondly, how should I allow the user to lock/unlock the reader (and
>   as a result the likely AVFormatContext user side) and decoder? I added
>   a few prototypes in async.h but I'm pretty sure that won't fit many
>   cases.  Comments very welcome.
>
> I hope this is going in a direction most developers like. If it's not
> the case, I'm of course open to any reworking but please be specific.

I'm quite confused by this approach, it doesn't really seem to
correspond at all to what we talked about earlier.
You can't really put a high-level async API on our low-level sync API,
we should start re-thinking the low level API first.

The first goal should be to get to a new low level API that implements
decoupled m:n output, then you could build a high-level API on top of
that, if you wanted something like that.
Such a low level API would easily integrate into other frameworks,
while this async API seems to be rather rigid and may be rather
troublesome to integrate into existing workflows.

>
> Also, I'm really sorry I'm not sending this to libav-devel even so I
> apparently looked forward a colaboration in a previous thread. I was
> planning to but unfortunately the threadmessage API is not there yet and
> it would have require way too much work for an initial prototype. When
> we come up with something that satisfy everyone and this code becomes
> something more than just a draft, I will maybe do the necessary.
> Unfortunately, porting VideoToolbox accel, threadmessage and probably a
> bunch of other ffmpeg specific features (I can thing about
> av_dynarray2_add and maybe more) is going to require a bit too much code
> adjustment, so I might just share the async.h and redirect them to a
> working implementation here....
>
> Anyway, please comment.
> ---
>  .gitignore                             |   1 +
>  configure                              |   2 +
>  doc/Makefile                           |   1 +
>  doc/examples/Makefile                  |   3 +-
>  doc/examples/async_demuxing_decoding.c | 155 +++++++++++++
>  libavutil/Makefile                     |   1 +
>  libavutil/async.c                      | 400 +++++++++++++++++++++++++++++++++
>  libavutil/async.h                      |  92 ++++++++
>  8 files changed, 654 insertions(+), 1 deletion(-)
>  create mode 100644 doc/examples/async_demuxing_decoding.c
>  create mode 100644 libavutil/async.c
>  create mode 100644 libavutil/async.h
>
> diff --git a/.gitignore b/.gitignore
> index 93b0dca..010acb0 100644
> --- a/.gitignore
> +++ b/.gitignore
> @@ -40,6 +40,7 @@
>  /doc/avoptions_codec.texi
>  /doc/avoptions_format.texi
>  /doc/doxy/html/
> +/doc/examples/async_demuxing_decoding
>  /doc/examples/avio_dir_cmd
>  /doc/examples/avio_reading
>  /doc/examples/decoding_encoding
> diff --git a/configure b/configure
> index d5e76de..12ca262 100755
> --- a/configure
> +++ b/configure
> @@ -1368,6 +1368,7 @@ COMPONENT_LIST="
>  "
>
>  EXAMPLE_LIST="
> +    async_demuxing_decoding_example
>      avio_reading_example
>      avio_dir_cmd_example
>      decoding_encoding_example
> @@ -2855,6 +2856,7 @@ zoompan_filter_deps="swscale"
>  zscale_filter_deps="libzimg"
>
>  # examples
> +async_demuxing_decoding_example_deps="avcodec avformat avutil"
>  avio_reading="avformat avcodec avutil"
>  avio_dir_cmd="avformat avutil"
>  avcodec_example_deps="avcodec avutil"
> diff --git a/doc/Makefile b/doc/Makefile
> index 3e67c2a..66112a9 100644
> --- a/doc/Makefile
> +++ b/doc/Makefile
> @@ -36,6 +36,7 @@ DOCS-$(CONFIG_MANPAGES)  += $(MANPAGES)
>  DOCS-$(CONFIG_TXTPAGES)  += $(TXTPAGES)
>  DOCS = $(DOCS-yes)
>
> +DOC_EXAMPLES-$(CONFIG_ASYNC_DEMUXING_DECODING_EXAMPLE) += async_demuxing_decoding
>  DOC_EXAMPLES-$(CONFIG_AVIO_DIR_CMD_EXAMPLE)      += avio_dir_cmd
>  DOC_EXAMPLES-$(CONFIG_AVIO_READING_EXAMPLE)      += avio_reading
>  DOC_EXAMPLES-$(CONFIG_AVCODEC_EXAMPLE)           += avcodec
> diff --git a/doc/examples/Makefile b/doc/examples/Makefile
> index af38159..ed06bc2 100644
> --- a/doc/examples/Makefile
> +++ b/doc/examples/Makefile
> @@ -11,7 +11,8 @@ CFLAGS += -Wall -g
>  CFLAGS := $(shell pkg-config --cflags $(FFMPEG_LIBS)) $(CFLAGS)
>  LDLIBS := $(shell pkg-config --libs $(FFMPEG_LIBS)) $(LDLIBS)
>
> -EXAMPLES=       avio_dir_cmd                       \
> +EXAMPLES=       async_demuxing_decoding            \
> +                avio_dir_cmd                       \
>                  avio_reading                       \
>                  decoding_encoding                  \
>                  demuxing_decoding                  \
> diff --git a/doc/examples/async_demuxing_decoding.c b/doc/examples/async_demuxing_decoding.c
> new file mode 100644
> index 0000000..0c25a68
> --- /dev/null
> +++ b/doc/examples/async_demuxing_decoding.c
> @@ -0,0 +1,155 @@
> +/*
> + * Permission is hereby granted, free of charge, to any person obtaining a copy
> + * of this software and associated documentation files (the "Software"), to deal
> + * in the Software without restriction, including without limitation the rights
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
> + * copies of the Software, and to permit persons to whom the Software is
> + * furnished to do so, subject to the following conditions:
> + *
> + * The above copyright notice and this permission notice shall be included in
> + * all copies or substantial portions of the Software.
> + *
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
> + * THE SOFTWARE.
> + */
> +
> +#include <libavutil/async.h>
> +#include <libavutil/imgutils.h>
> +#include <libavutil/samplefmt.h>
> +#include <libavutil/timestamp.h>
> +#include <libavformat/avformat.h>
> +
> +static int pull_packet_cb(void *priv, AVPacket *pkt)
> +{
> +    AVFormatContext *fmt_ctx = priv;
> +    int ret = av_read_frame(fmt_ctx, pkt);
> +
> +    printf("read packet of size %d from stream %d\n",
> +           pkt->size, pkt->stream_index);
> +    return ret;
> +}
> +
> +static int push_frame_cb(void *priv, AVFrame *frame)
> +{
> +    AVStream *st = priv;
> +    const char *type = st->codec->codec_type == AVMEDIA_TYPE_VIDEO ?
> +                       av_get_pix_fmt_name(frame->format)
> +                     : av_get_sample_fmt_name(frame->format);
> +
> +    printf("decoded %s frame / ts:%s\n", type,
> +           av_ts2str(av_frame_get_best_effort_timestamp(frame)));
> +
> +    av_frame_free(&frame);
> +    return 0;
> +}
> +
> +static int open_codec_context(AVAsyncReader *r, AVFormatContext *fmt_ctx,
> +                              enum AVMediaType type)
> +{
> +    int ret, stream_index = -1;
> +    AVStream *st;
> +    AVCodec *dec;
> +    AVCodecContext *dec_ctx;
> +
> +    ret = av_find_best_stream(fmt_ctx, type, -1, -1, NULL, 0);
> +    if (ret < 0)
> +        return 0; // ignore if no appropriate stream found
> +
> +    stream_index = ret;
> +    st = fmt_ctx->streams[stream_index];
> +
> +    dec_ctx = st->codec;
> +    dec = avcodec_find_decoder(dec_ctx->codec_id);
> +    if (!dec) {
> +        fprintf(stderr, "Unable to find %s codec\n", av_get_media_type_string(type));
> +        return AVERROR(EINVAL);
> +    }
> +
> +    ret = avcodec_open2(dec_ctx, dec, NULL);
> +    if (ret < 0) {
> +        fprintf(stderr, "Unable to open %s codec\n", av_get_media_type_string(type));
> +        return ret;
> +    }
> +
> +    return avasync_register_decoder(r, st->codec, st, push_frame_cb, st->index);
> +}
> +
> +int main(int ac, char **av)
> +{
> +    int i, ret = 0;
> +    AVAsyncContext *actx;
> +
> +    if (ac < 2) {
> +        fprintf(stderr, "Usage: %s <files...>\n", av[0]);
> +        return 0;
> +    }
> +
> +    av_register_all();
> +    av_log_set_level(AV_LOG_TRACE);
> +
> +    actx = avasync_alloc_context();
> +    if (!actx)
> +        return 1;
> +
> +    for (i = 1; i < ac; i++) {
> +        AVFormatContext *fmt_ctx = NULL;
> +        AVAsyncReader *r;
> +
> +        ret = avformat_open_input(&fmt_ctx, av[i], NULL, NULL);
> +        if (ret < 0) {
> +            fprintf(stderr, "Unable to open '%s'\n", av[i]);
> +            goto end;
> +        }
> +
> +        ret = avformat_find_stream_info(fmt_ctx, NULL) < 0;
> +        if (ret < 0) {
> +            fprintf(stderr, "Unable to find stream info\n");
> +            goto end;
> +        }
> +
> +        av_dump_format(fmt_ctx, 0, av[i], 0);
> +
> +        ret = avasync_register_reader(actx, av[i], fmt_ctx, pull_packet_cb, &r);
> +        if (ret < 0)
> +            goto end;
> +
> +        ret = open_codec_context(r, fmt_ctx, AVMEDIA_TYPE_AUDIO);
> +        if (ret < 0)
> +            goto end;
> +
> +        ret = open_codec_context(r, fmt_ctx, AVMEDIA_TYPE_VIDEO);
> +        if (ret < 0)
> +            goto end;
> +
> +        if (!r->nb_decoders) {
> +            fprintf(stderr, "No video or audio stream found\n");
> +            ret = 1;
> +            goto end;
> +        }
> +    }
> +
> +    avasync_start(actx);
> +
> +end:
> +
> +    avasync_wait(actx);
> +
> +    for (i = 0; i < actx->nb_readers; i++) {
> +        int j;
> +        AVAsyncReader *r = &actx->readers[i];
> +
> +        for (j = 0; j < r->nb_decoders; j++) {
> +            AVAsyncDecoder *d = &r->decoders[j];
> +            avcodec_close(d->codec_ctx);
> +        }
> +        avformat_close_input((AVFormatContext **)&actx->readers[i].priv_data);
> +    }
> +
> +    avasync_free(&actx);
> +    return ret < 0 ? 1 : 0;
> +}
> diff --git a/libavutil/Makefile b/libavutil/Makefile
> index 1bac2b9..2880b6c 100644
> --- a/libavutil/Makefile
> +++ b/libavutil/Makefile
> @@ -80,6 +80,7 @@ BUILT_HEADERS = avconfig.h                                              \
>
>  OBJS = adler32.o                                                        \
>         aes.o                                                            \
> +       async.o                                                          \
>         audio_fifo.o                                                     \
>         avstring.o                                                       \
>         base64.o                                                         \
> diff --git a/libavutil/async.c b/libavutil/async.c
> new file mode 100644
> index 0000000..f1a2c02
> --- /dev/null
> +++ b/libavutil/async.c
> @@ -0,0 +1,400 @@
> +/*
> + * This file is part of FFmpeg.
> + *
> + * FFmpeg is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * FFmpeg is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with FFmpeg; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> + */
> +
> +#include <pthread.h>
> +
> +#include <libavutil/async.h>
> +#include <libavutil/opt.h>
> +#include <libavutil/time.h>
> +
> +static const AVClass async_context_class = {
> +    .class_name = "async_context",
> +    .item_name  = av_default_item_name,
> +    .version    = LIBAVUTIL_VERSION_INT,
> +};
> +
> +AVAsyncContext *avasync_alloc_context(void)
> +{
> +    AVAsyncContext *actx = av_mallocz(sizeof(*actx));
> +    if (!actx)
> +        return NULL;
> +    actx->class = &async_context_class;
> +    return actx;
> +}
> +
> +#define OFFSET_READER(x) offsetof(AVAsyncReader, x)
> +static const AVOption async_reader_options[] = {
> +    { "non_blocking", "set non blocking mode", OFFSET_READER(non_blocking), AV_OPT_TYPE_BOOL, {.i64=0}, 0, 1 },
> +    { NULL }
> +};
> +
> +static const AVClass async_reader_class = {
> +    .class_name = "async_reader",
> +    .item_name  = av_default_item_name,
> +    .option     = async_reader_options,
> +    .version    = LIBAVUTIL_VERSION_INT,
> +};
> +
> +int avasync_register_reader(AVAsyncContext *actx,
> +                            const char *name, void *priv,
> +                            pull_packet_func_type pull_packet_cb,
> +                            AVAsyncReader **r)
> +{
> +    AVAsyncReader *reader = av_dynarray2_add((void**)&actx->readers, &actx->nb_readers,
> +                                             sizeof(*actx->readers), NULL);
> +    if (!reader)
> +        return AVERROR(ENOMEM);
> +
> +    memset(reader, 0, sizeof(*reader));
> +
> +    reader->class = &async_reader_class;
> +    av_opt_set_defaults(reader);
> +
> +    if (name) {
> +        reader->name = av_strdup(name);
> +        if (!reader->name)
> +            return AVERROR(ENOMEM);
> +    }
> +    reader->priv_data      = priv;
> +    reader->pull_packet_cb = pull_packet_cb;
> +
> +    *r = reader;
> +    return 0;
> +}
> +
> +#define OFFSET_DEC(x) offsetof(AVAsyncDecoder, x)
> +#define FLAGS AV_OPT_FLAG_DECODING_PARAM
> +static const AVOption async_decoder_options[] = {
> +    { "max_packets_queue", "set the maximum number of packets in the queue", OFFSET_DEC(max_packets_queue), AV_OPT_TYPE_INT, {.i64=5}, 1, 100, FLAGS },
> +    { "max_frames_queue",  "set the maximum number of frames in the queue",  OFFSET_DEC(max_frames_queue),  AV_OPT_TYPE_INT, {.i64=3}, 1, 100, FLAGS },
> +    { NULL }
> +};
> +
> +static const AVClass async_decoder_class = {
> +    .class_name = "async_decoder",
> +    .item_name  = av_default_item_name,
> +    .option     = async_decoder_options,
> +    .version    = LIBAVUTIL_VERSION_INT,
> +};
> +
> +int avasync_register_decoder(AVAsyncReader *r,
> +                             AVCodecContext *codec_ctx, void *priv,
> +                             push_frame_func_type push_frame_cb,
> +                             int pkt_id_match)
> +{
> +    AVAsyncDecoder *adec = av_dynarray2_add((void**)&r->decoders, &r->nb_decoders,
> +                                            sizeof(*r->decoders), NULL);
> +    if (!adec)
> +        return AVERROR(ENOMEM);
> +
> +    memset(adec, 0, sizeof(*adec));
> +
> +    adec->class = &async_decoder_class;
> +    av_opt_set_defaults(adec);
> +
> +    adec->codec_ctx     = codec_ctx;
> +    adec->priv_data     = priv;
> +    adec->push_frame_cb = push_frame_cb;
> +    adec->pkt_id_match  = pkt_id_match;
> +    return 0;
> +}
> +
> +/* Watch the frames queue and push them to the user */
> +static void *watcher_thread(void *arg)
> +{
> +    AVAsyncDecoder *d = arg;
> +
> +    av_log(d, AV_LOG_TRACE, "watching thread starting\n");
> +
> +    for (;;) {
> +        AVFrame *frame;
> +
> +        /* Wait to get a frame from the queue. If it fails, then the watcher
> +         * has to die. */
> +        av_log(d, AV_LOG_TRACE, "fetch frame\n");
> +        int ret = av_thread_message_queue_recv(d->frames_queue, &frame, 0);
> +        av_log(d, AV_LOG_TRACE, "watcher recv frame %p ret=%d\n", frame, ret);
> +        if (ret < 0)
> +            break;
> +
> +        ret = d->push_frame_cb(d->priv_data, frame);
> +
> +        if (ret < 0) {
> +            /* watcher will die, notify the decoder */
> +            av_thread_message_queue_set_err_send(d->frames_queue, ret);
> +            break;
> +        }
> +    }
> +
> +    return NULL;
> +}
> +
> +static int decode_packet(AVAsyncDecoder *d, AVPacket *pkt,
> +                         AVFrame **frame, int *got_frame)
> +{
> +    int ret;
> +    int decoded = pkt->size;
> +    AVFrame *dec_frame = av_frame_alloc(); // XXX: pre-alloc them in the fifo?
> +
> +    *got_frame = 0;
> +    *frame = NULL;
> +
> +    if (!dec_frame)
> +        return AVERROR(ENOMEM);
> +
> +    switch (d->codec_ctx->codec_type) {
> +    case AVMEDIA_TYPE_VIDEO:
> +        ret = avcodec_decode_video2(d->codec_ctx, dec_frame, got_frame, pkt);
> +        break;
> +    case AVMEDIA_TYPE_AUDIO:
> +        ret = avcodec_decode_audio4(d->codec_ctx, dec_frame, got_frame, pkt);
> +        break;
> +
> +    default:
> +        av_log(d, AV_LOG_ERROR, "Unsupported codec type :(\n");
> +    }
> +
> +    if (ret < 0) {
> +        fprintf(stderr, "Error decoding %s frame\n",
> +                av_get_media_type_string(d->codec_ctx->codec_type));
> +        av_frame_free(&dec_frame);
> +        return ret;
> +    }
> +
> +    if (*got_frame)
> +        *frame = dec_frame;
> +    else
> +        av_frame_free(&dec_frame);
> +
> +    decoded = FFMIN(ret, pkt->size);
> +
> +    return decoded;
> +}
> +
> +static int queue_frame(AVAsyncDecoder *d, AVFrame *frame)
> +{
> +    av_log(d, AV_LOG_TRACE, "queue frame %p\n", frame);
> +    int ret = av_thread_message_queue_send(d->frames_queue, &frame, 0);
> +    if (ret < 0)
> +        av_log(d, AV_LOG_ERROR, "Unable to push frame: %s\n", av_err2str(ret));
> +    return ret;
> +}
> +
> +static void *decoder_thread(void *arg)
> +{
> +    int ret, got_frame;
> +    AVPacket pkt, orig_pkt;
> +    AVFrame *dec_frame;
> +    AVAsyncDecoder *d = arg;
> +
> +    /* Initialize the frame queue (communication decode <-> watcher) */
> +    ret = av_thread_message_queue_alloc(&d->frames_queue, d->max_frames_queue, sizeof(AVFrame *));
> +    if (ret < 0) {
> +        return NULL;
> +    }
> +
> +    /* Spawn frame queue watcher */
> +    av_log(d, AV_LOG_TRACE, "decoding thread starting\n");
> +    if (pthread_create(&d->watcher_tid, NULL, watcher_thread, d)) {
> +        ret = AVERROR(errno);
> +        av_log(d, AV_LOG_ERROR, "Unable to start watcher thread: %s\n",
> +               av_err2str(ret));
> +        av_thread_message_queue_free(&d->frames_queue);
> +        return NULL;
> +    }
> +
> +    /* Main packet decoding loop */
> +    av_log(d, AV_LOG_TRACE, "main packet decoding loop\n");
> +    for (;;) {
> +        ret = av_thread_message_queue_recv(d->pkt_queue, &pkt, 0);
> +        if (ret < 0)
> +            break;
> +
> +        orig_pkt = pkt;
> +        do {
> +            ret = decode_packet(d, &pkt, &dec_frame, &got_frame);
> +            if (ret < 0)
> +                break;
> +            pkt.data += ret;
> +            pkt.size -= ret;
> +            if (got_frame && (ret = queue_frame(d, dec_frame)) < 0)
> +                break;
> +        } while (pkt.size > 0);
> +        av_packet_unref(&orig_pkt);
> +    }
> +
> +    /* flush cached frames */
> +    av_log(d, AV_LOG_TRACE, "flush cached frames\n");
> +    av_init_packet(&pkt);
> +    pkt.data = NULL;
> +    pkt.size = 0;
> +    do {
> +        ret = decode_packet(d, &pkt, &dec_frame, &got_frame);
> +        if (ret == 0 && got_frame && (ret = queue_frame(d, dec_frame)) < 0)
> +            break;
> +        av_log(d, AV_LOG_TRACE, "flushed, got_frame=%d\n", got_frame);
> +    } while (got_frame);
> +    av_log(d, AV_LOG_TRACE, "flush end\n");
> +
> +    /* Decoder ends, notify frame watcher so it dies */
> +    av_thread_message_queue_set_err_recv(d->frames_queue, ret < 0 ? ret : AVERROR_EOF);
> +    pthread_join(d->watcher_tid, NULL);
> +
> +    av_thread_message_queue_free(&d->frames_queue);
> +    return NULL;
> +}
> +
> +static void *reader_thread(void *arg)
> +{
> +    int ret, i;
> +    AVAsyncReader *r = arg;
> +
> +    av_log(r, AV_LOG_TRACE, "reader thread starting\n");
> +
> +    /* Spawn decoders */
> +    for (i = 0; i < r->nb_decoders; i++) {
> +        AVAsyncDecoder *d = &r->decoders[i];
> +
> +        av_log(d, AV_LOG_TRACE, "    decoder[%d]: matching pkt id #%d\n", i, d->pkt_id_match);
> +
> +        /* Initialize the packet queue (communication reader <-> decoder) */
> +        ret = av_thread_message_queue_alloc(&d->pkt_queue, d->max_packets_queue, sizeof(AVPacket));
> +        if (ret < 0) {
> +            return NULL;
> +        }
> +
> +        /* Start its working thread */
> +        if (pthread_create(&d->tid, NULL, decoder_thread, d)) {
> +            ret = AVERROR(errno);
> +            av_log(d, AV_LOG_ERROR, "Unable to start decoding thread %d: %s\n",
> +                   i, av_err2str(ret));
> +            goto end;
> +        }
> +
> +        d->started = 1;
> +    }
> +
> +    while (1) {
> +        AVPacket pkt;
> +        AVAsyncDecoder *decoder = NULL;
> +
> +        ret = r->pull_packet_cb(r->priv_data, &pkt);
> +
> +        if (ret == AVERROR(EAGAIN)) {
> +            av_usleep(10000);
> +            continue;
> +        }
> +        if (ret < 0)
> +            break;
> +
> +        /* Find decoder for a given packet */
> +        // FIXME: faster broadcasting needed
> +        for (i = 0; i < r->nb_decoders; i++) {
> +            AVAsyncDecoder *d = &r->decoders[i];
> +            if (pkt.stream_index == d->pkt_id_match) {
> +                decoder = d;
> +                break;
> +            }
> +        }
> +
> +        if (!decoder) {
> +            av_log(r, AV_LOG_DEBUG, "No decoder for stream %d, ignoring packet\n", pkt.stream_index);
> +            av_packet_unref(&pkt);
> +            continue;
> +        }
> +
> +        ret = av_thread_message_queue_send(decoder->pkt_queue, &pkt, 0);
> +        if (ret < 0) {
> +            if (ret != AVERROR_EOF)
> +                av_log(decoder, AV_LOG_ERROR, "Unable to send packet to decoder: %s\n", av_err2str(ret));
> +            av_packet_unref(&pkt);
> +            av_thread_message_queue_set_err_recv(decoder->pkt_queue, ret);
> +        }
> +    }
> +
> +end:
> +
> +    /* Notify all decoders about the error/EOF so they die */
> +    for (i = 0; i < r->nb_decoders; i++) {
> +        AVAsyncDecoder *d = &r->decoders[i];
> +        av_thread_message_queue_set_err_recv(d->pkt_queue, ret);
> +        if (d->started)
> +            pthread_join(d->tid, NULL);
> +        av_thread_message_queue_free(&d->pkt_queue);
> +    }
> +
> +    return NULL;
> +}
> +
> +int avasync_start(AVAsyncContext *actx)
> +{
> +    int i;
> +
> +    av_log(actx, AV_LOG_TRACE, "Starting AVAsync loop\n");
> +
> +    for (i = 0; i < actx->nb_readers; i++) {
> +        int ret;
> +        AVAsyncReader *r = &actx->readers[i];
> +
> +        av_log(actx, AV_LOG_TRACE, "  Reader[%d]: %s (blocking: %s)\n",
> +               i, r->name, r->non_blocking ? "yes" : "no");
> +
> +        ret = pthread_create(&r->tid, NULL, reader_thread, r);
> +        if (ret) {
> +            const int err = AVERROR(ret);
> +            av_log(actx, AV_LOG_ERROR, "Unable to start reader thread %d: %s\n",
> +                   i, av_err2str(err));
> +            return err;
> +        }
> +        r->started = 1;
> +    }
> +
> +    return 0;
> +}
> +
> +int avasync_wait(AVAsyncContext *actx)
> +{
> +    int i;
> +
> +    av_log(actx, AV_LOG_TRACE, "waiting for readers to end\n");
> +    for (i = 0; i < actx->nb_readers; i++) {
> +        const AVAsyncReader *r = &actx->readers[i];
> +
> +        if (r->started) {
> +            int ret = pthread_join(r->tid, NULL);
> +            if (ret)
> +                av_log(actx, AV_LOG_ERROR, "Unable to join reader #%d: %s\n",
> +                       i, av_err2str(AVERROR(ret)));
> +        }
> +    }
> +    return 0;
> +}
> +
> +void avasync_free(AVAsyncContext **actxp)
> +{
> +    int i, j;
> +    AVAsyncContext *actx = *actxp;
> +
> +    for (i = 0; i < actx->nb_readers; i++) {
> +        AVAsyncReader *r = &actx->readers[i];
> +        av_freep(&r->decoders);
> +        av_freep(&r->name);
> +    }
> +    av_freep(&actx->readers);
> +    av_freep(actxp);
> +}
> diff --git a/libavutil/async.h b/libavutil/async.h
> new file mode 100644
> index 0000000..9c50e79
> --- /dev/null
> +++ b/libavutil/async.h
> @@ -0,0 +1,92 @@
> +/*
> + * This file is part of FFmpeg.
> + *
> + * FFmpeg is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * FFmpeg is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with FFmpeg; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> + */
> +
> +#ifndef AVUTIL_ASYNC_H
> +#define AVUTIL_ASYNC_H
> +
> +#include <libavcodec/avcodec.h>
> +#include <libavutil/threadmessage.h>
> +
> +typedef int (*pull_packet_func_type)(void *priv, AVPacket *pkt);
> +typedef int (*push_frame_func_type)(void *priv, AVFrame *frame);
> +
> +typedef struct AVAsyncDecoder {
> +    const AVClass *class;
> +    AVCodecContext *codec_ctx;
> +    void *priv_data;
> +    push_frame_func_type push_frame_cb;
> +    int pkt_id_match;
> +
> +    int started;
> +    pthread_t tid;
> +    pthread_t watcher_tid;
> +
> +    AVThreadMessageQueue *pkt_queue;
> +    AVThreadMessageQueue *frames_queue;
> +
> +    int max_packets_queue;
> +    int max_frames_queue;
> +
> +} AVAsyncDecoder;
> +
> +typedef struct AVAsyncReader {
> +    const AVClass *class;
> +    char *name;
> +    void *priv_data;
> +    pull_packet_func_type pull_packet_cb;
> +    AVAsyncDecoder *decoders;
> +    int nb_decoders;
> +
> +    int started;
> +    pthread_t tid;
> +
> +    int non_blocking; // TODO: honor
> +
> +} AVAsyncReader;
> +
> +typedef struct AVAsyncContext {
> +    const AVClass *class;
> +    AVAsyncReader *readers;
> +    int nb_readers;
> +} AVAsyncContext;
> +
> +AVAsyncContext *avasync_alloc_context(void);
> +
> +int avasync_register_reader(AVAsyncContext *actx,
> +                            const char *name, void *priv,
> +                            pull_packet_func_type pull_packet_cb,
> +                            AVAsyncReader **r);
> +
> +int avasync_register_decoder(AVAsyncReader *r,
> +                             AVCodecContext *codec_ctx, void *priv,
> +                             push_frame_func_type push_frame_cb,
> +                             int pkt_id_match);
> +
> +int avasync_start(AVAsyncContext *actx);
> +
> +int avasync_lock_decoder(AVAsyncDecoder *d);
> +int avasync_unlock_decoder(AVAsyncDecoder *d);
> +
> +int avasync_lock_reader(AVAsyncReader *r);
> +int avasync_unlock_reader(AVAsyncReader *r);
> +
> +int avasync_wait(AVAsyncContext *actx);
> +
> +void avasync_free(AVAsyncContext **actxp);
> +
> +#endif /* AVUTIL_ASYNC_H */
> --
> 2.6.2
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel at ffmpeg.org
> http://ffmpeg.org/mailman/listinfo/ffmpeg-devel


More information about the ffmpeg-devel mailing list