FFmpeg
ffmpeg_sched.c
Go to the documentation of this file.
1 /*
2  * Inter-thread scheduling/synchronization.
3  * Copyright (c) 2023 Anton Khirnov
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <stdatomic.h>
23 #include <stddef.h>
24 #include <stdint.h>
25 
26 #include "cmdutils.h"
27 #include "ffmpeg_sched.h"
28 #include "ffmpeg_utils.h"
29 #include "sync_queue.h"
30 #include "thread_queue.h"
31 
32 #include "libavcodec/packet.h"
33 
34 #include "libavutil/avassert.h"
35 #include "libavutil/error.h"
36 #include "libavutil/fifo.h"
37 #include "libavutil/frame.h"
38 #include "libavutil/mem.h"
39 #include "libavutil/thread.h"
41 #include "libavutil/time.h"
42 
43 // 100 ms
44 // FIXME: some other value? make this dynamic?
45 #define SCHEDULE_TOLERANCE (100 * 1000)
46 
47 enum QueueType {
50 };
51 
52 typedef struct SchWaiter {
56 
57  // the following are internal state of schedule_update_locked() and must not
58  // be accessed outside of it
61 } SchWaiter;
62 
63 typedef struct SchTask {
66 
68  void *func_arg;
69 
72 } SchTask;
73 
74 typedef struct SchDecOutput {
76  uint8_t *dst_finished;
77  unsigned nb_dst;
78 } SchDecOutput;
79 
80 typedef struct SchDec {
81  const AVClass *class;
82 
84 
86  unsigned nb_outputs;
87 
89  // Queue for receiving input packets, one stream.
91 
92  // Queue for sending post-flush end timestamps back to the source
95 
96  // temporary storage used by sch_dec_send()
98 } SchDec;
99 
100 typedef struct SchSyncQueue {
104 
105  unsigned *enc_idx;
106  unsigned nb_enc_idx;
107 } SchSyncQueue;
108 
109 typedef struct SchEnc {
110  const AVClass *class;
111 
114  uint8_t *dst_finished;
115  unsigned nb_dst;
116 
117  // [0] - index of the sync queue in Scheduler.sq_enc,
118  // [1] - index of this encoder in the sq
119  int sq_idx[2];
120 
121  /* Opening encoders is somewhat nontrivial due to their interaction with
122  * sync queues, which are (among other things) responsible for maintaining
123  * constant audio frame size, when it is required by the encoder.
124  *
125  * Opening the encoder requires stream parameters, obtained from the first
126  * frame. However, that frame cannot be properly chunked by the sync queue
127  * without knowing the required frame size, which is only available after
128  * opening the encoder.
129  *
130  * This apparent circular dependency is resolved in the following way:
131  * - the caller creating the encoder gives us a callback which opens the
132  * encoder and returns the required frame size (if any)
133  * - when the first frame is sent to the encoder, the sending thread
134  * - calls this callback, opening the encoder
135  * - passes the returned frame size to the sync queue
136  */
137  int (*open_cb)(void *opaque, const AVFrame *frame);
138  int opened;
139 
141  // Queue for receiving input frames, one stream.
143  // tq_send() to queue returned EOF
145 
146  // temporary storage used by sch_enc_send()
148 } SchEnc;
149 
150 typedef struct SchDemuxStream {
152  uint8_t *dst_finished;
153  unsigned nb_dst;
155 
156 typedef struct SchDemux {
157  const AVClass *class;
158 
160  unsigned nb_streams;
161 
164 
165  // temporary storage used by sch_demux_send()
167 
168  // protected by schedule_lock
170 } SchDemux;
171 
172 typedef struct PreMuxQueue {
173  /**
174  * Queue for buffering the packets before the muxer task can be started.
175  */
177  /**
178  * Maximum number of packets in fifo.
179  */
181  /*
182  * The size of the AVPackets' buffers in queue.
183  * Updated when a packet is either pushed or pulled from the queue.
184  */
185  size_t data_size;
186  /* Threshold after which max_packets will be in effect */
188 } PreMuxQueue;
189 
190 typedef struct SchMuxStream {
193 
194  unsigned *sub_heartbeat_dst;
196 
198 
199  // an EOF was generated while flushing the pre-mux queue
200  int init_eof;
201 
202  ////////////////////////////////////////////////////////////
203  // The following are protected by Scheduler.schedule_lock //
204 
205  /* dts+duration of the last packet sent to this stream
206  in AV_TIME_BASE_Q */
208  // this stream no longer accepts input
210  ////////////////////////////////////////////////////////////
211 } SchMuxStream;
212 
213 typedef struct SchMux {
214  const AVClass *class;
215 
217  unsigned nb_streams;
219 
220  int (*init)(void *arg);
221 
223  /**
224  * Set to 1 after starting the muxer task and flushing the
225  * pre-muxing queues.
226  * Set either before any tasks have started, or with
227  * Scheduler.mux_ready_lock held.
228  */
231  unsigned queue_size;
232 
234 } SchMux;
235 
236 typedef struct SchFilterIn {
241 } SchFilterIn;
242 
243 typedef struct SchFilterOut {
245 } SchFilterOut;
246 
247 typedef struct SchFilterGraph {
248  const AVClass *class;
249 
251  unsigned nb_inputs;
254 
256  unsigned nb_outputs;
257 
259  // input queue, nb_inputs+1 streams
260  // last stream is control
263 
264  // protected by schedule_lock
265  unsigned best_input;
268 
273 };
274 
275 struct Scheduler {
276  const AVClass *class;
277 
279  unsigned nb_demux;
280 
282  unsigned nb_mux;
283 
284  unsigned nb_mux_ready;
286 
287  unsigned nb_mux_done;
290 
291 
293  unsigned nb_dec;
294 
296  unsigned nb_enc;
297 
299  unsigned nb_sq_enc;
300 
302  unsigned nb_filters;
303 
305  int sdp_auto;
306 
310 
312 
314 };
315 
316 /**
317  * Wait until this task is allowed to proceed.
318  *
319  * @retval 0 the caller should proceed
320  * @retval 1 the caller should terminate
321  */
322 static int waiter_wait(Scheduler *sch, SchWaiter *w)
323 {
324  int terminate;
325 
326  if (!atomic_load(&w->choked))
327  return 0;
328 
329  pthread_mutex_lock(&w->lock);
330 
331  while (atomic_load(&w->choked) && !atomic_load(&sch->terminate))
332  pthread_cond_wait(&w->cond, &w->lock);
333 
334  terminate = atomic_load(&sch->terminate);
335 
336  pthread_mutex_unlock(&w->lock);
337 
338  return terminate;
339 }
340 
341 static void waiter_set(SchWaiter *w, int choked)
342 {
343  pthread_mutex_lock(&w->lock);
344 
345  atomic_store(&w->choked, choked);
346  pthread_cond_signal(&w->cond);
347 
348  pthread_mutex_unlock(&w->lock);
349 }
350 
351 static int waiter_init(SchWaiter *w)
352 {
353  int ret;
354 
355  atomic_init(&w->choked, 0);
356 
357  ret = pthread_mutex_init(&w->lock, NULL);
358  if (ret)
359  return AVERROR(ret);
360 
361  ret = pthread_cond_init(&w->cond, NULL);
362  if (ret)
363  return AVERROR(ret);
364 
365  return 0;
366 }
367 
368 static void waiter_uninit(SchWaiter *w)
369 {
370  pthread_mutex_destroy(&w->lock);
371  pthread_cond_destroy(&w->cond);
372 }
373 
374 static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size,
375  enum QueueType type)
376 {
377  ThreadQueue *tq;
378  ObjPool *op;
379 
380  if (queue_size <= 0) {
381  if (type == QUEUE_FRAMES)
382  queue_size = DEFAULT_FRAME_THREAD_QUEUE_SIZE;
383  else
385  }
386 
387  if (type == QUEUE_FRAMES) {
388  // This queue length is used in the decoder code to ensure that
389  // there are enough entries in fixed-size frame pools to account
390  // for frames held in queues inside the ffmpeg utility. If this
391  // can ever dynamically change then the corresponding decode
392  // code needs to be updated as well.
394  }
395 
398  if (!op)
399  return AVERROR(ENOMEM);
400 
401  tq = tq_alloc(nb_streams, queue_size, op,
403  if (!tq) {
404  objpool_free(&op);
405  return AVERROR(ENOMEM);
406  }
407 
408  *ptq = tq;
409  return 0;
410 }
411 
412 static void *task_wrapper(void *arg);
413 
414 static int task_start(SchTask *task)
415 {
416  int ret;
417 
418  av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n");
419 
420  av_assert0(!task->thread_running);
421 
422  ret = pthread_create(&task->thread, NULL, task_wrapper, task);
423  if (ret) {
424  av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n",
425  strerror(ret));
426  return AVERROR(ret);
427  }
428 
429  task->thread_running = 1;
430  return 0;
431 }
432 
433 static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx,
434  SchThreadFunc func, void *func_arg)
435 {
436  task->parent = sch;
437 
438  task->node.type = type;
439  task->node.idx = idx;
440 
441  task->func = func;
442  task->func_arg = func_arg;
443 }
444 
445 static int64_t trailing_dts(const Scheduler *sch, int count_finished)
446 {
447  int64_t min_dts = INT64_MAX;
448 
449  for (unsigned i = 0; i < sch->nb_mux; i++) {
450  const SchMux *mux = &sch->mux[i];
451 
452  for (unsigned j = 0; j < mux->nb_streams; j++) {
453  const SchMuxStream *ms = &mux->streams[j];
454 
455  if (ms->source_finished && !count_finished)
456  continue;
457  if (ms->last_dts == AV_NOPTS_VALUE)
458  return AV_NOPTS_VALUE;
459 
460  min_dts = FFMIN(min_dts, ms->last_dts);
461  }
462  }
463 
464  return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts;
465 }
466 
467 void sch_free(Scheduler **psch)
468 {
469  Scheduler *sch = *psch;
470 
471  if (!sch)
472  return;
473 
474  sch_stop(sch, NULL);
475 
476  for (unsigned i = 0; i < sch->nb_demux; i++) {
477  SchDemux *d = &sch->demux[i];
478 
479  for (unsigned j = 0; j < d->nb_streams; j++) {
480  SchDemuxStream *ds = &d->streams[j];
481  av_freep(&ds->dst);
482  av_freep(&ds->dst_finished);
483  }
484  av_freep(&d->streams);
485 
487 
488  waiter_uninit(&d->waiter);
489  }
490  av_freep(&sch->demux);
491 
492  for (unsigned i = 0; i < sch->nb_mux; i++) {
493  SchMux *mux = &sch->mux[i];
494 
495  for (unsigned j = 0; j < mux->nb_streams; j++) {
496  SchMuxStream *ms = &mux->streams[j];
497 
498  if (ms->pre_mux_queue.fifo) {
499  AVPacket *pkt;
500  while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0)
503  }
504 
506  }
507  av_freep(&mux->streams);
508 
510 
511  tq_free(&mux->queue);
512  }
513  av_freep(&sch->mux);
514 
515  for (unsigned i = 0; i < sch->nb_dec; i++) {
516  SchDec *dec = &sch->dec[i];
517 
518  tq_free(&dec->queue);
519 
521 
522  for (unsigned j = 0; j < dec->nb_outputs; j++) {
523  SchDecOutput *o = &dec->outputs[j];
524 
525  av_freep(&o->dst);
526  av_freep(&o->dst_finished);
527  }
528 
529  av_freep(&dec->outputs);
530 
531  av_frame_free(&dec->send_frame);
532  }
533  av_freep(&sch->dec);
534 
535  for (unsigned i = 0; i < sch->nb_enc; i++) {
536  SchEnc *enc = &sch->enc[i];
537 
538  tq_free(&enc->queue);
539 
540  av_packet_free(&enc->send_pkt);
541 
542  av_freep(&enc->dst);
543  av_freep(&enc->dst_finished);
544  }
545  av_freep(&sch->enc);
546 
547  for (unsigned i = 0; i < sch->nb_sq_enc; i++) {
548  SchSyncQueue *sq = &sch->sq_enc[i];
549  sq_free(&sq->sq);
550  av_frame_free(&sq->frame);
552  av_freep(&sq->enc_idx);
553  }
554  av_freep(&sch->sq_enc);
555 
556  for (unsigned i = 0; i < sch->nb_filters; i++) {
557  SchFilterGraph *fg = &sch->filters[i];
558 
559  tq_free(&fg->queue);
560 
561  av_freep(&fg->inputs);
562  av_freep(&fg->outputs);
563 
564  waiter_uninit(&fg->waiter);
565  }
566  av_freep(&sch->filters);
567 
568  av_freep(&sch->sdp_filename);
569 
571 
573 
576 
577  av_freep(psch);
578 }
579 
580 static const AVClass scheduler_class = {
581  .class_name = "Scheduler",
582  .version = LIBAVUTIL_VERSION_INT,
583 };
584 
586 {
587  Scheduler *sch;
588  int ret;
589 
590  sch = av_mallocz(sizeof(*sch));
591  if (!sch)
592  return NULL;
593 
594  sch->class = &scheduler_class;
595  sch->sdp_auto = 1;
596 
598  if (ret)
599  goto fail;
600 
602  if (ret)
603  goto fail;
604 
606  if (ret)
607  goto fail;
608 
610  if (ret)
611  goto fail;
612 
613  return sch;
614 fail:
615  sch_free(&sch);
616  return NULL;
617 }
618 
619 int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
620 {
621  av_freep(&sch->sdp_filename);
622  sch->sdp_filename = av_strdup(sdp_filename);
623  return sch->sdp_filename ? 0 : AVERROR(ENOMEM);
624 }
625 
626 static const AVClass sch_mux_class = {
627  .class_name = "SchMux",
628  .version = LIBAVUTIL_VERSION_INT,
629  .parent_log_context_offset = offsetof(SchMux, task.func_arg),
630 };
631 
632 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
633  void *arg, int sdp_auto, unsigned thread_queue_size)
634 {
635  const unsigned idx = sch->nb_mux;
636 
637  SchMux *mux;
638  int ret;
639 
640  ret = GROW_ARRAY(sch->mux, sch->nb_mux);
641  if (ret < 0)
642  return ret;
643 
644  mux = &sch->mux[idx];
645  mux->class = &sch_mux_class;
646  mux->init = init;
647  mux->queue_size = thread_queue_size;
648 
649  task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
650 
651  sch->sdp_auto &= sdp_auto;
652 
653  return idx;
654 }
655 
656 int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
657 {
658  SchMux *mux;
659  SchMuxStream *ms;
660  unsigned stream_idx;
661  int ret;
662 
663  av_assert0(mux_idx < sch->nb_mux);
664  mux = &sch->mux[mux_idx];
665 
666  ret = GROW_ARRAY(mux->streams, mux->nb_streams);
667  if (ret < 0)
668  return ret;
669  stream_idx = mux->nb_streams - 1;
670 
671  ms = &mux->streams[stream_idx];
672 
673  ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
674  if (!ms->pre_mux_queue.fifo)
675  return AVERROR(ENOMEM);
676 
677  ms->last_dts = AV_NOPTS_VALUE;
678 
679  return stream_idx;
680 }
681 
682 static const AVClass sch_demux_class = {
683  .class_name = "SchDemux",
684  .version = LIBAVUTIL_VERSION_INT,
685  .parent_log_context_offset = offsetof(SchDemux, task.func_arg),
686 };
687 
689 {
690  const unsigned idx = sch->nb_demux;
691 
692  SchDemux *d;
693  int ret;
694 
695  ret = GROW_ARRAY(sch->demux, sch->nb_demux);
696  if (ret < 0)
697  return ret;
698 
699  d = &sch->demux[idx];
700 
701  task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx);
702 
703  d->class = &sch_demux_class;
704  d->send_pkt = av_packet_alloc();
705  if (!d->send_pkt)
706  return AVERROR(ENOMEM);
707 
708  ret = waiter_init(&d->waiter);
709  if (ret < 0)
710  return ret;
711 
712  return idx;
713 }
714 
715 int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
716 {
717  SchDemux *d;
718  int ret;
719 
720  av_assert0(demux_idx < sch->nb_demux);
721  d = &sch->demux[demux_idx];
722 
723  ret = GROW_ARRAY(d->streams, d->nb_streams);
724  return ret < 0 ? ret : d->nb_streams - 1;
725 }
726 
727 int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
728 {
729  SchDec *dec;
730  int ret;
731 
732  av_assert0(dec_idx < sch->nb_dec);
733  dec = &sch->dec[dec_idx];
734 
735  ret = GROW_ARRAY(dec->outputs, dec->nb_outputs);
736  if (ret < 0)
737  return ret;
738 
739  return dec->nb_outputs - 1;
740 }
741 
742 static const AVClass sch_dec_class = {
743  .class_name = "SchDec",
744  .version = LIBAVUTIL_VERSION_INT,
745  .parent_log_context_offset = offsetof(SchDec, task.func_arg),
746 };
747 
748 int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
749 {
750  const unsigned idx = sch->nb_dec;
751 
752  SchDec *dec;
753  int ret;
754 
755  ret = GROW_ARRAY(sch->dec, sch->nb_dec);
756  if (ret < 0)
757  return ret;
758 
759  dec = &sch->dec[idx];
760 
761  task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx);
762 
763  dec->class = &sch_dec_class;
764  dec->send_frame = av_frame_alloc();
765  if (!dec->send_frame)
766  return AVERROR(ENOMEM);
767 
768  ret = sch_add_dec_output(sch, idx);
769  if (ret < 0)
770  return ret;
771 
772  ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
773  if (ret < 0)
774  return ret;
775 
776  if (send_end_ts) {
778  if (ret < 0)
779  return ret;
780  }
781 
782  return idx;
783 }
784 
785 static const AVClass sch_enc_class = {
786  .class_name = "SchEnc",
787  .version = LIBAVUTIL_VERSION_INT,
788  .parent_log_context_offset = offsetof(SchEnc, task.func_arg),
789 };
790 
792  int (*open_cb)(void *opaque, const AVFrame *frame))
793 {
794  const unsigned idx = sch->nb_enc;
795 
796  SchEnc *enc;
797  int ret;
798 
799  ret = GROW_ARRAY(sch->enc, sch->nb_enc);
800  if (ret < 0)
801  return ret;
802 
803  enc = &sch->enc[idx];
804 
805  enc->class = &sch_enc_class;
806  enc->open_cb = open_cb;
807  enc->sq_idx[0] = -1;
808  enc->sq_idx[1] = -1;
809 
810  task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
811 
812  enc->send_pkt = av_packet_alloc();
813  if (!enc->send_pkt)
814  return AVERROR(ENOMEM);
815 
816  ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
817  if (ret < 0)
818  return ret;
819 
820  return idx;
821 }
822 
823 static const AVClass sch_fg_class = {
824  .class_name = "SchFilterGraph",
825  .version = LIBAVUTIL_VERSION_INT,
826  .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg),
827 };
828 
829 int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
830  SchThreadFunc func, void *ctx)
831 {
832  const unsigned idx = sch->nb_filters;
833 
834  SchFilterGraph *fg;
835  int ret;
836 
837  ret = GROW_ARRAY(sch->filters, sch->nb_filters);
838  if (ret < 0)
839  return ret;
840  fg = &sch->filters[idx];
841 
842  fg->class = &sch_fg_class;
843 
844  task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx);
845 
846  if (nb_inputs) {
847  fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs));
848  if (!fg->inputs)
849  return AVERROR(ENOMEM);
850  fg->nb_inputs = nb_inputs;
851  }
852 
853  if (nb_outputs) {
854  fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs));
855  if (!fg->outputs)
856  return AVERROR(ENOMEM);
857  fg->nb_outputs = nb_outputs;
858  }
859 
860  ret = waiter_init(&fg->waiter);
861  if (ret < 0)
862  return ret;
863 
864  ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
865  if (ret < 0)
866  return ret;
867 
868  return idx;
869 }
870 
871 int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
872 {
873  SchSyncQueue *sq;
874  int ret;
875 
876  ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc);
877  if (ret < 0)
878  return ret;
879  sq = &sch->sq_enc[sch->nb_sq_enc - 1];
880 
881  sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx);
882  if (!sq->sq)
883  return AVERROR(ENOMEM);
884 
885  sq->frame = av_frame_alloc();
886  if (!sq->frame)
887  return AVERROR(ENOMEM);
888 
889  ret = pthread_mutex_init(&sq->lock, NULL);
890  if (ret)
891  return AVERROR(ret);
892 
893  return sq - sch->sq_enc;
894 }
895 
896 int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
897  int limiting, uint64_t max_frames)
898 {
899  SchSyncQueue *sq;
900  SchEnc *enc;
901  int ret;
902 
903  av_assert0(sq_idx < sch->nb_sq_enc);
904  sq = &sch->sq_enc[sq_idx];
905 
906  av_assert0(enc_idx < sch->nb_enc);
907  enc = &sch->enc[enc_idx];
908 
909  ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx);
910  if (ret < 0)
911  return ret;
912  sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx;
913 
914  ret = sq_add_stream(sq->sq, limiting);
915  if (ret < 0)
916  return ret;
917 
918  enc->sq_idx[0] = sq_idx;
919  enc->sq_idx[1] = ret;
920 
921  if (max_frames != INT64_MAX)
922  sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames);
923 
924  return 0;
925 }
926 
928 {
929  int ret;
930 
931  switch (src.type) {
932  case SCH_NODE_TYPE_DEMUX: {
933  SchDemuxStream *ds;
934 
935  av_assert0(src.idx < sch->nb_demux &&
936  src.idx_stream < sch->demux[src.idx].nb_streams);
937  ds = &sch->demux[src.idx].streams[src.idx_stream];
938 
939  ret = GROW_ARRAY(ds->dst, ds->nb_dst);
940  if (ret < 0)
941  return ret;
942 
943  ds->dst[ds->nb_dst - 1] = dst;
944 
945  // demuxed packets go to decoding or streamcopy
946  switch (dst.type) {
947  case SCH_NODE_TYPE_DEC: {
948  SchDec *dec;
949 
950  av_assert0(dst.idx < sch->nb_dec);
951  dec = &sch->dec[dst.idx];
952 
953  av_assert0(!dec->src.type);
954  dec->src = src;
955  break;
956  }
957  case SCH_NODE_TYPE_MUX: {
958  SchMuxStream *ms;
959 
960  av_assert0(dst.idx < sch->nb_mux &&
961  dst.idx_stream < sch->mux[dst.idx].nb_streams);
962  ms = &sch->mux[dst.idx].streams[dst.idx_stream];
963 
964  av_assert0(!ms->src.type);
965  ms->src = src;
966 
967  break;
968  }
969  default: av_assert0(0);
970  }
971 
972  break;
973  }
974  case SCH_NODE_TYPE_DEC: {
975  SchDec *dec;
976  SchDecOutput *o;
977 
978  av_assert0(src.idx < sch->nb_dec);
979  dec = &sch->dec[src.idx];
980 
981  av_assert0(src.idx_stream < dec->nb_outputs);
982  o = &dec->outputs[src.idx_stream];
983 
984  ret = GROW_ARRAY(o->dst, o->nb_dst);
985  if (ret < 0)
986  return ret;
987 
988  o->dst[o->nb_dst - 1] = dst;
989 
990  // decoded frames go to filters or encoding
991  switch (dst.type) {
993  SchFilterIn *fi;
994 
995  av_assert0(dst.idx < sch->nb_filters &&
996  dst.idx_stream < sch->filters[dst.idx].nb_inputs);
997  fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
998 
999  av_assert0(!fi->src.type);
1000  fi->src = src;
1001  break;
1002  }
1003  case SCH_NODE_TYPE_ENC: {
1004  SchEnc *enc;
1005 
1006  av_assert0(dst.idx < sch->nb_enc);
1007  enc = &sch->enc[dst.idx];
1008 
1009  av_assert0(!enc->src.type);
1010  enc->src = src;
1011  break;
1012  }
1013  default: av_assert0(0);
1014  }
1015 
1016  break;
1017  }
1018  case SCH_NODE_TYPE_FILTER_OUT: {
1019  SchFilterOut *fo;
1020 
1021  av_assert0(src.idx < sch->nb_filters &&
1022  src.idx_stream < sch->filters[src.idx].nb_outputs);
1023  fo = &sch->filters[src.idx].outputs[src.idx_stream];
1024 
1025  av_assert0(!fo->dst.type);
1026  fo->dst = dst;
1027 
1028  // filtered frames go to encoding or another filtergraph
1029  switch (dst.type) {
1030  case SCH_NODE_TYPE_ENC: {
1031  SchEnc *enc;
1032 
1033  av_assert0(dst.idx < sch->nb_enc);
1034  enc = &sch->enc[dst.idx];
1035 
1036  av_assert0(!enc->src.type);
1037  enc->src = src;
1038  break;
1039  }
1040  case SCH_NODE_TYPE_FILTER_IN: {
1041  SchFilterIn *fi;
1042 
1043  av_assert0(dst.idx < sch->nb_filters &&
1044  dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1045  fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1046 
1047  av_assert0(!fi->src.type);
1048  fi->src = src;
1049  break;
1050  }
1051  default: av_assert0(0);
1052  }
1053 
1054 
1055  break;
1056  }
1057  case SCH_NODE_TYPE_ENC: {
1058  SchEnc *enc;
1059 
1060  av_assert0(src.idx < sch->nb_enc);
1061  enc = &sch->enc[src.idx];
1062 
1063  ret = GROW_ARRAY(enc->dst, enc->nb_dst);
1064  if (ret < 0)
1065  return ret;
1066 
1067  enc->dst[enc->nb_dst - 1] = dst;
1068 
1069  // encoding packets go to muxing or decoding
1070  switch (dst.type) {
1071  case SCH_NODE_TYPE_MUX: {
1072  SchMuxStream *ms;
1073 
1074  av_assert0(dst.idx < sch->nb_mux &&
1075  dst.idx_stream < sch->mux[dst.idx].nb_streams);
1076  ms = &sch->mux[dst.idx].streams[dst.idx_stream];
1077 
1078  av_assert0(!ms->src.type);
1079  ms->src = src;
1080 
1081  break;
1082  }
1083  case SCH_NODE_TYPE_DEC: {
1084  SchDec *dec;
1085 
1086  av_assert0(dst.idx < sch->nb_dec);
1087  dec = &sch->dec[dst.idx];
1088 
1089  av_assert0(!dec->src.type);
1090  dec->src = src;
1091 
1092  break;
1093  }
1094  default: av_assert0(0);
1095  }
1096 
1097  break;
1098  }
1099  default: av_assert0(0);
1100  }
1101 
1102  return 0;
1103 }
1104 
1105 static int mux_task_start(SchMux *mux)
1106 {
1107  int ret = 0;
1108 
1109  ret = task_start(&mux->task);
1110  if (ret < 0)
1111  return ret;
1112 
1113  /* flush the pre-muxing queues */
1114  while (1) {
1115  int min_stream = -1;
1116  Timestamp min_ts = { .ts = AV_NOPTS_VALUE };
1117 
1118  AVPacket *pkt;
1119 
1120  // find the stream with the earliest dts or EOF in pre-muxing queue
1121  for (unsigned i = 0; i < mux->nb_streams; i++) {
1122  SchMuxStream *ms = &mux->streams[i];
1123 
1124  if (av_fifo_peek(ms->pre_mux_queue.fifo, &pkt, 1, 0) < 0)
1125  continue;
1126 
1127  if (!pkt || pkt->dts == AV_NOPTS_VALUE) {
1128  min_stream = i;
1129  break;
1130  }
1131 
1132  if (min_ts.ts == AV_NOPTS_VALUE ||
1133  av_compare_ts(min_ts.ts, min_ts.tb, pkt->dts, pkt->time_base) > 0) {
1134  min_stream = i;
1135  min_ts = (Timestamp){ .ts = pkt->dts, .tb = pkt->time_base };
1136  }
1137  }
1138 
1139  if (min_stream >= 0) {
1140  SchMuxStream *ms = &mux->streams[min_stream];
1141 
1142  ret = av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1);
1143  av_assert0(ret >= 0);
1144 
1145  if (pkt) {
1146  if (!ms->init_eof)
1147  ret = tq_send(mux->queue, min_stream, pkt);
1148  av_packet_free(&pkt);
1149  if (ret == AVERROR_EOF)
1150  ms->init_eof = 1;
1151  else if (ret < 0)
1152  return ret;
1153  } else
1154  tq_send_finish(mux->queue, min_stream);
1155 
1156  continue;
1157  }
1158 
1159  break;
1160  }
1161 
1162  atomic_store(&mux->mux_started, 1);
1163 
1164  return 0;
1165 }
1166 
1167 int print_sdp(const char *filename);
1168 
1169 static int mux_init(Scheduler *sch, SchMux *mux)
1170 {
1171  int ret;
1172 
1173  ret = mux->init(mux->task.func_arg);
1174  if (ret < 0)
1175  return ret;
1176 
1177  sch->nb_mux_ready++;
1178 
1179  if (sch->sdp_filename || sch->sdp_auto) {
1180  if (sch->nb_mux_ready < sch->nb_mux)
1181  return 0;
1182 
1183  ret = print_sdp(sch->sdp_filename);
1184  if (ret < 0) {
1185  av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n");
1186  return ret;
1187  }
1188 
1189  /* SDP is written only after all the muxers are ready, so now we
1190  * start ALL the threads */
1191  for (unsigned i = 0; i < sch->nb_mux; i++) {
1192  ret = mux_task_start(&sch->mux[i]);
1193  if (ret < 0)
1194  return ret;
1195  }
1196  } else {
1197  ret = mux_task_start(mux);
1198  if (ret < 0)
1199  return ret;
1200  }
1201 
1202  return 0;
1203 }
1204 
1205 void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1206  size_t data_threshold, int max_packets)
1207 {
1208  SchMux *mux;
1209  SchMuxStream *ms;
1210 
1211  av_assert0(mux_idx < sch->nb_mux);
1212  mux = &sch->mux[mux_idx];
1213 
1214  av_assert0(stream_idx < mux->nb_streams);
1215  ms = &mux->streams[stream_idx];
1216 
1217  ms->pre_mux_queue.max_packets = max_packets;
1218  ms->pre_mux_queue.data_threshold = data_threshold;
1219 }
1220 
1221 int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
1222 {
1223  SchMux *mux;
1224  int ret = 0;
1225 
1226  av_assert0(mux_idx < sch->nb_mux);
1227  mux = &sch->mux[mux_idx];
1228 
1229  av_assert0(stream_idx < mux->nb_streams);
1230 
1232 
1233  av_assert0(mux->nb_streams_ready < mux->nb_streams);
1234 
1235  // this may be called during initialization - do not start
1236  // threads before sch_start() is called
1237  if (++mux->nb_streams_ready == mux->nb_streams &&
1238  sch->state >= SCH_STATE_STARTED)
1239  ret = mux_init(sch, mux);
1240 
1242 
1243  return ret;
1244 }
1245 
1246 int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1247  unsigned dec_idx)
1248 {
1249  SchMux *mux;
1250  SchMuxStream *ms;
1251  int ret = 0;
1252 
1253  av_assert0(mux_idx < sch->nb_mux);
1254  mux = &sch->mux[mux_idx];
1255 
1256  av_assert0(stream_idx < mux->nb_streams);
1257  ms = &mux->streams[stream_idx];
1258 
1260  if (ret < 0)
1261  return ret;
1262 
1263  av_assert0(dec_idx < sch->nb_dec);
1264  ms->sub_heartbeat_dst[ms->nb_sub_heartbeat_dst - 1] = dec_idx;
1265 
1266  if (!mux->sub_heartbeat_pkt) {
1268  if (!mux->sub_heartbeat_pkt)
1269  return AVERROR(ENOMEM);
1270  }
1271 
1272  return 0;
1273 }
1274 
1276 {
1277  while (1) {
1278  SchFilterGraph *fg;
1279 
1280  // fed directly by a demuxer (i.e. not through a filtergraph)
1281  if (src.type == SCH_NODE_TYPE_DEMUX) {
1282  sch->demux[src.idx].waiter.choked_next = 0;
1283  return;
1284  }
1285 
1287  fg = &sch->filters[src.idx];
1288 
1289  // the filtergraph contains internal sources and
1290  // requested to be scheduled directly
1291  if (fg->best_input == fg->nb_inputs) {
1292  fg->waiter.choked_next = 0;
1293  return;
1294  }
1295 
1296  src = fg->inputs[fg->best_input].src_sched;
1297  }
1298 }
1299 
1301 {
1302  int64_t dts;
1303  int have_unchoked = 0;
1304 
1305  // on termination request all waiters are choked,
1306  // we are not to unchoke them
1307  if (atomic_load(&sch->terminate))
1308  return;
1309 
1310  dts = trailing_dts(sch, 0);
1311 
1312  atomic_store(&sch->last_dts, dts);
1313 
1314  // initialize our internal state
1315  for (unsigned type = 0; type < 2; type++)
1316  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1317  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1318  w->choked_prev = atomic_load(&w->choked);
1319  w->choked_next = 1;
1320  }
1321 
1322  // figure out the sources that are allowed to proceed
1323  for (unsigned i = 0; i < sch->nb_mux; i++) {
1324  SchMux *mux = &sch->mux[i];
1325 
1326  for (unsigned j = 0; j < mux->nb_streams; j++) {
1327  SchMuxStream *ms = &mux->streams[j];
1328 
1329  // unblock sources for output streams that are not finished
1330  // and not too far ahead of the trailing stream
1331  if (ms->source_finished)
1332  continue;
1333  if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
1334  continue;
1335  if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
1336  continue;
1337 
1338  // resolve the source to unchoke
1339  unchoke_for_stream(sch, ms->src_sched);
1340  have_unchoked = 1;
1341  }
1342  }
1343 
1344  // make sure to unchoke at least one source, if still available
1345  for (unsigned type = 0; !have_unchoked && type < 2; type++)
1346  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1347  int exited = type ? sch->filters[i].task_exited : sch->demux[i].task_exited;
1348  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1349  if (!exited) {
1350  w->choked_next = 0;
1351  have_unchoked = 1;
1352  break;
1353  }
1354  }
1355 
1356 
1357  for (unsigned type = 0; type < 2; type++)
1358  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1359  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1360  if (w->choked_prev != w->choked_next)
1361  waiter_set(w, w->choked_next);
1362  }
1363 
1364 }
1365 
1366 enum {
1370 };
1371 
1372 static int
1374  uint8_t *filters_visited, SchedulerNode *filters_stack)
1375 {
1376  unsigned nb_filters_stack = 0;
1377 
1378  memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited));
1379 
1380  while (1) {
1381  const SchFilterGraph *fg = &sch->filters[src.idx];
1382 
1383  filters_visited[src.idx] = CYCLE_NODE_STARTED;
1384 
1385  // descend into every input, depth first
1386  if (src.idx_stream < fg->nb_inputs) {
1387  const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
1388 
1389  // connected to demuxer, no cycles possible
1390  if (fi->src_sched.type == SCH_NODE_TYPE_DEMUX)
1391  continue;
1392 
1393  // otherwise connected to another filtergraph
1395 
1396  // found a cycle
1397  if (filters_visited[fi->src_sched.idx] == CYCLE_NODE_STARTED)
1398  return AVERROR(EINVAL);
1399 
1400  // place current position on stack and descend
1401  av_assert0(nb_filters_stack < sch->nb_filters);
1402  filters_stack[nb_filters_stack++] = src;
1403  src = (SchedulerNode){ .idx = fi->src_sched.idx, .idx_stream = 0 };
1404  continue;
1405  }
1406 
1407  filters_visited[src.idx] = CYCLE_NODE_DONE;
1408 
1409  // previous search finished,
1410  if (nb_filters_stack) {
1411  src = filters_stack[--nb_filters_stack];
1412  continue;
1413  }
1414  return 0;
1415  }
1416 }
1417 
1418 static int check_acyclic(Scheduler *sch)
1419 {
1420  uint8_t *filters_visited = NULL;
1421  SchedulerNode *filters_stack = NULL;
1422 
1423  int ret = 0;
1424 
1425  if (!sch->nb_filters)
1426  return 0;
1427 
1428  filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited));
1429  if (!filters_visited)
1430  return AVERROR(ENOMEM);
1431 
1432  filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack));
1433  if (!filters_stack) {
1434  ret = AVERROR(ENOMEM);
1435  goto fail;
1436  }
1437 
1438  // trace the transcoding graph upstream from every filtegraph
1439  for (unsigned i = 0; i < sch->nb_filters; i++) {
1440  ret = check_acyclic_for_output(sch, (SchedulerNode){ .idx = i },
1441  filters_visited, filters_stack);
1442  if (ret < 0) {
1443  av_log(&sch->filters[i], AV_LOG_ERROR, "Transcoding graph has a cycle\n");
1444  goto fail;
1445  }
1446  }
1447 
1448 fail:
1449  av_freep(&filters_visited);
1450  av_freep(&filters_stack);
1451  return ret;
1452 }
1453 
1454 static int start_prepare(Scheduler *sch)
1455 {
1456  int ret;
1457 
1458  for (unsigned i = 0; i < sch->nb_demux; i++) {
1459  SchDemux *d = &sch->demux[i];
1460 
1461  for (unsigned j = 0; j < d->nb_streams; j++) {
1462  SchDemuxStream *ds = &d->streams[j];
1463 
1464  if (!ds->nb_dst) {
1465  av_log(d, AV_LOG_ERROR,
1466  "Demuxer stream %u not connected to any sink\n", j);
1467  return AVERROR(EINVAL);
1468  }
1469 
1470  ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished));
1471  if (!ds->dst_finished)
1472  return AVERROR(ENOMEM);
1473  }
1474  }
1475 
1476  for (unsigned i = 0; i < sch->nb_dec; i++) {
1477  SchDec *dec = &sch->dec[i];
1478 
1479  if (!dec->src.type) {
1480  av_log(dec, AV_LOG_ERROR,
1481  "Decoder not connected to a source\n");
1482  return AVERROR(EINVAL);
1483  }
1484 
1485  for (unsigned j = 0; j < dec->nb_outputs; j++) {
1486  SchDecOutput *o = &dec->outputs[j];
1487 
1488  if (!o->nb_dst) {
1489  av_log(dec, AV_LOG_ERROR,
1490  "Decoder output %u not connected to any sink\n", j);
1491  return AVERROR(EINVAL);
1492  }
1493 
1494  o->dst_finished = av_calloc(o->nb_dst, sizeof(*o->dst_finished));
1495  if (!o->dst_finished)
1496  return AVERROR(ENOMEM);
1497  }
1498  }
1499 
1500  for (unsigned i = 0; i < sch->nb_enc; i++) {
1501  SchEnc *enc = &sch->enc[i];
1502 
1503  if (!enc->src.type) {
1504  av_log(enc, AV_LOG_ERROR,
1505  "Encoder not connected to a source\n");
1506  return AVERROR(EINVAL);
1507  }
1508  if (!enc->nb_dst) {
1509  av_log(enc, AV_LOG_ERROR,
1510  "Encoder not connected to any sink\n");
1511  return AVERROR(EINVAL);
1512  }
1513 
1514  enc->dst_finished = av_calloc(enc->nb_dst, sizeof(*enc->dst_finished));
1515  if (!enc->dst_finished)
1516  return AVERROR(ENOMEM);
1517  }
1518 
1519  for (unsigned i = 0; i < sch->nb_mux; i++) {
1520  SchMux *mux = &sch->mux[i];
1521 
1522  for (unsigned j = 0; j < mux->nb_streams; j++) {
1523  SchMuxStream *ms = &mux->streams[j];
1524 
1525  switch (ms->src.type) {
1526  case SCH_NODE_TYPE_ENC: {
1527  SchEnc *enc = &sch->enc[ms->src.idx];
1528  if (enc->src.type == SCH_NODE_TYPE_DEC) {
1529  ms->src_sched = sch->dec[enc->src.idx].src;
1531  } else {
1532  ms->src_sched = enc->src;
1534  }
1535  break;
1536  }
1537  case SCH_NODE_TYPE_DEMUX:
1538  ms->src_sched = ms->src;
1539  break;
1540  default:
1541  av_log(mux, AV_LOG_ERROR,
1542  "Muxer stream #%u not connected to a source\n", j);
1543  return AVERROR(EINVAL);
1544  }
1545  }
1546 
1547  ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
1548  QUEUE_PACKETS);
1549  if (ret < 0)
1550  return ret;
1551  }
1552 
1553  for (unsigned i = 0; i < sch->nb_filters; i++) {
1554  SchFilterGraph *fg = &sch->filters[i];
1555 
1556  for (unsigned j = 0; j < fg->nb_inputs; j++) {
1557  SchFilterIn *fi = &fg->inputs[j];
1558  SchDec *dec;
1559 
1560  if (!fi->src.type) {
1561  av_log(fg, AV_LOG_ERROR,
1562  "Filtergraph input %u not connected to a source\n", j);
1563  return AVERROR(EINVAL);
1564  }
1565 
1566  if (fi->src.type == SCH_NODE_TYPE_FILTER_OUT)
1567  fi->src_sched = fi->src;
1568  else {
1570  dec = &sch->dec[fi->src.idx];
1571 
1572  switch (dec->src.type) {
1573  case SCH_NODE_TYPE_DEMUX: fi->src_sched = dec->src; break;
1574  case SCH_NODE_TYPE_ENC: fi->src_sched = sch->enc[dec->src.idx].src; break;
1575  default: av_assert0(0);
1576  }
1577  }
1578  }
1579 
1580  for (unsigned j = 0; j < fg->nb_outputs; j++) {
1581  SchFilterOut *fo = &fg->outputs[j];
1582 
1583  if (!fo->dst.type) {
1584  av_log(fg, AV_LOG_ERROR,
1585  "Filtergraph %u output %u not connected to a sink\n", i, j);
1586  return AVERROR(EINVAL);
1587  }
1588  }
1589  }
1590 
1591  // Check that the transcoding graph has no cycles.
1592  ret = check_acyclic(sch);
1593  if (ret < 0)
1594  return ret;
1595 
1596  return 0;
1597 }
1598 
1600 {
1601  int ret;
1602 
1603  ret = start_prepare(sch);
1604  if (ret < 0)
1605  return ret;
1606 
1608  sch->state = SCH_STATE_STARTED;
1609 
1610  for (unsigned i = 0; i < sch->nb_mux; i++) {
1611  SchMux *mux = &sch->mux[i];
1612 
1613  if (mux->nb_streams_ready == mux->nb_streams) {
1614  ret = mux_init(sch, mux);
1615  if (ret < 0)
1616  goto fail;
1617  }
1618  }
1619 
1620  for (unsigned i = 0; i < sch->nb_enc; i++) {
1621  SchEnc *enc = &sch->enc[i];
1622 
1623  ret = task_start(&enc->task);
1624  if (ret < 0)
1625  goto fail;
1626  }
1627 
1628  for (unsigned i = 0; i < sch->nb_filters; i++) {
1629  SchFilterGraph *fg = &sch->filters[i];
1630 
1631  ret = task_start(&fg->task);
1632  if (ret < 0)
1633  goto fail;
1634  }
1635 
1636  for (unsigned i = 0; i < sch->nb_dec; i++) {
1637  SchDec *dec = &sch->dec[i];
1638 
1639  ret = task_start(&dec->task);
1640  if (ret < 0)
1641  goto fail;
1642  }
1643 
1644  for (unsigned i = 0; i < sch->nb_demux; i++) {
1645  SchDemux *d = &sch->demux[i];
1646 
1647  if (!d->nb_streams)
1648  continue;
1649 
1650  ret = task_start(&d->task);
1651  if (ret < 0)
1652  goto fail;
1653  }
1654 
1658 
1659  return 0;
1660 fail:
1661  sch_stop(sch, NULL);
1662  return ret;
1663 }
1664 
1665 int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
1666 {
1667  int ret, err;
1668 
1669  // convert delay to absolute timestamp
1670  timeout_us += av_gettime();
1671 
1673 
1674  if (sch->nb_mux_done < sch->nb_mux) {
1675  struct timespec tv = { .tv_sec = timeout_us / 1000000,
1676  .tv_nsec = (timeout_us % 1000000) * 1000 };
1678  }
1679 
1680  ret = sch->nb_mux_done == sch->nb_mux;
1681 
1683 
1684  *transcode_ts = atomic_load(&sch->last_dts);
1685 
1686  // abort transcoding if any task failed
1687  err = atomic_load(&sch->task_failed);
1688 
1689  return ret || err;
1690 }
1691 
1692 static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
1693 {
1694  int ret;
1695 
1696  ret = enc->open_cb(enc->task.func_arg, frame);
1697  if (ret < 0)
1698  return ret;
1699 
1700  // ret>0 signals audio frame size, which means sync queue must
1701  // have been enabled during encoder creation
1702  if (ret > 0) {
1703  SchSyncQueue *sq;
1704 
1705  av_assert0(enc->sq_idx[0] >= 0);
1706  sq = &sch->sq_enc[enc->sq_idx[0]];
1707 
1708  pthread_mutex_lock(&sq->lock);
1709 
1710  sq_frame_samples(sq->sq, enc->sq_idx[1], ret);
1711 
1712  pthread_mutex_unlock(&sq->lock);
1713  }
1714 
1715  return 0;
1716 }
1717 
1719 {
1720  int ret;
1721 
1722  if (!frame) {
1723  tq_send_finish(enc->queue, 0);
1724  return 0;
1725  }
1726 
1727  if (enc->in_finished)
1728  return AVERROR_EOF;
1729 
1730  ret = tq_send(enc->queue, 0, frame);
1731  if (ret < 0)
1732  enc->in_finished = 1;
1733 
1734  return ret;
1735 }
1736 
1737 static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1738 {
1739  SchSyncQueue *sq = &sch->sq_enc[enc->sq_idx[0]];
1740  int ret = 0;
1741 
1742  // inform the scheduling code that no more input will arrive along this path;
1743  // this is necessary because the sync queue may not send an EOF downstream
1744  // until other streams finish
1745  // TODO: consider a cleaner way of passing this information through
1746  // the pipeline
1747  if (!frame) {
1748  for (unsigned i = 0; i < enc->nb_dst; i++) {
1749  SchMux *mux;
1750  SchMuxStream *ms;
1751 
1752  if (enc->dst[i].type != SCH_NODE_TYPE_MUX)
1753  continue;
1754 
1755  mux = &sch->mux[enc->dst[i].idx];
1756  ms = &mux->streams[enc->dst[i].idx_stream];
1757 
1759 
1760  ms->source_finished = 1;
1762 
1764  }
1765  }
1766 
1767  pthread_mutex_lock(&sq->lock);
1768 
1769  ret = sq_send(sq->sq, enc->sq_idx[1], SQFRAME(frame));
1770  if (ret < 0)
1771  goto finish;
1772 
1773  while (1) {
1774  SchEnc *enc;
1775 
1776  // TODO: the SQ API should be extended to allow returning EOF
1777  // for individual streams
1778  ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame));
1779  if (ret < 0) {
1780  ret = (ret == AVERROR(EAGAIN)) ? 0 : ret;
1781  break;
1782  }
1783 
1784  enc = &sch->enc[sq->enc_idx[ret]];
1785  ret = send_to_enc_thread(sch, enc, sq->frame);
1786  if (ret < 0) {
1787  av_frame_unref(sq->frame);
1788  if (ret != AVERROR_EOF)
1789  break;
1790 
1791  sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL));
1792  continue;
1793  }
1794  }
1795 
1796  if (ret < 0) {
1797  // close all encoders fed from this sync queue
1798  for (unsigned i = 0; i < sq->nb_enc_idx; i++) {
1799  int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL);
1800 
1801  // if the sync queue error is EOF and closing the encoder
1802  // produces a more serious error, make sure to pick the latter
1803  ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err);
1804  }
1805  }
1806 
1807 finish:
1808  pthread_mutex_unlock(&sq->lock);
1809 
1810  return ret;
1811 }
1812 
1813 static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1814 {
1815  if (enc->open_cb && frame && !enc->opened) {
1816  int ret = enc_open(sch, enc, frame);
1817  if (ret < 0)
1818  return ret;
1819  enc->opened = 1;
1820 
1821  // discard empty frames that only carry encoder init parameters
1822  if (!frame->buf[0]) {
1824  return 0;
1825  }
1826  }
1827 
1828  return (enc->sq_idx[0] >= 0) ?
1829  send_to_enc_sq (sch, enc, frame) :
1830  send_to_enc_thread(sch, enc, frame);
1831 }
1832 
1834 {
1835  PreMuxQueue *q = &ms->pre_mux_queue;
1836  AVPacket *tmp_pkt = NULL;
1837  int ret;
1838 
1839  if (!av_fifo_can_write(q->fifo)) {
1840  size_t packets = av_fifo_can_read(q->fifo);
1841  size_t pkt_size = pkt ? pkt->size : 0;
1842  int thresh_reached = (q->data_size + pkt_size) > q->data_threshold;
1843  size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX;
1844  size_t new_size = FFMIN(2 * packets, max_packets);
1845 
1846  if (new_size <= packets) {
1847  av_log(mux, AV_LOG_ERROR,
1848  "Too many packets buffered for output stream.\n");
1849  return AVERROR(ENOSPC);
1850  }
1851  ret = av_fifo_grow2(q->fifo, new_size - packets);
1852  if (ret < 0)
1853  return ret;
1854  }
1855 
1856  if (pkt) {
1857  tmp_pkt = av_packet_alloc();
1858  if (!tmp_pkt)
1859  return AVERROR(ENOMEM);
1860 
1861  av_packet_move_ref(tmp_pkt, pkt);
1862  q->data_size += tmp_pkt->size;
1863  }
1864  av_fifo_write(q->fifo, &tmp_pkt, 1);
1865 
1866  return 0;
1867 }
1868 
1869 static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx,
1870  AVPacket *pkt)
1871 {
1872  SchMuxStream *ms = &mux->streams[stream_idx];
1873  int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ?
1876 
1877  // queue the packet if the muxer cannot be started yet
1878  if (!atomic_load(&mux->mux_started)) {
1879  int queued = 0;
1880 
1881  // the muxer could have started between the above atomic check and
1882  // locking the mutex, then this block falls through to normal send path
1884 
1885  if (!atomic_load(&mux->mux_started)) {
1886  int ret = mux_queue_packet(mux, ms, pkt);
1887  queued = ret < 0 ? ret : 1;
1888  }
1889 
1891 
1892  if (queued < 0)
1893  return queued;
1894  else if (queued)
1895  goto update_schedule;
1896  }
1897 
1898  if (pkt) {
1899  int ret;
1900 
1901  if (ms->init_eof)
1902  return AVERROR_EOF;
1903 
1904  ret = tq_send(mux->queue, stream_idx, pkt);
1905  if (ret < 0)
1906  return ret;
1907  } else
1908  tq_send_finish(mux->queue, stream_idx);
1909 
1910 update_schedule:
1911  // TODO: use atomics to check whether this changes trailing dts
1912  // to avoid locking unnecesarily
1913  if (dts != AV_NOPTS_VALUE || !pkt) {
1915 
1916  if (pkt) ms->last_dts = dts;
1917  else ms->source_finished = 1;
1918 
1920 
1922  }
1923 
1924  return 0;
1925 }
1926 
1927 static int
1929  uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
1930 {
1931  int ret;
1932 
1933  if (*dst_finished)
1934  return AVERROR_EOF;
1935 
1936  if (pkt && dst.type == SCH_NODE_TYPE_MUX &&
1939  pkt = NULL;
1940  }
1941 
1942  if (!pkt)
1943  goto finish;
1944 
1945  ret = (dst.type == SCH_NODE_TYPE_MUX) ?
1946  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
1947  tq_send(sch->dec[dst.idx].queue, 0, pkt);
1948  if (ret == AVERROR_EOF)
1949  goto finish;
1950 
1951  return ret;
1952 
1953 finish:
1954  if (dst.type == SCH_NODE_TYPE_MUX)
1955  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
1956  else
1957  tq_send_finish(sch->dec[dst.idx].queue, 0);
1958 
1959  *dst_finished = 1;
1960  return AVERROR_EOF;
1961 }
1962 
1964  AVPacket *pkt, unsigned flags)
1965 {
1966  unsigned nb_done = 0;
1967 
1968  for (unsigned i = 0; i < ds->nb_dst; i++) {
1969  AVPacket *to_send = pkt;
1970  uint8_t *finished = &ds->dst_finished[i];
1971 
1972  int ret;
1973 
1974  // sending a packet consumes it, so make a temporary reference if needed
1975  if (pkt && i < ds->nb_dst - 1) {
1976  to_send = d->send_pkt;
1977 
1978  ret = av_packet_ref(to_send, pkt);
1979  if (ret < 0)
1980  return ret;
1981  }
1982 
1983  ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags);
1984  if (to_send)
1985  av_packet_unref(to_send);
1986  if (ret == AVERROR_EOF)
1987  nb_done++;
1988  else if (ret < 0)
1989  return ret;
1990  }
1991 
1992  return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0;
1993 }
1994 
1996 {
1997  Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE };
1998 
1999  av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems);
2000 
2001  for (unsigned i = 0; i < d->nb_streams; i++) {
2002  SchDemuxStream *ds = &d->streams[i];
2003 
2004  for (unsigned j = 0; j < ds->nb_dst; j++) {
2005  const SchedulerNode *dst = &ds->dst[j];
2006  SchDec *dec;
2007  int ret;
2008 
2009  if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC)
2010  continue;
2011 
2012  dec = &sch->dec[dst->idx];
2013 
2014  ret = tq_send(dec->queue, 0, pkt);
2015  if (ret < 0)
2016  return ret;
2017 
2018  if (dec->queue_end_ts) {
2019  Timestamp ts;
2021  if (ret < 0)
2022  return ret;
2023 
2024  if (max_end_ts.ts == AV_NOPTS_VALUE ||
2025  (ts.ts != AV_NOPTS_VALUE &&
2026  av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0))
2027  max_end_ts = ts;
2028 
2029  }
2030  }
2031  }
2032 
2033  pkt->pts = max_end_ts.ts;
2034  pkt->time_base = max_end_ts.tb;
2035 
2036  return 0;
2037 }
2038 
2039 int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt,
2040  unsigned flags)
2041 {
2042  SchDemux *d;
2043  int terminate;
2044 
2045  av_assert0(demux_idx < sch->nb_demux);
2046  d = &sch->demux[demux_idx];
2047 
2048  terminate = waiter_wait(sch, &d->waiter);
2049  if (terminate)
2050  return AVERROR_EXIT;
2051 
2052  // flush the downstreams after seek
2053  if (pkt->stream_index == -1)
2054  return demux_flush(sch, d, pkt);
2055 
2057 
2058  return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags);
2059 }
2060 
2061 static int demux_done(Scheduler *sch, unsigned demux_idx)
2062 {
2063  SchDemux *d = &sch->demux[demux_idx];
2064  int ret = 0;
2065 
2066  for (unsigned i = 0; i < d->nb_streams; i++) {
2067  int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0);
2068  if (err != AVERROR_EOF)
2069  ret = err_merge(ret, err);
2070  }
2071 
2073 
2074  d->task_exited = 1;
2075 
2077 
2079 
2080  return ret;
2081 }
2082 
2083 int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
2084 {
2085  SchMux *mux;
2086  int ret, stream_idx;
2087 
2088  av_assert0(mux_idx < sch->nb_mux);
2089  mux = &sch->mux[mux_idx];
2090 
2091  ret = tq_receive(mux->queue, &stream_idx, pkt);
2092  pkt->stream_index = stream_idx;
2093  return ret;
2094 }
2095 
2096 void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
2097 {
2098  SchMux *mux;
2099 
2100  av_assert0(mux_idx < sch->nb_mux);
2101  mux = &sch->mux[mux_idx];
2102 
2103  av_assert0(stream_idx < mux->nb_streams);
2104  tq_receive_finish(mux->queue, stream_idx);
2105 
2107  mux->streams[stream_idx].source_finished = 1;
2108 
2110 
2112 }
2113 
2114 int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
2115  const AVPacket *pkt)
2116 {
2117  SchMux *mux;
2118  SchMuxStream *ms;
2119 
2120  av_assert0(mux_idx < sch->nb_mux);
2121  mux = &sch->mux[mux_idx];
2122 
2123  av_assert0(stream_idx < mux->nb_streams);
2124  ms = &mux->streams[stream_idx];
2125 
2126  for (unsigned i = 0; i < ms->nb_sub_heartbeat_dst; i++) {
2127  SchDec *dst = &sch->dec[ms->sub_heartbeat_dst[i]];
2128  int ret;
2129 
2131  if (ret < 0)
2132  return ret;
2133 
2134  tq_send(dst->queue, 0, mux->sub_heartbeat_pkt);
2135  }
2136 
2137  return 0;
2138 }
2139 
2140 static int mux_done(Scheduler *sch, unsigned mux_idx)
2141 {
2142  SchMux *mux = &sch->mux[mux_idx];
2143 
2145 
2146  for (unsigned i = 0; i < mux->nb_streams; i++) {
2147  tq_receive_finish(mux->queue, i);
2148  mux->streams[i].source_finished = 1;
2149  }
2150 
2152 
2154 
2156 
2157  av_assert0(sch->nb_mux_done < sch->nb_mux);
2158  sch->nb_mux_done++;
2159 
2161 
2163 
2164  return 0;
2165 }
2166 
2167 int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
2168 {
2169  SchDec *dec;
2170  int ret, dummy;
2171 
2172  av_assert0(dec_idx < sch->nb_dec);
2173  dec = &sch->dec[dec_idx];
2174 
2175  // the decoder should have given us post-flush end timestamp in pkt
2176  if (dec->expect_end_ts) {
2177  Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base };
2179  if (ret < 0)
2180  return ret;
2181 
2182  dec->expect_end_ts = 0;
2183  }
2184 
2185  ret = tq_receive(dec->queue, &dummy, pkt);
2186  av_assert0(dummy <= 0);
2187 
2188  // got a flush packet, on the next call to this function the decoder
2189  // will give us post-flush end timestamp
2190  if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
2191  dec->expect_end_ts = 1;
2192 
2193  return ret;
2194 }
2195 
2197  unsigned in_idx, AVFrame *frame)
2198 {
2199  if (frame)
2200  return tq_send(fg->queue, in_idx, frame);
2201 
2202  if (!fg->inputs[in_idx].send_finished) {
2203  fg->inputs[in_idx].send_finished = 1;
2204  tq_send_finish(fg->queue, in_idx);
2205 
2206  // close the control stream when all actual inputs are done
2207  if (atomic_fetch_add(&fg->nb_inputs_finished_send, 1) == fg->nb_inputs - 1)
2208  tq_send_finish(fg->queue, fg->nb_inputs);
2209  }
2210  return 0;
2211 }
2212 
2214  uint8_t *dst_finished, AVFrame *frame)
2215 {
2216  int ret;
2217 
2218  if (*dst_finished)
2219  return AVERROR_EOF;
2220 
2221  if (!frame)
2222  goto finish;
2223 
2224  ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ?
2225  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) :
2226  send_to_enc(sch, &sch->enc[dst.idx], frame);
2227  if (ret == AVERROR_EOF)
2228  goto finish;
2229 
2230  return ret;
2231 
2232 finish:
2233  if (dst.type == SCH_NODE_TYPE_FILTER_IN)
2234  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2235  else
2236  send_to_enc(sch, &sch->enc[dst.idx], NULL);
2237 
2238  *dst_finished = 1;
2239 
2240  return AVERROR_EOF;
2241 }
2242 
2243 int sch_dec_send(Scheduler *sch, unsigned dec_idx,
2244  unsigned out_idx, AVFrame *frame)
2245 {
2246  SchDec *dec;
2247  SchDecOutput *o;
2248  int ret;
2249  unsigned nb_done = 0;
2250 
2251  av_assert0(dec_idx < sch->nb_dec);
2252  dec = &sch->dec[dec_idx];
2253 
2254  av_assert0(out_idx < dec->nb_outputs);
2255  o = &dec->outputs[out_idx];
2256 
2257  for (unsigned i = 0; i < o->nb_dst; i++) {
2258  uint8_t *finished = &o->dst_finished[i];
2259  AVFrame *to_send = frame;
2260 
2261  // sending a frame consumes it, so make a temporary reference if needed
2262  if (i < o->nb_dst - 1) {
2263  to_send = dec->send_frame;
2264 
2265  // frame may sometimes contain props only,
2266  // e.g. to signal EOF timestamp
2267  ret = frame->buf[0] ? av_frame_ref(to_send, frame) :
2268  av_frame_copy_props(to_send, frame);
2269  if (ret < 0)
2270  return ret;
2271  }
2272 
2273  ret = dec_send_to_dst(sch, o->dst[i], finished, to_send);
2274  if (ret < 0) {
2275  av_frame_unref(to_send);
2276  if (ret == AVERROR_EOF) {
2277  nb_done++;
2278  continue;
2279  }
2280  return ret;
2281  }
2282  }
2283 
2284  return (nb_done == o->nb_dst) ? AVERROR_EOF : 0;
2285 }
2286 
2287 static int dec_done(Scheduler *sch, unsigned dec_idx)
2288 {
2289  SchDec *dec = &sch->dec[dec_idx];
2290  int ret = 0;
2291 
2292  tq_receive_finish(dec->queue, 0);
2293 
2294  // make sure our source does not get stuck waiting for end timestamps
2295  // that will never arrive
2296  if (dec->queue_end_ts)
2298 
2299  for (unsigned i = 0; i < dec->nb_outputs; i++) {
2300  SchDecOutput *o = &dec->outputs[i];
2301 
2302  for (unsigned j = 0; j < o->nb_dst; j++) {
2303  int err = dec_send_to_dst(sch, o->dst[j], &o->dst_finished[j], NULL);
2304  if (err < 0 && err != AVERROR_EOF)
2305  ret = err_merge(ret, err);
2306  }
2307  }
2308 
2309  return ret;
2310 }
2311 
2312 int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
2313 {
2314  SchEnc *enc;
2315  int ret, dummy;
2316 
2317  av_assert0(enc_idx < sch->nb_enc);
2318  enc = &sch->enc[enc_idx];
2319 
2320  ret = tq_receive(enc->queue, &dummy, frame);
2321  av_assert0(dummy <= 0);
2322 
2323  return ret;
2324 }
2325 
2327  uint8_t *dst_finished, AVPacket *pkt)
2328 {
2329  int ret;
2330 
2331  if (*dst_finished)
2332  return AVERROR_EOF;
2333 
2334  if (!pkt)
2335  goto finish;
2336 
2337  ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2338  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2339  tq_send(sch->dec[dst.idx].queue, 0, pkt);
2340  if (ret == AVERROR_EOF)
2341  goto finish;
2342 
2343  return ret;
2344 
2345 finish:
2346  if (dst.type == SCH_NODE_TYPE_MUX)
2347  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2348  else
2349  tq_send_finish(sch->dec[dst.idx].queue, 0);
2350 
2351  *dst_finished = 1;
2352 
2353  return AVERROR_EOF;
2354 }
2355 
2356 int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
2357 {
2358  SchEnc *enc;
2359  int ret;
2360 
2361  av_assert0(enc_idx < sch->nb_enc);
2362  enc = &sch->enc[enc_idx];
2363 
2364  for (unsigned i = 0; i < enc->nb_dst; i++) {
2365  uint8_t *finished = &enc->dst_finished[i];
2366  AVPacket *to_send = pkt;
2367 
2368  // sending a packet consumes it, so make a temporary reference if needed
2369  if (i < enc->nb_dst - 1) {
2370  to_send = enc->send_pkt;
2371 
2372  ret = av_packet_ref(to_send, pkt);
2373  if (ret < 0)
2374  return ret;
2375  }
2376 
2377  ret = enc_send_to_dst(sch, enc->dst[i], finished, to_send);
2378  if (ret < 0) {
2379  av_packet_unref(to_send);
2380  if (ret == AVERROR_EOF)
2381  continue;
2382  return ret;
2383  }
2384  }
2385 
2386  return 0;
2387 }
2388 
2389 static int enc_done(Scheduler *sch, unsigned enc_idx)
2390 {
2391  SchEnc *enc = &sch->enc[enc_idx];
2392  int ret = 0;
2393 
2394  tq_receive_finish(enc->queue, 0);
2395 
2396  for (unsigned i = 0; i < enc->nb_dst; i++) {
2397  int err = enc_send_to_dst(sch, enc->dst[i], &enc->dst_finished[i], NULL);
2398  if (err < 0 && err != AVERROR_EOF)
2399  ret = err_merge(ret, err);
2400  }
2401 
2402  return ret;
2403 }
2404 
2405 int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
2406  unsigned *in_idx, AVFrame *frame)
2407 {
2408  SchFilterGraph *fg;
2409 
2410  av_assert0(fg_idx < sch->nb_filters);
2411  fg = &sch->filters[fg_idx];
2412 
2413  av_assert0(*in_idx <= fg->nb_inputs);
2414 
2415  // update scheduling to account for desired input stream, if it changed
2416  //
2417  // this check needs no locking because only the filtering thread
2418  // updates this value
2419  if (*in_idx != fg->best_input) {
2421 
2422  fg->best_input = *in_idx;
2424 
2426  }
2427 
2428  if (*in_idx == fg->nb_inputs) {
2429  int terminate = waiter_wait(sch, &fg->waiter);
2430  return terminate ? AVERROR_EOF : AVERROR(EAGAIN);
2431  }
2432 
2433  while (1) {
2434  int ret, idx;
2435 
2436  ret = tq_receive(fg->queue, &idx, frame);
2437  if (idx < 0)
2438  return AVERROR_EOF;
2439  else if (ret >= 0) {
2440  *in_idx = idx;
2441  return 0;
2442  }
2443 
2444  // disregard EOFs for specific streams - they should always be
2445  // preceded by an EOF frame
2446  }
2447 }
2448 
2449 void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
2450 {
2451  SchFilterGraph *fg;
2452  SchFilterIn *fi;
2453 
2454  av_assert0(fg_idx < sch->nb_filters);
2455  fg = &sch->filters[fg_idx];
2456 
2457  av_assert0(in_idx < fg->nb_inputs);
2458  fi = &fg->inputs[in_idx];
2459 
2460  if (!fi->receive_finished) {
2461  fi->receive_finished = 1;
2462  tq_receive_finish(fg->queue, in_idx);
2463 
2464  // close the control stream when all actual inputs are done
2465  if (++fg->nb_inputs_finished_receive == fg->nb_inputs)
2466  tq_receive_finish(fg->queue, fg->nb_inputs);
2467  }
2468 }
2469 
2470 int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
2471 {
2472  SchFilterGraph *fg;
2474 
2475  av_assert0(fg_idx < sch->nb_filters);
2476  fg = &sch->filters[fg_idx];
2477 
2478  av_assert0(out_idx < fg->nb_outputs);
2479  dst = fg->outputs[out_idx].dst;
2480 
2481  return (dst.type == SCH_NODE_TYPE_ENC) ?
2482  send_to_enc (sch, &sch->enc[dst.idx], frame) :
2483  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame);
2484 }
2485 
2486 static int filter_done(Scheduler *sch, unsigned fg_idx)
2487 {
2488  SchFilterGraph *fg = &sch->filters[fg_idx];
2489  int ret = 0;
2490 
2491  for (unsigned i = 0; i <= fg->nb_inputs; i++)
2492  tq_receive_finish(fg->queue, i);
2493 
2494  for (unsigned i = 0; i < fg->nb_outputs; i++) {
2495  SchedulerNode dst = fg->outputs[i].dst;
2496  int err = (dst.type == SCH_NODE_TYPE_ENC) ?
2497  send_to_enc (sch, &sch->enc[dst.idx], NULL) :
2498  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2499 
2500  if (err < 0 && err != AVERROR_EOF)
2501  ret = err_merge(ret, err);
2502  }
2503 
2505 
2506  fg->task_exited = 1;
2507 
2509 
2511 
2512  return ret;
2513 }
2514 
2515 int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
2516 {
2517  SchFilterGraph *fg;
2518 
2519  av_assert0(fg_idx < sch->nb_filters);
2520  fg = &sch->filters[fg_idx];
2521 
2522  return send_to_filter(sch, fg, fg->nb_inputs, frame);
2523 }
2524 
2525 static int task_cleanup(Scheduler *sch, SchedulerNode node)
2526 {
2527  switch (node.type) {
2528  case SCH_NODE_TYPE_DEMUX: return demux_done (sch, node.idx);
2529  case SCH_NODE_TYPE_MUX: return mux_done (sch, node.idx);
2530  case SCH_NODE_TYPE_DEC: return dec_done (sch, node.idx);
2531  case SCH_NODE_TYPE_ENC: return enc_done (sch, node.idx);
2532  case SCH_NODE_TYPE_FILTER_IN: return filter_done(sch, node.idx);
2533  default: av_assert0(0);
2534  }
2535 }
2536 
2537 static void *task_wrapper(void *arg)
2538 {
2539  SchTask *task = arg;
2540  Scheduler *sch = task->parent;
2541  int ret;
2542  int err = 0;
2543 
2544  ret = task->func(task->func_arg);
2545  if (ret < 0)
2546  av_log(task->func_arg, AV_LOG_ERROR,
2547  "Task finished with error code: %d (%s)\n", ret, av_err2str(ret));
2548 
2549  err = task_cleanup(sch, task->node);
2550  ret = err_merge(ret, err);
2551 
2552  // EOF is considered normal termination
2553  if (ret == AVERROR_EOF)
2554  ret = 0;
2555  if (ret < 0)
2556  atomic_store(&sch->task_failed, 1);
2557 
2559  "Terminating thread with return code %d (%s)\n", ret,
2560  ret < 0 ? av_err2str(ret) : "success");
2561 
2562  return (void*)(intptr_t)ret;
2563 }
2564 
2565 static int task_stop(Scheduler *sch, SchTask *task)
2566 {
2567  int ret;
2568  void *thread_ret;
2569 
2570  if (!task->thread_running)
2571  return task_cleanup(sch, task->node);
2572 
2573  ret = pthread_join(task->thread, &thread_ret);
2574  av_assert0(ret == 0);
2575 
2576  task->thread_running = 0;
2577 
2578  return (intptr_t)thread_ret;
2579 }
2580 
2581 int sch_stop(Scheduler *sch, int64_t *finish_ts)
2582 {
2583  int ret = 0, err;
2584 
2585  if (sch->state != SCH_STATE_STARTED)
2586  return 0;
2587 
2588  atomic_store(&sch->terminate, 1);
2589 
2590  for (unsigned type = 0; type < 2; type++)
2591  for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
2592  SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
2593  waiter_set(w, 1);
2594  }
2595 
2596  for (unsigned i = 0; i < sch->nb_demux; i++) {
2597  SchDemux *d = &sch->demux[i];
2598 
2599  err = task_stop(sch, &d->task);
2600  ret = err_merge(ret, err);
2601  }
2602 
2603  for (unsigned i = 0; i < sch->nb_dec; i++) {
2604  SchDec *dec = &sch->dec[i];
2605 
2606  err = task_stop(sch, &dec->task);
2607  ret = err_merge(ret, err);
2608  }
2609 
2610  for (unsigned i = 0; i < sch->nb_filters; i++) {
2611  SchFilterGraph *fg = &sch->filters[i];
2612 
2613  err = task_stop(sch, &fg->task);
2614  ret = err_merge(ret, err);
2615  }
2616 
2617  for (unsigned i = 0; i < sch->nb_enc; i++) {
2618  SchEnc *enc = &sch->enc[i];
2619 
2620  err = task_stop(sch, &enc->task);
2621  ret = err_merge(ret, err);
2622  }
2623 
2624  for (unsigned i = 0; i < sch->nb_mux; i++) {
2625  SchMux *mux = &sch->mux[i];
2626 
2627  err = task_stop(sch, &mux->task);
2628  ret = err_merge(ret, err);
2629  }
2630 
2631  if (finish_ts)
2632  *finish_ts = trailing_dts(sch, 1);
2633 
2634  sch->state = SCH_STATE_STOPPED;
2635 
2636  return ret;
2637 }
Scheduler::sq_enc
SchSyncQueue * sq_enc
Definition: ffmpeg_sched.c:298
func
int(* func)(AVBPrint *dst, const char *in, const char *arg)
Definition: jacosubdec.c:68
pthread_mutex_t
_fmutex pthread_mutex_t
Definition: os2threads.h:53
SchWaiter
Definition: ffmpeg_sched.c:52
av_packet_unref
void av_packet_unref(AVPacket *pkt)
Wipe the packet.
Definition: packet.c:429
mux_task_start
static int mux_task_start(SchMux *mux)
Definition: ffmpeg_sched.c:1105
pthread_join
static av_always_inline int pthread_join(pthread_t thread, void **value_ptr)
Definition: os2threads.h:94
SchedulerNode::idx_stream
unsigned idx_stream
Definition: ffmpeg_sched.h:106
SchDecOutput::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:76
waiter_init
static int waiter_init(SchWaiter *w)
Definition: ffmpeg_sched.c:351
av_fifo_can_write
size_t av_fifo_can_write(const AVFifo *f)
Definition: fifo.c:94
atomic_store
#define atomic_store(object, desired)
Definition: stdatomic.h:85
sch_filter_send
int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
Called by filtergraph tasks to send a filtered frame or EOF to consumers.
Definition: ffmpeg_sched.c:2470
err_merge
static int err_merge(int err0, int err1)
Merge two return codes - return one of the error codes if at least one of them was negative,...
Definition: ffmpeg_utils.h:39
SchDec::task
SchTask task
Definition: ffmpeg_sched.c:88
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
Scheduler::enc
SchEnc * enc
Definition: ffmpeg_sched.c:295
Scheduler::nb_mux_done
unsigned nb_mux_done
Definition: ffmpeg_sched.c:287
av_compare_ts
int av_compare_ts(int64_t ts_a, AVRational tb_a, int64_t ts_b, AVRational tb_b)
Compare two timestamps each in its own time base.
Definition: mathematics.c:147
SchedulerState
SchedulerState
Definition: ffmpeg_sched.c:269
sch_mux_receive_finish
void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
Called by muxer tasks to signal that a stream will no longer accept input.
Definition: ffmpeg_sched.c:2096
SCH_NODE_TYPE_ENC
@ SCH_NODE_TYPE_ENC
Definition: ffmpeg_sched.h:98
SchSyncQueue::sq
SyncQueue * sq
Definition: ffmpeg_sched.c:101
SchTask::thread
pthread_t thread
Definition: ffmpeg_sched.c:70
atomic_fetch_add
#define atomic_fetch_add(object, operand)
Definition: stdatomic.h:137
demux_flush
static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt)
Definition: ffmpeg_sched.c:1995
thread.h
AVERROR_EOF
#define AVERROR_EOF
End of file.
Definition: error.h:57
Scheduler::nb_sq_enc
unsigned nb_sq_enc
Definition: ffmpeg_sched.c:299
SchMux::sub_heartbeat_pkt
AVPacket * sub_heartbeat_pkt
Definition: ffmpeg_sched.c:233
sq_limit_frames
void sq_limit_frames(SyncQueue *sq, unsigned int stream_idx, uint64_t frames)
Limit the number of output frames for stream with index stream_idx to max_frames.
Definition: sync_queue.c:649
pthread_mutex_init
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
SchEnc::send_pkt
AVPacket * send_pkt
Definition: ffmpeg_sched.c:147
SCHEDULE_TOLERANCE
#define SCHEDULE_TOLERANCE
Definition: ffmpeg_sched.c:45
sch_add_demux
int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx)
Add a demuxer to the scheduler.
Definition: ffmpeg_sched.c:688
PreMuxQueue::data_size
size_t data_size
Definition: ffmpeg_sched.c:185
AV_TIME_BASE_Q
#define AV_TIME_BASE_Q
Internal time base represented as fractional value.
Definition: avutil.h:264
int64_t
long long int64_t
Definition: coverity.c:34
CYCLE_NODE_STARTED
@ CYCLE_NODE_STARTED
Definition: ffmpeg_sched.c:1368
SchTask::func
SchThreadFunc func
Definition: ffmpeg_sched.c:67
mux_done
static int mux_done(Scheduler *sch, unsigned mux_idx)
Definition: ffmpeg_sched.c:2140
Scheduler::nb_enc
unsigned nb_enc
Definition: ffmpeg_sched.c:296
av_frame_free
void av_frame_free(AVFrame **frame)
Free the frame and any dynamically allocated objects in it, e.g.
Definition: frame.c:162
SQFRAME
#define SQFRAME(frame)
Definition: sync_queue.h:38
av_fifo_peek
int av_fifo_peek(const AVFifo *f, void *buf, size_t nb_elems, size_t offset)
Read data from a FIFO without modifying FIFO state.
Definition: fifo.c:255
check_acyclic_for_output
static int check_acyclic_for_output(const Scheduler *sch, SchedulerNode src, uint8_t *filters_visited, SchedulerNode *filters_stack)
Definition: ffmpeg_sched.c:1373
AVFrame
This structure describes decoded (raw) audio or video data.
Definition: frame.h:389
w
uint8_t w
Definition: llviddspenc.c:38
task_cleanup
static int task_cleanup(Scheduler *sch, SchedulerNode node)
Definition: ffmpeg_sched.c:2525
frame_move
static void frame_move(void *dst, void *src)
Definition: ffmpeg_utils.h:52
sync_queue.h
AVPacket::data
uint8_t * data
Definition: packet.h:539
SchMux::nb_streams_ready
unsigned nb_streams_ready
Definition: ffmpeg_sched.c:218
SchDemux::nb_streams
unsigned nb_streams
Definition: ffmpeg_sched.c:160
send_to_mux
static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx, AVPacket *pkt)
Definition: ffmpeg_sched.c:1869
Scheduler::nb_mux_ready
unsigned nb_mux_ready
Definition: ffmpeg_sched.c:284
atomic_int
intptr_t atomic_int
Definition: stdatomic.h:55
objpool_free
void objpool_free(ObjPool **pop)
Definition: objpool.c:54
enc_done
static int enc_done(Scheduler *sch, unsigned enc_idx)
Definition: ffmpeg_sched.c:2389
SCH_NODE_TYPE_MUX
@ SCH_NODE_TYPE_MUX
Definition: ffmpeg_sched.h:96
AV_LOG_VERBOSE
#define AV_LOG_VERBOSE
Detailed information.
Definition: log.h:225
AVPacket::duration
int64_t duration
Duration of this packet in AVStream->time_base units, 0 if unknown.
Definition: packet.h:557
SchWaiter::choked_prev
int choked_prev
Definition: ffmpeg_sched.c:59
QUEUE_PACKETS
@ QUEUE_PACKETS
Definition: ffmpeg_sched.c:48
SchMux
Definition: ffmpeg_sched.c:213
Scheduler::class
const AVClass * class
Definition: ffmpeg_sched.c:276
objpool_alloc_packets
ObjPool * objpool_alloc_packets(void)
Definition: objpool.c:124
SchFilterOut
Definition: ffmpeg_sched.c:243
SCH_STATE_UNINIT
@ SCH_STATE_UNINIT
Definition: ffmpeg_sched.c:270
Timestamp::ts
int64_t ts
Definition: ffmpeg_utils.h:31
PreMuxQueue::fifo
AVFifo * fifo
Queue for buffering the packets before the muxer task can be started.
Definition: ffmpeg_sched.c:176
SchMuxStream::last_dts
int64_t last_dts
Definition: ffmpeg_sched.c:207
av_packet_free
void av_packet_free(AVPacket **pkt)
Free the packet, if the packet is reference counted, it will be unreferenced first.
Definition: packet.c:74
DEFAULT_FRAME_THREAD_QUEUE_SIZE
#define DEFAULT_FRAME_THREAD_QUEUE_SIZE
Default size of a frame thread queue.
Definition: ffmpeg_sched.h:260
Scheduler::last_dts
atomic_int_least64_t last_dts
Definition: ffmpeg_sched.c:313
SchDemux::send_pkt
AVPacket * send_pkt
Definition: ffmpeg_sched.c:166
sch_mux_stream_ready
int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
Signal to the scheduler that the specified muxed stream is initialized and ready.
Definition: ffmpeg_sched.c:1221
send_to_enc_thread
static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1718
task_stop
static int task_stop(Scheduler *sch, SchTask *task)
Definition: ffmpeg_sched.c:2565
SchFilterIn::receive_finished
int receive_finished
Definition: ffmpeg_sched.c:240
SchedulerNode::type
enum SchedulerNodeType type
Definition: ffmpeg_sched.h:104
fifo.h
finish
static void finish(void)
Definition: movenc.c:373
sch_stop
int sch_stop(Scheduler *sch, int64_t *finish_ts)
Definition: ffmpeg_sched.c:2581
SchEnc::sq_idx
int sq_idx[2]
Definition: ffmpeg_sched.c:119
fail
#define fail()
Definition: checkasm.h:188
av_fifo_write
int av_fifo_write(AVFifo *f, const void *buf, size_t nb_elems)
Write data into a FIFO.
Definition: fifo.c:188
SchThreadFunc
int(* SchThreadFunc)(void *arg)
Definition: ffmpeg_sched.h:109
SchFilterOut::dst
SchedulerNode dst
Definition: ffmpeg_sched.c:244
SchDec::outputs
SchDecOutput * outputs
Definition: ffmpeg_sched.c:85
SchEnc
Definition: ffmpeg_sched.c:109
dummy
int dummy
Definition: motion.c:66
av_fifo_grow2
int av_fifo_grow2(AVFifo *f, size_t inc)
Enlarge an AVFifo.
Definition: fifo.c:99
SchDec::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:90
sch_add_mux_stream
int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
Add a muxed stream for a previously added muxer.
Definition: ffmpeg_sched.c:656
SchMux::class
const AVClass * class
Definition: ffmpeg_sched.c:214
SchFilterGraph::nb_inputs_finished_send
atomic_uint nb_inputs_finished_send
Definition: ffmpeg_sched.c:252
SCH_STATE_STOPPED
@ SCH_STATE_STOPPED
Definition: ffmpeg_sched.c:272
sq_receive
int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
Read a frame from the queue.
Definition: sync_queue.c:608
type
it s the only field you need to keep assuming you have a context There is some magic you don t need to care about around this just let it vf type
Definition: writing_filters.txt:86
Scheduler::nb_dec
unsigned nb_dec
Definition: ffmpeg_sched.c:293
av_thread_message_queue_recv
int av_thread_message_queue_recv(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Receive a message from the queue.
Definition: threadmessage.c:177
sch_add_filtergraph
int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, SchThreadFunc func, void *ctx)
Add a filtergraph to the scheduler.
Definition: ffmpeg_sched.c:829
av_frame_alloc
AVFrame * av_frame_alloc(void)
Allocate an AVFrame and set its fields to default values.
Definition: frame.c:150
SchFilterGraph
Definition: ffmpeg_sched.c:247
SchMuxStream::src_sched
SchedulerNode src_sched
Definition: ffmpeg_sched.c:192
avassert.h
pkt
AVPacket * pkt
Definition: movenc.c:60
AV_LOG_ERROR
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:209
sch_free
void sch_free(Scheduler **psch)
Definition: ffmpeg_sched.c:467
Scheduler::state
enum SchedulerState state
Definition: ffmpeg_sched.c:307
SchMux::streams
SchMuxStream * streams
Definition: ffmpeg_sched.c:216
av_thread_message_queue_send
int av_thread_message_queue_send(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Send a message on the queue.
Definition: threadmessage.c:161
Scheduler::mux_done_cond
pthread_cond_t mux_done_cond
Definition: ffmpeg_sched.c:289
av_fifo_read
int av_fifo_read(AVFifo *f, void *buf, size_t nb_elems)
Read data from a FIFO.
Definition: fifo.c:240
SchMuxStream
Definition: ffmpeg_sched.c:190
SchDecOutput
Definition: ffmpeg_sched.c:74
sch_add_mux
int sch_add_mux(Scheduler *sch, SchThreadFunc func, int(*init)(void *), void *arg, int sdp_auto, unsigned thread_queue_size)
Add a muxer to the scheduler.
Definition: ffmpeg_sched.c:632
waiter_set
static void waiter_set(SchWaiter *w, int choked)
Definition: ffmpeg_sched.c:341
SchFilterGraph::nb_outputs
unsigned nb_outputs
Definition: ffmpeg_sched.c:256
SchDec::expect_end_ts
int expect_end_ts
Definition: ffmpeg_sched.c:94
enc_open
static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
Definition: ffmpeg_sched.c:1692
sch_alloc
Scheduler * sch_alloc(void)
Definition: ffmpeg_sched.c:585
dec_send_to_dst
static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVFrame *frame)
Definition: ffmpeg_sched.c:2213
task_init
static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx, SchThreadFunc func, void *func_arg)
Definition: ffmpeg_sched.c:433
SchMuxStream::nb_sub_heartbeat_dst
unsigned nb_sub_heartbeat_dst
Definition: ffmpeg_sched.c:195
op
static int op(uint8_t **dst, const uint8_t *dst_end, GetByteContext *gb, int pixel, int count, int *x, int width, int linesize)
Perform decode operation.
Definition: anm.c:76
SchEnc::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:113
sch_add_demux_stream
int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
Add a demuxed stream for a previously added demuxer.
Definition: ffmpeg_sched.c:715
SchMuxStream::src
SchedulerNode src
Definition: ffmpeg_sched.c:191
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:40
SchedulerNodeType
SchedulerNodeType
Definition: ffmpeg_sched.h:93
sch_dec_send
int sch_dec_send(Scheduler *sch, unsigned dec_idx, unsigned out_idx, AVFrame *frame)
Called by decoder tasks to send a decoded frame downstream.
Definition: ffmpeg_sched.c:2243
ctx
AVFormatContext * ctx
Definition: movenc.c:49
nb_streams
static int nb_streams
Definition: ffprobe.c:384
SchMuxStream::source_finished
int source_finished
Definition: ffmpeg_sched.c:209
av_rescale_q
int64_t av_rescale_q(int64_t a, AVRational bq, AVRational cq)
Rescale a 64-bit integer by 2 rational numbers.
Definition: mathematics.c:142
ffmpeg_utils.h
filter_done
static int filter_done(Scheduler *sch, unsigned fg_idx)
Definition: ffmpeg_sched.c:2486
SchFilterGraph::class
const AVClass * class
Definition: ffmpeg_sched.c:248
sch_enc_receive
int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
Called by encoder tasks to obtain frames for encoding.
Definition: ffmpeg_sched.c:2312
AVThreadMessageQueue
Definition: threadmessage.c:30
atomic_load
#define atomic_load(object)
Definition: stdatomic.h:93
objpool_alloc_frames
ObjPool * objpool_alloc_frames(void)
Definition: objpool.c:128
sch_sdp_filename
int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
Set the file path for the SDP.
Definition: ffmpeg_sched.c:619
SchEnc::class
const AVClass * class
Definition: ffmpeg_sched.c:110
demux_send_for_stream
static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds, AVPacket *pkt, unsigned flags)
Definition: ffmpeg_sched.c:1963
arg
const char * arg
Definition: jacosubdec.c:67
pthread_create
static av_always_inline int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
Definition: os2threads.h:80
SchMuxStream::pre_mux_queue
PreMuxQueue pre_mux_queue
Definition: ffmpeg_sched.c:197
sq_add_stream
int sq_add_stream(SyncQueue *sq, int limiting)
Add a new stream to the sync queue.
Definition: sync_queue.c:620
SchMux::mux_started
atomic_int mux_started
Set to 1 after starting the muxer task and flushing the pre-muxing queues.
Definition: ffmpeg_sched.c:229
SCH_NODE_TYPE_DEMUX
@ SCH_NODE_TYPE_DEMUX
Definition: ffmpeg_sched.h:95
Scheduler::demux
SchDemux * demux
Definition: ffmpeg_sched.c:278
pkt_move
static void pkt_move(void *dst, void *src)
Definition: ffmpeg_utils.h:47
AVPacket::buf
AVBufferRef * buf
A reference to the reference-counted buffer where the packet data is stored.
Definition: packet.h:522
tq_free
void tq_free(ThreadQueue **ptq)
Definition: thread_queue.c:55
LIBAVUTIL_VERSION_INT
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
waiter_uninit
static void waiter_uninit(SchWaiter *w)
Definition: ffmpeg_sched.c:368
AVClass
Describe the class of an AVClass context structure.
Definition: log.h:75
Scheduler::sdp_filename
char * sdp_filename
Definition: ffmpeg_sched.c:304
send_to_enc_sq
static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1737
NULL
#define NULL
Definition: coverity.c:32
SchEnc::open_cb
int(* open_cb)(void *opaque, const AVFrame *frame)
Definition: ffmpeg_sched.c:137
av_frame_copy_props
int av_frame_copy_props(AVFrame *dst, const AVFrame *src)
Copy only "metadata" fields from src to dst.
Definition: frame.c:713
SchFilterGraph::nb_inputs
unsigned nb_inputs
Definition: ffmpeg_sched.c:251
Scheduler::mux
SchMux * mux
Definition: ffmpeg_sched.c:281
schedule_update_locked
static void schedule_update_locked(Scheduler *sch)
Definition: ffmpeg_sched.c:1300
tq_receive_finish
void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the receiving side.
Definition: thread_queue.c:241
sch_add_enc
int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx, int(*open_cb)(void *opaque, const AVFrame *frame))
Definition: ffmpeg_sched.c:791
SchSyncQueue::enc_idx
unsigned * enc_idx
Definition: ffmpeg_sched.c:105
SCH_STATE_STARTED
@ SCH_STATE_STARTED
Definition: ffmpeg_sched.c:271
dec_done
static int dec_done(Scheduler *sch, unsigned dec_idx)
Definition: ffmpeg_sched.c:2287
SchFilterGraph::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:261
SchDemux::class
const AVClass * class
Definition: ffmpeg_sched.c:157
av_fifo_can_read
size_t av_fifo_can_read(const AVFifo *f)
Definition: fifo.c:87
SchEnc::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:114
sch_add_dec
int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
Add a decoder to the scheduler.
Definition: ffmpeg_sched.c:748
SchWaiter::choked
atomic_int choked
Definition: ffmpeg_sched.c:55
SchWaiter::cond
pthread_cond_t cond
Definition: ffmpeg_sched.c:54
CYCLE_NODE_NEW
@ CYCLE_NODE_NEW
Definition: ffmpeg_sched.c:1367
time.h
DEMUX_SEND_STREAMCOPY_EOF
@ DEMUX_SEND_STREAMCOPY_EOF
Treat the packet as an EOF for SCH_NODE_TYPE_MUX destinations send normally to other types.
Definition: ffmpeg_sched.h:338
sch_fg_class
static const AVClass sch_fg_class
Definition: ffmpeg_sched.c:823
QUEUE_FRAMES
@ QUEUE_FRAMES
Definition: ffmpeg_sched.c:49
av_packet_ref
int av_packet_ref(AVPacket *dst, const AVPacket *src)
Setup a new reference to the data described by a given packet.
Definition: packet.c:437
av_packet_move_ref
void av_packet_move_ref(AVPacket *dst, AVPacket *src)
Move every field in src to dst and reset src.
Definition: packet.c:486
SchTask::thread_running
int thread_running
Definition: ffmpeg_sched.c:71
sch_enc_send
int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
Called by encoder tasks to send encoded packets downstream.
Definition: ffmpeg_sched.c:2356
pthread_mutex_unlock
#define pthread_mutex_unlock(a)
Definition: ffprobe.c:82
error.h
Scheduler
Definition: ffmpeg_sched.c:275
SchMux::nb_streams
unsigned nb_streams
Definition: ffmpeg_sched.c:217
SchSyncQueue::lock
pthread_mutex_t lock
Definition: ffmpeg_sched.c:103
SchMuxStream::sub_heartbeat_dst
unsigned * sub_heartbeat_dst
Definition: ffmpeg_sched.c:194
SchDec::class
const AVClass * class
Definition: ffmpeg_sched.c:81
sq_frame_samples
void sq_frame_samples(SyncQueue *sq, unsigned int stream_idx, int frame_samples)
Set a constant output audio frame size, in samples.
Definition: sync_queue.c:661
SchEnc::in_finished
int in_finished
Definition: ffmpeg_sched.c:144
SchDemux::task
SchTask task
Definition: ffmpeg_sched.c:162
SchDemuxStream::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:153
SchFilterGraph::nb_inputs_finished_receive
unsigned nb_inputs_finished_receive
Definition: ffmpeg_sched.c:253
tq_send
int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
Send an item for the given stream to the queue.
Definition: thread_queue.c:120
init
int(* init)(AVBSFContext *ctx)
Definition: dts2pts.c:368
AVPacket::size
int size
Definition: packet.h:540
AVFifo
Definition: fifo.c:35
SchSyncQueue::nb_enc_idx
unsigned nb_enc_idx
Definition: ffmpeg_sched.c:106
SchFilterGraph::task_exited
int task_exited
Definition: ffmpeg_sched.c:266
av_frame_ref
int av_frame_ref(AVFrame *dst, const AVFrame *src)
Set up a new reference to the data described by the source frame.
Definition: frame.c:388
threadmessage.h
dst
uint8_t ptrdiff_t const uint8_t ptrdiff_t int intptr_t intptr_t int int16_t * dst
Definition: dsp.h:83
PreMuxQueue::max_packets
int max_packets
Maximum number of packets in fifo.
Definition: ffmpeg_sched.c:180
SchFilterGraph::task
SchTask task
Definition: ffmpeg_sched.c:258
av_err2str
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
Definition: error.h:122
SchWaiter::lock
pthread_mutex_t lock
Definition: ffmpeg_sched.c:53
sq_send
int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame)
Submit a frame for the stream with index stream_idx.
Definition: sync_queue.c:343
PreMuxQueue::data_threshold
size_t data_threshold
Definition: ffmpeg_sched.c:187
sq_free
void sq_free(SyncQueue **psq)
Definition: sync_queue.c:699
AV_NOPTS_VALUE
#define AV_NOPTS_VALUE
Undefined timestamp value.
Definition: avutil.h:248
sch_dec_class
static const AVClass sch_dec_class
Definition: ffmpeg_sched.c:742
SchFilterGraph::inputs
SchFilterIn * inputs
Definition: ffmpeg_sched.c:250
Scheduler::schedule_lock
pthread_mutex_t schedule_lock
Definition: ffmpeg_sched.c:311
frame.h
SchTask::func_arg
void * func_arg
Definition: ffmpeg_sched.c:68
SCH_NODE_TYPE_FILTER_OUT
@ SCH_NODE_TYPE_FILTER_OUT
Definition: ffmpeg_sched.h:100
AVPacket::dts
int64_t dts
Decompression timestamp in AVStream->time_base units; the time at which the packet is decompressed.
Definition: packet.h:538
ObjPool
Definition: objpool.c:30
enc_send_to_dst
static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVPacket *pkt)
Definition: ffmpeg_sched.c:2326
CYCLE_NODE_DONE
@ CYCLE_NODE_DONE
Definition: ffmpeg_sched.c:1369
sch_mux_class
static const AVClass sch_mux_class
Definition: ffmpeg_sched.c:626
SchFilterIn
Definition: ffmpeg_sched.c:236
sch_filter_receive
int sch_filter_receive(Scheduler *sch, unsigned fg_idx, unsigned *in_idx, AVFrame *frame)
Called by filtergraph tasks to obtain frames for filtering.
Definition: ffmpeg_sched.c:2405
Scheduler::nb_mux
unsigned nb_mux
Definition: ffmpeg_sched.c:282
av_packet_alloc
AVPacket * av_packet_alloc(void)
Allocate an AVPacket and set its fields to default values.
Definition: packet.c:63
tq_alloc
ThreadQueue * tq_alloc(unsigned int nb_streams, size_t queue_size, ObjPool *obj_pool, void(*obj_move)(void *dst, void *src))
Allocate a queue for sending data between threads.
Definition: thread_queue.c:79
SchEnc::opened
int opened
Definition: ffmpeg_sched.c:138
scheduler_class
static const AVClass scheduler_class
Definition: ffmpeg_sched.c:580
pthread_t
Definition: os2threads.h:44
pthread_cond_destroy
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:144
Scheduler::nb_demux
unsigned nb_demux
Definition: ffmpeg_sched.c:279
Scheduler::task_failed
atomic_int task_failed
Definition: ffmpeg_sched.c:309
av_thread_message_queue_alloc
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, unsigned nelem, unsigned elsize)
Allocate a new message queue.
Definition: threadmessage.c:45
pthread_mutex_destroy
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
av_packet_copy_props
int av_packet_copy_props(AVPacket *dst, const AVPacket *src)
Copy only "properties" fields from src to dst.
Definition: packet.c:392
SchDemuxStream::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:152
SchDemux::task_exited
int task_exited
Definition: ffmpeg_sched.c:169
SCH_NODE_TYPE_FILTER_IN
@ SCH_NODE_TYPE_FILTER_IN
Definition: ffmpeg_sched.h:99
task_start
static int task_start(SchTask *task)
Definition: ffmpeg_sched.c:414
Scheduler::filters
SchFilterGraph * filters
Definition: ffmpeg_sched.c:301
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:256
AVPacket::pts
int64_t pts
Presentation timestamp in AVStream->time_base units; the time at which the decompressed packet will b...
Definition: packet.h:532
demux_done
static int demux_done(Scheduler *sch, unsigned demux_idx)
Definition: ffmpeg_sched.c:2061
packet.h
SchWaiter::choked_next
int choked_next
Definition: ffmpeg_sched.c:60
SchFilterGraph::best_input
unsigned best_input
Definition: ffmpeg_sched.c:265
av_malloc_array
#define av_malloc_array(a, b)
Definition: tableprint_vlc.h:31
Scheduler::mux_ready_lock
pthread_mutex_t mux_ready_lock
Definition: ffmpeg_sched.c:285
Scheduler::terminate
atomic_int terminate
Definition: ffmpeg_sched.c:308
SchDec
Definition: ffmpeg_sched.c:80
DEFAULT_PACKET_THREAD_QUEUE_SIZE
#define DEFAULT_PACKET_THREAD_QUEUE_SIZE
Default size of a packet thread queue.
Definition: ffmpeg_sched.h:255
QueueType
QueueType
Definition: ffmpeg_sched.c:47
FFMIN
#define FFMIN(a, b)
Definition: macros.h:49
SchDec::nb_outputs
unsigned nb_outputs
Definition: ffmpeg_sched.c:86
av_frame_unref
void av_frame_unref(AVFrame *frame)
Unreference all the buffers referenced by frame and reset the frame fields.
Definition: frame.c:610
trailing_dts
static int64_t trailing_dts(const Scheduler *sch, int count_finished)
Definition: ffmpeg_sched.c:445
Scheduler::mux_done_lock
pthread_mutex_t mux_done_lock
Definition: ffmpeg_sched.c:288
av_mallocz
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:256
SchFilterGraph::outputs
SchFilterOut * outputs
Definition: ffmpeg_sched.c:255
sch_enc_class
static const AVClass sch_enc_class
Definition: ffmpeg_sched.c:785
SchedulerNode
Definition: ffmpeg_sched.h:103
SCH_NODE_TYPE_DEC
@ SCH_NODE_TYPE_DEC
Definition: ffmpeg_sched.h:97
pthread_cond_t
Definition: os2threads.h:58
SchTask
Definition: ffmpeg_sched.c:63
mux_init
static int mux_init(Scheduler *sch, SchMux *mux)
Definition: ffmpeg_sched.c:1169
send_to_filter
static int send_to_filter(Scheduler *sch, SchFilterGraph *fg, unsigned in_idx, AVFrame *frame)
Definition: ffmpeg_sched.c:2196
tq_receive
int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
Read the next item from the queue.
Definition: thread_queue.c:196
SchDemuxStream::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:151
av_calloc
void * av_calloc(size_t nmemb, size_t size)
Definition: mem.c:264
sch_connect
int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
Definition: ffmpeg_sched.c:927
send_to_enc
static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1813
sch_filter_command
int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
Definition: ffmpeg_sched.c:2515
SchDemuxStream
Definition: ffmpeg_sched.c:150
Timestamp::tb
AVRational tb
Definition: ffmpeg_utils.h:32
atomic_int_least64_t
intptr_t atomic_int_least64_t
Definition: stdatomic.h:68
SchFilterIn::src_sched
SchedulerNode src_sched
Definition: ffmpeg_sched.c:238
ret
ret
Definition: filter_design.txt:187
sch_dec_receive
int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
Called by decoder tasks to receive a packet for decoding.
Definition: ffmpeg_sched.c:2167
AVClass::class_name
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:80
frame
these buffered frames must be flushed immediately if a new input produces new the filter must not call request_frame to get more It must just process the frame or queue it The task of requesting more frames is left to the filter s request_frame method or the application If a filter has several the filter must be ready for frames arriving randomly on any input any filter with several inputs will most likely require some kind of queuing mechanism It is perfectly acceptable to have a limited queue and to drop frames when the inputs are too unbalanced request_frame For filters that do not use the this method is called when a frame is wanted on an output For a it should directly call filter_frame on the corresponding output For a if there are queued frames already one of these frames should be pushed If the filter should request a frame on one of its repeatedly until at least one frame has been pushed Return or at least make progress towards producing a frame
Definition: filter_design.txt:264
SchMuxStream::init_eof
int init_eof
Definition: ffmpeg_sched.c:200
mux_queue_packet
static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt)
Definition: ffmpeg_sched.c:1833
SchMux::init
int(* init)(void *arg)
Definition: ffmpeg_sched.c:220
sch_demux_class
static const AVClass sch_demux_class
Definition: ffmpeg_sched.c:682
ThreadQueue
Definition: thread_queue.c:42
av_fifo_alloc2
AVFifo * av_fifo_alloc2(size_t nb_elems, size_t elem_size, unsigned int flags)
Allocate and initialize an AVFifo with a given element size.
Definition: fifo.c:47
pthread_cond_signal
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:152
task_wrapper
static void * task_wrapper(void *arg)
Definition: ffmpeg_sched.c:2537
SchMux::task
SchTask task
Definition: ffmpeg_sched.c:222
SyncQueue
A sync queue provides timestamp synchronization between multiple streams.
Definition: sync_queue.c:88
sch_demux_send
int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt, unsigned flags)
Called by demuxer tasks to communicate with their downstreams.
Definition: ffmpeg_sched.c:2039
SchDemux
Definition: ffmpeg_sched.c:156
Scheduler::dec
SchDec * dec
Definition: ffmpeg_sched.c:292
atomic_uint
intptr_t atomic_uint
Definition: stdatomic.h:56
SchDec::queue_end_ts
AVThreadMessageQueue * queue_end_ts
Definition: ffmpeg_sched.c:93
demux_stream_send_to_dst
static int demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
Definition: ffmpeg_sched.c:1928
SchDec::src
SchedulerNode src
Definition: ffmpeg_sched.c:83
thread_queue.h
AVPacket::stream_index
int stream_index
Definition: packet.h:541
GROW_ARRAY
#define GROW_ARRAY(array, nb_elems)
Definition: cmdutils.h:532
pthread_cond_wait
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:192
SchMux::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:230
SchDemux::waiter
SchWaiter waiter
Definition: ffmpeg_sched.c:163
av_gettime
int64_t av_gettime(void)
Get the current time in microseconds.
Definition: time.c:39
waiter_wait
static int waiter_wait(Scheduler *sch, SchWaiter *w)
Wait until this task is allowed to proceed.
Definition: ffmpeg_sched.c:322
av_strdup
char * av_strdup(const char *s)
Duplicate a string.
Definition: mem.c:272
SchSyncQueue::frame
AVFrame * frame
Definition: ffmpeg_sched.c:102
SchTask::node
SchedulerNode node
Definition: ffmpeg_sched.c:65
sch_sq_add_enc
int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx, int limiting, uint64_t max_frames)
Definition: ffmpeg_sched.c:896
print_sdp
int print_sdp(const char *filename)
Definition: ffmpeg_mux.c:507
mem.h
start_prepare
static int start_prepare(Scheduler *sch)
Definition: ffmpeg_sched.c:1454
sch_mux_sub_heartbeat_add
int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, unsigned dec_idx)
Definition: ffmpeg_sched.c:1246
SchedulerNode::idx
unsigned idx
Definition: ffmpeg_sched.h:105
sch_filter_receive_finish
void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
Called by filter tasks to signal that a filter input will no longer accept input.
Definition: ffmpeg_sched.c:2449
ffmpeg_sched.h
sch_add_dec_output
int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
Add another output to decoder (e.g.
Definition: ffmpeg_sched.c:727
SchEnc::src
SchedulerNode src
Definition: ffmpeg_sched.c:112
sch_wait
int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
Wait until transcoding terminates or the specified timeout elapses.
Definition: ffmpeg_sched.c:1665
AVPacket
This structure stores compressed data.
Definition: packet.h:516
av_thread_message_queue_free
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
Free a message queue.
Definition: threadmessage.c:96
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:34
cmdutils.h
SchSyncQueue
Definition: ffmpeg_sched.c:100
SchMux::queue_size
unsigned queue_size
Definition: ffmpeg_sched.c:231
SchTask::parent
Scheduler * parent
Definition: ffmpeg_sched.c:64
SchDec::send_frame
AVFrame * send_frame
Definition: ffmpeg_sched.c:97
queue_alloc
static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size, enum QueueType type)
Definition: ffmpeg_sched.c:374
sch_start
int sch_start(Scheduler *sch)
Definition: ffmpeg_sched.c:1599
flags
#define flags(name, subs,...)
Definition: cbs_av1.c:482
av_thread_message_queue_set_err_recv
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err)
Set the receiving error code.
Definition: threadmessage.c:204
pthread_cond_timedwait
static av_always_inline int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime)
Definition: os2threads.h:170
av_log
#define av_log(a,...)
Definition: tableprint_vlc.h:27
sch_mux_sub_heartbeat
int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, const AVPacket *pkt)
Definition: ffmpeg_sched.c:2114
av_fifo_freep2
void av_fifo_freep2(AVFifo **f)
Free an AVFifo and reset pointer to NULL.
Definition: fifo.c:286
SchDecOutput::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:75
SchEnc::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:142
pthread_cond_init
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:133
AVERROR_EXIT
#define AVERROR_EXIT
Immediate exit was requested; the called function should not be restarted.
Definition: error.h:58
SYNC_QUEUE_FRAMES
@ SYNC_QUEUE_FRAMES
Definition: sync_queue.h:30
sq_alloc
SyncQueue * sq_alloc(enum SyncQueueType type, int64_t buf_size_us, void *logctx)
Allocate a sync queue of the given type.
Definition: sync_queue.c:675
atomic_init
#define atomic_init(obj, value)
Definition: stdatomic.h:33
SchEnc::task
SchTask task
Definition: ffmpeg_sched.c:140
Timestamp
Definition: ffmpeg_utils.h:30
SchFilterIn::src
SchedulerNode src
Definition: ffmpeg_sched.c:237
sch_mux_stream_buffering
void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, size_t data_threshold, int max_packets)
Configure limits on packet buffering performed before the muxer task is started.
Definition: ffmpeg_sched.c:1205
check_acyclic
static int check_acyclic(Scheduler *sch)
Definition: ffmpeg_sched.c:1418
SchDemux::streams
SchDemuxStream * streams
Definition: ffmpeg_sched.c:159
PreMuxQueue
Definition: ffmpeg_sched.c:172
Scheduler::sdp_auto
int sdp_auto
Definition: ffmpeg_sched.c:305
src
#define src
Definition: vp8dsp.c:248
SchFilterIn::send_finished
int send_finished
Definition: ffmpeg_sched.c:239
SchFilterGraph::waiter
SchWaiter waiter
Definition: ffmpeg_sched.c:262
AVPacket::time_base
AVRational time_base
Time base of the packet's timestamps.
Definition: packet.h:583
unchoke_for_stream
static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
Definition: ffmpeg_sched.c:1275
AVPacket::side_data_elems
int side_data_elems
Definition: packet.h:551
sch_mux_receive
int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
Called by muxer tasks to obtain packets for muxing.
Definition: ffmpeg_sched.c:2083
sch_add_sq_enc
int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
Add an pre-encoding sync queue to the scheduler.
Definition: ffmpeg_sched.c:871
pthread_mutex_lock
#define pthread_mutex_lock(a)
Definition: ffprobe.c:78
SchDecOutput::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:77
SchEnc::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:115
tq_send_finish
void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the sending side.
Definition: thread_queue.c:226
Scheduler::nb_filters
unsigned nb_filters
Definition: ffmpeg_sched.c:302