FFmpeg
fifo.c
Go to the documentation of this file.
1 /*
2  * FIFO pseudo-muxer
3  * Copyright (c) 2016 Jan Sebechlebsky
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public License
9  * as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <stdatomic.h>
23 
24 #include "libavutil/avassert.h"
25 #include "libavutil/opt.h"
26 #include "libavutil/time.h"
27 #include "libavutil/thread.h"
29 #include "avformat.h"
30 #include "internal.h"
31 
32 #define FIFO_DEFAULT_QUEUE_SIZE 60
33 #define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS 0
34 #define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 seconds
35 
36 typedef struct FifoContext {
37  const AVClass *class;
39 
40  char *format;
42 
45 
47 
48  /* Return value of last write_trailer_call */
50 
51  /* Time to wait before next recovery attempt
52  * This can refer to the time in processed stream,
53  * or real time. */
55 
56  /* Maximal number of unsuccessful successive recovery attempts */
58 
59  /* Whether to attempt recovery from failure */
61 
62  /* If >0 stream time will be used when waiting
63  * for the recovery attempt instead of real time */
65 
66  /* If >0 recovery will be attempted regardless of error code
67  * (except AVERROR_EXIT, so exit request is never ignored) */
69 
70  /* Whether to drop packets in case the queue is full. */
72 
73  /* Whether to wait for keyframe when recovering
74  * from failure or queue overflow */
76 
79  /* Value > 0 signals queue overflow */
81 
83  int64_t last_sent_dts;
84  int64_t timeshift;
85 } FifoContext;
86 
87 typedef struct FifoThreadContext {
89 
90  /* Timestamp of last failure.
91  * This is either pts in case stream time is used,
92  * or microseconds as returned by av_getttime_relative() */
94 
95  /* Number of current recovery process
96  * Value > 0 means we are in recovery process */
98 
99  /* If > 0 all frames will be dropped until keyframe is received */
101 
102  /* Value > 0 means that the previous write_header call was successful
103  * so finalization by calling write_trailer and ff_io_close must be done
104  * before exiting / reinitialization of underlying muxer */
106 
109 
110 typedef enum FifoMessageType {
116 
117 typedef struct FifoMessage {
120 } FifoMessage;
121 
123 {
124  AVFormatContext *avf = ctx->avf;
125  FifoContext *fifo = avf->priv_data;
126  AVFormatContext *avf2 = fifo->avf;
127  AVDictionary *format_options = NULL;
128  int ret, i;
129 
130  ret = av_dict_copy(&format_options, fifo->format_options, 0);
131  if (ret < 0)
132  return ret;
133 
134  ret = ff_format_output_open(avf2, avf->url, &format_options);
135  if (ret < 0) {
136  av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n", avf->url,
137  av_err2str(ret));
138  goto end;
139  }
140 
141  for (i = 0;i < avf2->nb_streams; i++)
142  avf2->streams[i]->cur_dts = 0;
143 
144  ret = avformat_write_header(avf2, &format_options);
145  if (!ret)
146  ctx->header_written = 1;
147 
148  // Check for options unrecognized by underlying muxer
149  if (format_options) {
150  AVDictionaryEntry *entry = NULL;
151  while ((entry = av_dict_get(format_options, "", entry, AV_DICT_IGNORE_SUFFIX)))
152  av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
153  ret = AVERROR(EINVAL);
154  }
155 
156 end:
157  av_dict_free(&format_options);
158  return ret;
159 }
160 
162 {
163  AVFormatContext *avf = ctx->avf;
164  FifoContext *fifo = avf->priv_data;
165  AVFormatContext *avf2 = fifo->avf;
166 
167  return av_write_frame(avf2, NULL);
168 }
169 
170 static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t *last_dts)
171 {
172  AVStream *st = avf->streams[pkt->stream_index];
173  int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q);
174  int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts - *last_dts);
175  *last_dts = dts;
176  return duration;
177 }
178 
180 {
181  AVFormatContext *avf = ctx->avf;
182  FifoContext *fifo = avf->priv_data;
183  AVFormatContext *avf2 = fifo->avf;
184  AVRational src_tb, dst_tb;
185  int ret, s_idx;
186 
187  if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
188  atomic_fetch_sub_explicit(&fifo->queue_duration, next_duration(avf, pkt, &ctx->last_received_dts), memory_order_relaxed);
189 
190  if (ctx->drop_until_keyframe) {
191  if (pkt->flags & AV_PKT_FLAG_KEY) {
192  ctx->drop_until_keyframe = 0;
193  av_log(avf, AV_LOG_VERBOSE, "Keyframe received, recovering...\n");
194  } else {
195  av_log(avf, AV_LOG_VERBOSE, "Dropping non-keyframe packet\n");
197  return 0;
198  }
199  }
200 
201  s_idx = pkt->stream_index;
202  src_tb = avf->streams[s_idx]->time_base;
203  dst_tb = avf2->streams[s_idx]->time_base;
204  av_packet_rescale_ts(pkt, src_tb, dst_tb);
205 
206  ret = av_write_frame(avf2, pkt);
207  if (ret >= 0)
209  return ret;
210 }
211 
213 {
214  AVFormatContext *avf = ctx->avf;
215  FifoContext *fifo = avf->priv_data;
216  AVFormatContext *avf2 = fifo->avf;
217  int ret;
218 
219  if (!ctx->header_written)
220  return 0;
221 
222  ret = av_write_trailer(avf2);
223  ff_format_io_close(avf2, &avf2->pb);
224 
225  return ret;
226 }
227 
229 {
230  int ret = AVERROR(EINVAL);
231 
232  if (msg->type == FIFO_NOOP)
233  return 0;
234 
235  if (!ctx->header_written) {
237  if (ret < 0)
238  return ret;
239  }
240 
241  switch(msg->type) {
242  case FIFO_WRITE_HEADER:
243  av_assert0(ret >= 0);
244  return ret;
245  case FIFO_WRITE_PACKET:
246  return fifo_thread_write_packet(ctx, &msg->pkt);
247  case FIFO_FLUSH_OUTPUT:
249  }
250 
251  av_assert0(0);
252  return AVERROR(EINVAL);
253 }
254 
255 static int is_recoverable(const FifoContext *fifo, int err_no) {
256  if (!fifo->attempt_recovery)
257  return 0;
258 
259  if (fifo->recover_any_error)
260  return err_no != AVERROR_EXIT;
261 
262  switch (err_no) {
263  case AVERROR(EINVAL):
264  case AVERROR(ENOSYS):
265  case AVERROR_EOF:
266  case AVERROR_EXIT:
268  return 0;
269  default:
270  return 1;
271  }
272 }
273 
274 static void free_message(void *msg)
275 {
276  FifoMessage *fifo_msg = msg;
277 
278  if (fifo_msg->type == FIFO_WRITE_PACKET)
279  av_packet_unref(&fifo_msg->pkt);
280 }
281 
283  int err_no)
284 {
285  AVFormatContext *avf = ctx->avf;
286  FifoContext *fifo = avf->priv_data;
287  int ret;
288 
289  av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n",
290  av_err2str(err_no));
291 
292  if (fifo->recovery_wait_streamtime) {
293  if (pkt->pts == AV_NOPTS_VALUE)
294  av_log(avf, AV_LOG_WARNING, "Packet does not contain presentation"
295  " timestamp, recovery will be attempted immediately");
296  ctx->last_recovery_ts = pkt->pts;
297  } else {
298  ctx->last_recovery_ts = av_gettime_relative();
299  }
300 
301  if (fifo->max_recovery_attempts &&
302  ctx->recovery_nr >= fifo->max_recovery_attempts) {
303  av_log(avf, AV_LOG_ERROR,
304  "Maximal number of %d recovery attempts reached.\n",
305  fifo->max_recovery_attempts);
306  ret = err_no;
307  } else {
308  ret = AVERROR(EAGAIN);
309  }
310 
311  return ret;
312 }
313 
315 {
316  AVFormatContext *avf = ctx->avf;
317  FifoContext *fifo = avf->priv_data;
318  AVPacket *pkt = &msg->pkt;
319  int64_t time_since_recovery;
320  int ret;
321 
322  if (!is_recoverable(fifo, err_no)) {
323  ret = err_no;
324  goto fail;
325  }
326 
327  if (ctx->header_written) {
329  ctx->header_written = 0;
330  }
331 
332  if (!ctx->recovery_nr) {
333  ctx->last_recovery_ts = fifo->recovery_wait_streamtime ?
334  AV_NOPTS_VALUE : 0;
335  } else {
336  if (fifo->recovery_wait_streamtime) {
337  if (ctx->last_recovery_ts == AV_NOPTS_VALUE) {
339  time_since_recovery = av_rescale_q(pkt->pts - ctx->last_recovery_ts,
340  tb, AV_TIME_BASE_Q);
341  } else {
342  /* Enforce recovery immediately */
343  time_since_recovery = fifo->recovery_wait_time;
344  }
345  } else {
346  time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
347  }
348 
349  if (time_since_recovery < fifo->recovery_wait_time)
350  return AVERROR(EAGAIN);
351  }
352 
353  ctx->recovery_nr++;
354 
355  if (fifo->max_recovery_attempts) {
356  av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d/%d\n",
357  ctx->recovery_nr, fifo->max_recovery_attempts);
358  } else {
359  av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d\n",
360  ctx->recovery_nr);
361  }
362 
363  if (fifo->restart_with_keyframe && fifo->drop_pkts_on_overflow)
364  ctx->drop_until_keyframe = 1;
365 
367  if (ret < 0) {
368  if (is_recoverable(fifo, ret)) {
370  } else {
371  goto fail;
372  }
373  } else {
374  av_log(avf, AV_LOG_INFO, "Recovery successful\n");
375  ctx->recovery_nr = 0;
376  }
377 
378  return 0;
379 
380 fail:
381  free_message(msg);
382  return ret;
383 }
384 
385 static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
386 {
387  AVFormatContext *avf = ctx->avf;
388  FifoContext *fifo = avf->priv_data;
389  int ret;
390 
391  do {
392  if (!fifo->recovery_wait_streamtime && ctx->recovery_nr > 0) {
393  int64_t time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
394  int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery);
395  if (time_to_wait)
396  av_usleep(FFMIN(10000, time_to_wait));
397  }
398 
399  ret = fifo_thread_attempt_recovery(ctx, msg, err_no);
400  } while (ret == AVERROR(EAGAIN) && !fifo->drop_pkts_on_overflow);
401 
402  if (ret == AVERROR(EAGAIN) && fifo->drop_pkts_on_overflow) {
403  if (msg->type == FIFO_WRITE_PACKET)
404  av_packet_unref(&msg->pkt);
405  ret = 0;
406  }
407 
408  return ret;
409 }
410 
411 static void *fifo_consumer_thread(void *data)
412 {
413  AVFormatContext *avf = data;
414  FifoContext *fifo = avf->priv_data;
415  AVThreadMessageQueue *queue = fifo->queue;
416  FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER, {0}};
417  int ret;
418 
419  FifoThreadContext fifo_thread_ctx;
420  memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
421  fifo_thread_ctx.avf = avf;
422  fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE;
423 
424  while (1) {
425  uint8_t just_flushed = 0;
426 
427  if (!fifo_thread_ctx.recovery_nr)
428  ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg);
429 
430  if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) {
431  int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret);
432  if (rec_ret < 0) {
433  av_thread_message_queue_set_err_send(queue, rec_ret);
434  break;
435  }
436  }
437 
438  /* If the queue is full at the moment when fifo_write_packet
439  * attempts to insert new message (packet) to the queue,
440  * it sets the fifo->overflow_flag to 1 and drops packet.
441  * Here in consumer thread, the flag is checked and if it is
442  * set, the queue is flushed and flag cleared. */
444  if (fifo->overflow_flag) {
446  if (fifo->restart_with_keyframe)
447  fifo_thread_ctx.drop_until_keyframe = 1;
448  fifo->overflow_flag = 0;
449  just_flushed = 1;
450  }
452 
453  if (just_flushed)
454  av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
455 
456  if (fifo->timeshift)
457  while (atomic_load_explicit(&fifo->queue_duration, memory_order_relaxed) < fifo->timeshift)
458  av_usleep(10000);
459 
460  ret = av_thread_message_queue_recv(queue, &msg, 0);
461  if (ret < 0) {
463  break;
464  }
465  }
466 
467  fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
468 
469  return NULL;
470 }
471 
473  const char *filename)
474 {
475  FifoContext *fifo = avf->priv_data;
476  AVFormatContext *avf2;
477  int ret = 0, i;
478 
479  ret = avformat_alloc_output_context2(&avf2, oformat, NULL, filename);
480  if (ret < 0)
481  return ret;
482 
483  fifo->avf = avf2;
484 
486  avf2->max_delay = avf->max_delay;
487  ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
488  if (ret < 0)
489  return ret;
490  avf2->opaque = avf->opaque;
491  avf2->io_close = avf->io_close;
492  avf2->io_open = avf->io_open;
493  avf2->flags = avf->flags;
494 
495  for (i = 0; i < avf->nb_streams; ++i) {
496  AVStream *st = avformat_new_stream(avf2, NULL);
497  if (!st)
498  return AVERROR(ENOMEM);
499 
501  if (ret < 0)
502  return ret;
503  }
504 
505  return 0;
506 }
507 
508 static int fifo_init(AVFormatContext *avf)
509 {
510  FifoContext *fifo = avf->priv_data;
511  ff_const59 AVOutputFormat *oformat;
512  int ret = 0;
513 
514  if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) {
515  av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on"
516  " only when drop_pkts_on_overflow is also turned on\n");
517  return AVERROR(EINVAL);
518  }
519  atomic_init(&fifo->queue_duration, 0);
521 
522  oformat = av_guess_format(fifo->format, avf->url, NULL);
523  if (!oformat) {
525  return ret;
526  }
527 
528  ret = fifo_mux_init(avf, oformat, avf->url);
529  if (ret < 0)
530  return ret;
531 
532  ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size,
533  sizeof(FifoMessage));
534  if (ret < 0)
535  return ret;
536 
538 
540  if (ret < 0)
541  return AVERROR(ret);
543 
544  return 0;
545 }
546 
548 {
549  FifoContext * fifo = avf->priv_data;
550  int ret;
551 
553  if (ret) {
554  av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n",
556  ret = AVERROR(ret);
557  }
558 
559  return ret;
560 }
561 
563 {
564  FifoContext *fifo = avf->priv_data;
566  int ret;
567 
568  if (pkt) {
569  ret = av_packet_ref(&msg.pkt,pkt);
570  if (ret < 0)
571  return ret;
572  }
573 
574  ret = av_thread_message_queue_send(fifo->queue, &msg,
575  fifo->drop_pkts_on_overflow ?
577  if (ret == AVERROR(EAGAIN)) {
578  uint8_t overflow_set = 0;
579 
580  /* Queue is full, set fifo->overflow_flag to 1
581  * to let consumer thread know the queue should
582  * be flushed. */
584  if (!fifo->overflow_flag)
585  fifo->overflow_flag = overflow_set = 1;
587 
588  if (overflow_set)
589  av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
590  ret = 0;
591  goto fail;
592  } else if (ret < 0) {
593  goto fail;
594  }
595 
596  if (fifo->timeshift && pkt && pkt->dts != AV_NOPTS_VALUE)
597  atomic_fetch_add_explicit(&fifo->queue_duration, next_duration(avf, pkt, &fifo->last_sent_dts), memory_order_relaxed);
598 
599  return ret;
600 fail:
601  if (pkt)
602  av_packet_unref(&msg.pkt);
603  return ret;
604 }
605 
607 {
608  FifoContext *fifo= avf->priv_data;
609  int ret;
610 
612  if (fifo->timeshift) {
613  int64_t now = av_gettime_relative();
614  int64_t elapsed = 0;
615  FifoMessage msg = {FIFO_NOOP};
616  do {
617  int64_t delay = av_gettime_relative() - now;
618  if (delay < 0) { // Discontinuity?
619  delay = 10000;
620  now = av_gettime_relative();
621  } else {
622  now += delay;
623  }
624  atomic_fetch_add_explicit(&fifo->queue_duration, delay, memory_order_relaxed);
625  elapsed += delay;
626  if (elapsed > fifo->timeshift)
627  break;
628  av_usleep(10000);
630  } while (ret >= 0 || ret == AVERROR(EAGAIN));
631  atomic_store(&fifo->queue_duration, INT64_MAX);
632  }
633 
635  if (ret < 0) {
636  av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
638  return AVERROR(ret);
639  }
640 
641  ret = fifo->write_trailer_ret;
642  return ret;
643 }
644 
645 static void fifo_deinit(AVFormatContext *avf)
646 {
647  FifoContext *fifo = avf->priv_data;
648 
649  avformat_free_context(fifo->avf);
653 }
654 
655 #define OFFSET(x) offsetof(FifoContext, x)
656 static const AVOption options[] = {
657  {"fifo_format", "Target muxer", OFFSET(format),
659 
660  {"queue_size", "Size of fifo queue", OFFSET(queue_size),
662 
663  {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options),
665 
666  {"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow),
667  AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
668 
669  {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe),
670  AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
671 
672  {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery),
673  AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
674 
675  {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts),
677 
678  {"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time),
680 
681  {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery",
682  OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
683 
684  {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
685  AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
686 
687  {"timeshift", "Delay fifo output", OFFSET(timeshift),
688  AV_OPT_TYPE_DURATION, {.i64 = 0}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
689 
690  {NULL},
691 };
692 
693 static const AVClass fifo_muxer_class = {
694  .class_name = "Fifo muxer",
695  .item_name = av_default_item_name,
696  .option = options,
697  .version = LIBAVUTIL_VERSION_INT,
698 };
699 
701  .name = "fifo",
702  .long_name = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"),
703  .priv_data_size = sizeof(FifoContext),
704  .init = fifo_init,
708  .deinit = fifo_deinit,
709  .priv_class = &fifo_muxer_class,
711 };
pthread_mutex_t
_fmutex pthread_mutex_t
Definition: os2threads.h:53
av_packet_unref
void av_packet_unref(AVPacket *pkt)
Wipe the packet.
Definition: avpacket.c:634
pthread_join
static av_always_inline int pthread_join(pthread_t thread, void **value_ptr)
Definition: os2threads.h:94
av_gettime_relative
int64_t av_gettime_relative(void)
Get the current time in microseconds since some unspecified starting point.
Definition: time.c:56
fifo_init
static int fifo_init(AVFormatContext *avf)
Definition: fifo.c:508
AV_LOG_WARNING
#define AV_LOG_WARNING
Something somehow does not look correct.
Definition: log.h:200
fifo_thread_flush_output
static int fifo_thread_flush_output(FifoThreadContext *ctx)
Definition: fifo.c:161
atomic_store
#define atomic_store(object, desired)
Definition: stdatomic.h:85
AVOutputFormat::name
const char * name
Definition: avformat.h:491
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
FifoContext::write_trailer_ret
int write_trailer_ret
Definition: fifo.c:49
opt.h
avformat_new_stream
AVStream * avformat_new_stream(AVFormatContext *s, const AVCodec *c)
Add a new stream to a media file.
Definition: utils.c:4509
FifoContext::queue
AVThreadMessageQueue * queue
Definition: fifo.c:44
FIFO_WRITE_PACKET
@ FIFO_WRITE_PACKET
Definition: fifo.c:113
FifoContext::avf
AVFormatContext * avf
Definition: fifo.c:38
thread.h
AVERROR_EOF
#define AVERROR_EOF
End of file.
Definition: error.h:55
pthread_mutex_init
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
ff_format_output_open
int ff_format_output_open(AVFormatContext *s, const char *url, AVDictionary **options)
Utility function to open IO stream of output format.
Definition: utils.c:5682
fifo_deinit
static void fifo_deinit(AVFormatContext *avf)
Definition: fifo.c:645
FifoContext
Definition: fifo.c:38
AV_TIME_BASE_Q
#define AV_TIME_BASE_Q
Internal time base represented as fractional value.
Definition: avutil.h:260
fifo_write_packet
static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
Definition: fifo.c:562
AV_THREAD_MESSAGE_NONBLOCK
@ AV_THREAD_MESSAGE_NONBLOCK
Perform non-blocking operation.
Definition: threadmessage.h:31
AVFormatContext::streams
AVStream ** streams
A list of all streams in the file.
Definition: avformat.h:1300
free_message
static void free_message(void *msg)
Definition: fifo.c:274
FifoContext::format_options
AVDictionary * format_options
Definition: fifo.c:41
AVOption
AVOption.
Definition: opt.h:248
FIFO_NOOP
@ FIFO_NOOP
Definition: fifo.c:111
data
const char data[16]
Definition: mxf.c:142
AV_OPT_TYPE_DURATION
@ AV_OPT_TYPE_DURATION
Definition: opt.h:239
AVStream::cur_dts
int64_t cur_dts
Definition: avformat.h:1066
AV_DICT_IGNORE_SUFFIX
#define AV_DICT_IGNORE_SUFFIX
Return first entry in a dictionary whose first part corresponds to the search key,...
Definition: dict.h:70
AV_LOG_VERBOSE
#define AV_LOG_VERBOSE
Detailed information.
Definition: log.h:210
FifoThreadContext::drop_until_keyframe
uint8_t drop_until_keyframe
Definition: fifo.c:100
AVDictionary
Definition: dict.c:30
FIFO_WRITE_HEADER
@ FIFO_WRITE_HEADER
Definition: fifo.c:112
AV_PKT_FLAG_KEY
#define AV_PKT_FLAG_KEY
The packet contains a keyframe.
Definition: packet.h:410
FifoContext::overflow_flag
volatile uint8_t overflow_flag
Definition: fifo.c:80
ff_const59
#define ff_const59
The ff_const59 define is not part of the public API and will be removed without further warning.
Definition: avformat.h:535
av_guess_format
ff_const59 AVOutputFormat * av_guess_format(const char *short_name, const char *filename, const char *mime_type)
Return the output format in the list of registered output formats which best matches the provided par...
Definition: format.c:51
AVFormatContext::interrupt_callback
AVIOInterruptCB interrupt_callback
Custom interrupt callbacks for the I/O layer.
Definition: avformat.h:1512
FifoContext::last_sent_dts
int64_t last_sent_dts
Definition: fifo.c:83
FifoMessage::type
FifoMessageType type
Definition: fifo.c:118
fail
#define fail()
Definition: checkasm.h:133
FifoMessage
Definition: fifo.c:117
FifoContext::queue_size
int queue_size
Definition: fifo.c:43
av_thread_message_queue_recv
int av_thread_message_queue_recv(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Receive a message from the queue.
Definition: threadmessage.c:172
is_recoverable
static int is_recoverable(const FifoContext *fifo, int err_no)
Definition: fifo.c:255
avassert.h
fifo_thread_process_recovery_failure
static int fifo_thread_process_recovery_failure(FifoThreadContext *ctx, AVPacket *pkt, int err_no)
Definition: fifo.c:282
pkt
AVPacket * pkt
Definition: movenc.c:59
AV_LOG_ERROR
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:194
AVFormatContext::metadata
AVDictionary * metadata
Metadata that applies to the whole file.
Definition: avformat.h:1474
av_thread_message_queue_send
int av_thread_message_queue_send(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Send a message on the queue.
Definition: threadmessage.c:156
av_thread_message_flush
void av_thread_message_flush(AVThreadMessageQueue *mq)
Flush the message queue.
Definition: threadmessage.c:218
duration
int64_t duration
Definition: movenc.c:64
av_dict_get
AVDictionaryEntry * av_dict_get(const AVDictionary *m, const char *key, const AVDictionaryEntry *prev, int flags)
Get a dictionary entry with matching key.
Definition: dict.c:40
FifoContext::overflow_flag_lock
pthread_mutex_t overflow_flag_lock
Definition: fifo.c:77
AV_OPT_FLAG_ENCODING_PARAM
#define AV_OPT_FLAG_ENCODING_PARAM
a generic parameter which can be set by the user for muxing or encoding
Definition: opt.h:278
FifoContext::max_recovery_attempts
int max_recovery_attempts
Definition: fifo.c:57
AVFormatContext::flags
int flags
Flags modifying the (de)muxer behaviour.
Definition: avformat.h:1363
format
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample format(the sample packing is implied by the sample format) and sample rate. The lists are not just lists
AVDictionaryEntry::key
char * key
Definition: dict.h:82
fifo_thread_write_packet
static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
Definition: fifo.c:179
options
static const AVOption options[]
Definition: fifo.c:656
OFFSET
#define OFFSET(x)
Definition: fifo.c:655
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:37
ctx
AVFormatContext * ctx
Definition: movenc.c:48
av_rescale_q
int64_t av_rescale_q(int64_t a, AVRational bq, AVRational cq)
Rescale a 64-bit integer by 2 rational numbers.
Definition: mathematics.c:142
FIFO_DEFAULT_QUEUE_SIZE
#define FIFO_DEFAULT_QUEUE_SIZE
Definition: fifo.c:32
AVFormatContext::opaque
void * opaque
User data.
Definition: avformat.h:1752
AVThreadMessageQueue
Definition: threadmessage.c:25
fifo_thread_write_trailer
static int fifo_thread_write_trailer(FifoThreadContext *ctx)
Definition: fifo.c:212
av_usleep
int av_usleep(unsigned usec)
Sleep for a period of time.
Definition: time.c:84
avformat_write_header
av_warn_unused_result int avformat_write_header(AVFormatContext *s, AVDictionary **options)
Allocate the stream private data and write the stream header to an output media file.
Definition: mux.c:506
pthread_create
static av_always_inline int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
Definition: os2threads.h:80
AVFormatContext
Format I/O context.
Definition: avformat.h:1232
internal.h
FifoThreadContext
Definition: fifo.c:87
LIBAVUTIL_VERSION_INT
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
AVClass
Describe the class of an AVClass context structure.
Definition: log.h:67
AVStream::time_base
AVRational time_base
This is the fundamental unit of time (in seconds) in terms of which frame timestamps are represented.
Definition: avformat.h:902
NULL
#define NULL
Definition: coverity.c:32
AVERROR_PATCHWELCOME
#define AVERROR_PATCHWELCOME
Not yet implemented in FFmpeg, patches welcome.
Definition: error.h:62
write_trailer
static int write_trailer(AVFormatContext *s1)
Definition: v4l2enc.c:98
AVRational
Rational number (pair of numerator and denominator).
Definition: rational.h:58
AV_OPT_TYPE_DICT
@ AV_OPT_TYPE_DICT
Definition: opt.h:232
av_default_item_name
const char * av_default_item_name(void *ptr)
Return the context name.
Definition: log.c:235
AVFormatContext::pb
AVIOContext * pb
I/O context.
Definition: avformat.h:1274
fifo_write_header
static int fifo_write_header(AVFormatContext *avf)
Definition: fifo.c:547
fifo_thread_dispatch_message
static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg)
Definition: fifo.c:228
time.h
av_write_frame
int av_write_frame(AVFormatContext *s, AVPacket *pkt)
Write a packet to an output media file.
Definition: mux.c:1212
av_packet_ref
int av_packet_ref(AVPacket *dst, const AVPacket *src)
Setup a new reference to the data described by a given packet.
Definition: avpacket.c:641
atomic_fetch_sub_explicit
#define atomic_fetch_sub_explicit(object, operand, order)
Definition: stdatomic.h:152
atomic_load_explicit
#define atomic_load_explicit(object, order)
Definition: stdatomic.h:96
fifo_mux_init
static int fifo_mux_init(AVFormatContext *avf, ff_const59 AVOutputFormat *oformat, const char *filename)
Definition: fifo.c:472
FifoContext::writer_thread
pthread_t writer_thread
Definition: fifo.c:46
pthread_mutex_unlock
#define pthread_mutex_unlock(a)
Definition: ffprobe.c:67
AVFormatContext::nb_streams
unsigned int nb_streams
Number of elements in AVFormatContext.streams.
Definition: avformat.h:1288
FifoContext::recover_any_error
int recover_any_error
Definition: fifo.c:68
FifoThreadContext::last_received_dts
int64_t last_received_dts
Definition: fifo.c:107
NULL_IF_CONFIG_SMALL
#define NULL_IF_CONFIG_SMALL(x)
Return NULL if CONFIG_SMALL is true, otherwise the argument without modification.
Definition: internal.h:117
fifo_consumer_thread
static void * fifo_consumer_thread(void *data)
Definition: fifo.c:411
threadmessage.h
av_err2str
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
Definition: error.h:119
FifoContext::drop_pkts_on_overflow
int drop_pkts_on_overflow
Definition: fifo.c:71
FFMAX
#define FFMAX(a, b)
Definition: common.h:103
FifoThreadContext::last_recovery_ts
int64_t last_recovery_ts
Definition: fifo.c:93
AVFormatContext::url
char * url
input or output URL.
Definition: avformat.h:1328
atomic_fetch_add_explicit
#define atomic_fetch_add_explicit(object, operand, order)
Definition: stdatomic.h:149
AV_NOPTS_VALUE
#define AV_NOPTS_VALUE
Undefined timestamp value.
Definition: avutil.h:248
AVFMT_ALLOW_FLUSH
#define AVFMT_ALLOW_FLUSH
Format allows flushing.
Definition: avformat.h:471
AVFMT_NOFILE
#define AVFMT_NOFILE
Demuxer will use avio_open, no opened file should be provided by the caller.
Definition: avformat.h:458
FifoContext::timeshift
int64_t timeshift
Definition: fifo.c:84
AVPacket::dts
int64_t dts
Decompression timestamp in AVStream->time_base units; the time at which the packet is decompressed.
Definition: packet.h:368
FifoMessageType
FifoMessageType
Definition: fifo.c:110
FFMIN
#define FFMIN(a, b)
Definition: common.h:105
FifoContext::recovery_wait_streamtime
int recovery_wait_streamtime
Definition: fifo.c:64
AVPacket::flags
int flags
A combination of AV_PKT_FLAG values.
Definition: packet.h:375
av_dict_free
void av_dict_free(AVDictionary **pm)
Free all the memory allocated for an AVDictionary struct and all keys and values.
Definition: dict.c:203
avformat_alloc_output_context2
int avformat_alloc_output_context2(AVFormatContext **ctx, ff_const59 AVOutputFormat *oformat, const char *format_name, const char *filename)
Allocate an AVFormatContext for an output format.
Definition: mux.c:136
av_packet_rescale_ts
void av_packet_rescale_ts(AVPacket *pkt, AVRational src_tb, AVRational dst_tb)
Convert valid timing fields (timestamps / durations) in a packet from one timebase to another.
Definition: avpacket.c:737
pthread_t
Definition: os2threads.h:44
FifoContext::queue_duration
atomic_int_least64_t queue_duration
Definition: fifo.c:82
AV_LOG_INFO
#define AV_LOG_INFO
Standard information.
Definition: log.h:205
av_thread_message_queue_alloc
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, unsigned nelem, unsigned elsize)
Allocate a new message queue.
Definition: threadmessage.c:40
pthread_mutex_destroy
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
write_packet
static void write_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int unqueue)
Definition: ffmpeg.c:729
av_write_trailer
int av_write_trailer(AVFormatContext *s)
Write the stream trailer to an output media file and free the file private data.
Definition: mux.c:1274
FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS
#define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS
Definition: fifo.c:33
FifoMessage::pkt
AVPacket pkt
Definition: fifo.c:119
i
int i
Definition: input.c:407
AVOutputFormat
Definition: avformat.h:490
AVPacket::pts
int64_t pts
Presentation timestamp in AVStream->time_base units; the time at which the decompressed packet will b...
Definition: packet.h:362
AVERROR_MUXER_NOT_FOUND
#define AVERROR_MUXER_NOT_FOUND
Muxer not found.
Definition: error.h:60
av_thread_message_queue_set_err_send
void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq, int err)
Set the sending error code.
Definition: threadmessage.c:188
uint8_t
uint8_t
Definition: audio_convert.c:194
AVFormatContext::max_delay
int max_delay
Definition: avformat.h:1357
tb
#define tb
Definition: regdef.h:68
FifoThreadContext::recovery_nr
int recovery_nr
Definition: fifo.c:97
AVFMT_TS_NEGATIVE
#define AVFMT_TS_NEGATIVE
Format allows muxing negative timestamps.
Definition: avformat.h:475
atomic_int_least64_t
intptr_t atomic_int_least64_t
Definition: stdatomic.h:68
ret
ret
Definition: filter_design.txt:187
AVStream
Stream structure.
Definition: avformat.h:873
ff_fifo_muxer
AVOutputFormat ff_fifo_muxer
Definition: fifo.c:700
AVClass::class_name
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:72
FifoContext::restart_with_keyframe
int restart_with_keyframe
Definition: fifo.c:75
avformat.h
ff_stream_encode_params_copy
int ff_stream_encode_params_copy(AVStream *dst, const AVStream *src)
Copy encoding parameters from source to destination stream.
Definition: utils.c:4316
fifo_thread_write_header
static int fifo_thread_write_header(FifoThreadContext *ctx)
Definition: fifo.c:122
AV_OPT_TYPE_INT
@ AV_OPT_TYPE_INT
Definition: opt.h:225
FifoThreadContext::avf
AVFormatContext * avf
Definition: fifo.c:88
avformat_free_context
void avformat_free_context(AVFormatContext *s)
Free an AVFormatContext and all its streams.
Definition: utils.c:4436
fifo_muxer_class
static const AVClass fifo_muxer_class
Definition: fifo.c:693
AVFormatContext::io_close
void(* io_close)(struct AVFormatContext *s, AVIOContext *pb)
A callback for closing the streams opened with AVFormatContext.io_open().
Definition: avformat.h:1834
AVFormatContext::io_open
int(* io_open)(struct AVFormatContext *s, AVIOContext **pb, const char *url, int flags, AVDictionary **options)
A callback for opening new IO streams.
Definition: avformat.h:1828
AVPacket::stream_index
int stream_index
Definition: packet.h:371
ff_format_io_close
void ff_format_io_close(AVFormatContext *s, AVIOContext **pb)
Definition: utils.c:5692
next_duration
static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t *last_dts)
Definition: fifo.c:170
FifoContext::attempt_recovery
int attempt_recovery
Definition: fifo.c:60
FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC
#define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC
Definition: fifo.c:34
fifo_thread_recover
static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
Definition: fifo.c:385
AVDictionaryEntry
Definition: dict.h:81
fifo_write_trailer
static int fifo_write_trailer(AVFormatContext *avf)
Definition: fifo.c:606
AVPacket
This structure stores compressed data.
Definition: packet.h:346
AV_OPT_TYPE_BOOL
@ AV_OPT_TYPE_BOOL
Definition: opt.h:242
av_thread_message_queue_free
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
Free a message queue.
Definition: threadmessage.c:91
av_dict_copy
int av_dict_copy(AVDictionary **dst, const AVDictionary *src, int flags)
Copy entries from one AVDictionary struct into another.
Definition: dict.c:217
fifo_thread_attempt_recovery
static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
Definition: fifo.c:314
flags
#define flags(name, subs,...)
Definition: cbs_av1.c:561
av_thread_message_queue_set_err_recv
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err)
Set the receiving error code.
Definition: threadmessage.c:199
av_thread_message_queue_set_free_func
void av_thread_message_queue_set_free_func(AVThreadMessageQueue *mq, void(*free_func)(void *msg))
Set the optional free message callback function which will be called if an operation is removing mess...
Definition: threadmessage.c:83
av_log
#define av_log(a,...)
Definition: tableprint_vlc.h:28
AVERROR_EXIT
#define AVERROR_EXIT
Immediate exit was requested; the called function should not be restarted.
Definition: error.h:56
FifoContext::recovery_wait_time
int64_t recovery_wait_time
Definition: fifo.c:54
atomic_init
#define atomic_init(obj, value)
Definition: stdatomic.h:33
AV_OPT_TYPE_STRING
@ AV_OPT_TYPE_STRING
Definition: opt.h:229
write_header
static void write_header(FFV1Context *f)
Definition: ffv1enc.c:346
init
static av_cold int init(AVFilterContext *ctx)
Definition: fifo.c:50
FIFO_FLUSH_OUTPUT
@ FIFO_FLUSH_OUTPUT
Definition: fifo.c:114
FifoContext::overflow_flag_lock_initialized
int overflow_flag_lock_initialized
Definition: fifo.c:78
AVFormatContext::priv_data
void * priv_data
Format private data.
Definition: avformat.h:1260
FifoThreadContext::header_written
uint8_t header_written
Definition: fifo.c:105
FifoContext::format
char * format
Definition: fifo.c:40
pthread_mutex_lock
#define pthread_mutex_lock(a)
Definition: ffprobe.c:63