[FFmpeg-devel] [PATCH v8 01/11] avformat: Add fifo pseudo-muxer

Jan Sebechlebsky sebechlebskyjan at gmail.com
Tue Aug 16 02:05:51 EEST 2016


On 08/15/2016 11:50 PM, Nicolas George wrote:

> L'octidi 28 thermidor, an CCXXIV, sebechlebskyjan at gmail.com a écrit :
[...]
>> +s at item recovery_wait_streamtime @var{bool}
>     ^
> Strange.
>
Sorry, that is obviously a typo. Strange thing is it had not produced 
any kind of warning/error.
[...]
>> +#define FIFO_DEFAULT_QUEUE_SIZE              60
>> +#define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS   0
>> +#define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 second
> Not very useful, but not harmful either.
>
> Nit: plural for seconds.
>
>> +
>> +typedef struct FifoContext {
>> +    const AVClass *class;
>> +    AVFormatContext *avf;
>> +
>> +    char *format;
>> +    AVOutputFormat *oformat;
> Does not seem to be needed in the context, could be a local variable.
You're probably right, I'll check it and make it local variable if so.
>> +    // Check for options unrecognized by underlying muxer
>> +    if (format_options) {
>> +        AVDictionaryEntry *entry = NULL;
>> +        while ((entry = av_dict_get(format_options, "", entry, AV_DICT_IGNORE_SUFFIX)))
>> +            av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
>> +        ret = AVERROR(EINVAL);
>> +    }
> This snippet seems to pop at quite a few places, it could become a helper
> function. But do not consider it necessary for now.
I'll try to keep it in mind and add helper function later.
> [...]
>> +        ctx->last_recovery_ts = pkt->pts;
>> +    } else {
>> +        ctx->last_recovery_ts = av_gettime_relative();
> In my experience, this kind of construct is slightly simpler if you compute
> the next timestamp immediately instead of keeping the last one. I mean,
> instead of this:
>
> 	last_ts = now();
> 	...
> 	sleep(last_ts + delay - now());
>
> you can write this:
>
> 	next_ts = now() + delay;
> 	...
> 	sleep(next_ts - now());
I'll give it a try and see if it is more cleaner in context of this code.
>> +    }
>> +
>> +    if (fifo->max_recovery_attempts &&
>> +        ctx->recovery_nr >= fifo->max_recovery_attempts) {
>> +        av_log(avf, AV_LOG_ERROR,
>> +               "Maximal number of %d recovery attempts reached.\n",
>> +               fifo->max_recovery_attempts);
>> +        ret = err_no;
>> +    } else {
>> +        ret = AVERROR(EAGAIN);
>> +    }
>> +
>> +    return ret;
>> +}
>> +
>> +static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
>> +{
>> +    AVFormatContext *avf = ctx->avf;
>> +    FifoContext *fifo = avf->priv_data;
>> +    AVPacket *pkt = &msg->pkt;
>> +    int64_t time_since_recovery;
>> +    int ret;
>> +
>> +    if (!is_recoverable(fifo, err_no)) {
>> +        ret = err_no;
>> +        goto fail;
>> +    }
>> +
>> +    if (ctx->header_written) {
>> +        fifo->write_trailer_ret = fifo_thread_write_trailer(ctx);
>> +        ctx->header_written = 0;
>> +    }
>> +
>> +    if (!ctx->recovery_nr) {
>> +        ctx->last_recovery_ts = 0;
> AV_NOPTS_VALUE? And maybe an av_assert1() when you use it to be sure it was
> actually set.
I'll take a look at it. Using zero will delay the initial recovery 
attempt if failure happens at
the pts = 0, so treating it specially seems like a good idea.
> [...]
>> +        pthread_mutex_lock(&fifo->overflow_flag_lock);
>> +        if (fifo->overflow_flag) {
>> +            av_thread_message_flush(queue);
>> +            if (fifo->restart_with_keyframe)
>> +                fifo_thread_ctx.drop_until_keyframe = 1;
>> +            fifo->overflow_flag = 0;
>> +            just_flushed = 1;
>> +        }
>> +        pthread_mutex_unlock(&fifo->overflow_flag_lock);
> I wonder if this whole thing could not be made simpler. This is just a
> suggestion, so do not feel obligated to act on it if you lack time.
>
> This has the feel of an out-of-band message. This is a rather useful
> feature. This could be added to the thread message queue rather easily.
I am not sure if I understand this. Do you mean thread queue function 
which would set
certain flag that the queue should be flushed and flushed the queue at 
certain point in time (next receive call?)?
> But in fact, I think it could be made even simpler. See below where
> overflow_flag is set.
>
>> +
>> +        if (just_flushed)
>> +            av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
>> +
>> +        ret = av_thread_message_queue_recv(queue, &msg, 0);
> [...]
>> +        if (ret < 0) {
>> +            av_thread_message_queue_set_err_send(queue, ret);
>> +            break;
> I do not think this is necessary: the only error possible here is the one
> you sent yourself using av_thread_message_queue_set_err_recv().
You're right. I'll remove it.
>> +        }
>> +    }
>> +
>> +    fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
>> +
>> +    return NULL;
>> +}
>> +
>> +static int fifo_mux_init(AVFormatContext *avf)
>> +{
>> +    FifoContext *fifo = avf->priv_data;
>> +    AVFormatContext *avf2;
>> +    int ret = 0, i;
>> +
>> +    ret = avformat_alloc_output_context2(&avf2, fifo->oformat, NULL, NULL);
>> +    if (ret < 0) {
>> +        return ret;
>> +    }
>> +
>> +    fifo->avf = avf2;
>> +
>> +    avf2->interrupt_callback = avf->interrupt_callback;
> You have not documented the fact that the interrupt callback needs to be
> thread-safe. And if users can select the fifo muxer, this becomes a problem.
I will submit another patch adding that to the API documentation. Thanks 
for reminding.
[...]
>> +    if (ret == AVERROR(EAGAIN)) {
>> +        uint8_t overflow_set = 0;
>> +
>> +        /* Queue is full, set fifo->overflow_flag to 1
>> +         * to let consumer thread know the queue should
>> +         * be flushed. */
>> +        pthread_mutex_lock(&fifo->overflow_flag_lock);
>> +        if (!fifo->overflow_flag)
>> +            fifo->overflow_flag = overflow_set = 1;
>> +        pthread_mutex_unlock(&fifo->overflow_flag_lock);
>> +
>> +        if (overflow_set)
>> +            av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
>> +        ret = 0;
>> +        goto fail;
> Can you explain why you do not just drop the packet here and now? If you do
> that and move the discard until keyframe here, you can dispense with the
> overflow flag and the lock. The consumer thread becomes much simpler and can
> focus on the retries themselves. Plus, the whole process becomes slightly
> more efficient since packets are discarded earlier.
>
> Also, why do you flush the whole 60 packets when it blocks? If the output is
> only lagging a little, discarding a single packet could be enough. And if
> not, it will discard the next packets.
The current packet is discarded here. We discussed with Marton how to 
drop packets and agreed that flushing the whole queue should be OK - it 
is done within single critical section in av_thread_queue_flush(), 
flushing just single packet might not be enough in some situations. The 
reason why is it done in worker thread and not in this call is that 
flushing the queue can get quite costly -> dereferencing larger number 
of packets can lead to many free() calls which may be costly, the idea 
was to move potentially costly operation to worker thread.
     It may be a good thing to make this configurable (allow user to set 
how many messages to flush on overflow) and add function to thread queue 
which would flush certain number of messages in single critical section.
> [...]
>> +         AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, 1024, AV_OPT_FLAG_ENCODING_PARAM},
> 1024 seems a bit arbitrary.
I'll change that to INT_MAX.
> [...]
>> +#define LIBAVFORMAT_VERSION_MINOR  47
>>   #define LIBAVFORMAT_VERSION_MICRO 101
> Reset micro when bumping minor.
I'll fix that.

Thanks for review! I'll fix the issues and resend the patch soon.

Regards,
Jan


More information about the ffmpeg-devel mailing list