[Libav-user] avio_alloc_context with multiple threads

Victor dMdB ffmpeg at eml.cc
Mon Jan 2 18:23:53 EET 2017


On Mon, 2 Jan 2017, at 12:47 AM, Charles wrote:
> On 01/01/2017 10:07 AM, Victor dMdB wrote:
> > On Sun, 1 Jan 2017, at 11:04 PM, Charles wrote:
> >> On 01/01/2017 05:22 AM, Victor dMdB wrote:
> >>> If I have multiple streams of distinct data, how would I go about making
> >>> sure data is thread safe in real-time applications?
> >>>
> >>> My code is in C++, so I managed to pass a write function using extern C
> >>> but that makes the function global (ie not thread safe), I tried fixing
> >>> with thread_local but i then ran into other issues.
> >>>
> >>> I then tried using the private data ptr, but then I get a data race,
> >>> because I was just polling with a while loop in another thread, which is
> >>> rather unreliable.
> >>>
> >>> Are there any previous examples of how to do this?
> >>> _______________________________________________
> >>> Libav-user mailing list
> >>> Libav-user at ffmpeg.org
> >>> http://ffmpeg.org/mailman/listinfo/libav-user
> >>
> >> Muxing or Demuxing? Multiple streams are files or actual transport
> >> streams?
> >>
> >> Thanks
> >> Charles
> >
> >
> > In my case it is to mux mp4 files into rtp format, would it be different
> > if it were transport streams?
> 
> It could change they way you want to tackle it.
> 
> First, to make it thread safe you have to use some form of
> synchronization.
> I use a template LockingQueue.hpp
> 
> Code :
>     public :
>     void PushItem( T f_ )
>     {
>        std::lock_guard<std::mutex> lock(m_queue_mtx);
>        m_queue.push( f_ );
>     }
> 
>     T PopItem()
>     {
>        T f_;
>        std::lock_guard<std::mutex> lock(m_queue_mtx);
>        f_ = m_queue.front();
>        m_queue.pop();
>        return f_;
>     }
>     protected :
>     std::queue<T>  m_queue;
>     std::mutex     m_queue_mtx;
> 
> You can google thread safe multiple producers and there are lots of
> different ways to implement.
> 
> Run the stream out as a thread and give it a SendPacket( AvPacket *
> in_pkt ) method that will PushItem( in_pkt )
> 
> NOTE : You may need to do a memcpy depending on how the packets are
> allocated. Do memcpy before PushItem/SendPacket to prevent additional 
> lock time
> 
> In the while loop in the thread check the queue Size() and next_pkt =
> PopItem()
> Then do any ts scaling and stream re-ident and
> av_interleaved_write_frame( m_av_out_fmt_ctx, next_pkt );
> 
> If the input is already in transport streams, then you could just round
> robin the input sockets and just forward each packet as it comes in.
> 
> Thanks
> Charles
> 
> 
> 
> _______________________________________________
> Libav-user mailing list
> Libav-user at ffmpeg.org
> http://ffmpeg.org/mailman/listinfo/libav-user

Ok thanks, I've looked at it a bit more but I seem to still be getting
some memory corruption in my polling thread. So the static function used
with avio_alloc_context might be:

static write(void* a, uint8_t* b, int c) {
	LockingQueue * queue = reinterpret_cast<LockingQueue *>(a);
	queue->PushItem(std::string( b, b + c ));
	return c;
}

while the thread polling the queue would look like:

void pollQueue( std::shared_ptr<LockingQueue> buffer ){
	while(true){
		LockingQueue * queue = buffer.get();
		if( queue != nullptr ){
			if( queue->size() > 0 ){ //adding a size method
			to LockingQueue
				std::string pkt = queue->PopItem();
				// Do what i need with pkt
			}
		}
	}
}

And the setup would look something like : 

	std::shared_ptr<LockingQueue> queue =
	std::make_shared<LockingQueue>();
	std::thread poll = std::thread{ &pollQueue, this, queue };
	avio_alloc_context(avioCtxBuf, avioCtxBuf_size, 1, queue.get(),
	NULL, write, NULL);
	poll.detach();

Would this be a thread-safe example?


More information about the Libav-user mailing list