FFmpeg
ffmpeg_sched.c
Go to the documentation of this file.
1 /*
2  * Inter-thread scheduling/synchronization.
3  * Copyright (c) 2023 Anton Khirnov
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <stdatomic.h>
23 #include <stddef.h>
24 #include <stdint.h>
25 
26 #include "cmdutils.h"
27 #include "ffmpeg_sched.h"
28 #include "ffmpeg_utils.h"
29 #include "sync_queue.h"
30 #include "thread_queue.h"
31 
32 #include "libavcodec/packet.h"
33 
34 #include "libavutil/avassert.h"
35 #include "libavutil/error.h"
36 #include "libavutil/fifo.h"
37 #include "libavutil/frame.h"
38 #include "libavutil/mem.h"
39 #include "libavutil/thread.h"
41 #include "libavutil/time.h"
42 
43 // 100 ms
44 // FIXME: some other value? make this dynamic?
45 #define SCHEDULE_TOLERANCE (100 * 1000)
46 
47 enum QueueType {
50 };
51 
52 typedef struct SchWaiter {
56 
57  // the following are internal state of schedule_update_locked() and must not
58  // be accessed outside of it
61 } SchWaiter;
62 
63 typedef struct SchTask {
66 
68  void *func_arg;
69 
72 } SchTask;
73 
74 typedef struct SchDecOutput {
76  uint8_t *dst_finished;
77  unsigned nb_dst;
78 } SchDecOutput;
79 
80 typedef struct SchDec {
81  const AVClass *class;
82 
84 
86  unsigned nb_outputs;
87 
89  // Queue for receiving input packets, one stream.
91 
92  // Queue for sending post-flush end timestamps back to the source
95 
96  // temporary storage used by sch_dec_send()
98 } SchDec;
99 
100 typedef struct SchSyncQueue {
104 
105  unsigned *enc_idx;
106  unsigned nb_enc_idx;
107 } SchSyncQueue;
108 
109 typedef struct SchEnc {
110  const AVClass *class;
111 
114  uint8_t *dst_finished;
115  unsigned nb_dst;
116 
117  // [0] - index of the sync queue in Scheduler.sq_enc,
118  // [1] - index of this encoder in the sq
119  int sq_idx[2];
120 
121  /* Opening encoders is somewhat nontrivial due to their interaction with
122  * sync queues, which are (among other things) responsible for maintaining
123  * constant audio frame size, when it is required by the encoder.
124  *
125  * Opening the encoder requires stream parameters, obtained from the first
126  * frame. However, that frame cannot be properly chunked by the sync queue
127  * without knowing the required frame size, which is only available after
128  * opening the encoder.
129  *
130  * This apparent circular dependency is resolved in the following way:
131  * - the caller creating the encoder gives us a callback which opens the
132  * encoder and returns the required frame size (if any)
133  * - when the first frame is sent to the encoder, the sending thread
134  * - calls this callback, opening the encoder
135  * - passes the returned frame size to the sync queue
136  */
137  int (*open_cb)(void *opaque, const AVFrame *frame);
138  int opened;
139 
141  // Queue for receiving input frames, one stream.
143  // tq_send() to queue returned EOF
145 
146  // temporary storage used by sch_enc_send()
148 } SchEnc;
149 
150 typedef struct SchDemuxStream {
152  uint8_t *dst_finished;
153  unsigned nb_dst;
155 
156 typedef struct SchDemux {
157  const AVClass *class;
158 
160  unsigned nb_streams;
161 
164 
165  // temporary storage used by sch_demux_send()
167 
168  // protected by schedule_lock
170 } SchDemux;
171 
172 typedef struct PreMuxQueue {
173  /**
174  * Queue for buffering the packets before the muxer task can be started.
175  */
177  /**
178  * Maximum number of packets in fifo.
179  */
181  /*
182  * The size of the AVPackets' buffers in queue.
183  * Updated when a packet is either pushed or pulled from the queue.
184  */
185  size_t data_size;
186  /* Threshold after which max_packets will be in effect */
188 } PreMuxQueue;
189 
190 typedef struct SchMuxStream {
193 
194  unsigned *sub_heartbeat_dst;
196 
198 
199  // an EOF was generated while flushing the pre-mux queue
200  int init_eof;
201 
202  ////////////////////////////////////////////////////////////
203  // The following are protected by Scheduler.schedule_lock //
204 
205  /* dts+duration of the last packet sent to this stream
206  in AV_TIME_BASE_Q */
208  // this stream no longer accepts input
210  ////////////////////////////////////////////////////////////
211 } SchMuxStream;
212 
213 typedef struct SchMux {
214  const AVClass *class;
215 
217  unsigned nb_streams;
219 
220  int (*init)(void *arg);
221 
223  /**
224  * Set to 1 after starting the muxer task and flushing the
225  * pre-muxing queues.
226  * Set either before any tasks have started, or with
227  * Scheduler.mux_ready_lock held.
228  */
231  unsigned queue_size;
232 
234 } SchMux;
235 
236 typedef struct SchFilterIn {
241 } SchFilterIn;
242 
243 typedef struct SchFilterOut {
245 } SchFilterOut;
246 
247 typedef struct SchFilterGraph {
248  const AVClass *class;
249 
251  unsigned nb_inputs;
254 
256  unsigned nb_outputs;
257 
259  // input queue, nb_inputs+1 streams
260  // last stream is control
263 
264  // protected by schedule_lock
265  unsigned best_input;
268 
273 };
274 
275 struct Scheduler {
276  const AVClass *class;
277 
279  unsigned nb_demux;
280 
282  unsigned nb_mux;
283 
284  unsigned nb_mux_ready;
286 
287  unsigned nb_mux_done;
290 
291 
293  unsigned nb_dec;
294 
296  unsigned nb_enc;
297 
299  unsigned nb_sq_enc;
300 
302  unsigned nb_filters;
303 
305  int sdp_auto;
306 
310 
312 
314 };
315 
316 /**
317  * Wait until this task is allowed to proceed.
318  *
319  * @retval 0 the caller should proceed
320  * @retval 1 the caller should terminate
321  */
322 static int waiter_wait(Scheduler *sch, SchWaiter *w)
323 {
324  int terminate;
325 
326  if (!atomic_load(&w->choked))
327  return 0;
328 
329  pthread_mutex_lock(&w->lock);
330 
331  while (atomic_load(&w->choked) && !atomic_load(&sch->terminate))
332  pthread_cond_wait(&w->cond, &w->lock);
333 
334  terminate = atomic_load(&sch->terminate);
335 
336  pthread_mutex_unlock(&w->lock);
337 
338  return terminate;
339 }
340 
341 static void waiter_set(SchWaiter *w, int choked)
342 {
343  pthread_mutex_lock(&w->lock);
344 
345  atomic_store(&w->choked, choked);
346  pthread_cond_signal(&w->cond);
347 
348  pthread_mutex_unlock(&w->lock);
349 }
350 
351 static int waiter_init(SchWaiter *w)
352 {
353  int ret;
354 
355  atomic_init(&w->choked, 0);
356 
357  ret = pthread_mutex_init(&w->lock, NULL);
358  if (ret)
359  return AVERROR(ret);
360 
361  ret = pthread_cond_init(&w->cond, NULL);
362  if (ret)
363  return AVERROR(ret);
364 
365  return 0;
366 }
367 
368 static void waiter_uninit(SchWaiter *w)
369 {
370  pthread_mutex_destroy(&w->lock);
371  pthread_cond_destroy(&w->cond);
372 }
373 
374 static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size,
375  enum QueueType type)
376 {
377  ThreadQueue *tq;
378  ObjPool *op;
379 
380  if (queue_size <= 0) {
381  if (type == QUEUE_FRAMES)
382  queue_size = DEFAULT_FRAME_THREAD_QUEUE_SIZE;
383  else
385  }
386 
387  if (type == QUEUE_FRAMES) {
388  // This queue length is used in the decoder code to ensure that
389  // there are enough entries in fixed-size frame pools to account
390  // for frames held in queues inside the ffmpeg utility. If this
391  // can ever dynamically change then the corresponding decode
392  // code needs to be updated as well.
394  }
395 
398  if (!op)
399  return AVERROR(ENOMEM);
400 
401  tq = tq_alloc(nb_streams, queue_size, op,
403  if (!tq) {
404  objpool_free(&op);
405  return AVERROR(ENOMEM);
406  }
407 
408  *ptq = tq;
409  return 0;
410 }
411 
412 static void *task_wrapper(void *arg);
413 
414 static int task_start(SchTask *task)
415 {
416  int ret;
417 
418  av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n");
419 
420  av_assert0(!task->thread_running);
421 
422  ret = pthread_create(&task->thread, NULL, task_wrapper, task);
423  if (ret) {
424  av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n",
425  strerror(ret));
426  return AVERROR(ret);
427  }
428 
429  task->thread_running = 1;
430  return 0;
431 }
432 
433 static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx,
434  SchThreadFunc func, void *func_arg)
435 {
436  task->parent = sch;
437 
438  task->node.type = type;
439  task->node.idx = idx;
440 
441  task->func = func;
442  task->func_arg = func_arg;
443 }
444 
445 static int64_t trailing_dts(const Scheduler *sch, int count_finished)
446 {
447  int64_t min_dts = INT64_MAX;
448 
449  for (unsigned i = 0; i < sch->nb_mux; i++) {
450  const SchMux *mux = &sch->mux[i];
451 
452  for (unsigned j = 0; j < mux->nb_streams; j++) {
453  const SchMuxStream *ms = &mux->streams[j];
454 
455  if (ms->source_finished && !count_finished)
456  continue;
457  if (ms->last_dts == AV_NOPTS_VALUE)
458  return AV_NOPTS_VALUE;
459 
460  min_dts = FFMIN(min_dts, ms->last_dts);
461  }
462  }
463 
464  return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts;
465 }
466 
467 void sch_free(Scheduler **psch)
468 {
469  Scheduler *sch = *psch;
470 
471  if (!sch)
472  return;
473 
474  sch_stop(sch, NULL);
475 
476  for (unsigned i = 0; i < sch->nb_demux; i++) {
477  SchDemux *d = &sch->demux[i];
478 
479  for (unsigned j = 0; j < d->nb_streams; j++) {
480  SchDemuxStream *ds = &d->streams[j];
481  av_freep(&ds->dst);
482  av_freep(&ds->dst_finished);
483  }
484  av_freep(&d->streams);
485 
487 
488  waiter_uninit(&d->waiter);
489  }
490  av_freep(&sch->demux);
491 
492  for (unsigned i = 0; i < sch->nb_mux; i++) {
493  SchMux *mux = &sch->mux[i];
494 
495  for (unsigned j = 0; j < mux->nb_streams; j++) {
496  SchMuxStream *ms = &mux->streams[j];
497 
498  if (ms->pre_mux_queue.fifo) {
499  AVPacket *pkt;
500  while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0)
503  }
504 
506  }
507  av_freep(&mux->streams);
508 
510 
511  tq_free(&mux->queue);
512  }
513  av_freep(&sch->mux);
514 
515  for (unsigned i = 0; i < sch->nb_dec; i++) {
516  SchDec *dec = &sch->dec[i];
517 
518  tq_free(&dec->queue);
519 
521 
522  for (unsigned j = 0; j < dec->nb_outputs; j++) {
523  SchDecOutput *o = &dec->outputs[j];
524 
525  av_freep(&o->dst);
526  av_freep(&o->dst_finished);
527  }
528 
529  av_freep(&dec->outputs);
530 
531  av_frame_free(&dec->send_frame);
532  }
533  av_freep(&sch->dec);
534 
535  for (unsigned i = 0; i < sch->nb_enc; i++) {
536  SchEnc *enc = &sch->enc[i];
537 
538  tq_free(&enc->queue);
539 
540  av_packet_free(&enc->send_pkt);
541 
542  av_freep(&enc->dst);
543  av_freep(&enc->dst_finished);
544  }
545  av_freep(&sch->enc);
546 
547  for (unsigned i = 0; i < sch->nb_sq_enc; i++) {
548  SchSyncQueue *sq = &sch->sq_enc[i];
549  sq_free(&sq->sq);
550  av_frame_free(&sq->frame);
552  av_freep(&sq->enc_idx);
553  }
554  av_freep(&sch->sq_enc);
555 
556  for (unsigned i = 0; i < sch->nb_filters; i++) {
557  SchFilterGraph *fg = &sch->filters[i];
558 
559  tq_free(&fg->queue);
560 
561  av_freep(&fg->inputs);
562  av_freep(&fg->outputs);
563 
564  waiter_uninit(&fg->waiter);
565  }
566  av_freep(&sch->filters);
567 
568  av_freep(&sch->sdp_filename);
569 
571 
573 
576 
577  av_freep(psch);
578 }
579 
580 static const AVClass scheduler_class = {
581  .class_name = "Scheduler",
582  .version = LIBAVUTIL_VERSION_INT,
583 };
584 
586 {
587  Scheduler *sch;
588  int ret;
589 
590  sch = av_mallocz(sizeof(*sch));
591  if (!sch)
592  return NULL;
593 
594  sch->class = &scheduler_class;
595  sch->sdp_auto = 1;
596 
598  if (ret)
599  goto fail;
600 
602  if (ret)
603  goto fail;
604 
606  if (ret)
607  goto fail;
608 
610  if (ret)
611  goto fail;
612 
613  return sch;
614 fail:
615  sch_free(&sch);
616  return NULL;
617 }
618 
619 int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
620 {
621  av_freep(&sch->sdp_filename);
622  sch->sdp_filename = av_strdup(sdp_filename);
623  return sch->sdp_filename ? 0 : AVERROR(ENOMEM);
624 }
625 
626 static const AVClass sch_mux_class = {
627  .class_name = "SchMux",
628  .version = LIBAVUTIL_VERSION_INT,
629  .parent_log_context_offset = offsetof(SchMux, task.func_arg),
630 };
631 
632 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
633  void *arg, int sdp_auto, unsigned thread_queue_size)
634 {
635  const unsigned idx = sch->nb_mux;
636 
637  SchMux *mux;
638  int ret;
639 
640  ret = GROW_ARRAY(sch->mux, sch->nb_mux);
641  if (ret < 0)
642  return ret;
643 
644  mux = &sch->mux[idx];
645  mux->class = &sch_mux_class;
646  mux->init = init;
647  mux->queue_size = thread_queue_size;
648 
649  task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
650 
651  sch->sdp_auto &= sdp_auto;
652 
653  return idx;
654 }
655 
656 int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
657 {
658  SchMux *mux;
659  SchMuxStream *ms;
660  unsigned stream_idx;
661  int ret;
662 
663  av_assert0(mux_idx < sch->nb_mux);
664  mux = &sch->mux[mux_idx];
665 
666  ret = GROW_ARRAY(mux->streams, mux->nb_streams);
667  if (ret < 0)
668  return ret;
669  stream_idx = mux->nb_streams - 1;
670 
671  ms = &mux->streams[stream_idx];
672 
673  ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
674  if (!ms->pre_mux_queue.fifo)
675  return AVERROR(ENOMEM);
676 
677  ms->last_dts = AV_NOPTS_VALUE;
678 
679  return stream_idx;
680 }
681 
682 static const AVClass sch_demux_class = {
683  .class_name = "SchDemux",
684  .version = LIBAVUTIL_VERSION_INT,
685  .parent_log_context_offset = offsetof(SchDemux, task.func_arg),
686 };
687 
689 {
690  const unsigned idx = sch->nb_demux;
691 
692  SchDemux *d;
693  int ret;
694 
695  ret = GROW_ARRAY(sch->demux, sch->nb_demux);
696  if (ret < 0)
697  return ret;
698 
699  d = &sch->demux[idx];
700 
701  task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx);
702 
703  d->class = &sch_demux_class;
704  d->send_pkt = av_packet_alloc();
705  if (!d->send_pkt)
706  return AVERROR(ENOMEM);
707 
708  ret = waiter_init(&d->waiter);
709  if (ret < 0)
710  return ret;
711 
712  return idx;
713 }
714 
715 int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
716 {
717  SchDemux *d;
718  int ret;
719 
720  av_assert0(demux_idx < sch->nb_demux);
721  d = &sch->demux[demux_idx];
722 
723  ret = GROW_ARRAY(d->streams, d->nb_streams);
724  return ret < 0 ? ret : d->nb_streams - 1;
725 }
726 
727 int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
728 {
729  SchDec *dec;
730  int ret;
731 
732  av_assert0(dec_idx < sch->nb_dec);
733  dec = &sch->dec[dec_idx];
734 
735  ret = GROW_ARRAY(dec->outputs, dec->nb_outputs);
736  if (ret < 0)
737  return ret;
738 
739  return dec->nb_outputs - 1;
740 }
741 
742 static const AVClass sch_dec_class = {
743  .class_name = "SchDec",
744  .version = LIBAVUTIL_VERSION_INT,
745  .parent_log_context_offset = offsetof(SchDec, task.func_arg),
746 };
747 
748 int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
749 {
750  const unsigned idx = sch->nb_dec;
751 
752  SchDec *dec;
753  int ret;
754 
755  ret = GROW_ARRAY(sch->dec, sch->nb_dec);
756  if (ret < 0)
757  return ret;
758 
759  dec = &sch->dec[idx];
760 
761  task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx);
762 
763  dec->class = &sch_dec_class;
764  dec->send_frame = av_frame_alloc();
765  if (!dec->send_frame)
766  return AVERROR(ENOMEM);
767 
768  ret = sch_add_dec_output(sch, idx);
769  if (ret < 0)
770  return ret;
771 
772  ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
773  if (ret < 0)
774  return ret;
775 
776  if (send_end_ts) {
778  if (ret < 0)
779  return ret;
780  }
781 
782  return idx;
783 }
784 
785 static const AVClass sch_enc_class = {
786  .class_name = "SchEnc",
787  .version = LIBAVUTIL_VERSION_INT,
788  .parent_log_context_offset = offsetof(SchEnc, task.func_arg),
789 };
790 
792  int (*open_cb)(void *opaque, const AVFrame *frame))
793 {
794  const unsigned idx = sch->nb_enc;
795 
796  SchEnc *enc;
797  int ret;
798 
799  ret = GROW_ARRAY(sch->enc, sch->nb_enc);
800  if (ret < 0)
801  return ret;
802 
803  enc = &sch->enc[idx];
804 
805  enc->class = &sch_enc_class;
806  enc->open_cb = open_cb;
807  enc->sq_idx[0] = -1;
808  enc->sq_idx[1] = -1;
809 
810  task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
811 
812  enc->send_pkt = av_packet_alloc();
813  if (!enc->send_pkt)
814  return AVERROR(ENOMEM);
815 
816  ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
817  if (ret < 0)
818  return ret;
819 
820  return idx;
821 }
822 
823 static const AVClass sch_fg_class = {
824  .class_name = "SchFilterGraph",
825  .version = LIBAVUTIL_VERSION_INT,
826  .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg),
827 };
828 
829 int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
830  SchThreadFunc func, void *ctx)
831 {
832  const unsigned idx = sch->nb_filters;
833 
834  SchFilterGraph *fg;
835  int ret;
836 
837  ret = GROW_ARRAY(sch->filters, sch->nb_filters);
838  if (ret < 0)
839  return ret;
840  fg = &sch->filters[idx];
841 
842  fg->class = &sch_fg_class;
843 
844  task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx);
845 
846  if (nb_inputs) {
847  fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs));
848  if (!fg->inputs)
849  return AVERROR(ENOMEM);
850  fg->nb_inputs = nb_inputs;
851  }
852 
853  if (nb_outputs) {
854  fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs));
855  if (!fg->outputs)
856  return AVERROR(ENOMEM);
857  fg->nb_outputs = nb_outputs;
858  }
859 
860  ret = waiter_init(&fg->waiter);
861  if (ret < 0)
862  return ret;
863 
864  ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
865  if (ret < 0)
866  return ret;
867 
868  return idx;
869 }
870 
871 int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
872 {
873  SchSyncQueue *sq;
874  int ret;
875 
876  ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc);
877  if (ret < 0)
878  return ret;
879  sq = &sch->sq_enc[sch->nb_sq_enc - 1];
880 
881  sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx);
882  if (!sq->sq)
883  return AVERROR(ENOMEM);
884 
885  sq->frame = av_frame_alloc();
886  if (!sq->frame)
887  return AVERROR(ENOMEM);
888 
889  ret = pthread_mutex_init(&sq->lock, NULL);
890  if (ret)
891  return AVERROR(ret);
892 
893  return sq - sch->sq_enc;
894 }
895 
896 int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
897  int limiting, uint64_t max_frames)
898 {
899  SchSyncQueue *sq;
900  SchEnc *enc;
901  int ret;
902 
903  av_assert0(sq_idx < sch->nb_sq_enc);
904  sq = &sch->sq_enc[sq_idx];
905 
906  av_assert0(enc_idx < sch->nb_enc);
907  enc = &sch->enc[enc_idx];
908 
909  ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx);
910  if (ret < 0)
911  return ret;
912  sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx;
913 
914  ret = sq_add_stream(sq->sq, limiting);
915  if (ret < 0)
916  return ret;
917 
918  enc->sq_idx[0] = sq_idx;
919  enc->sq_idx[1] = ret;
920 
921  if (max_frames != INT64_MAX)
922  sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames);
923 
924  return 0;
925 }
926 
928 {
929  int ret;
930 
931  switch (src.type) {
932  case SCH_NODE_TYPE_DEMUX: {
933  SchDemuxStream *ds;
934 
935  av_assert0(src.idx < sch->nb_demux &&
936  src.idx_stream < sch->demux[src.idx].nb_streams);
937  ds = &sch->demux[src.idx].streams[src.idx_stream];
938 
939  ret = GROW_ARRAY(ds->dst, ds->nb_dst);
940  if (ret < 0)
941  return ret;
942 
943  ds->dst[ds->nb_dst - 1] = dst;
944 
945  // demuxed packets go to decoding or streamcopy
946  switch (dst.type) {
947  case SCH_NODE_TYPE_DEC: {
948  SchDec *dec;
949 
950  av_assert0(dst.idx < sch->nb_dec);
951  dec = &sch->dec[dst.idx];
952 
953  av_assert0(!dec->src.type);
954  dec->src = src;
955  break;
956  }
957  case SCH_NODE_TYPE_MUX: {
958  SchMuxStream *ms;
959 
960  av_assert0(dst.idx < sch->nb_mux &&
961  dst.idx_stream < sch->mux[dst.idx].nb_streams);
962  ms = &sch->mux[dst.idx].streams[dst.idx_stream];
963 
964  av_assert0(!ms->src.type);
965  ms->src = src;
966 
967  break;
968  }
969  default: av_assert0(0);
970  }
971 
972  break;
973  }
974  case SCH_NODE_TYPE_DEC: {
975  SchDec *dec;
976  SchDecOutput *o;
977 
978  av_assert0(src.idx < sch->nb_dec);
979  dec = &sch->dec[src.idx];
980 
981  av_assert0(src.idx_stream < dec->nb_outputs);
982  o = &dec->outputs[src.idx_stream];
983 
984  ret = GROW_ARRAY(o->dst, o->nb_dst);
985  if (ret < 0)
986  return ret;
987 
988  o->dst[o->nb_dst - 1] = dst;
989 
990  // decoded frames go to filters or encoding
991  switch (dst.type) {
993  SchFilterIn *fi;
994 
995  av_assert0(dst.idx < sch->nb_filters &&
996  dst.idx_stream < sch->filters[dst.idx].nb_inputs);
997  fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
998 
999  av_assert0(!fi->src.type);
1000  fi->src = src;
1001  break;
1002  }
1003  case SCH_NODE_TYPE_ENC: {
1004  SchEnc *enc;
1005 
1006  av_assert0(dst.idx < sch->nb_enc);
1007  enc = &sch->enc[dst.idx];
1008 
1009  av_assert0(!enc->src.type);
1010  enc->src = src;
1011  break;
1012  }
1013  default: av_assert0(0);
1014  }
1015 
1016  break;
1017  }
1018  case SCH_NODE_TYPE_FILTER_OUT: {
1019  SchFilterOut *fo;
1020 
1021  av_assert0(src.idx < sch->nb_filters &&
1022  src.idx_stream < sch->filters[src.idx].nb_outputs);
1023  fo = &sch->filters[src.idx].outputs[src.idx_stream];
1024 
1025  av_assert0(!fo->dst.type);
1026  fo->dst = dst;
1027 
1028  // filtered frames go to encoding or another filtergraph
1029  switch (dst.type) {
1030  case SCH_NODE_TYPE_ENC: {
1031  SchEnc *enc;
1032 
1033  av_assert0(dst.idx < sch->nb_enc);
1034  enc = &sch->enc[dst.idx];
1035 
1036  av_assert0(!enc->src.type);
1037  enc->src = src;
1038  break;
1039  }
1040  case SCH_NODE_TYPE_FILTER_IN: {
1041  SchFilterIn *fi;
1042 
1043  av_assert0(dst.idx < sch->nb_filters &&
1044  dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1045  fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1046 
1047  av_assert0(!fi->src.type);
1048  fi->src = src;
1049  break;
1050  }
1051  default: av_assert0(0);
1052  }
1053 
1054 
1055  break;
1056  }
1057  case SCH_NODE_TYPE_ENC: {
1058  SchEnc *enc;
1059 
1060  av_assert0(src.idx < sch->nb_enc);
1061  enc = &sch->enc[src.idx];
1062 
1063  ret = GROW_ARRAY(enc->dst, enc->nb_dst);
1064  if (ret < 0)
1065  return ret;
1066 
1067  enc->dst[enc->nb_dst - 1] = dst;
1068 
1069  // encoding packets go to muxing or decoding
1070  switch (dst.type) {
1071  case SCH_NODE_TYPE_MUX: {
1072  SchMuxStream *ms;
1073 
1074  av_assert0(dst.idx < sch->nb_mux &&
1075  dst.idx_stream < sch->mux[dst.idx].nb_streams);
1076  ms = &sch->mux[dst.idx].streams[dst.idx_stream];
1077 
1078  av_assert0(!ms->src.type);
1079  ms->src = src;
1080 
1081  break;
1082  }
1083  case SCH_NODE_TYPE_DEC: {
1084  SchDec *dec;
1085 
1086  av_assert0(dst.idx < sch->nb_dec);
1087  dec = &sch->dec[dst.idx];
1088 
1089  av_assert0(!dec->src.type);
1090  dec->src = src;
1091 
1092  break;
1093  }
1094  default: av_assert0(0);
1095  }
1096 
1097  break;
1098  }
1099  default: av_assert0(0);
1100  }
1101 
1102  return 0;
1103 }
1104 
1105 static int mux_task_start(SchMux *mux)
1106 {
1107  int ret = 0;
1108 
1109  ret = task_start(&mux->task);
1110  if (ret < 0)
1111  return ret;
1112 
1113  /* flush the pre-muxing queues */
1114  for (unsigned i = 0; i < mux->nb_streams; i++) {
1115  SchMuxStream *ms = &mux->streams[i];
1116  AVPacket *pkt;
1117 
1118  while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0) {
1119  if (pkt) {
1120  if (!ms->init_eof)
1121  ret = tq_send(mux->queue, i, pkt);
1122  av_packet_free(&pkt);
1123  if (ret == AVERROR_EOF)
1124  ms->init_eof = 1;
1125  else if (ret < 0)
1126  return ret;
1127  } else
1128  tq_send_finish(mux->queue, i);
1129  }
1130  }
1131 
1132  atomic_store(&mux->mux_started, 1);
1133 
1134  return 0;
1135 }
1136 
1137 int print_sdp(const char *filename);
1138 
1139 static int mux_init(Scheduler *sch, SchMux *mux)
1140 {
1141  int ret;
1142 
1143  ret = mux->init(mux->task.func_arg);
1144  if (ret < 0)
1145  return ret;
1146 
1147  sch->nb_mux_ready++;
1148 
1149  if (sch->sdp_filename || sch->sdp_auto) {
1150  if (sch->nb_mux_ready < sch->nb_mux)
1151  return 0;
1152 
1153  ret = print_sdp(sch->sdp_filename);
1154  if (ret < 0) {
1155  av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n");
1156  return ret;
1157  }
1158 
1159  /* SDP is written only after all the muxers are ready, so now we
1160  * start ALL the threads */
1161  for (unsigned i = 0; i < sch->nb_mux; i++) {
1162  ret = mux_task_start(&sch->mux[i]);
1163  if (ret < 0)
1164  return ret;
1165  }
1166  } else {
1167  ret = mux_task_start(mux);
1168  if (ret < 0)
1169  return ret;
1170  }
1171 
1172  return 0;
1173 }
1174 
1175 void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1176  size_t data_threshold, int max_packets)
1177 {
1178  SchMux *mux;
1179  SchMuxStream *ms;
1180 
1181  av_assert0(mux_idx < sch->nb_mux);
1182  mux = &sch->mux[mux_idx];
1183 
1184  av_assert0(stream_idx < mux->nb_streams);
1185  ms = &mux->streams[stream_idx];
1186 
1187  ms->pre_mux_queue.max_packets = max_packets;
1188  ms->pre_mux_queue.data_threshold = data_threshold;
1189 }
1190 
1191 int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
1192 {
1193  SchMux *mux;
1194  int ret = 0;
1195 
1196  av_assert0(mux_idx < sch->nb_mux);
1197  mux = &sch->mux[mux_idx];
1198 
1199  av_assert0(stream_idx < mux->nb_streams);
1200 
1202 
1203  av_assert0(mux->nb_streams_ready < mux->nb_streams);
1204 
1205  // this may be called during initialization - do not start
1206  // threads before sch_start() is called
1207  if (++mux->nb_streams_ready == mux->nb_streams &&
1208  sch->state >= SCH_STATE_STARTED)
1209  ret = mux_init(sch, mux);
1210 
1212 
1213  return ret;
1214 }
1215 
1216 int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1217  unsigned dec_idx)
1218 {
1219  SchMux *mux;
1220  SchMuxStream *ms;
1221  int ret = 0;
1222 
1223  av_assert0(mux_idx < sch->nb_mux);
1224  mux = &sch->mux[mux_idx];
1225 
1226  av_assert0(stream_idx < mux->nb_streams);
1227  ms = &mux->streams[stream_idx];
1228 
1230  if (ret < 0)
1231  return ret;
1232 
1233  av_assert0(dec_idx < sch->nb_dec);
1234  ms->sub_heartbeat_dst[ms->nb_sub_heartbeat_dst - 1] = dec_idx;
1235 
1236  if (!mux->sub_heartbeat_pkt) {
1238  if (!mux->sub_heartbeat_pkt)
1239  return AVERROR(ENOMEM);
1240  }
1241 
1242  return 0;
1243 }
1244 
1246 {
1247  while (1) {
1248  SchFilterGraph *fg;
1249 
1250  // fed directly by a demuxer (i.e. not through a filtergraph)
1251  if (src.type == SCH_NODE_TYPE_DEMUX) {
1252  sch->demux[src.idx].waiter.choked_next = 0;
1253  return;
1254  }
1255 
1257  fg = &sch->filters[src.idx];
1258 
1259  // the filtergraph contains internal sources and
1260  // requested to be scheduled directly
1261  if (fg->best_input == fg->nb_inputs) {
1262  fg->waiter.choked_next = 0;
1263  return;
1264  }
1265 
1266  src = fg->inputs[fg->best_input].src_sched;
1267  }
1268 }
1269 
1271 {
1272  int64_t dts;
1273  int have_unchoked = 0;
1274 
1275  // on termination request all waiters are choked,
1276  // we are not to unchoke them
1277  if (atomic_load(&sch->terminate))
1278  return;
1279 
1280  dts = trailing_dts(sch, 0);
1281 
1282  atomic_store(&sch->last_dts, dts);
1283 
1284  // initialize our internal state
1285  for (unsigned type = 0; type < 2; type++)
1286  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1287  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1288  w->choked_prev = atomic_load(&w->choked);
1289  w->choked_next = 1;
1290  }
1291 
1292  // figure out the sources that are allowed to proceed
1293  for (unsigned i = 0; i < sch->nb_mux; i++) {
1294  SchMux *mux = &sch->mux[i];
1295 
1296  for (unsigned j = 0; j < mux->nb_streams; j++) {
1297  SchMuxStream *ms = &mux->streams[j];
1298 
1299  // unblock sources for output streams that are not finished
1300  // and not too far ahead of the trailing stream
1301  if (ms->source_finished)
1302  continue;
1303  if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
1304  continue;
1305  if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
1306  continue;
1307 
1308  // resolve the source to unchoke
1309  unchoke_for_stream(sch, ms->src_sched);
1310  have_unchoked = 1;
1311  }
1312  }
1313 
1314  // make sure to unchoke at least one source, if still available
1315  for (unsigned type = 0; !have_unchoked && type < 2; type++)
1316  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1317  int exited = type ? sch->filters[i].task_exited : sch->demux[i].task_exited;
1318  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1319  if (!exited) {
1320  w->choked_next = 0;
1321  have_unchoked = 1;
1322  break;
1323  }
1324  }
1325 
1326 
1327  for (unsigned type = 0; type < 2; type++)
1328  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1329  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1330  if (w->choked_prev != w->choked_next)
1331  waiter_set(w, w->choked_next);
1332  }
1333 
1334 }
1335 
1336 enum {
1340 };
1341 
1342 static int
1344  uint8_t *filters_visited, SchedulerNode *filters_stack)
1345 {
1346  unsigned nb_filters_stack = 0;
1347 
1348  memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited));
1349 
1350  while (1) {
1351  const SchFilterGraph *fg = &sch->filters[src.idx];
1352 
1353  filters_visited[src.idx] = CYCLE_NODE_STARTED;
1354 
1355  // descend into every input, depth first
1356  if (src.idx_stream < fg->nb_inputs) {
1357  const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
1358 
1359  // connected to demuxer, no cycles possible
1360  if (fi->src_sched.type == SCH_NODE_TYPE_DEMUX)
1361  continue;
1362 
1363  // otherwise connected to another filtergraph
1365 
1366  // found a cycle
1367  if (filters_visited[fi->src_sched.idx] == CYCLE_NODE_STARTED)
1368  return AVERROR(EINVAL);
1369 
1370  // place current position on stack and descend
1371  av_assert0(nb_filters_stack < sch->nb_filters);
1372  filters_stack[nb_filters_stack++] = src;
1373  src = (SchedulerNode){ .idx = fi->src_sched.idx, .idx_stream = 0 };
1374  continue;
1375  }
1376 
1377  filters_visited[src.idx] = CYCLE_NODE_DONE;
1378 
1379  // previous search finished,
1380  if (nb_filters_stack) {
1381  src = filters_stack[--nb_filters_stack];
1382  continue;
1383  }
1384  return 0;
1385  }
1386 }
1387 
1388 static int check_acyclic(Scheduler *sch)
1389 {
1390  uint8_t *filters_visited = NULL;
1391  SchedulerNode *filters_stack = NULL;
1392 
1393  int ret = 0;
1394 
1395  if (!sch->nb_filters)
1396  return 0;
1397 
1398  filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited));
1399  if (!filters_visited)
1400  return AVERROR(ENOMEM);
1401 
1402  filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack));
1403  if (!filters_stack) {
1404  ret = AVERROR(ENOMEM);
1405  goto fail;
1406  }
1407 
1408  // trace the transcoding graph upstream from every filtegraph
1409  for (unsigned i = 0; i < sch->nb_filters; i++) {
1410  ret = check_acyclic_for_output(sch, (SchedulerNode){ .idx = i },
1411  filters_visited, filters_stack);
1412  if (ret < 0) {
1413  av_log(&sch->filters[i], AV_LOG_ERROR, "Transcoding graph has a cycle\n");
1414  goto fail;
1415  }
1416  }
1417 
1418 fail:
1419  av_freep(&filters_visited);
1420  av_freep(&filters_stack);
1421  return ret;
1422 }
1423 
1424 static int start_prepare(Scheduler *sch)
1425 {
1426  int ret;
1427 
1428  for (unsigned i = 0; i < sch->nb_demux; i++) {
1429  SchDemux *d = &sch->demux[i];
1430 
1431  for (unsigned j = 0; j < d->nb_streams; j++) {
1432  SchDemuxStream *ds = &d->streams[j];
1433 
1434  if (!ds->nb_dst) {
1435  av_log(d, AV_LOG_ERROR,
1436  "Demuxer stream %u not connected to any sink\n", j);
1437  return AVERROR(EINVAL);
1438  }
1439 
1440  ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished));
1441  if (!ds->dst_finished)
1442  return AVERROR(ENOMEM);
1443  }
1444  }
1445 
1446  for (unsigned i = 0; i < sch->nb_dec; i++) {
1447  SchDec *dec = &sch->dec[i];
1448 
1449  if (!dec->src.type) {
1450  av_log(dec, AV_LOG_ERROR,
1451  "Decoder not connected to a source\n");
1452  return AVERROR(EINVAL);
1453  }
1454 
1455  for (unsigned j = 0; j < dec->nb_outputs; j++) {
1456  SchDecOutput *o = &dec->outputs[j];
1457 
1458  if (!o->nb_dst) {
1459  av_log(dec, AV_LOG_ERROR,
1460  "Decoder output %u not connected to any sink\n", j);
1461  return AVERROR(EINVAL);
1462  }
1463 
1464  o->dst_finished = av_calloc(o->nb_dst, sizeof(*o->dst_finished));
1465  if (!o->dst_finished)
1466  return AVERROR(ENOMEM);
1467  }
1468  }
1469 
1470  for (unsigned i = 0; i < sch->nb_enc; i++) {
1471  SchEnc *enc = &sch->enc[i];
1472 
1473  if (!enc->src.type) {
1474  av_log(enc, AV_LOG_ERROR,
1475  "Encoder not connected to a source\n");
1476  return AVERROR(EINVAL);
1477  }
1478  if (!enc->nb_dst) {
1479  av_log(enc, AV_LOG_ERROR,
1480  "Encoder not connected to any sink\n");
1481  return AVERROR(EINVAL);
1482  }
1483 
1484  enc->dst_finished = av_calloc(enc->nb_dst, sizeof(*enc->dst_finished));
1485  if (!enc->dst_finished)
1486  return AVERROR(ENOMEM);
1487  }
1488 
1489  for (unsigned i = 0; i < sch->nb_mux; i++) {
1490  SchMux *mux = &sch->mux[i];
1491 
1492  for (unsigned j = 0; j < mux->nb_streams; j++) {
1493  SchMuxStream *ms = &mux->streams[j];
1494 
1495  switch (ms->src.type) {
1496  case SCH_NODE_TYPE_ENC: {
1497  SchEnc *enc = &sch->enc[ms->src.idx];
1498  if (enc->src.type == SCH_NODE_TYPE_DEC) {
1499  ms->src_sched = sch->dec[enc->src.idx].src;
1501  } else {
1502  ms->src_sched = enc->src;
1504  }
1505  break;
1506  }
1507  case SCH_NODE_TYPE_DEMUX:
1508  ms->src_sched = ms->src;
1509  break;
1510  default:
1511  av_log(mux, AV_LOG_ERROR,
1512  "Muxer stream #%u not connected to a source\n", j);
1513  return AVERROR(EINVAL);
1514  }
1515  }
1516 
1517  ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
1518  QUEUE_PACKETS);
1519  if (ret < 0)
1520  return ret;
1521  }
1522 
1523  for (unsigned i = 0; i < sch->nb_filters; i++) {
1524  SchFilterGraph *fg = &sch->filters[i];
1525 
1526  for (unsigned j = 0; j < fg->nb_inputs; j++) {
1527  SchFilterIn *fi = &fg->inputs[j];
1528  SchDec *dec;
1529 
1530  if (!fi->src.type) {
1531  av_log(fg, AV_LOG_ERROR,
1532  "Filtergraph input %u not connected to a source\n", j);
1533  return AVERROR(EINVAL);
1534  }
1535 
1536  if (fi->src.type == SCH_NODE_TYPE_FILTER_OUT)
1537  fi->src_sched = fi->src;
1538  else {
1540  dec = &sch->dec[fi->src.idx];
1541 
1542  switch (dec->src.type) {
1543  case SCH_NODE_TYPE_DEMUX: fi->src_sched = dec->src; break;
1544  case SCH_NODE_TYPE_ENC: fi->src_sched = sch->enc[dec->src.idx].src; break;
1545  default: av_assert0(0);
1546  }
1547  }
1548  }
1549 
1550  for (unsigned j = 0; j < fg->nb_outputs; j++) {
1551  SchFilterOut *fo = &fg->outputs[j];
1552 
1553  if (!fo->dst.type) {
1554  av_log(fg, AV_LOG_ERROR,
1555  "Filtergraph %u output %u not connected to a sink\n", i, j);
1556  return AVERROR(EINVAL);
1557  }
1558  }
1559  }
1560 
1561  // Check that the transcoding graph has no cycles.
1562  ret = check_acyclic(sch);
1563  if (ret < 0)
1564  return ret;
1565 
1566  return 0;
1567 }
1568 
1570 {
1571  int ret;
1572 
1573  ret = start_prepare(sch);
1574  if (ret < 0)
1575  return ret;
1576 
1578  sch->state = SCH_STATE_STARTED;
1579 
1580  for (unsigned i = 0; i < sch->nb_mux; i++) {
1581  SchMux *mux = &sch->mux[i];
1582 
1583  if (mux->nb_streams_ready == mux->nb_streams) {
1584  ret = mux_init(sch, mux);
1585  if (ret < 0)
1586  goto fail;
1587  }
1588  }
1589 
1590  for (unsigned i = 0; i < sch->nb_enc; i++) {
1591  SchEnc *enc = &sch->enc[i];
1592 
1593  ret = task_start(&enc->task);
1594  if (ret < 0)
1595  goto fail;
1596  }
1597 
1598  for (unsigned i = 0; i < sch->nb_filters; i++) {
1599  SchFilterGraph *fg = &sch->filters[i];
1600 
1601  ret = task_start(&fg->task);
1602  if (ret < 0)
1603  goto fail;
1604  }
1605 
1606  for (unsigned i = 0; i < sch->nb_dec; i++) {
1607  SchDec *dec = &sch->dec[i];
1608 
1609  ret = task_start(&dec->task);
1610  if (ret < 0)
1611  goto fail;
1612  }
1613 
1614  for (unsigned i = 0; i < sch->nb_demux; i++) {
1615  SchDemux *d = &sch->demux[i];
1616 
1617  if (!d->nb_streams)
1618  continue;
1619 
1620  ret = task_start(&d->task);
1621  if (ret < 0)
1622  goto fail;
1623  }
1624 
1628 
1629  return 0;
1630 fail:
1631  sch_stop(sch, NULL);
1632  return ret;
1633 }
1634 
1635 int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
1636 {
1637  int ret, err;
1638 
1639  // convert delay to absolute timestamp
1640  timeout_us += av_gettime();
1641 
1643 
1644  if (sch->nb_mux_done < sch->nb_mux) {
1645  struct timespec tv = { .tv_sec = timeout_us / 1000000,
1646  .tv_nsec = (timeout_us % 1000000) * 1000 };
1648  }
1649 
1650  ret = sch->nb_mux_done == sch->nb_mux;
1651 
1653 
1654  *transcode_ts = atomic_load(&sch->last_dts);
1655 
1656  // abort transcoding if any task failed
1657  err = atomic_load(&sch->task_failed);
1658 
1659  return ret || err;
1660 }
1661 
1662 static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
1663 {
1664  int ret;
1665 
1666  ret = enc->open_cb(enc->task.func_arg, frame);
1667  if (ret < 0)
1668  return ret;
1669 
1670  // ret>0 signals audio frame size, which means sync queue must
1671  // have been enabled during encoder creation
1672  if (ret > 0) {
1673  SchSyncQueue *sq;
1674 
1675  av_assert0(enc->sq_idx[0] >= 0);
1676  sq = &sch->sq_enc[enc->sq_idx[0]];
1677 
1678  pthread_mutex_lock(&sq->lock);
1679 
1680  sq_frame_samples(sq->sq, enc->sq_idx[1], ret);
1681 
1682  pthread_mutex_unlock(&sq->lock);
1683  }
1684 
1685  return 0;
1686 }
1687 
1689 {
1690  int ret;
1691 
1692  if (!frame) {
1693  tq_send_finish(enc->queue, 0);
1694  return 0;
1695  }
1696 
1697  if (enc->in_finished)
1698  return AVERROR_EOF;
1699 
1700  ret = tq_send(enc->queue, 0, frame);
1701  if (ret < 0)
1702  enc->in_finished = 1;
1703 
1704  return ret;
1705 }
1706 
1707 static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1708 {
1709  SchSyncQueue *sq = &sch->sq_enc[enc->sq_idx[0]];
1710  int ret = 0;
1711 
1712  // inform the scheduling code that no more input will arrive along this path;
1713  // this is necessary because the sync queue may not send an EOF downstream
1714  // until other streams finish
1715  // TODO: consider a cleaner way of passing this information through
1716  // the pipeline
1717  if (!frame) {
1718  for (unsigned i = 0; i < enc->nb_dst; i++) {
1719  SchMux *mux;
1720  SchMuxStream *ms;
1721 
1722  if (enc->dst[i].type != SCH_NODE_TYPE_MUX)
1723  continue;
1724 
1725  mux = &sch->mux[enc->dst[i].idx];
1726  ms = &mux->streams[enc->dst[i].idx_stream];
1727 
1729 
1730  ms->source_finished = 1;
1732 
1734  }
1735  }
1736 
1737  pthread_mutex_lock(&sq->lock);
1738 
1739  ret = sq_send(sq->sq, enc->sq_idx[1], SQFRAME(frame));
1740  if (ret < 0)
1741  goto finish;
1742 
1743  while (1) {
1744  SchEnc *enc;
1745 
1746  // TODO: the SQ API should be extended to allow returning EOF
1747  // for individual streams
1748  ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame));
1749  if (ret < 0) {
1750  ret = (ret == AVERROR(EAGAIN)) ? 0 : ret;
1751  break;
1752  }
1753 
1754  enc = &sch->enc[sq->enc_idx[ret]];
1755  ret = send_to_enc_thread(sch, enc, sq->frame);
1756  if (ret < 0) {
1757  av_frame_unref(sq->frame);
1758  if (ret != AVERROR_EOF)
1759  break;
1760 
1761  sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL));
1762  continue;
1763  }
1764  }
1765 
1766  if (ret < 0) {
1767  // close all encoders fed from this sync queue
1768  for (unsigned i = 0; i < sq->nb_enc_idx; i++) {
1769  int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL);
1770 
1771  // if the sync queue error is EOF and closing the encoder
1772  // produces a more serious error, make sure to pick the latter
1773  ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err);
1774  }
1775  }
1776 
1777 finish:
1778  pthread_mutex_unlock(&sq->lock);
1779 
1780  return ret;
1781 }
1782 
1783 static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1784 {
1785  if (enc->open_cb && frame && !enc->opened) {
1786  int ret = enc_open(sch, enc, frame);
1787  if (ret < 0)
1788  return ret;
1789  enc->opened = 1;
1790 
1791  // discard empty frames that only carry encoder init parameters
1792  if (!frame->buf[0]) {
1794  return 0;
1795  }
1796  }
1797 
1798  return (enc->sq_idx[0] >= 0) ?
1799  send_to_enc_sq (sch, enc, frame) :
1800  send_to_enc_thread(sch, enc, frame);
1801 }
1802 
1804 {
1805  PreMuxQueue *q = &ms->pre_mux_queue;
1806  AVPacket *tmp_pkt = NULL;
1807  int ret;
1808 
1809  if (!av_fifo_can_write(q->fifo)) {
1810  size_t packets = av_fifo_can_read(q->fifo);
1811  size_t pkt_size = pkt ? pkt->size : 0;
1812  int thresh_reached = (q->data_size + pkt_size) > q->data_threshold;
1813  size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX;
1814  size_t new_size = FFMIN(2 * packets, max_packets);
1815 
1816  if (new_size <= packets) {
1817  av_log(mux, AV_LOG_ERROR,
1818  "Too many packets buffered for output stream.\n");
1819  return AVERROR(ENOSPC);
1820  }
1821  ret = av_fifo_grow2(q->fifo, new_size - packets);
1822  if (ret < 0)
1823  return ret;
1824  }
1825 
1826  if (pkt) {
1827  tmp_pkt = av_packet_alloc();
1828  if (!tmp_pkt)
1829  return AVERROR(ENOMEM);
1830 
1831  av_packet_move_ref(tmp_pkt, pkt);
1832  q->data_size += tmp_pkt->size;
1833  }
1834  av_fifo_write(q->fifo, &tmp_pkt, 1);
1835 
1836  return 0;
1837 }
1838 
1839 static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx,
1840  AVPacket *pkt)
1841 {
1842  SchMuxStream *ms = &mux->streams[stream_idx];
1843  int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ?
1846 
1847  // queue the packet if the muxer cannot be started yet
1848  if (!atomic_load(&mux->mux_started)) {
1849  int queued = 0;
1850 
1851  // the muxer could have started between the above atomic check and
1852  // locking the mutex, then this block falls through to normal send path
1854 
1855  if (!atomic_load(&mux->mux_started)) {
1856  int ret = mux_queue_packet(mux, ms, pkt);
1857  queued = ret < 0 ? ret : 1;
1858  }
1859 
1861 
1862  if (queued < 0)
1863  return queued;
1864  else if (queued)
1865  goto update_schedule;
1866  }
1867 
1868  if (pkt) {
1869  int ret;
1870 
1871  if (ms->init_eof)
1872  return AVERROR_EOF;
1873 
1874  ret = tq_send(mux->queue, stream_idx, pkt);
1875  if (ret < 0)
1876  return ret;
1877  } else
1878  tq_send_finish(mux->queue, stream_idx);
1879 
1880 update_schedule:
1881  // TODO: use atomics to check whether this changes trailing dts
1882  // to avoid locking unnecesarily
1883  if (dts != AV_NOPTS_VALUE || !pkt) {
1885 
1886  if (pkt) ms->last_dts = dts;
1887  else ms->source_finished = 1;
1888 
1890 
1892  }
1893 
1894  return 0;
1895 }
1896 
1897 static int
1899  uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
1900 {
1901  int ret;
1902 
1903  if (*dst_finished)
1904  return AVERROR_EOF;
1905 
1906  if (pkt && dst.type == SCH_NODE_TYPE_MUX &&
1909  pkt = NULL;
1910  }
1911 
1912  if (!pkt)
1913  goto finish;
1914 
1915  ret = (dst.type == SCH_NODE_TYPE_MUX) ?
1916  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
1917  tq_send(sch->dec[dst.idx].queue, 0, pkt);
1918  if (ret == AVERROR_EOF)
1919  goto finish;
1920 
1921  return ret;
1922 
1923 finish:
1924  if (dst.type == SCH_NODE_TYPE_MUX)
1925  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
1926  else
1927  tq_send_finish(sch->dec[dst.idx].queue, 0);
1928 
1929  *dst_finished = 1;
1930  return AVERROR_EOF;
1931 }
1932 
1934  AVPacket *pkt, unsigned flags)
1935 {
1936  unsigned nb_done = 0;
1937 
1938  for (unsigned i = 0; i < ds->nb_dst; i++) {
1939  AVPacket *to_send = pkt;
1940  uint8_t *finished = &ds->dst_finished[i];
1941 
1942  int ret;
1943 
1944  // sending a packet consumes it, so make a temporary reference if needed
1945  if (pkt && i < ds->nb_dst - 1) {
1946  to_send = d->send_pkt;
1947 
1948  ret = av_packet_ref(to_send, pkt);
1949  if (ret < 0)
1950  return ret;
1951  }
1952 
1953  ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags);
1954  if (to_send)
1955  av_packet_unref(to_send);
1956  if (ret == AVERROR_EOF)
1957  nb_done++;
1958  else if (ret < 0)
1959  return ret;
1960  }
1961 
1962  return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0;
1963 }
1964 
1966 {
1967  Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE };
1968 
1969  av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems);
1970 
1971  for (unsigned i = 0; i < d->nb_streams; i++) {
1972  SchDemuxStream *ds = &d->streams[i];
1973 
1974  for (unsigned j = 0; j < ds->nb_dst; j++) {
1975  const SchedulerNode *dst = &ds->dst[j];
1976  SchDec *dec;
1977  int ret;
1978 
1979  if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC)
1980  continue;
1981 
1982  dec = &sch->dec[dst->idx];
1983 
1984  ret = tq_send(dec->queue, 0, pkt);
1985  if (ret < 0)
1986  return ret;
1987 
1988  if (dec->queue_end_ts) {
1989  Timestamp ts;
1991  if (ret < 0)
1992  return ret;
1993 
1994  if (max_end_ts.ts == AV_NOPTS_VALUE ||
1995  (ts.ts != AV_NOPTS_VALUE &&
1996  av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0))
1997  max_end_ts = ts;
1998 
1999  }
2000  }
2001  }
2002 
2003  pkt->pts = max_end_ts.ts;
2004  pkt->time_base = max_end_ts.tb;
2005 
2006  return 0;
2007 }
2008 
2009 int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt,
2010  unsigned flags)
2011 {
2012  SchDemux *d;
2013  int terminate;
2014 
2015  av_assert0(demux_idx < sch->nb_demux);
2016  d = &sch->demux[demux_idx];
2017 
2018  terminate = waiter_wait(sch, &d->waiter);
2019  if (terminate)
2020  return AVERROR_EXIT;
2021 
2022  // flush the downstreams after seek
2023  if (pkt->stream_index == -1)
2024  return demux_flush(sch, d, pkt);
2025 
2027 
2028  return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags);
2029 }
2030 
2031 static int demux_done(Scheduler *sch, unsigned demux_idx)
2032 {
2033  SchDemux *d = &sch->demux[demux_idx];
2034  int ret = 0;
2035 
2036  for (unsigned i = 0; i < d->nb_streams; i++) {
2037  int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0);
2038  if (err != AVERROR_EOF)
2039  ret = err_merge(ret, err);
2040  }
2041 
2043 
2044  d->task_exited = 1;
2045 
2047 
2049 
2050  return ret;
2051 }
2052 
2053 int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
2054 {
2055  SchMux *mux;
2056  int ret, stream_idx;
2057 
2058  av_assert0(mux_idx < sch->nb_mux);
2059  mux = &sch->mux[mux_idx];
2060 
2061  ret = tq_receive(mux->queue, &stream_idx, pkt);
2062  pkt->stream_index = stream_idx;
2063  return ret;
2064 }
2065 
2066 void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
2067 {
2068  SchMux *mux;
2069 
2070  av_assert0(mux_idx < sch->nb_mux);
2071  mux = &sch->mux[mux_idx];
2072 
2073  av_assert0(stream_idx < mux->nb_streams);
2074  tq_receive_finish(mux->queue, stream_idx);
2075 
2077  mux->streams[stream_idx].source_finished = 1;
2078 
2080 
2082 }
2083 
2084 int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
2085  const AVPacket *pkt)
2086 {
2087  SchMux *mux;
2088  SchMuxStream *ms;
2089 
2090  av_assert0(mux_idx < sch->nb_mux);
2091  mux = &sch->mux[mux_idx];
2092 
2093  av_assert0(stream_idx < mux->nb_streams);
2094  ms = &mux->streams[stream_idx];
2095 
2096  for (unsigned i = 0; i < ms->nb_sub_heartbeat_dst; i++) {
2097  SchDec *dst = &sch->dec[ms->sub_heartbeat_dst[i]];
2098  int ret;
2099 
2101  if (ret < 0)
2102  return ret;
2103 
2104  tq_send(dst->queue, 0, mux->sub_heartbeat_pkt);
2105  }
2106 
2107  return 0;
2108 }
2109 
2110 static int mux_done(Scheduler *sch, unsigned mux_idx)
2111 {
2112  SchMux *mux = &sch->mux[mux_idx];
2113 
2115 
2116  for (unsigned i = 0; i < mux->nb_streams; i++) {
2117  tq_receive_finish(mux->queue, i);
2118  mux->streams[i].source_finished = 1;
2119  }
2120 
2122 
2124 
2126 
2127  av_assert0(sch->nb_mux_done < sch->nb_mux);
2128  sch->nb_mux_done++;
2129 
2131 
2133 
2134  return 0;
2135 }
2136 
2137 int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
2138 {
2139  SchDec *dec;
2140  int ret, dummy;
2141 
2142  av_assert0(dec_idx < sch->nb_dec);
2143  dec = &sch->dec[dec_idx];
2144 
2145  // the decoder should have given us post-flush end timestamp in pkt
2146  if (dec->expect_end_ts) {
2147  Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base };
2149  if (ret < 0)
2150  return ret;
2151 
2152  dec->expect_end_ts = 0;
2153  }
2154 
2155  ret = tq_receive(dec->queue, &dummy, pkt);
2156  av_assert0(dummy <= 0);
2157 
2158  // got a flush packet, on the next call to this function the decoder
2159  // will give us post-flush end timestamp
2160  if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
2161  dec->expect_end_ts = 1;
2162 
2163  return ret;
2164 }
2165 
2167  unsigned in_idx, AVFrame *frame)
2168 {
2169  if (frame)
2170  return tq_send(fg->queue, in_idx, frame);
2171 
2172  if (!fg->inputs[in_idx].send_finished) {
2173  fg->inputs[in_idx].send_finished = 1;
2174  tq_send_finish(fg->queue, in_idx);
2175 
2176  // close the control stream when all actual inputs are done
2177  if (atomic_fetch_add(&fg->nb_inputs_finished_send, 1) == fg->nb_inputs - 1)
2178  tq_send_finish(fg->queue, fg->nb_inputs);
2179  }
2180  return 0;
2181 }
2182 
2184  uint8_t *dst_finished, AVFrame *frame)
2185 {
2186  int ret;
2187 
2188  if (*dst_finished)
2189  return AVERROR_EOF;
2190 
2191  if (!frame)
2192  goto finish;
2193 
2194  ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ?
2195  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) :
2196  send_to_enc(sch, &sch->enc[dst.idx], frame);
2197  if (ret == AVERROR_EOF)
2198  goto finish;
2199 
2200  return ret;
2201 
2202 finish:
2203  if (dst.type == SCH_NODE_TYPE_FILTER_IN)
2204  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2205  else
2206  send_to_enc(sch, &sch->enc[dst.idx], NULL);
2207 
2208  *dst_finished = 1;
2209 
2210  return AVERROR_EOF;
2211 }
2212 
2213 int sch_dec_send(Scheduler *sch, unsigned dec_idx,
2214  unsigned out_idx, AVFrame *frame)
2215 {
2216  SchDec *dec;
2217  SchDecOutput *o;
2218  int ret;
2219  unsigned nb_done = 0;
2220 
2221  av_assert0(dec_idx < sch->nb_dec);
2222  dec = &sch->dec[dec_idx];
2223 
2224  av_assert0(out_idx < dec->nb_outputs);
2225  o = &dec->outputs[out_idx];
2226 
2227  for (unsigned i = 0; i < o->nb_dst; i++) {
2228  uint8_t *finished = &o->dst_finished[i];
2229  AVFrame *to_send = frame;
2230 
2231  // sending a frame consumes it, so make a temporary reference if needed
2232  if (i < o->nb_dst - 1) {
2233  to_send = dec->send_frame;
2234 
2235  // frame may sometimes contain props only,
2236  // e.g. to signal EOF timestamp
2237  ret = frame->buf[0] ? av_frame_ref(to_send, frame) :
2238  av_frame_copy_props(to_send, frame);
2239  if (ret < 0)
2240  return ret;
2241  }
2242 
2243  ret = dec_send_to_dst(sch, o->dst[i], finished, to_send);
2244  if (ret < 0) {
2245  av_frame_unref(to_send);
2246  if (ret == AVERROR_EOF) {
2247  nb_done++;
2248  continue;
2249  }
2250  return ret;
2251  }
2252  }
2253 
2254  return (nb_done == o->nb_dst) ? AVERROR_EOF : 0;
2255 }
2256 
2257 static int dec_done(Scheduler *sch, unsigned dec_idx)
2258 {
2259  SchDec *dec = &sch->dec[dec_idx];
2260  int ret = 0;
2261 
2262  tq_receive_finish(dec->queue, 0);
2263 
2264  // make sure our source does not get stuck waiting for end timestamps
2265  // that will never arrive
2266  if (dec->queue_end_ts)
2268 
2269  for (unsigned i = 0; i < dec->nb_outputs; i++) {
2270  SchDecOutput *o = &dec->outputs[i];
2271 
2272  for (unsigned j = 0; j < o->nb_dst; j++) {
2273  int err = dec_send_to_dst(sch, o->dst[j], &o->dst_finished[j], NULL);
2274  if (err < 0 && err != AVERROR_EOF)
2275  ret = err_merge(ret, err);
2276  }
2277  }
2278 
2279  return ret;
2280 }
2281 
2282 int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
2283 {
2284  SchEnc *enc;
2285  int ret, dummy;
2286 
2287  av_assert0(enc_idx < sch->nb_enc);
2288  enc = &sch->enc[enc_idx];
2289 
2290  ret = tq_receive(enc->queue, &dummy, frame);
2291  av_assert0(dummy <= 0);
2292 
2293  return ret;
2294 }
2295 
2297  uint8_t *dst_finished, AVPacket *pkt)
2298 {
2299  int ret;
2300 
2301  if (*dst_finished)
2302  return AVERROR_EOF;
2303 
2304  if (!pkt)
2305  goto finish;
2306 
2307  ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2308  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2309  tq_send(sch->dec[dst.idx].queue, 0, pkt);
2310  if (ret == AVERROR_EOF)
2311  goto finish;
2312 
2313  return ret;
2314 
2315 finish:
2316  if (dst.type == SCH_NODE_TYPE_MUX)
2317  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2318  else
2319  tq_send_finish(sch->dec[dst.idx].queue, 0);
2320 
2321  *dst_finished = 1;
2322 
2323  return AVERROR_EOF;
2324 }
2325 
2326 int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
2327 {
2328  SchEnc *enc;
2329  int ret;
2330 
2331  av_assert0(enc_idx < sch->nb_enc);
2332  enc = &sch->enc[enc_idx];
2333 
2334  for (unsigned i = 0; i < enc->nb_dst; i++) {
2335  uint8_t *finished = &enc->dst_finished[i];
2336  AVPacket *to_send = pkt;
2337 
2338  // sending a packet consumes it, so make a temporary reference if needed
2339  if (i < enc->nb_dst - 1) {
2340  to_send = enc->send_pkt;
2341 
2342  ret = av_packet_ref(to_send, pkt);
2343  if (ret < 0)
2344  return ret;
2345  }
2346 
2347  ret = enc_send_to_dst(sch, enc->dst[i], finished, to_send);
2348  if (ret < 0) {
2349  av_packet_unref(to_send);
2350  if (ret == AVERROR_EOF)
2351  continue;
2352  return ret;
2353  }
2354  }
2355 
2356  return 0;
2357 }
2358 
2359 static int enc_done(Scheduler *sch, unsigned enc_idx)
2360 {
2361  SchEnc *enc = &sch->enc[enc_idx];
2362  int ret = 0;
2363 
2364  tq_receive_finish(enc->queue, 0);
2365 
2366  for (unsigned i = 0; i < enc->nb_dst; i++) {
2367  int err = enc_send_to_dst(sch, enc->dst[i], &enc->dst_finished[i], NULL);
2368  if (err < 0 && err != AVERROR_EOF)
2369  ret = err_merge(ret, err);
2370  }
2371 
2372  return ret;
2373 }
2374 
2375 int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
2376  unsigned *in_idx, AVFrame *frame)
2377 {
2378  SchFilterGraph *fg;
2379 
2380  av_assert0(fg_idx < sch->nb_filters);
2381  fg = &sch->filters[fg_idx];
2382 
2383  av_assert0(*in_idx <= fg->nb_inputs);
2384 
2385  // update scheduling to account for desired input stream, if it changed
2386  //
2387  // this check needs no locking because only the filtering thread
2388  // updates this value
2389  if (*in_idx != fg->best_input) {
2391 
2392  fg->best_input = *in_idx;
2394 
2396  }
2397 
2398  if (*in_idx == fg->nb_inputs) {
2399  int terminate = waiter_wait(sch, &fg->waiter);
2400  return terminate ? AVERROR_EOF : AVERROR(EAGAIN);
2401  }
2402 
2403  while (1) {
2404  int ret, idx;
2405 
2406  ret = tq_receive(fg->queue, &idx, frame);
2407  if (idx < 0)
2408  return AVERROR_EOF;
2409  else if (ret >= 0) {
2410  *in_idx = idx;
2411  return 0;
2412  }
2413 
2414  // disregard EOFs for specific streams - they should always be
2415  // preceded by an EOF frame
2416  }
2417 }
2418 
2419 void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
2420 {
2421  SchFilterGraph *fg;
2422  SchFilterIn *fi;
2423 
2424  av_assert0(fg_idx < sch->nb_filters);
2425  fg = &sch->filters[fg_idx];
2426 
2427  av_assert0(in_idx < fg->nb_inputs);
2428  fi = &fg->inputs[in_idx];
2429 
2430  if (!fi->receive_finished) {
2431  fi->receive_finished = 1;
2432  tq_receive_finish(fg->queue, in_idx);
2433 
2434  // close the control stream when all actual inputs are done
2435  if (++fg->nb_inputs_finished_receive == fg->nb_inputs)
2436  tq_receive_finish(fg->queue, fg->nb_inputs);
2437  }
2438 }
2439 
2440 int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
2441 {
2442  SchFilterGraph *fg;
2444 
2445  av_assert0(fg_idx < sch->nb_filters);
2446  fg = &sch->filters[fg_idx];
2447 
2448  av_assert0(out_idx < fg->nb_outputs);
2449  dst = fg->outputs[out_idx].dst;
2450 
2451  return (dst.type == SCH_NODE_TYPE_ENC) ?
2452  send_to_enc (sch, &sch->enc[dst.idx], frame) :
2453  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame);
2454 }
2455 
2456 static int filter_done(Scheduler *sch, unsigned fg_idx)
2457 {
2458  SchFilterGraph *fg = &sch->filters[fg_idx];
2459  int ret = 0;
2460 
2461  for (unsigned i = 0; i <= fg->nb_inputs; i++)
2462  tq_receive_finish(fg->queue, i);
2463 
2464  for (unsigned i = 0; i < fg->nb_outputs; i++) {
2465  SchedulerNode dst = fg->outputs[i].dst;
2466  int err = (dst.type == SCH_NODE_TYPE_ENC) ?
2467  send_to_enc (sch, &sch->enc[dst.idx], NULL) :
2468  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2469 
2470  if (err < 0 && err != AVERROR_EOF)
2471  ret = err_merge(ret, err);
2472  }
2473 
2475 
2476  fg->task_exited = 1;
2477 
2479 
2481 
2482  return ret;
2483 }
2484 
2485 int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
2486 {
2487  SchFilterGraph *fg;
2488 
2489  av_assert0(fg_idx < sch->nb_filters);
2490  fg = &sch->filters[fg_idx];
2491 
2492  return send_to_filter(sch, fg, fg->nb_inputs, frame);
2493 }
2494 
2495 static int task_cleanup(Scheduler *sch, SchedulerNode node)
2496 {
2497  switch (node.type) {
2498  case SCH_NODE_TYPE_DEMUX: return demux_done (sch, node.idx);
2499  case SCH_NODE_TYPE_MUX: return mux_done (sch, node.idx);
2500  case SCH_NODE_TYPE_DEC: return dec_done (sch, node.idx);
2501  case SCH_NODE_TYPE_ENC: return enc_done (sch, node.idx);
2502  case SCH_NODE_TYPE_FILTER_IN: return filter_done(sch, node.idx);
2503  default: av_assert0(0);
2504  }
2505 }
2506 
2507 static void *task_wrapper(void *arg)
2508 {
2509  SchTask *task = arg;
2510  Scheduler *sch = task->parent;
2511  int ret;
2512  int err = 0;
2513 
2514  ret = task->func(task->func_arg);
2515  if (ret < 0)
2516  av_log(task->func_arg, AV_LOG_ERROR,
2517  "Task finished with error code: %d (%s)\n", ret, av_err2str(ret));
2518 
2519  err = task_cleanup(sch, task->node);
2520  ret = err_merge(ret, err);
2521 
2522  // EOF is considered normal termination
2523  if (ret == AVERROR_EOF)
2524  ret = 0;
2525  if (ret < 0)
2526  atomic_store(&sch->task_failed, 1);
2527 
2529  "Terminating thread with return code %d (%s)\n", ret,
2530  ret < 0 ? av_err2str(ret) : "success");
2531 
2532  return (void*)(intptr_t)ret;
2533 }
2534 
2535 static int task_stop(Scheduler *sch, SchTask *task)
2536 {
2537  int ret;
2538  void *thread_ret;
2539 
2540  if (!task->thread_running)
2541  return task_cleanup(sch, task->node);
2542 
2543  ret = pthread_join(task->thread, &thread_ret);
2544  av_assert0(ret == 0);
2545 
2546  task->thread_running = 0;
2547 
2548  return (intptr_t)thread_ret;
2549 }
2550 
2551 int sch_stop(Scheduler *sch, int64_t *finish_ts)
2552 {
2553  int ret = 0, err;
2554 
2555  if (sch->state != SCH_STATE_STARTED)
2556  return 0;
2557 
2558  atomic_store(&sch->terminate, 1);
2559 
2560  for (unsigned type = 0; type < 2; type++)
2561  for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
2562  SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
2563  waiter_set(w, 1);
2564  }
2565 
2566  for (unsigned i = 0; i < sch->nb_demux; i++) {
2567  SchDemux *d = &sch->demux[i];
2568 
2569  err = task_stop(sch, &d->task);
2570  ret = err_merge(ret, err);
2571  }
2572 
2573  for (unsigned i = 0; i < sch->nb_dec; i++) {
2574  SchDec *dec = &sch->dec[i];
2575 
2576  err = task_stop(sch, &dec->task);
2577  ret = err_merge(ret, err);
2578  }
2579 
2580  for (unsigned i = 0; i < sch->nb_filters; i++) {
2581  SchFilterGraph *fg = &sch->filters[i];
2582 
2583  err = task_stop(sch, &fg->task);
2584  ret = err_merge(ret, err);
2585  }
2586 
2587  for (unsigned i = 0; i < sch->nb_enc; i++) {
2588  SchEnc *enc = &sch->enc[i];
2589 
2590  err = task_stop(sch, &enc->task);
2591  ret = err_merge(ret, err);
2592  }
2593 
2594  for (unsigned i = 0; i < sch->nb_mux; i++) {
2595  SchMux *mux = &sch->mux[i];
2596 
2597  err = task_stop(sch, &mux->task);
2598  ret = err_merge(ret, err);
2599  }
2600 
2601  if (finish_ts)
2602  *finish_ts = trailing_dts(sch, 1);
2603 
2604  sch->state = SCH_STATE_STOPPED;
2605 
2606  return ret;
2607 }
Scheduler::sq_enc
SchSyncQueue * sq_enc
Definition: ffmpeg_sched.c:298
func
int(* func)(AVBPrint *dst, const char *in, const char *arg)
Definition: jacosubdec.c:68
pthread_mutex_t
_fmutex pthread_mutex_t
Definition: os2threads.h:53
SchWaiter
Definition: ffmpeg_sched.c:52
av_packet_unref
void av_packet_unref(AVPacket *pkt)
Wipe the packet.
Definition: packet.c:429
mux_task_start
static int mux_task_start(SchMux *mux)
Definition: ffmpeg_sched.c:1105
pthread_join
static av_always_inline int pthread_join(pthread_t thread, void **value_ptr)
Definition: os2threads.h:94
SchedulerNode::idx_stream
unsigned idx_stream
Definition: ffmpeg_sched.h:106
SchDecOutput::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:76
waiter_init
static int waiter_init(SchWaiter *w)
Definition: ffmpeg_sched.c:351
av_fifo_can_write
size_t av_fifo_can_write(const AVFifo *f)
Definition: fifo.c:94
atomic_store
#define atomic_store(object, desired)
Definition: stdatomic.h:85
sch_filter_send
int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
Called by filtergraph tasks to send a filtered frame or EOF to consumers.
Definition: ffmpeg_sched.c:2440
err_merge
static int err_merge(int err0, int err1)
Merge two return codes - return one of the error codes if at least one of them was negative,...
Definition: ffmpeg_utils.h:39
SchDec::task
SchTask task
Definition: ffmpeg_sched.c:88
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
Scheduler::enc
SchEnc * enc
Definition: ffmpeg_sched.c:295
Scheduler::nb_mux_done
unsigned nb_mux_done
Definition: ffmpeg_sched.c:287
av_compare_ts
int av_compare_ts(int64_t ts_a, AVRational tb_a, int64_t ts_b, AVRational tb_b)
Compare two timestamps each in its own time base.
Definition: mathematics.c:147
SchedulerState
SchedulerState
Definition: ffmpeg_sched.c:269
sch_mux_receive_finish
void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
Called by muxer tasks to signal that a stream will no longer accept input.
Definition: ffmpeg_sched.c:2066
SCH_NODE_TYPE_ENC
@ SCH_NODE_TYPE_ENC
Definition: ffmpeg_sched.h:98
SchSyncQueue::sq
SyncQueue * sq
Definition: ffmpeg_sched.c:101
SchTask::thread
pthread_t thread
Definition: ffmpeg_sched.c:70
atomic_fetch_add
#define atomic_fetch_add(object, operand)
Definition: stdatomic.h:137
demux_flush
static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt)
Definition: ffmpeg_sched.c:1965
thread.h
AVERROR_EOF
#define AVERROR_EOF
End of file.
Definition: error.h:57
Scheduler::nb_sq_enc
unsigned nb_sq_enc
Definition: ffmpeg_sched.c:299
SchMux::sub_heartbeat_pkt
AVPacket * sub_heartbeat_pkt
Definition: ffmpeg_sched.c:233
sq_limit_frames
void sq_limit_frames(SyncQueue *sq, unsigned int stream_idx, uint64_t frames)
Limit the number of output frames for stream with index stream_idx to max_frames.
Definition: sync_queue.c:649
pthread_mutex_init
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
SchEnc::send_pkt
AVPacket * send_pkt
Definition: ffmpeg_sched.c:147
SCHEDULE_TOLERANCE
#define SCHEDULE_TOLERANCE
Definition: ffmpeg_sched.c:45
sch_add_demux
int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx)
Add a demuxer to the scheduler.
Definition: ffmpeg_sched.c:688
PreMuxQueue::data_size
size_t data_size
Definition: ffmpeg_sched.c:185
AV_TIME_BASE_Q
#define AV_TIME_BASE_Q
Internal time base represented as fractional value.
Definition: avutil.h:264
int64_t
long long int64_t
Definition: coverity.c:34
CYCLE_NODE_STARTED
@ CYCLE_NODE_STARTED
Definition: ffmpeg_sched.c:1338
SchTask::func
SchThreadFunc func
Definition: ffmpeg_sched.c:67
mux_done
static int mux_done(Scheduler *sch, unsigned mux_idx)
Definition: ffmpeg_sched.c:2110
Scheduler::nb_enc
unsigned nb_enc
Definition: ffmpeg_sched.c:296
av_frame_free
void av_frame_free(AVFrame **frame)
Free the frame and any dynamically allocated objects in it, e.g.
Definition: frame.c:162
SQFRAME
#define SQFRAME(frame)
Definition: sync_queue.h:38
check_acyclic_for_output
static int check_acyclic_for_output(const Scheduler *sch, SchedulerNode src, uint8_t *filters_visited, SchedulerNode *filters_stack)
Definition: ffmpeg_sched.c:1343
AVFrame
This structure describes decoded (raw) audio or video data.
Definition: frame.h:389
w
uint8_t w
Definition: llviddspenc.c:38
task_cleanup
static int task_cleanup(Scheduler *sch, SchedulerNode node)
Definition: ffmpeg_sched.c:2495
frame_move
static void frame_move(void *dst, void *src)
Definition: ffmpeg_utils.h:52
sync_queue.h
AVPacket::data
uint8_t * data
Definition: packet.h:539
SchMux::nb_streams_ready
unsigned nb_streams_ready
Definition: ffmpeg_sched.c:218
SchDemux::nb_streams
unsigned nb_streams
Definition: ffmpeg_sched.c:160
send_to_mux
static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx, AVPacket *pkt)
Definition: ffmpeg_sched.c:1839
Scheduler::nb_mux_ready
unsigned nb_mux_ready
Definition: ffmpeg_sched.c:284
atomic_int
intptr_t atomic_int
Definition: stdatomic.h:55
objpool_free
void objpool_free(ObjPool **pop)
Definition: objpool.c:54
enc_done
static int enc_done(Scheduler *sch, unsigned enc_idx)
Definition: ffmpeg_sched.c:2359
SCH_NODE_TYPE_MUX
@ SCH_NODE_TYPE_MUX
Definition: ffmpeg_sched.h:96
AV_LOG_VERBOSE
#define AV_LOG_VERBOSE
Detailed information.
Definition: log.h:196
AVPacket::duration
int64_t duration
Duration of this packet in AVStream->time_base units, 0 if unknown.
Definition: packet.h:557
SchWaiter::choked_prev
int choked_prev
Definition: ffmpeg_sched.c:59
QUEUE_PACKETS
@ QUEUE_PACKETS
Definition: ffmpeg_sched.c:48
SchMux
Definition: ffmpeg_sched.c:213
Scheduler::class
const AVClass * class
Definition: ffmpeg_sched.c:276
objpool_alloc_packets
ObjPool * objpool_alloc_packets(void)
Definition: objpool.c:124
SchFilterOut
Definition: ffmpeg_sched.c:243
SCH_STATE_UNINIT
@ SCH_STATE_UNINIT
Definition: ffmpeg_sched.c:270
Timestamp::ts
int64_t ts
Definition: ffmpeg_utils.h:31
PreMuxQueue::fifo
AVFifo * fifo
Queue for buffering the packets before the muxer task can be started.
Definition: ffmpeg_sched.c:176
SchMuxStream::last_dts
int64_t last_dts
Definition: ffmpeg_sched.c:207
av_packet_free
void av_packet_free(AVPacket **pkt)
Free the packet, if the packet is reference counted, it will be unreferenced first.
Definition: packet.c:74
DEFAULT_FRAME_THREAD_QUEUE_SIZE
#define DEFAULT_FRAME_THREAD_QUEUE_SIZE
Default size of a frame thread queue.
Definition: ffmpeg_sched.h:260
Scheduler::last_dts
atomic_int_least64_t last_dts
Definition: ffmpeg_sched.c:313
SchDemux::send_pkt
AVPacket * send_pkt
Definition: ffmpeg_sched.c:166
sch_mux_stream_ready
int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
Signal to the scheduler that the specified muxed stream is initialized and ready.
Definition: ffmpeg_sched.c:1191
send_to_enc_thread
static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1688
task_stop
static int task_stop(Scheduler *sch, SchTask *task)
Definition: ffmpeg_sched.c:2535
SchFilterIn::receive_finished
int receive_finished
Definition: ffmpeg_sched.c:240
SchedulerNode::type
enum SchedulerNodeType type
Definition: ffmpeg_sched.h:104
fifo.h
finish
static void finish(void)
Definition: movenc.c:374
sch_stop
int sch_stop(Scheduler *sch, int64_t *finish_ts)
Definition: ffmpeg_sched.c:2551
SchEnc::sq_idx
int sq_idx[2]
Definition: ffmpeg_sched.c:119
fail
#define fail()
Definition: checkasm.h:188
av_fifo_write
int av_fifo_write(AVFifo *f, const void *buf, size_t nb_elems)
Write data into a FIFO.
Definition: fifo.c:188
SchThreadFunc
int(* SchThreadFunc)(void *arg)
Definition: ffmpeg_sched.h:109
SchFilterOut::dst
SchedulerNode dst
Definition: ffmpeg_sched.c:244
SchDec::outputs
SchDecOutput * outputs
Definition: ffmpeg_sched.c:85
SchEnc
Definition: ffmpeg_sched.c:109
dummy
int dummy
Definition: motion.c:66
av_fifo_grow2
int av_fifo_grow2(AVFifo *f, size_t inc)
Enlarge an AVFifo.
Definition: fifo.c:99
SchDec::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:90
sch_add_mux_stream
int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
Add a muxed stream for a previously added muxer.
Definition: ffmpeg_sched.c:656
SchMux::class
const AVClass * class
Definition: ffmpeg_sched.c:214
SchFilterGraph::nb_inputs_finished_send
atomic_uint nb_inputs_finished_send
Definition: ffmpeg_sched.c:252
SCH_STATE_STOPPED
@ SCH_STATE_STOPPED
Definition: ffmpeg_sched.c:272
sq_receive
int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
Read a frame from the queue.
Definition: sync_queue.c:608
type
it s the only field you need to keep assuming you have a context There is some magic you don t need to care about around this just let it vf type
Definition: writing_filters.txt:86
Scheduler::nb_dec
unsigned nb_dec
Definition: ffmpeg_sched.c:293
av_thread_message_queue_recv
int av_thread_message_queue_recv(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Receive a message from the queue.
Definition: threadmessage.c:177
sch_add_filtergraph
int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, SchThreadFunc func, void *ctx)
Add a filtergraph to the scheduler.
Definition: ffmpeg_sched.c:829
av_frame_alloc
AVFrame * av_frame_alloc(void)
Allocate an AVFrame and set its fields to default values.
Definition: frame.c:150
SchFilterGraph
Definition: ffmpeg_sched.c:247
SchMuxStream::src_sched
SchedulerNode src_sched
Definition: ffmpeg_sched.c:192
avassert.h
pkt
AVPacket * pkt
Definition: movenc.c:60
AV_LOG_ERROR
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:180
sch_free
void sch_free(Scheduler **psch)
Definition: ffmpeg_sched.c:467
Scheduler::state
enum SchedulerState state
Definition: ffmpeg_sched.c:307
SchMux::streams
SchMuxStream * streams
Definition: ffmpeg_sched.c:216
av_thread_message_queue_send
int av_thread_message_queue_send(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Send a message on the queue.
Definition: threadmessage.c:161
Scheduler::mux_done_cond
pthread_cond_t mux_done_cond
Definition: ffmpeg_sched.c:289
av_fifo_read
int av_fifo_read(AVFifo *f, void *buf, size_t nb_elems)
Read data from a FIFO.
Definition: fifo.c:240
SchMuxStream
Definition: ffmpeg_sched.c:190
SchDecOutput
Definition: ffmpeg_sched.c:74
sch_add_mux
int sch_add_mux(Scheduler *sch, SchThreadFunc func, int(*init)(void *), void *arg, int sdp_auto, unsigned thread_queue_size)
Add a muxer to the scheduler.
Definition: ffmpeg_sched.c:632
waiter_set
static void waiter_set(SchWaiter *w, int choked)
Definition: ffmpeg_sched.c:341
SchFilterGraph::nb_outputs
unsigned nb_outputs
Definition: ffmpeg_sched.c:256
SchDec::expect_end_ts
int expect_end_ts
Definition: ffmpeg_sched.c:94
enc_open
static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
Definition: ffmpeg_sched.c:1662
sch_alloc
Scheduler * sch_alloc(void)
Definition: ffmpeg_sched.c:585
dec_send_to_dst
static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVFrame *frame)
Definition: ffmpeg_sched.c:2183
task_init
static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx, SchThreadFunc func, void *func_arg)
Definition: ffmpeg_sched.c:433
SchMuxStream::nb_sub_heartbeat_dst
unsigned nb_sub_heartbeat_dst
Definition: ffmpeg_sched.c:195
op
static int op(uint8_t **dst, const uint8_t *dst_end, GetByteContext *gb, int pixel, int count, int *x, int width, int linesize)
Perform decode operation.
Definition: anm.c:76
SchEnc::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:113
sch_add_demux_stream
int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
Add a demuxed stream for a previously added demuxer.
Definition: ffmpeg_sched.c:715
SchMuxStream::src
SchedulerNode src
Definition: ffmpeg_sched.c:191
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:40
SchedulerNodeType
SchedulerNodeType
Definition: ffmpeg_sched.h:93
sch_dec_send
int sch_dec_send(Scheduler *sch, unsigned dec_idx, unsigned out_idx, AVFrame *frame)
Called by decoder tasks to send a decoded frame downstream.
Definition: ffmpeg_sched.c:2213
ctx
AVFormatContext * ctx
Definition: movenc.c:49
nb_streams
static int nb_streams
Definition: ffprobe.c:384
SchMuxStream::source_finished
int source_finished
Definition: ffmpeg_sched.c:209
av_rescale_q
int64_t av_rescale_q(int64_t a, AVRational bq, AVRational cq)
Rescale a 64-bit integer by 2 rational numbers.
Definition: mathematics.c:142
ffmpeg_utils.h
filter_done
static int filter_done(Scheduler *sch, unsigned fg_idx)
Definition: ffmpeg_sched.c:2456
SchFilterGraph::class
const AVClass * class
Definition: ffmpeg_sched.c:248
sch_enc_receive
int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
Called by encoder tasks to obtain frames for encoding.
Definition: ffmpeg_sched.c:2282
AVThreadMessageQueue
Definition: threadmessage.c:30
atomic_load
#define atomic_load(object)
Definition: stdatomic.h:93
objpool_alloc_frames
ObjPool * objpool_alloc_frames(void)
Definition: objpool.c:128
sch_sdp_filename
int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
Set the file path for the SDP.
Definition: ffmpeg_sched.c:619
SchEnc::class
const AVClass * class
Definition: ffmpeg_sched.c:110
demux_send_for_stream
static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds, AVPacket *pkt, unsigned flags)
Definition: ffmpeg_sched.c:1933
arg
const char * arg
Definition: jacosubdec.c:67
pthread_create
static av_always_inline int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
Definition: os2threads.h:80
SchMuxStream::pre_mux_queue
PreMuxQueue pre_mux_queue
Definition: ffmpeg_sched.c:197
sq_add_stream
int sq_add_stream(SyncQueue *sq, int limiting)
Add a new stream to the sync queue.
Definition: sync_queue.c:620
SchMux::mux_started
atomic_int mux_started
Set to 1 after starting the muxer task and flushing the pre-muxing queues.
Definition: ffmpeg_sched.c:229
SCH_NODE_TYPE_DEMUX
@ SCH_NODE_TYPE_DEMUX
Definition: ffmpeg_sched.h:95
Scheduler::demux
SchDemux * demux
Definition: ffmpeg_sched.c:278
pkt_move
static void pkt_move(void *dst, void *src)
Definition: ffmpeg_utils.h:47
AVPacket::buf
AVBufferRef * buf
A reference to the reference-counted buffer where the packet data is stored.
Definition: packet.h:522
tq_free
void tq_free(ThreadQueue **ptq)
Definition: thread_queue.c:55
LIBAVUTIL_VERSION_INT
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
waiter_uninit
static void waiter_uninit(SchWaiter *w)
Definition: ffmpeg_sched.c:368
AVClass
Describe the class of an AVClass context structure.
Definition: log.h:66
Scheduler::sdp_filename
char * sdp_filename
Definition: ffmpeg_sched.c:304
send_to_enc_sq
static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1707
NULL
#define NULL
Definition: coverity.c:32
SchEnc::open_cb
int(* open_cb)(void *opaque, const AVFrame *frame)
Definition: ffmpeg_sched.c:137
av_frame_copy_props
int av_frame_copy_props(AVFrame *dst, const AVFrame *src)
Copy only "metadata" fields from src to dst.
Definition: frame.c:713
SchFilterGraph::nb_inputs
unsigned nb_inputs
Definition: ffmpeg_sched.c:251
Scheduler::mux
SchMux * mux
Definition: ffmpeg_sched.c:281
schedule_update_locked
static void schedule_update_locked(Scheduler *sch)
Definition: ffmpeg_sched.c:1270
tq_receive_finish
void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the receiving side.
Definition: thread_queue.c:241
sch_add_enc
int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx, int(*open_cb)(void *opaque, const AVFrame *frame))
Definition: ffmpeg_sched.c:791
SchSyncQueue::enc_idx
unsigned * enc_idx
Definition: ffmpeg_sched.c:105
SCH_STATE_STARTED
@ SCH_STATE_STARTED
Definition: ffmpeg_sched.c:271
dec_done
static int dec_done(Scheduler *sch, unsigned dec_idx)
Definition: ffmpeg_sched.c:2257
SchFilterGraph::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:261
SchDemux::class
const AVClass * class
Definition: ffmpeg_sched.c:157
av_fifo_can_read
size_t av_fifo_can_read(const AVFifo *f)
Definition: fifo.c:87
SchEnc::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:114
sch_add_dec
int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
Add a decoder to the scheduler.
Definition: ffmpeg_sched.c:748
SchWaiter::choked
atomic_int choked
Definition: ffmpeg_sched.c:55
SchWaiter::cond
pthread_cond_t cond
Definition: ffmpeg_sched.c:54
CYCLE_NODE_NEW
@ CYCLE_NODE_NEW
Definition: ffmpeg_sched.c:1337
time.h
DEMUX_SEND_STREAMCOPY_EOF
@ DEMUX_SEND_STREAMCOPY_EOF
Treat the packet as an EOF for SCH_NODE_TYPE_MUX destinations send normally to other types.
Definition: ffmpeg_sched.h:338
sch_fg_class
static const AVClass sch_fg_class
Definition: ffmpeg_sched.c:823
QUEUE_FRAMES
@ QUEUE_FRAMES
Definition: ffmpeg_sched.c:49
av_packet_ref
int av_packet_ref(AVPacket *dst, const AVPacket *src)
Setup a new reference to the data described by a given packet.
Definition: packet.c:437
av_packet_move_ref
void av_packet_move_ref(AVPacket *dst, AVPacket *src)
Move every field in src to dst and reset src.
Definition: packet.c:486
SchTask::thread_running
int thread_running
Definition: ffmpeg_sched.c:71
sch_enc_send
int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
Called by encoder tasks to send encoded packets downstream.
Definition: ffmpeg_sched.c:2326
pthread_mutex_unlock
#define pthread_mutex_unlock(a)
Definition: ffprobe.c:82
error.h
Scheduler
Definition: ffmpeg_sched.c:275
SchMux::nb_streams
unsigned nb_streams
Definition: ffmpeg_sched.c:217
SchSyncQueue::lock
pthread_mutex_t lock
Definition: ffmpeg_sched.c:103
SchMuxStream::sub_heartbeat_dst
unsigned * sub_heartbeat_dst
Definition: ffmpeg_sched.c:194
SchDec::class
const AVClass * class
Definition: ffmpeg_sched.c:81
sq_frame_samples
void sq_frame_samples(SyncQueue *sq, unsigned int stream_idx, int frame_samples)
Set a constant output audio frame size, in samples.
Definition: sync_queue.c:661
SchEnc::in_finished
int in_finished
Definition: ffmpeg_sched.c:144
SchDemux::task
SchTask task
Definition: ffmpeg_sched.c:162
SchDemuxStream::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:153
SchFilterGraph::nb_inputs_finished_receive
unsigned nb_inputs_finished_receive
Definition: ffmpeg_sched.c:253
tq_send
int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
Send an item for the given stream to the queue.
Definition: thread_queue.c:120
init
int(* init)(AVBSFContext *ctx)
Definition: dts2pts.c:368
AVPacket::size
int size
Definition: packet.h:540
AVFifo
Definition: fifo.c:35
SchSyncQueue::nb_enc_idx
unsigned nb_enc_idx
Definition: ffmpeg_sched.c:106
SchFilterGraph::task_exited
int task_exited
Definition: ffmpeg_sched.c:266
av_frame_ref
int av_frame_ref(AVFrame *dst, const AVFrame *src)
Set up a new reference to the data described by the source frame.
Definition: frame.c:388
threadmessage.h
dst
uint8_t ptrdiff_t const uint8_t ptrdiff_t int intptr_t intptr_t int int16_t * dst
Definition: dsp.h:83
PreMuxQueue::max_packets
int max_packets
Maximum number of packets in fifo.
Definition: ffmpeg_sched.c:180
SchFilterGraph::task
SchTask task
Definition: ffmpeg_sched.c:258
av_err2str
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
Definition: error.h:122
SchWaiter::lock
pthread_mutex_t lock
Definition: ffmpeg_sched.c:53
sq_send
int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame)
Submit a frame for the stream with index stream_idx.
Definition: sync_queue.c:343
PreMuxQueue::data_threshold
size_t data_threshold
Definition: ffmpeg_sched.c:187
sq_free
void sq_free(SyncQueue **psq)
Definition: sync_queue.c:699
AV_NOPTS_VALUE
#define AV_NOPTS_VALUE
Undefined timestamp value.
Definition: avutil.h:248
sch_dec_class
static const AVClass sch_dec_class
Definition: ffmpeg_sched.c:742
SchFilterGraph::inputs
SchFilterIn * inputs
Definition: ffmpeg_sched.c:250
Scheduler::schedule_lock
pthread_mutex_t schedule_lock
Definition: ffmpeg_sched.c:311
frame.h
SchTask::func_arg
void * func_arg
Definition: ffmpeg_sched.c:68
SCH_NODE_TYPE_FILTER_OUT
@ SCH_NODE_TYPE_FILTER_OUT
Definition: ffmpeg_sched.h:100
AVPacket::dts
int64_t dts
Decompression timestamp in AVStream->time_base units; the time at which the packet is decompressed.
Definition: packet.h:538
ObjPool
Definition: objpool.c:30
enc_send_to_dst
static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVPacket *pkt)
Definition: ffmpeg_sched.c:2296
CYCLE_NODE_DONE
@ CYCLE_NODE_DONE
Definition: ffmpeg_sched.c:1339
sch_mux_class
static const AVClass sch_mux_class
Definition: ffmpeg_sched.c:626
SchFilterIn
Definition: ffmpeg_sched.c:236
sch_filter_receive
int sch_filter_receive(Scheduler *sch, unsigned fg_idx, unsigned *in_idx, AVFrame *frame)
Called by filtergraph tasks to obtain frames for filtering.
Definition: ffmpeg_sched.c:2375
Scheduler::nb_mux
unsigned nb_mux
Definition: ffmpeg_sched.c:282
av_packet_alloc
AVPacket * av_packet_alloc(void)
Allocate an AVPacket and set its fields to default values.
Definition: packet.c:63
tq_alloc
ThreadQueue * tq_alloc(unsigned int nb_streams, size_t queue_size, ObjPool *obj_pool, void(*obj_move)(void *dst, void *src))
Allocate a queue for sending data between threads.
Definition: thread_queue.c:79
SchEnc::opened
int opened
Definition: ffmpeg_sched.c:138
scheduler_class
static const AVClass scheduler_class
Definition: ffmpeg_sched.c:580
pthread_t
Definition: os2threads.h:44
pthread_cond_destroy
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:144
Scheduler::nb_demux
unsigned nb_demux
Definition: ffmpeg_sched.c:279
Scheduler::task_failed
atomic_int task_failed
Definition: ffmpeg_sched.c:309
av_thread_message_queue_alloc
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, unsigned nelem, unsigned elsize)
Allocate a new message queue.
Definition: threadmessage.c:45
pthread_mutex_destroy
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
av_packet_copy_props
int av_packet_copy_props(AVPacket *dst, const AVPacket *src)
Copy only "properties" fields from src to dst.
Definition: packet.c:392
SchDemuxStream::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:152
SchDemux::task_exited
int task_exited
Definition: ffmpeg_sched.c:169
SCH_NODE_TYPE_FILTER_IN
@ SCH_NODE_TYPE_FILTER_IN
Definition: ffmpeg_sched.h:99
task_start
static int task_start(SchTask *task)
Definition: ffmpeg_sched.c:414
Scheduler::filters
SchFilterGraph * filters
Definition: ffmpeg_sched.c:301
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:256
AVPacket::pts
int64_t pts
Presentation timestamp in AVStream->time_base units; the time at which the decompressed packet will b...
Definition: packet.h:532
demux_done
static int demux_done(Scheduler *sch, unsigned demux_idx)
Definition: ffmpeg_sched.c:2031
packet.h
SchWaiter::choked_next
int choked_next
Definition: ffmpeg_sched.c:60
SchFilterGraph::best_input
unsigned best_input
Definition: ffmpeg_sched.c:265
av_malloc_array
#define av_malloc_array(a, b)
Definition: tableprint_vlc.h:31
Scheduler::mux_ready_lock
pthread_mutex_t mux_ready_lock
Definition: ffmpeg_sched.c:285
Scheduler::terminate
atomic_int terminate
Definition: ffmpeg_sched.c:308
SchDec
Definition: ffmpeg_sched.c:80
DEFAULT_PACKET_THREAD_QUEUE_SIZE
#define DEFAULT_PACKET_THREAD_QUEUE_SIZE
Default size of a packet thread queue.
Definition: ffmpeg_sched.h:255
QueueType
QueueType
Definition: ffmpeg_sched.c:47
FFMIN
#define FFMIN(a, b)
Definition: macros.h:49
SchDec::nb_outputs
unsigned nb_outputs
Definition: ffmpeg_sched.c:86
av_frame_unref
void av_frame_unref(AVFrame *frame)
Unreference all the buffers referenced by frame and reset the frame fields.
Definition: frame.c:610
trailing_dts
static int64_t trailing_dts(const Scheduler *sch, int count_finished)
Definition: ffmpeg_sched.c:445
Scheduler::mux_done_lock
pthread_mutex_t mux_done_lock
Definition: ffmpeg_sched.c:288
av_mallocz
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:256
SchFilterGraph::outputs
SchFilterOut * outputs
Definition: ffmpeg_sched.c:255
sch_enc_class
static const AVClass sch_enc_class
Definition: ffmpeg_sched.c:785
SchedulerNode
Definition: ffmpeg_sched.h:103
SCH_NODE_TYPE_DEC
@ SCH_NODE_TYPE_DEC
Definition: ffmpeg_sched.h:97
pthread_cond_t
Definition: os2threads.h:58
SchTask
Definition: ffmpeg_sched.c:63
mux_init
static int mux_init(Scheduler *sch, SchMux *mux)
Definition: ffmpeg_sched.c:1139
send_to_filter
static int send_to_filter(Scheduler *sch, SchFilterGraph *fg, unsigned in_idx, AVFrame *frame)
Definition: ffmpeg_sched.c:2166
tq_receive
int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
Read the next item from the queue.
Definition: thread_queue.c:196
SchDemuxStream::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:151
av_calloc
void * av_calloc(size_t nmemb, size_t size)
Definition: mem.c:264
sch_connect
int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
Definition: ffmpeg_sched.c:927
send_to_enc
static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1783
sch_filter_command
int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
Definition: ffmpeg_sched.c:2485
SchDemuxStream
Definition: ffmpeg_sched.c:150
Timestamp::tb
AVRational tb
Definition: ffmpeg_utils.h:32
atomic_int_least64_t
intptr_t atomic_int_least64_t
Definition: stdatomic.h:68
SchFilterIn::src_sched
SchedulerNode src_sched
Definition: ffmpeg_sched.c:238
ret
ret
Definition: filter_design.txt:187
sch_dec_receive
int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
Called by decoder tasks to receive a packet for decoding.
Definition: ffmpeg_sched.c:2137
AVClass::class_name
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:71
frame
these buffered frames must be flushed immediately if a new input produces new the filter must not call request_frame to get more It must just process the frame or queue it The task of requesting more frames is left to the filter s request_frame method or the application If a filter has several the filter must be ready for frames arriving randomly on any input any filter with several inputs will most likely require some kind of queuing mechanism It is perfectly acceptable to have a limited queue and to drop frames when the inputs are too unbalanced request_frame For filters that do not use the this method is called when a frame is wanted on an output For a it should directly call filter_frame on the corresponding output For a if there are queued frames already one of these frames should be pushed If the filter should request a frame on one of its repeatedly until at least one frame has been pushed Return or at least make progress towards producing a frame
Definition: filter_design.txt:264
SchMuxStream::init_eof
int init_eof
Definition: ffmpeg_sched.c:200
mux_queue_packet
static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt)
Definition: ffmpeg_sched.c:1803
SchMux::init
int(* init)(void *arg)
Definition: ffmpeg_sched.c:220
sch_demux_class
static const AVClass sch_demux_class
Definition: ffmpeg_sched.c:682
ThreadQueue
Definition: thread_queue.c:42
av_fifo_alloc2
AVFifo * av_fifo_alloc2(size_t nb_elems, size_t elem_size, unsigned int flags)
Allocate and initialize an AVFifo with a given element size.
Definition: fifo.c:47
pthread_cond_signal
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:152
task_wrapper
static void * task_wrapper(void *arg)
Definition: ffmpeg_sched.c:2507
SchMux::task
SchTask task
Definition: ffmpeg_sched.c:222
SyncQueue
A sync queue provides timestamp synchronization between multiple streams.
Definition: sync_queue.c:88
sch_demux_send
int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt, unsigned flags)
Called by demuxer tasks to communicate with their downstreams.
Definition: ffmpeg_sched.c:2009
SchDemux
Definition: ffmpeg_sched.c:156
Scheduler::dec
SchDec * dec
Definition: ffmpeg_sched.c:292
atomic_uint
intptr_t atomic_uint
Definition: stdatomic.h:56
SchDec::queue_end_ts
AVThreadMessageQueue * queue_end_ts
Definition: ffmpeg_sched.c:93
demux_stream_send_to_dst
static int demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
Definition: ffmpeg_sched.c:1898
SchDec::src
SchedulerNode src
Definition: ffmpeg_sched.c:83
thread_queue.h
AVPacket::stream_index
int stream_index
Definition: packet.h:541
GROW_ARRAY
#define GROW_ARRAY(array, nb_elems)
Definition: cmdutils.h:532
pthread_cond_wait
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:192
SchMux::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:230
SchDemux::waiter
SchWaiter waiter
Definition: ffmpeg_sched.c:163
av_gettime
int64_t av_gettime(void)
Get the current time in microseconds.
Definition: time.c:39
waiter_wait
static int waiter_wait(Scheduler *sch, SchWaiter *w)
Wait until this task is allowed to proceed.
Definition: ffmpeg_sched.c:322
av_strdup
char * av_strdup(const char *s)
Duplicate a string.
Definition: mem.c:272
SchSyncQueue::frame
AVFrame * frame
Definition: ffmpeg_sched.c:102
SchTask::node
SchedulerNode node
Definition: ffmpeg_sched.c:65
sch_sq_add_enc
int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx, int limiting, uint64_t max_frames)
Definition: ffmpeg_sched.c:896
print_sdp
int print_sdp(const char *filename)
Definition: ffmpeg_mux.c:507
mem.h
start_prepare
static int start_prepare(Scheduler *sch)
Definition: ffmpeg_sched.c:1424
sch_mux_sub_heartbeat_add
int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, unsigned dec_idx)
Definition: ffmpeg_sched.c:1216
SchedulerNode::idx
unsigned idx
Definition: ffmpeg_sched.h:105
sch_filter_receive_finish
void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
Called by filter tasks to signal that a filter input will no longer accept input.
Definition: ffmpeg_sched.c:2419
ffmpeg_sched.h
sch_add_dec_output
int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
Add another output to decoder (e.g.
Definition: ffmpeg_sched.c:727
SchEnc::src
SchedulerNode src
Definition: ffmpeg_sched.c:112
sch_wait
int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
Wait until transcoding terminates or the specified timeout elapses.
Definition: ffmpeg_sched.c:1635
AVPacket
This structure stores compressed data.
Definition: packet.h:516
av_thread_message_queue_free
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
Free a message queue.
Definition: threadmessage.c:96
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:34
cmdutils.h
SchSyncQueue
Definition: ffmpeg_sched.c:100
SchMux::queue_size
unsigned queue_size
Definition: ffmpeg_sched.c:231
SchTask::parent
Scheduler * parent
Definition: ffmpeg_sched.c:64
SchDec::send_frame
AVFrame * send_frame
Definition: ffmpeg_sched.c:97
queue_alloc
static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size, enum QueueType type)
Definition: ffmpeg_sched.c:374
sch_start
int sch_start(Scheduler *sch)
Definition: ffmpeg_sched.c:1569
flags
#define flags(name, subs,...)
Definition: cbs_av1.c:482
av_thread_message_queue_set_err_recv
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err)
Set the receiving error code.
Definition: threadmessage.c:204
pthread_cond_timedwait
static av_always_inline int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime)
Definition: os2threads.h:170
av_log
#define av_log(a,...)
Definition: tableprint_vlc.h:27
sch_mux_sub_heartbeat
int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, const AVPacket *pkt)
Definition: ffmpeg_sched.c:2084
av_fifo_freep2
void av_fifo_freep2(AVFifo **f)
Free an AVFifo and reset pointer to NULL.
Definition: fifo.c:286
SchDecOutput::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:75
SchEnc::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:142
pthread_cond_init
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:133
AVERROR_EXIT
#define AVERROR_EXIT
Immediate exit was requested; the called function should not be restarted.
Definition: error.h:58
SYNC_QUEUE_FRAMES
@ SYNC_QUEUE_FRAMES
Definition: sync_queue.h:30
sq_alloc
SyncQueue * sq_alloc(enum SyncQueueType type, int64_t buf_size_us, void *logctx)
Allocate a sync queue of the given type.
Definition: sync_queue.c:675
atomic_init
#define atomic_init(obj, value)
Definition: stdatomic.h:33
SchEnc::task
SchTask task
Definition: ffmpeg_sched.c:140
Timestamp
Definition: ffmpeg_utils.h:30
SchFilterIn::src
SchedulerNode src
Definition: ffmpeg_sched.c:237
sch_mux_stream_buffering
void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, size_t data_threshold, int max_packets)
Configure limits on packet buffering performed before the muxer task is started.
Definition: ffmpeg_sched.c:1175
check_acyclic
static int check_acyclic(Scheduler *sch)
Definition: ffmpeg_sched.c:1388
SchDemux::streams
SchDemuxStream * streams
Definition: ffmpeg_sched.c:159
PreMuxQueue
Definition: ffmpeg_sched.c:172
Scheduler::sdp_auto
int sdp_auto
Definition: ffmpeg_sched.c:305
src
#define src
Definition: vp8dsp.c:248
SchFilterIn::send_finished
int send_finished
Definition: ffmpeg_sched.c:239
SchFilterGraph::waiter
SchWaiter waiter
Definition: ffmpeg_sched.c:262
AVPacket::time_base
AVRational time_base
Time base of the packet's timestamps.
Definition: packet.h:583
unchoke_for_stream
static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
Definition: ffmpeg_sched.c:1245
AVPacket::side_data_elems
int side_data_elems
Definition: packet.h:551
sch_mux_receive
int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
Called by muxer tasks to obtain packets for muxing.
Definition: ffmpeg_sched.c:2053
sch_add_sq_enc
int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
Add an pre-encoding sync queue to the scheduler.
Definition: ffmpeg_sched.c:871
pthread_mutex_lock
#define pthread_mutex_lock(a)
Definition: ffprobe.c:78
SchDecOutput::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:77
SchEnc::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:115
tq_send_finish
void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the sending side.
Definition: thread_queue.c:226
Scheduler::nb_filters
unsigned nb_filters
Definition: ffmpeg_sched.c:302