[FFmpeg-devel] [PATCH v2 02/03] libavdevice/avfoundation.m: Replace mutex-based concurrency handling in avfoundation.m by a thread-safe fifo queue with maximum length.
Marvin Scholz
epirat07 at gmail.com
Mon Dec 13 20:56:47 EET 2021
On 13 Dec 2021, at 17:39, Romain Beauxis wrote:
> This is the second patch of a series of 3 that cleanup and enhance the
> avfoundation implementation for libavdevice.
>
> This patch fixes the concurrency model. Avfoundation runs its own
> producing thread
> to send produced frames and ffmpeg runs its own thread to consume
> them.
>
> The existing implementation stores the last transmitted frame and uses
> a mutex
> to avoid concurrent access. However, this leads to situations where
> upcoming frames
> can be dropped if the ffmpeg thread is acessing the latest frame. This
> happens
> even when the thread would otherwise catch up and process frames fast
> enought.
>
> This patches changes this implementation to use a buffer queue with a
> max queue length
> and encapsulated thread-safety. This greatly simplifies the logic of
> the calling code
> and gives the consuming thread a chance to process all frames
> concurrently to the producing
> thread while avoiding memory leaks.
Couldn't this just use CMSimpleQueue
https://developer.apple.com/documentation/coremedia/cmsimplequeue?language=objc
or CMBufferQueue?
The implementation of the queue in this patch does not seem right, see
review below.
>
> Signed-off-by: Romain Beauxis <toots at rastageeks.org>
> ---
> libavdevice/avfoundation.m | 220 +++++++++++++++++++++----------------
> 1 file changed, 127 insertions(+), 93 deletions(-)
>
> diff --git a/libavdevice/avfoundation.m b/libavdevice/avfoundation.m
> index 79c9207cfa..95414fd16a 100644
> --- a/libavdevice/avfoundation.m
> +++ b/libavdevice/avfoundation.m
> @@ -26,7 +26,6 @@
> */
>
> #import <AVFoundation/AVFoundation.h>
> -#include <pthread.h>
>
> #include "libavutil/channel_layout.h"
> #include "libavutil/pixdesc.h"
> @@ -80,13 +79,97 @@
> { AV_PIX_FMT_NONE, 0 }
> };
>
> +#define MAX_QUEUED_OBJECTS 10
> +
> + at interface AvdeviceAvfoundationBuffer : NSObject
> ++ (AvdeviceAvfoundationBuffer *)
> fromCMSampleBufferRef:(CMSampleBufferRef)sampleBuffer;
> +- (CMSampleBufferRef) getCMSampleBuffer;
> + at end
> +
> + at implementation AvdeviceAvfoundationBuffer {
> + CMSampleBufferRef sampleBuffer;
> +}
> +
> ++ (AvdeviceAvfoundationBuffer *)
> fromCMSampleBufferRef:(CMSampleBufferRef)sampleBuffer {
> + return [[AvdeviceAvfoundationBuffer alloc] init:sampleBuffer];
> +}
> +
> +- (id) init:(CMSampleBufferRef)buffer {
> + sampleBuffer = buffer;
> + return self;
> +}
> +
> +- (CMSampleBufferRef) getCMSampleBuffer {
> + return sampleBuffer;
> +}
> + at end
> +
> + at interface AvdeviceAvfoundationBufferQueue : NSObject
> +- (CMSampleBufferRef) dequeue;
> +- (NSUInteger) count;
> +- (void) enqueue:(CMSampleBufferRef)obj;
> + at end
> +
> + at implementation AvdeviceAvfoundationBufferQueue {
> + NSLock *mutex;
> + NSMutableArray *queue;
> +}
> +
> +- (id) init {
> + mutex = [[[NSLock alloc] init] retain];
> + queue = [[[NSMutableArray alloc] init] retain];
> + return self;
> +}
> +
> +- (oneway void) release {
> + NSEnumerator *enumerator = [queue objectEnumerator];
> + AvdeviceAvfoundationBuffer *buffer;
> +
> + while (buffer = [enumerator nextObject]) {
> + CFRelease([buffer getCMSampleBuffer]);
> + }
> +
> + [mutex release];
> + [queue release];
> +}
Shouldn't this be done in dealloc instead of release?
Especially as retain is not subclassed, so this seems
like it could lead to over-releasing resources.
> +
> +- (NSUInteger) count {
> + [mutex lock];
> + NSUInteger c = [queue count];
> + [mutex unlock];
> + return c;
> +}
This does not look right, the count can change after it is returned
and the caller does not hold a lock to prevent this.
> +
> +- (CMSampleBufferRef) dequeue {
> + [mutex lock];
> +
> + if ([queue count] < 1) {
> + [mutex unlock];
> + return nil;
> + }
> +
> + AvdeviceAvfoundationBuffer *buffer = [queue objectAtIndex:0];
> + CMSampleBufferRef sampleBuffer = [buffer getCMSampleBuffer];
> + [queue removeObjectAtIndex:0];
> + [mutex unlock];
> +
> + return sampleBuffer;
> +}
> +
> +- (void) enqueue:(CMSampleBufferRef)buffer {
> + [mutex lock];
> + while (MAX_QUEUED_OBJECTS < [queue count]) {
> + [queue removeObjectAtIndex:0];
> + }
> + [queue addObject:[AvdeviceAvfoundationBuffer
> fromCMSampleBufferRef:(CMSampleBufferRef)CFRetain(buffer)]];
> + [mutex unlock];
> +}
> + at end
> +
> typedef struct
> {
> AVClass* class;
>
> - int frames_captured;
> - int audio_frames_captured;
> - pthread_mutex_t frame_lock;
> id avf_delegate;
> id avf_audio_delegate;
>
> @@ -121,8 +204,8 @@
> AVCaptureSession *capture_session;
> AVCaptureVideoDataOutput *video_output;
> AVCaptureAudioDataOutput *audio_output;
> - CMSampleBufferRef current_frame;
> - CMSampleBufferRef current_audio_frame;
> + AvdeviceAvfoundationBufferQueue *audio_frames;
> + AvdeviceAvfoundationBufferQueue *video_frames;
>
> AVCaptureDevice *observed_device;
> #if !TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED >= 1070
> @@ -131,16 +214,6 @@
> int observed_quit;
> } AVFContext;
>
> -static void lock_frames(AVFContext* ctx)
> -{
> - pthread_mutex_lock(&ctx->frame_lock);
> -}
> -
> -static void unlock_frames(AVFContext* ctx)
> -{
> - pthread_mutex_unlock(&ctx->frame_lock);
> -}
> -
> /** FrameReciever class - delegate for AVCaptureSession
> */
> @interface AVFFrameReceiver : NSObject
> @@ -218,17 +291,7 @@ - (void) captureOutput:(AVCaptureOutput
> *)captureOutput
> didOutputSampleBuffer:(CMSampleBufferRef)videoFrame
> fromConnection:(AVCaptureConnection *)connection
> {
> - lock_frames(_context);
> -
> - if (_context->current_frame != nil) {
> - CFRelease(_context->current_frame);
> - }
> -
> - _context->current_frame =
> (CMSampleBufferRef)CFRetain(videoFrame);
> -
> - unlock_frames(_context);
> -
> - ++_context->frames_captured;
> + [_context->video_frames enqueue:videoFrame];
> }
>
> @end
> @@ -262,17 +325,7 @@ - (void) captureOutput:(AVCaptureOutput
> *)captureOutput
> didOutputSampleBuffer:(CMSampleBufferRef)audioFrame
> fromConnection:(AVCaptureConnection *)connection
> {
> - lock_frames(_context);
> -
> - if (_context->current_audio_frame != nil) {
> - CFRelease(_context->current_audio_frame);
> - }
> -
> - _context->current_audio_frame =
> (CMSampleBufferRef)CFRetain(audioFrame);
> -
> - unlock_frames(_context);
> -
> - ++_context->audio_frames_captured;
> + [_context->audio_frames enqueue:audioFrame];
> }
>
> @end
> @@ -284,12 +337,16 @@ static void destroy_context(AVFContext* ctx)
> [ctx->capture_session release];
> [ctx->video_output release];
> [ctx->audio_output release];
> + [ctx->video_frames release];
> + [ctx->audio_frames release];
> [ctx->avf_delegate release];
> [ctx->avf_audio_delegate release];
>
> ctx->capture_session = NULL;
> ctx->video_output = NULL;
> ctx->audio_output = NULL;
> + ctx->video_frames = NULL;
> + ctx->audio_frames = NULL;
> ctx->avf_delegate = NULL;
> ctx->avf_audio_delegate = NULL;
>
> @@ -297,12 +354,6 @@ static void destroy_context(AVFContext* ctx)
> AudioConverterDispose(ctx->audio_converter);
> ctx->audio_converter = NULL;
> }
> -
> - pthread_mutex_destroy(&ctx->frame_lock);
> -
> - if (ctx->current_frame) {
> - CFRelease(ctx->current_frame);
> - }
> }
>
> static void parse_device_name(AVFormatContext *s)
> @@ -630,18 +681,18 @@ static int get_video_config(AVFormatContext *s)
> }
>
> // Take stream info from the first frame.
> - while (ctx->frames_captured < 1) {
> + while ([ctx->video_frames count] < 1) {
> CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.1, YES);
> }
>
> - lock_frames(ctx);
> + CMSampleBufferRef frame = [ctx->video_frames dequeue];
>
> ctx->video_stream_index = stream->index;
>
> avpriv_set_pts_info(stream, 64, 1, avf_time_base);
>
> - image_buffer = CMSampleBufferGetImageBuffer(ctx->current_frame);
> - block_buffer = CMSampleBufferGetDataBuffer(ctx->current_frame);
> + image_buffer = CMSampleBufferGetImageBuffer(frame);
> + block_buffer = CMSampleBufferGetDataBuffer(frame);
>
> if (image_buffer) {
> image_buffer_size = CVImageBufferGetEncodedSize(image_buffer);
> @@ -657,10 +708,7 @@ static int get_video_config(AVFormatContext *s)
> stream->codecpar->format = ctx->pixel_format;
> }
>
> - CFRelease(ctx->current_frame);
> - ctx->current_frame = nil;
> -
> - unlock_frames(ctx);
> + CFRelease(frame);
>
> return 0;
> }
> @@ -680,27 +728,27 @@ static int get_audio_config(AVFormatContext *s)
> }
>
> // Take stream info from the first frame.
> - while (ctx->audio_frames_captured < 1) {
> + while ([ctx->audio_frames count] < 1) {
> CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.1, YES);
> }
>
> - lock_frames(ctx);
> + CMSampleBufferRef frame = [ctx->audio_frames dequeue];
>
> ctx->audio_stream_index = stream->index;
>
> avpriv_set_pts_info(stream, 64, 1, avf_time_base);
>
> - format_desc =
> CMSampleBufferGetFormatDescription(ctx->current_audio_frame);
> + format_desc = CMSampleBufferGetFormatDescription(frame);
> const AudioStreamBasicDescription *input_format =
> CMAudioFormatDescriptionGetStreamBasicDescription(format_desc);
>
> if (!input_format) {
> - unlock_frames(ctx);
> + CFRelease(frame);
> av_log(s, AV_LOG_ERROR, "audio format not available\n");
> return 1;
> }
>
> if (input_format->mFormatID != kAudioFormatLinearPCM) {
> - unlock_frames(ctx);
> + CFRelease(frame);
> av_log(s, AV_LOG_ERROR, "only PCM audio format are supported
> at the moment\n");
> return 1;
> }
> @@ -778,16 +826,13 @@ static int get_audio_config(AVFormatContext *s)
> if (must_convert) {
> OSStatus ret = AudioConverterNew(input_format, &output_format,
> &ctx->audio_converter);
> if (ret != noErr) {
> - unlock_frames(ctx);
> + CFRelease(frame);
> av_log(s, AV_LOG_ERROR, "Error while allocating audio
> converter\n");
> return 1;
> }
> }
>
> - CFRelease(ctx->current_audio_frame);
> - ctx->current_audio_frame = nil;
> -
> - unlock_frames(ctx);
> + CFRelease(frame);
>
> return 0;
> }
> @@ -805,8 +850,6 @@ static int avf_read_header(AVFormatContext *s)
>
> ctx->num_video_devices = [devices count] + [devices_muxed count];
>
> - pthread_mutex_init(&ctx->frame_lock, NULL);
> -
> #if !TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED >= 1070
> CGGetActiveDisplayList(0, NULL, &num_screens);
> #endif
> @@ -1006,6 +1049,8 @@ static int avf_read_header(AVFormatContext *s)
>
> // Initialize capture session
> ctx->capture_session = [[AVCaptureSession alloc] init];
> + ctx->video_frames = [[AvdeviceAvfoundationBufferQueue alloc]
> init];
> + ctx->audio_frames = [[AvdeviceAvfoundationBufferQueue alloc]
> init];
>
> if (video_device && add_video_device(s, video_device)) {
> goto fail;
> @@ -1088,35 +1133,31 @@ static int avf_read_packet(AVFormatContext *s,
> AVPacket *pkt)
> AVFContext* ctx = (AVFContext*)s->priv_data;
>
> do {
> - CVImageBufferRef image_buffer;
> - CMBlockBufferRef block_buffer;
> - lock_frames(ctx);
> -
> - if (ctx->current_frame != nil) {
> + if (1 <= [ctx->video_frames count]) {
> int status;
> int length = 0;
> -
> - image_buffer =
> CMSampleBufferGetImageBuffer(ctx->current_frame);
> - block_buffer =
> CMSampleBufferGetDataBuffer(ctx->current_frame);
> + CMSampleBufferRef video_frame = [ctx->video_frames
> dequeue];
> + CVImageBufferRef image_buffer =
> CMSampleBufferGetImageBuffer(video_frame);;
> + CMBlockBufferRef block_buffer =
> CMSampleBufferGetDataBuffer(video_frame);
>
> if (image_buffer != nil) {
> length = (int)CVPixelBufferGetDataSize(image_buffer);
> } else if (block_buffer != nil) {
> length =
> (int)CMBlockBufferGetDataLength(block_buffer);
> } else {
> - unlock_frames(ctx);
> + CFRelease(video_frame);
> return AVERROR(EINVAL);
> }
>
> if (av_new_packet(pkt, length) < 0) {
> - unlock_frames(ctx);
> + CFRelease(video_frame);
> return AVERROR(EIO);
> }
>
> CMItemCount count;
> CMSampleTimingInfo timing_info;
>
> - if
> (CMSampleBufferGetOutputSampleTimingInfoArray(ctx->current_frame, 1,
> &timing_info, &count) == noErr) {
> + if
> (CMSampleBufferGetOutputSampleTimingInfoArray(video_frame, 1,
> &timing_info, &count) == noErr) {
> AVRational timebase_q = av_make_q(1,
> timing_info.presentationTimeStamp.timescale);
> pkt->pts = pkt->dts =
> av_rescale_q(timing_info.presentationTimeStamp.value, timebase_q,
> avf_time_base_q);
> }
> @@ -1133,15 +1174,14 @@ static int avf_read_packet(AVFormatContext *s,
> AVPacket *pkt)
> status = AVERROR(EIO);
> }
> }
> - CFRelease(ctx->current_frame);
> - ctx->current_frame = nil;
> + CFRelease(video_frame);
>
> if (status < 0) {
> - unlock_frames(ctx);
> return status;
> }
> - } else if (ctx->current_audio_frame != nil) {
> - CMBlockBufferRef block_buffer =
> CMSampleBufferGetDataBuffer(ctx->current_audio_frame);
> + } else if (1 <= [ctx->audio_frames count]) {
> + CMSampleBufferRef audio_frame = [ctx->audio_frames
> dequeue];
> + CMBlockBufferRef block_buffer =
> CMSampleBufferGetDataBuffer(audio_frame);
>
> size_t input_size =
> CMBlockBufferGetDataLength(block_buffer);
> int buffer_size = input_size / ctx->audio_buffers;
> @@ -1151,12 +1191,12 @@ static int avf_read_packet(AVFormatContext *s,
> AVPacket *pkt)
> UInt32 size = sizeof(output_size);
> ret = AudioConverterGetProperty(ctx->audio_converter,
> kAudioConverterPropertyCalculateOutputBufferSize, &size,
> &output_size);
> if (ret != noErr) {
> - unlock_frames(ctx);
> + CFRelease(audio_frame);
> return AVERROR(EIO);
> }
>
> if (av_new_packet(pkt, output_size) < 0) {
> - unlock_frames(ctx);
> + CFRelease(audio_frame);
> return AVERROR(EIO);
> }
>
> @@ -1173,7 +1213,7 @@ static int avf_read_packet(AVFormatContext *s,
> AVPacket *pkt)
>
> if (ret != kCMBlockBufferNoErr) {
> av_free(input_buffer);
> - unlock_frames(ctx);
> + CFRelease(audio_frame);
> return AVERROR(EIO);
> }
> }
> @@ -1191,7 +1231,7 @@ static int avf_read_packet(AVFormatContext *s,
> AVPacket *pkt)
> av_free(input_buffer);
>
> if (ret != noErr) {
> - unlock_frames(ctx);
> + CFRelease(audio_frame);
> return AVERROR(EIO);
> }
>
> @@ -1199,7 +1239,7 @@ static int avf_read_packet(AVFormatContext *s,
> AVPacket *pkt)
> } else {
> ret = CMBlockBufferCopyDataBytes(block_buffer, 0,
> pkt->size, pkt->data);
> if (ret != kCMBlockBufferNoErr) {
> - unlock_frames(ctx);
> + CFRelease(audio_frame);
> return AVERROR(EIO);
> }
> }
> @@ -1207,7 +1247,7 @@ static int avf_read_packet(AVFormatContext *s,
> AVPacket *pkt)
> CMItemCount count;
> CMSampleTimingInfo timing_info;
>
> - if
> (CMSampleBufferGetOutputSampleTimingInfoArray(ctx->current_audio_frame,
> 1, &timing_info, &count) == noErr) {
> + if
> (CMSampleBufferGetOutputSampleTimingInfoArray(audio_frame, 1,
> &timing_info, &count) == noErr) {
> AVRational timebase_q = av_make_q(1,
> timing_info.presentationTimeStamp.timescale);
> pkt->pts = pkt->dts =
> av_rescale_q(timing_info.presentationTimeStamp.value, timebase_q,
> avf_time_base_q);
> }
> @@ -1215,21 +1255,15 @@ static int avf_read_packet(AVFormatContext *s,
> AVPacket *pkt)
> pkt->stream_index = ctx->audio_stream_index;
> pkt->flags |= AV_PKT_FLAG_KEY;
>
> - CFRelease(ctx->current_audio_frame);
> - ctx->current_audio_frame = nil;
> -
> - unlock_frames(ctx);
> + CFRelease(audio_frame);
> } else {
> pkt->data = NULL;
> - unlock_frames(ctx);
> if (ctx->observed_quit) {
> return AVERROR_EOF;
> } else {
> return AVERROR(EAGAIN);
> }
> }
> -
> - unlock_frames(ctx);
> } while (!pkt->data);
>
> return 0;
> --
> 2.30.1 (Apple Git-130)
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel at ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request at ffmpeg.org with subject "unsubscribe".
More information about the ffmpeg-devel
mailing list