FFmpeg
ffmpeg_sched.c
Go to the documentation of this file.
1 /*
2  * Inter-thread scheduling/synchronization.
3  * Copyright (c) 2023 Anton Khirnov
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <stdatomic.h>
23 #include <stddef.h>
24 #include <stdint.h>
25 
26 #include "cmdutils.h"
27 #include "ffmpeg_sched.h"
28 #include "ffmpeg_utils.h"
29 #include "sync_queue.h"
30 #include "thread_queue.h"
31 
32 #include "libavcodec/packet.h"
33 
34 #include "libavutil/avassert.h"
36 #include "libavutil/error.h"
37 #include "libavutil/fifo.h"
38 #include "libavutil/frame.h"
39 #include "libavutil/mem.h"
40 #include "libavutil/thread.h"
42 #include "libavutil/time.h"
43 
44 // 100 ms
45 // FIXME: some other value? make this dynamic?
46 #define SCHEDULE_TOLERANCE (100 * 1000)
47 
48 enum QueueType {
51 };
52 
53 typedef struct SchWaiter {
57 
58  // the following are internal state of schedule_update_locked() and must not
59  // be accessed outside of it
62 } SchWaiter;
63 
64 typedef struct SchTask {
67 
69  void *func_arg;
70 
73 } SchTask;
74 
75 typedef struct SchDecOutput {
77  uint8_t *dst_finished;
78  unsigned nb_dst;
79 } SchDecOutput;
80 
81 typedef struct SchDec {
82  const AVClass *class;
83 
85 
87  unsigned nb_outputs;
88 
91  // Queue for receiving input packets, one stream.
93 
94  // Queue for sending post-flush end timestamps back to the source
97 
98  // temporary storage used by sch_dec_send()
100 
101  // internal queue of undecoded packets used by sch_dec_receive()
103 } SchDec;
104 
105 typedef struct SchSyncQueue {
109 
110  unsigned *enc_idx;
111  unsigned nb_enc_idx;
112 } SchSyncQueue;
113 
114 typedef struct SchEnc {
115  const AVClass *class;
116 
119  uint8_t *dst_finished;
120  unsigned nb_dst;
121 
122  // [0] - index of the sync queue in Scheduler.sq_enc,
123  // [1] - index of this encoder in the sq
124  int sq_idx[2];
125 
126  /* Opening encoders is somewhat nontrivial due to their interaction with
127  * sync queues, which are (among other things) responsible for maintaining
128  * constant audio frame size, when it is required by the encoder.
129  *
130  * Opening the encoder requires stream parameters, obtained from the first
131  * frame. However, that frame cannot be properly chunked by the sync queue
132  * without knowing the required frame size, which is only available after
133  * opening the encoder.
134  *
135  * This apparent circular dependency is resolved in the following way:
136  * - the caller creating the encoder gives us a callback which opens the
137  * encoder and returns the required frame size (if any)
138  * - when the first frame is sent to the encoder, the sending thread
139  * - calls this callback, opening the encoder
140  * - passes the returned frame size to the sync queue
141  */
142  int (*open_cb)(void *opaque, const AVFrame *frame);
143  int opened;
144 
146  // Queue for receiving input frames, one stream.
148  // tq_send() to queue returned EOF
150 
151  // temporary storage used by sch_enc_send()
153 } SchEnc;
154 
155 typedef struct SchDemuxStream {
157  uint8_t *dst_finished;
158  unsigned nb_dst;
160 
161 typedef struct SchDemux {
162  const AVClass *class;
163 
165  unsigned nb_streams;
166 
169 
170  // temporary storage used by sch_demux_send()
172 
173  // protected by schedule_lock
175 } SchDemux;
176 
177 typedef struct PreMuxQueue {
178  /**
179  * Queue for buffering the packets before the muxer task can be started.
180  */
182  /**
183  * Maximum number of packets in fifo.
184  */
186  /*
187  * The size of the AVPackets' buffers in queue.
188  * Updated when a packet is either pushed or pulled from the queue.
189  */
190  size_t data_size;
191  /* Threshold after which max_packets will be in effect */
193 } PreMuxQueue;
194 
195 typedef struct SchMuxStream {
197 
198  unsigned *sub_heartbeat_dst;
200 
202 
203  // an EOF was generated while flushing the pre-mux queue
204  int init_eof;
205 
206  ////////////////////////////////////////////////////////////
207  // The following are protected by Scheduler.schedule_lock //
208 
209  /* dts+duration of the last packet sent to this stream
210  in AV_TIME_BASE_Q */
212  // this stream no longer accepts input
214  ////////////////////////////////////////////////////////////
215 } SchMuxStream;
216 
217 typedef struct SchMux {
218  const AVClass *class;
219 
221  unsigned nb_streams;
223 
224  int (*init)(void *arg);
225 
227  /**
228  * Set to 1 after starting the muxer task and flushing the
229  * pre-muxing queues.
230  * Set either before any tasks have started, or with
231  * Scheduler.mux_ready_lock held.
232  */
235  unsigned queue_size;
236 
238 } SchMux;
239 
240 typedef struct SchFilterIn {
244 } SchFilterIn;
245 
246 typedef struct SchFilterOut {
248 } SchFilterOut;
249 
250 typedef struct SchFilterGraph {
251  const AVClass *class;
252 
254  unsigned nb_inputs;
257 
259  unsigned nb_outputs;
260 
262  // input queue, nb_inputs+1 streams
263  // last stream is control
266 
267  // protected by schedule_lock
268  unsigned best_input;
271 
276 };
277 
278 struct Scheduler {
279  const AVClass *class;
280 
282  unsigned nb_demux;
283 
285  unsigned nb_mux;
286 
287  unsigned nb_mux_ready;
289 
290  unsigned nb_mux_done;
291  unsigned task_failed;
294 
295 
297  unsigned nb_dec;
298 
300  unsigned nb_enc;
301 
303  unsigned nb_sq_enc;
304 
306  unsigned nb_filters;
307 
309  int sdp_auto;
310 
313 
315 
317 };
318 
319 /**
320  * Wait until this task is allowed to proceed.
321  *
322  * @retval 0 the caller should proceed
323  * @retval 1 the caller should terminate
324  */
325 static int waiter_wait(Scheduler *sch, SchWaiter *w)
326 {
327  int terminate;
328 
329  if (!atomic_load(&w->choked))
330  return 0;
331 
332  pthread_mutex_lock(&w->lock);
333 
334  while (atomic_load(&w->choked) && !atomic_load(&sch->terminate))
335  pthread_cond_wait(&w->cond, &w->lock);
336 
337  terminate = atomic_load(&sch->terminate);
338 
339  pthread_mutex_unlock(&w->lock);
340 
341  return terminate;
342 }
343 
344 static void waiter_set(SchWaiter *w, int choked)
345 {
346  pthread_mutex_lock(&w->lock);
347 
348  atomic_store(&w->choked, choked);
349  pthread_cond_signal(&w->cond);
350 
351  pthread_mutex_unlock(&w->lock);
352 }
353 
354 static int waiter_init(SchWaiter *w)
355 {
356  int ret;
357 
358  atomic_init(&w->choked, 0);
359 
360  ret = pthread_mutex_init(&w->lock, NULL);
361  if (ret)
362  return AVERROR(ret);
363 
364  ret = pthread_cond_init(&w->cond, NULL);
365  if (ret)
366  return AVERROR(ret);
367 
368  return 0;
369 }
370 
371 static void waiter_uninit(SchWaiter *w)
372 {
373  pthread_mutex_destroy(&w->lock);
374  pthread_cond_destroy(&w->cond);
375 }
376 
377 static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size,
378  enum QueueType type)
379 {
380  ThreadQueue *tq;
381 
382  if (queue_size <= 0) {
383  if (type == QUEUE_FRAMES)
384  queue_size = DEFAULT_FRAME_THREAD_QUEUE_SIZE;
385  else
387  }
388 
389  if (type == QUEUE_FRAMES) {
390  // This queue length is used in the decoder code to ensure that
391  // there are enough entries in fixed-size frame pools to account
392  // for frames held in queues inside the ffmpeg utility. If this
393  // can ever dynamically change then the corresponding decode
394  // code needs to be updated as well.
396  }
397 
398  tq = tq_alloc(nb_streams, queue_size,
400  if (!tq)
401  return AVERROR(ENOMEM);
402 
403  *ptq = tq;
404  return 0;
405 }
406 
407 static void *task_wrapper(void *arg);
408 
409 static int task_start(SchTask *task)
410 {
411  int ret;
412 
413  if (!task->parent)
414  return 0;
415 
416  av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n");
417 
418  av_assert0(!task->thread_running);
419 
420  ret = pthread_create(&task->thread, NULL, task_wrapper, task);
421  if (ret) {
422  av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n",
423  strerror(ret));
424  return AVERROR(ret);
425  }
426 
427  task->thread_running = 1;
428  return 0;
429 }
430 
431 static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx,
432  SchThreadFunc func, void *func_arg)
433 {
434  task->parent = sch;
435 
436  task->node.type = type;
437  task->node.idx = idx;
438 
439  task->func = func;
440  task->func_arg = func_arg;
441 }
442 
443 static int64_t trailing_dts(const Scheduler *sch)
444 {
445  int64_t min_dts = INT64_MAX;
446 
447  for (unsigned i = 0; i < sch->nb_mux; i++) {
448  const SchMux *mux = &sch->mux[i];
449 
450  for (unsigned j = 0; j < mux->nb_streams; j++) {
451  const SchMuxStream *ms = &mux->streams[j];
452 
453  if (ms->source_finished)
454  continue;
455  if (ms->last_dts == AV_NOPTS_VALUE)
456  return AV_NOPTS_VALUE;
457 
458  min_dts = FFMIN(min_dts, ms->last_dts);
459  }
460  }
461 
462  return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts;
463 }
464 
465 static int64_t progressing_dts(const Scheduler *sch, int count_finished)
466 {
467  int64_t max_dts = INT64_MIN;
468 
469  for (unsigned i = 0; i < sch->nb_mux; i++) {
470  const SchMux *mux = &sch->mux[i];
471 
472  for (unsigned j = 0; j < mux->nb_streams; j++) {
473  const SchMuxStream *ms = &mux->streams[j];
474 
475  if (ms->source_finished && !count_finished)
476  continue;
477  if (ms->last_dts != AV_NOPTS_VALUE)
478  max_dts = FFMAX(max_dts, ms->last_dts);
479  }
480  }
481 
482  return max_dts == INT64_MIN ? AV_NOPTS_VALUE : max_dts;
483 }
484 
486 {
487  SchFilterGraph *fg = &sch->filters[idx];
488 
490  memset(&fg->task, 0, sizeof(fg->task));
491 
492  tq_free(&fg->queue);
493 
494  av_freep(&fg->inputs);
495  fg->nb_inputs = 0;
496  av_freep(&fg->outputs);
497  fg->nb_outputs = 0;
498 
499  fg->task_exited = 1;
500 }
501 
502 void sch_free(Scheduler **psch)
503 {
504  Scheduler *sch = *psch;
505 
506  if (!sch)
507  return;
508 
509  sch_stop(sch, NULL);
510 
511  for (unsigned i = 0; i < sch->nb_demux; i++) {
512  SchDemux *d = &sch->demux[i];
513 
514  for (unsigned j = 0; j < d->nb_streams; j++) {
515  SchDemuxStream *ds = &d->streams[j];
516  av_freep(&ds->dst);
517  av_freep(&ds->dst_finished);
518  }
519  av_freep(&d->streams);
520 
522 
523  waiter_uninit(&d->waiter);
524  }
525  av_freep(&sch->demux);
526 
527  for (unsigned i = 0; i < sch->nb_mux; i++) {
528  SchMux *mux = &sch->mux[i];
529 
530  for (unsigned j = 0; j < mux->nb_streams; j++) {
531  SchMuxStream *ms = &mux->streams[j];
532 
533  if (ms->pre_mux_queue.fifo) {
534  AVPacket *pkt;
535  while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0)
538  }
539 
541  }
542  av_freep(&mux->streams);
543 
545 
546  tq_free(&mux->queue);
547  }
548  av_freep(&sch->mux);
549 
550  for (unsigned i = 0; i < sch->nb_dec; i++) {
551  SchDec *dec = &sch->dec[i];
552 
553  tq_free(&dec->queue);
554 
557 
558  for (unsigned j = 0; j < dec->nb_outputs; j++) {
559  SchDecOutput *o = &dec->outputs[j];
560 
561  av_freep(&o->dst);
562  av_freep(&o->dst_finished);
563  }
564 
565  av_freep(&dec->outputs);
566 
567  av_frame_free(&dec->send_frame);
568 
569  waiter_uninit(&dec->waiter);
570  }
571  av_freep(&sch->dec);
572 
573  for (unsigned i = 0; i < sch->nb_enc; i++) {
574  SchEnc *enc = &sch->enc[i];
575 
576  tq_free(&enc->queue);
577 
578  av_packet_free(&enc->send_pkt);
579 
580  av_freep(&enc->dst);
581  av_freep(&enc->dst_finished);
582  }
583  av_freep(&sch->enc);
584 
585  for (unsigned i = 0; i < sch->nb_sq_enc; i++) {
586  SchSyncQueue *sq = &sch->sq_enc[i];
587  sq_free(&sq->sq);
588  av_frame_free(&sq->frame);
590  av_freep(&sq->enc_idx);
591  }
592  av_freep(&sch->sq_enc);
593 
594  for (unsigned i = 0; i < sch->nb_filters; i++) {
595  SchFilterGraph *fg = &sch->filters[i];
596 
597  tq_free(&fg->queue);
598 
599  av_freep(&fg->inputs);
600  av_freep(&fg->outputs);
601 
602  waiter_uninit(&fg->waiter);
603  }
604  av_freep(&sch->filters);
605 
606  av_freep(&sch->sdp_filename);
607 
609 
611 
614 
615  av_freep(psch);
616 }
617 
618 static const AVClass scheduler_class = {
619  .class_name = "Scheduler",
620  .version = LIBAVUTIL_VERSION_INT,
621 };
622 
624 {
625  Scheduler *sch;
626  int ret;
627 
628  sch = av_mallocz(sizeof(*sch));
629  if (!sch)
630  return NULL;
631 
632  sch->class = &scheduler_class;
633  sch->sdp_auto = 1;
634 
636  if (ret)
637  goto fail;
638 
640  if (ret)
641  goto fail;
642 
644  if (ret)
645  goto fail;
646 
648  if (ret)
649  goto fail;
650 
651  return sch;
652 fail:
653  sch_free(&sch);
654  return NULL;
655 }
656 
657 int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
658 {
659  av_freep(&sch->sdp_filename);
660  sch->sdp_filename = av_strdup(sdp_filename);
661  return sch->sdp_filename ? 0 : AVERROR(ENOMEM);
662 }
663 
664 static const AVClass sch_mux_class = {
665  .class_name = "SchMux",
666  .version = LIBAVUTIL_VERSION_INT,
667  .parent_log_context_offset = offsetof(SchMux, task.func_arg),
668 };
669 
670 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
671  void *arg, int sdp_auto, unsigned thread_queue_size)
672 {
673  const unsigned idx = sch->nb_mux;
674 
675  SchMux *mux;
676  int ret;
677 
678  ret = GROW_ARRAY(sch->mux, sch->nb_mux);
679  if (ret < 0)
680  return ret;
681 
682  mux = &sch->mux[idx];
683  mux->class = &sch_mux_class;
684  mux->init = init;
685  mux->queue_size = thread_queue_size;
686 
687  task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
688 
689  sch->sdp_auto &= sdp_auto;
690 
691  return idx;
692 }
693 
694 int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
695 {
696  SchMux *mux;
697  SchMuxStream *ms;
698  unsigned stream_idx;
699  int ret;
700 
701  av_assert0(mux_idx < sch->nb_mux);
702  mux = &sch->mux[mux_idx];
703 
704  ret = GROW_ARRAY(mux->streams, mux->nb_streams);
705  if (ret < 0)
706  return ret;
707  stream_idx = mux->nb_streams - 1;
708 
709  ms = &mux->streams[stream_idx];
710 
711  ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
712  if (!ms->pre_mux_queue.fifo)
713  return AVERROR(ENOMEM);
714 
715  ms->last_dts = AV_NOPTS_VALUE;
716 
717  return stream_idx;
718 }
719 
720 static const AVClass sch_demux_class = {
721  .class_name = "SchDemux",
722  .version = LIBAVUTIL_VERSION_INT,
723  .parent_log_context_offset = offsetof(SchDemux, task.func_arg),
724 };
725 
727 {
728  const unsigned idx = sch->nb_demux;
729 
730  SchDemux *d;
731  int ret;
732 
733  ret = GROW_ARRAY(sch->demux, sch->nb_demux);
734  if (ret < 0)
735  return ret;
736 
737  d = &sch->demux[idx];
738 
739  task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx);
740 
741  d->class = &sch_demux_class;
742  d->send_pkt = av_packet_alloc();
743  if (!d->send_pkt)
744  return AVERROR(ENOMEM);
745 
746  ret = waiter_init(&d->waiter);
747  if (ret < 0)
748  return ret;
749 
750  return idx;
751 }
752 
753 int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
754 {
755  SchDemux *d;
756  int ret;
757 
758  av_assert0(demux_idx < sch->nb_demux);
759  d = &sch->demux[demux_idx];
760 
761  ret = GROW_ARRAY(d->streams, d->nb_streams);
762  return ret < 0 ? ret : d->nb_streams - 1;
763 }
764 
765 int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
766 {
767  SchDec *dec;
768  int ret;
769 
770  av_assert0(dec_idx < sch->nb_dec);
771  dec = &sch->dec[dec_idx];
772 
773  ret = GROW_ARRAY(dec->outputs, dec->nb_outputs);
774  if (ret < 0)
775  return ret;
776 
777  return dec->nb_outputs - 1;
778 }
779 
780 static const AVClass sch_dec_class = {
781  .class_name = "SchDec",
782  .version = LIBAVUTIL_VERSION_INT,
783  .parent_log_context_offset = offsetof(SchDec, task.func_arg),
784 };
785 
786 int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
787 {
788  const unsigned idx = sch->nb_dec;
789 
790  SchDec *dec;
791  int ret;
792 
793  ret = GROW_ARRAY(sch->dec, sch->nb_dec);
794  if (ret < 0)
795  return ret;
796 
797  dec = &sch->dec[idx];
798 
799  task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx);
800 
801  dec->class = &sch_dec_class;
802  dec->send_frame = av_frame_alloc();
803  if (!dec->send_frame)
804  return AVERROR(ENOMEM);
805 
806  ret = sch_add_dec_output(sch, idx);
807  if (ret < 0)
808  return ret;
809 
810  ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
811  if (ret < 0)
812  return ret;
813 
814  if (send_end_ts) {
816  if (ret < 0)
817  return ret;
818  }
819 
821  if (!dec->overflow)
822  return AVERROR(ENOMEM);
823 
824  ret = waiter_init(&dec->waiter);
825  if (ret < 0)
826  return ret;
827 
828  return idx;
829 }
830 
831 static const AVClass sch_enc_class = {
832  .class_name = "SchEnc",
833  .version = LIBAVUTIL_VERSION_INT,
834  .parent_log_context_offset = offsetof(SchEnc, task.func_arg),
835 };
836 
838  int (*open_cb)(void *opaque, const AVFrame *frame))
839 {
840  const unsigned idx = sch->nb_enc;
841 
842  SchEnc *enc;
843  int ret;
844 
845  ret = GROW_ARRAY(sch->enc, sch->nb_enc);
846  if (ret < 0)
847  return ret;
848 
849  enc = &sch->enc[idx];
850 
851  enc->class = &sch_enc_class;
852  enc->open_cb = open_cb;
853  enc->sq_idx[0] = -1;
854  enc->sq_idx[1] = -1;
855 
856  task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
857 
858  enc->send_pkt = av_packet_alloc();
859  if (!enc->send_pkt)
860  return AVERROR(ENOMEM);
861 
862  ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
863  if (ret < 0)
864  return ret;
865 
866  return idx;
867 }
868 
869 static const AVClass sch_fg_class = {
870  .class_name = "SchFilterGraph",
871  .version = LIBAVUTIL_VERSION_INT,
872  .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg),
873 };
874 
875 int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
876  SchThreadFunc func, void *ctx)
877 {
878  const unsigned idx = sch->nb_filters;
879 
880  SchFilterGraph *fg;
881  int ret;
882 
883  ret = GROW_ARRAY(sch->filters, sch->nb_filters);
884  if (ret < 0)
885  return ret;
886  fg = &sch->filters[idx];
887 
888  fg->class = &sch_fg_class;
889 
890  task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx);
891 
892  if (nb_inputs) {
893  fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs));
894  if (!fg->inputs)
895  return AVERROR(ENOMEM);
896  fg->nb_inputs = nb_inputs;
897  }
898 
899  if (nb_outputs) {
900  fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs));
901  if (!fg->outputs)
902  return AVERROR(ENOMEM);
903  fg->nb_outputs = nb_outputs;
904  }
905 
906  ret = waiter_init(&fg->waiter);
907  if (ret < 0)
908  return ret;
909 
910  ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
911  if (ret < 0)
912  return ret;
913 
914  return idx;
915 }
916 
917 int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
918 {
919  SchSyncQueue *sq;
920  int ret;
921 
922  ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc);
923  if (ret < 0)
924  return ret;
925  sq = &sch->sq_enc[sch->nb_sq_enc - 1];
926 
927  sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx);
928  if (!sq->sq)
929  return AVERROR(ENOMEM);
930 
931  sq->frame = av_frame_alloc();
932  if (!sq->frame)
933  return AVERROR(ENOMEM);
934 
935  ret = pthread_mutex_init(&sq->lock, NULL);
936  if (ret)
937  return AVERROR(ret);
938 
939  return sq - sch->sq_enc;
940 }
941 
942 int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
943  int limiting, uint64_t max_frames)
944 {
945  SchSyncQueue *sq;
946  SchEnc *enc;
947  int ret;
948 
949  av_assert0(sq_idx < sch->nb_sq_enc);
950  sq = &sch->sq_enc[sq_idx];
951 
952  av_assert0(enc_idx < sch->nb_enc);
953  enc = &sch->enc[enc_idx];
954 
955  ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx);
956  if (ret < 0)
957  return ret;
958  sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx;
959 
960  ret = sq_add_stream(sq->sq, limiting);
961  if (ret < 0)
962  return ret;
963 
964  enc->sq_idx[0] = sq_idx;
965  enc->sq_idx[1] = ret;
966 
967  if (max_frames != INT64_MAX)
968  sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames);
969 
970  return 0;
971 }
972 
974 {
975  int ret;
976 
977  switch (src.type) {
978  case SCH_NODE_TYPE_DEMUX: {
979  SchDemuxStream *ds;
980 
981  av_assert0(src.idx < sch->nb_demux &&
982  src.idx_stream < sch->demux[src.idx].nb_streams);
983  ds = &sch->demux[src.idx].streams[src.idx_stream];
984 
985  ret = GROW_ARRAY(ds->dst, ds->nb_dst);
986  if (ret < 0)
987  return ret;
988 
989  ds->dst[ds->nb_dst - 1] = dst;
990 
991  // demuxed packets go to decoding or streamcopy
992  switch (dst.type) {
993  case SCH_NODE_TYPE_DEC: {
994  SchDec *dec;
995 
996  av_assert0(dst.idx < sch->nb_dec);
997  dec = &sch->dec[dst.idx];
998 
999  av_assert0(!dec->src.type);
1000  dec->src = src;
1001  break;
1002  }
1003  case SCH_NODE_TYPE_MUX: {
1004  SchMuxStream *ms;
1005 
1006  av_assert0(dst.idx < sch->nb_mux &&
1007  dst.idx_stream < sch->mux[dst.idx].nb_streams);
1008  ms = &sch->mux[dst.idx].streams[dst.idx_stream];
1009 
1010  av_assert0(!ms->src.type);
1011  ms->src = src;
1012 
1013  break;
1014  }
1015  default: av_assert0(0);
1016  }
1017 
1018  break;
1019  }
1020  case SCH_NODE_TYPE_DEC: {
1021  SchDec *dec;
1022  SchDecOutput *o;
1023 
1024  av_assert0(src.idx < sch->nb_dec);
1025  dec = &sch->dec[src.idx];
1026 
1027  av_assert0(src.idx_stream < dec->nb_outputs);
1028  o = &dec->outputs[src.idx_stream];
1029 
1030  ret = GROW_ARRAY(o->dst, o->nb_dst);
1031  if (ret < 0)
1032  return ret;
1033 
1034  o->dst[o->nb_dst - 1] = dst;
1035 
1036  // decoded frames go to filters or encoding
1037  switch (dst.type) {
1038  case SCH_NODE_TYPE_FILTER_IN: {
1039  SchFilterIn *fi;
1040 
1041  av_assert0(dst.idx < sch->nb_filters &&
1042  dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1043  fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1044 
1045  av_assert0(!fi->src.type);
1046  fi->src = src;
1047  break;
1048  }
1049  case SCH_NODE_TYPE_ENC: {
1050  SchEnc *enc;
1051 
1052  av_assert0(dst.idx < sch->nb_enc);
1053  enc = &sch->enc[dst.idx];
1054 
1055  av_assert0(!enc->src.type);
1056  enc->src = src;
1057  break;
1058  }
1059  default: av_assert0(0);
1060  }
1061 
1062  break;
1063  }
1064  case SCH_NODE_TYPE_FILTER_OUT: {
1065  SchFilterOut *fo;
1066 
1067  av_assert0(src.idx < sch->nb_filters &&
1068  src.idx_stream < sch->filters[src.idx].nb_outputs);
1069  fo = &sch->filters[src.idx].outputs[src.idx_stream];
1070 
1071  av_assert0(!fo->dst.type);
1072  fo->dst = dst;
1073 
1074  // filtered frames go to encoding or another filtergraph
1075  switch (dst.type) {
1076  case SCH_NODE_TYPE_ENC: {
1077  SchEnc *enc;
1078 
1079  av_assert0(dst.idx < sch->nb_enc);
1080  enc = &sch->enc[dst.idx];
1081 
1082  av_assert0(!enc->src.type);
1083  enc->src = src;
1084  break;
1085  }
1086  case SCH_NODE_TYPE_FILTER_IN: {
1087  SchFilterIn *fi;
1088 
1089  av_assert0(dst.idx < sch->nb_filters &&
1090  dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1091  fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1092 
1093  av_assert0(!fi->src.type);
1094  fi->src = src;
1095  break;
1096  }
1097  default: av_assert0(0);
1098  }
1099 
1100 
1101  break;
1102  }
1103  case SCH_NODE_TYPE_ENC: {
1104  SchEnc *enc;
1105 
1106  av_assert0(src.idx < sch->nb_enc);
1107  enc = &sch->enc[src.idx];
1108 
1109  ret = GROW_ARRAY(enc->dst, enc->nb_dst);
1110  if (ret < 0)
1111  return ret;
1112 
1113  enc->dst[enc->nb_dst - 1] = dst;
1114 
1115  // encoding packets go to muxing or decoding
1116  switch (dst.type) {
1117  case SCH_NODE_TYPE_MUX: {
1118  SchMuxStream *ms;
1119 
1120  av_assert0(dst.idx < sch->nb_mux &&
1121  dst.idx_stream < sch->mux[dst.idx].nb_streams);
1122  ms = &sch->mux[dst.idx].streams[dst.idx_stream];
1123 
1124  av_assert0(!ms->src.type);
1125  ms->src = src;
1126 
1127  break;
1128  }
1129  case SCH_NODE_TYPE_DEC: {
1130  SchDec *dec;
1131 
1132  av_assert0(dst.idx < sch->nb_dec);
1133  dec = &sch->dec[dst.idx];
1134 
1135  av_assert0(!dec->src.type);
1136  dec->src = src;
1137 
1138  break;
1139  }
1140  default: av_assert0(0);
1141  }
1142 
1143  break;
1144  }
1145  default: av_assert0(0);
1146  }
1147 
1148  return 0;
1149 }
1150 
1151 static int mux_task_start(SchMux *mux)
1152 {
1153  int ret = 0;
1154 
1155  ret = task_start(&mux->task);
1156  if (ret < 0)
1157  return ret;
1158 
1159  /* flush the pre-muxing queues */
1160  while (1) {
1161  int min_stream = -1;
1162  Timestamp min_ts = { .ts = AV_NOPTS_VALUE };
1163 
1164  AVPacket *pkt;
1165 
1166  // find the stream with the earliest dts or EOF in pre-muxing queue
1167  for (unsigned i = 0; i < mux->nb_streams; i++) {
1168  SchMuxStream *ms = &mux->streams[i];
1169 
1170  if (av_fifo_peek(ms->pre_mux_queue.fifo, &pkt, 1, 0) < 0)
1171  continue;
1172 
1173  if (!pkt || pkt->dts == AV_NOPTS_VALUE) {
1174  min_stream = i;
1175  break;
1176  }
1177 
1178  if (min_ts.ts == AV_NOPTS_VALUE ||
1179  av_compare_ts(min_ts.ts, min_ts.tb, pkt->dts, pkt->time_base) > 0) {
1180  min_stream = i;
1181  min_ts = (Timestamp){ .ts = pkt->dts, .tb = pkt->time_base };
1182  }
1183  }
1184 
1185  if (min_stream >= 0) {
1186  SchMuxStream *ms = &mux->streams[min_stream];
1187 
1188  ret = av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1);
1189  av_assert0(ret >= 0);
1190 
1191  if (pkt) {
1192  if (!ms->init_eof)
1193  ret = tq_send(mux->queue, min_stream, pkt);
1194  av_packet_free(&pkt);
1195  if (ret == AVERROR_EOF)
1196  ms->init_eof = 1;
1197  else if (ret < 0)
1198  return ret;
1199  } else
1200  tq_send_finish(mux->queue, min_stream);
1201 
1202  continue;
1203  }
1204 
1205  break;
1206  }
1207 
1208  atomic_store(&mux->mux_started, 1);
1209 
1210  return 0;
1211 }
1212 
1213 int print_sdp(const char *filename);
1214 
1215 static int mux_init(Scheduler *sch, SchMux *mux)
1216 {
1217  int ret;
1218 
1219  ret = mux->init(mux->task.func_arg);
1220  if (ret < 0)
1221  return ret;
1222 
1223  sch->nb_mux_ready++;
1224 
1225  if (sch->sdp_filename || sch->sdp_auto) {
1226  if (sch->nb_mux_ready < sch->nb_mux)
1227  return 0;
1228 
1229  ret = print_sdp(sch->sdp_filename);
1230  if (ret < 0) {
1231  av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n");
1232  return ret;
1233  }
1234 
1235  /* SDP is written only after all the muxers are ready, so now we
1236  * start ALL the threads */
1237  for (unsigned i = 0; i < sch->nb_mux; i++) {
1238  ret = mux_task_start(&sch->mux[i]);
1239  if (ret < 0)
1240  return ret;
1241  }
1242  } else {
1243  ret = mux_task_start(mux);
1244  if (ret < 0)
1245  return ret;
1246  }
1247 
1248  return 0;
1249 }
1250 
1251 void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1252  size_t data_threshold, int max_packets)
1253 {
1254  SchMux *mux;
1255  SchMuxStream *ms;
1256 
1257  av_assert0(mux_idx < sch->nb_mux);
1258  mux = &sch->mux[mux_idx];
1259 
1260  av_assert0(stream_idx < mux->nb_streams);
1261  ms = &mux->streams[stream_idx];
1262 
1263  ms->pre_mux_queue.max_packets = max_packets;
1264  ms->pre_mux_queue.data_threshold = data_threshold;
1265 }
1266 
1267 int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
1268 {
1269  SchMux *mux;
1270  int ret = 0;
1271 
1272  av_assert0(mux_idx < sch->nb_mux);
1273  mux = &sch->mux[mux_idx];
1274 
1275  av_assert0(stream_idx < mux->nb_streams);
1276 
1278 
1279  av_assert0(mux->nb_streams_ready < mux->nb_streams);
1280 
1281  // this may be called during initialization - do not start
1282  // threads before sch_start() is called
1283  if (++mux->nb_streams_ready == mux->nb_streams &&
1284  sch->state >= SCH_STATE_STARTED)
1285  ret = mux_init(sch, mux);
1286 
1288 
1289  return ret;
1290 }
1291 
1292 int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1293  unsigned dec_idx)
1294 {
1295  SchMux *mux;
1296  SchMuxStream *ms;
1297  int ret = 0;
1298 
1299  av_assert0(mux_idx < sch->nb_mux);
1300  mux = &sch->mux[mux_idx];
1301 
1302  av_assert0(stream_idx < mux->nb_streams);
1303  ms = &mux->streams[stream_idx];
1304 
1306  if (ret < 0)
1307  return ret;
1308 
1309  av_assert0(dec_idx < sch->nb_dec);
1310  ms->sub_heartbeat_dst[ms->nb_sub_heartbeat_dst - 1] = dec_idx;
1311 
1312  if (!mux->sub_heartbeat_pkt) {
1314  if (!mux->sub_heartbeat_pkt)
1315  return AVERROR(ENOMEM);
1316  }
1317 
1318  return 0;
1319 }
1320 
1321 enum {
1322  UNCHOKE_DEMUX = (1 << 0),
1323  UNCHOKE_FILTER = (1 << 1),
1324  UNCHOKE_DECODE = (1 << 2),
1325 
1327 };
1328 
1329 static void unchoke_for_stream(Scheduler *sch, SchedulerNode src, int flags);
1330 
1331 // Unchoke any filter graphs that are downstream of this node, to prevent it
1332 // from getting stuck trying to push data to a full queue
1334 {
1335  SchFilterGraph *fg;
1336  SchDec *dec;
1337  SchEnc *enc;
1338  switch (dst->type) {
1339  case SCH_NODE_TYPE_DEC:
1340  dec = &sch->dec[dst->idx];
1341  if (!dec->waiter.choked_next) {
1342  for (int i = 0; i < dec->nb_outputs; i++)
1343  unchoke_downstream(sch, dec->outputs[i].dst);
1344  }
1345  break;
1346  case SCH_NODE_TYPE_ENC:
1347  enc = &sch->enc[dst->idx];
1348  for (int i = 0; i < enc->nb_dst; i++)
1349  unchoke_downstream(sch, &enc->dst[i]);
1350  break;
1351  case SCH_NODE_TYPE_MUX:
1352  // muxers are never choked
1353  break;
1355  fg = &sch->filters[dst->idx];
1356  if (fg->best_input == fg->nb_inputs) {
1357  fg->waiter.choked_next = 0;
1358  } else {
1359  // ensure that this filter graph is not stuck waiting for
1360  // input from a different upstream source
1362  }
1363  break;
1364  default:
1365  av_unreachable("Invalid destination node type?");
1366  break;
1367  }
1368 }
1369 
1371 {
1372  while (1) {
1373  SchFilterGraph *fg;
1374  SchDemux *demux;
1375  SchDec *dec;
1376  switch (src.type) {
1377  case SCH_NODE_TYPE_DEMUX:
1378  // fed directly by a demuxer (i.e. not through a filtergraph)
1379  demux = &sch->demux[src.idx];
1380  if (demux->waiter.choked_next == 0)
1381  return; // prevent infinite loop
1382  if (flags & UNCHOKE_DEMUX) {
1383  demux->waiter.choked_next = 0;
1384  for (int i = 0; i < demux->nb_streams; i++)
1385  unchoke_downstream(sch, demux->streams[i].dst);
1386  }
1387  return;
1388  case SCH_NODE_TYPE_DEC:
1389  dec = &sch->dec[src.idx];
1390  if (!(flags & UNCHOKE_DECODE))
1391  return;
1392  dec->waiter.choked_next = 0;
1393  src = dec->src;
1394  continue;
1395  case SCH_NODE_TYPE_ENC:
1396  src = sch->enc[src.idx].src;
1397  continue;
1399  fg = &sch->filters[src.idx];
1400  // the filtergraph contains internal sources and
1401  // requested to be scheduled directly
1402  if (fg->best_input == fg->nb_inputs) {
1403  if (flags & UNCHOKE_FILTER)
1404  fg->waiter.choked_next = 0;
1405  return;
1406  }
1407  src = fg->inputs[fg->best_input].src;
1408  continue;
1409  default:
1410  av_unreachable("Invalid source node type?");
1411  return;
1412  }
1413  }
1414 }
1415 
1416 static void choke_demux(const Scheduler *sch, int demux_id, int choked)
1417 {
1418  av_assert1(demux_id < sch->nb_demux);
1419  SchDemux *demux = &sch->demux[demux_id];
1420 
1421  for (int i = 0; i < demux->nb_streams; i++) {
1422  SchedulerNode *dst = demux->streams[i].dst;
1423  SchFilterGraph *fg;
1424 
1425  switch (dst->type) {
1426  case SCH_NODE_TYPE_DEC:
1427  tq_choke(sch->dec[dst->idx].queue, choked);
1428  break;
1429  case SCH_NODE_TYPE_ENC:
1430  tq_choke(sch->enc[dst->idx].queue, choked);
1431  break;
1432  case SCH_NODE_TYPE_MUX:
1433  break;
1435  fg = &sch->filters[dst->idx];
1436  if (fg->nb_inputs == 1)
1437  tq_choke(fg->queue, choked);
1438  break;
1439  default:
1440  av_unreachable("Invalid destination node type?");
1441  break;
1442  }
1443  }
1444 }
1445 
1447 {
1448  int64_t dts;
1449  int have_unchoked = 0;
1450 
1451  // on termination request all waiters are choked,
1452  // we are not to unchoke them
1453  if (atomic_load(&sch->terminate))
1454  return;
1455 
1456  dts = trailing_dts(sch);
1457 
1458  atomic_store(&sch->last_dts, progressing_dts(sch, 0));
1459 
1460  // initialize our internal state
1461 #define RESET_WAITER(field) \
1462  do { \
1463  for (unsigned i = 0; i < sch->nb_##field; i++) { \
1464  SchWaiter *w = &sch->field[i].waiter; \
1465  w->choked_prev = atomic_load(&w->choked); \
1466  w->choked_next = 1; \
1467  } \
1468  } while (0)
1469 
1470  RESET_WAITER(demux);
1472  RESET_WAITER(dec);
1473 
1474  // figure out the sources that are allowed to proceed
1475  for (unsigned i = 0; i < sch->nb_mux; i++) {
1476  SchMux *mux = &sch->mux[i];
1477 
1478  for (unsigned j = 0; j < mux->nb_streams; j++) {
1479  SchMuxStream *ms = &mux->streams[j];
1480 
1481  // unblock sources for output streams that are not finished
1482  // and not too far ahead of the trailing stream
1483  if (ms->source_finished) {
1484  // still allow decoders to drain
1486  continue;
1487  }
1488  if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
1489  continue;
1490  if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
1491  continue;
1492 
1493  // resolve the source to unchoke
1494  unchoke_for_stream(sch, ms->src, UNCHOKE_ALL);
1495  have_unchoked = 1;
1496  }
1497  }
1498 
1499  // also unchoke any sources feeding into closed filter graph inputs, so
1500  // that they can observe the downstream EOF
1501  for (unsigned i = 0; i < sch->nb_filters; i++) {
1502  SchFilterGraph *fg = &sch->filters[i];
1503 
1504  for (unsigned j = 0; j < fg->nb_inputs; j++) {
1505  SchFilterIn *fi = &fg->inputs[j];
1506  if (fi->receive_finished && !fi->send_finished)
1507  unchoke_for_stream(sch, fi->src, UNCHOKE_ALL);
1508  }
1509  }
1510 
1511  // make sure to unchoke at least one source, if still available
1512 #define UNCHOKE_ONCE(field) \
1513  do { \
1514  for (unsigned i = 0; !have_unchoked && i < sch->nb_##field; i++) { \
1515  SchWaiter *w = &sch->field[i].waiter; \
1516  if (!sch->field[i].task_exited) { \
1517  w->choked_next = 0; \
1518  have_unchoked = 1; \
1519  break; \
1520  } \
1521  } \
1522  } while (0)
1523 
1524  UNCHOKE_ONCE(demux);
1526 
1527 #define UPDATE_WAITER(field) \
1528  do { \
1529  for (unsigned i = 0; i < sch->nb_##field; i++) { \
1530  SchWaiter *w = &sch->field[i].waiter; \
1531  if (w->choked_prev != w->choked_next) { \
1532  waiter_set(w, w->choked_next); \
1533  if (offsetof(Scheduler, field) == offsetof(Scheduler, demux)) \
1534  choke_demux(sch, i, w->choked_next); \
1535  } \
1536  } \
1537  } while (0)
1538 
1539  UPDATE_WAITER(demux);
1541  UPDATE_WAITER(dec);
1542 }
1543 
1544 enum {
1548 };
1549 
1550 // Finds the filtergraph or muxer upstream of a scheduler node
1552 {
1553  while (1) {
1554  switch (src.type) {
1555  case SCH_NODE_TYPE_DEMUX:
1557  return src;
1558  case SCH_NODE_TYPE_DEC:
1559  src = sch->dec[src.idx].src;
1560  continue;
1561  case SCH_NODE_TYPE_ENC:
1562  src = sch->enc[src.idx].src;
1563  continue;
1564  default:
1565  av_unreachable("Invalid source node type?");
1566  return (SchedulerNode) {0};
1567  }
1568  }
1569 }
1570 
1571 static int
1573  uint8_t *filters_visited, SchedulerNode *filters_stack)
1574 {
1575  unsigned nb_filters_stack = 0;
1576 
1577  memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited));
1578 
1579  while (1) {
1580  const SchFilterGraph *fg = &sch->filters[src.idx];
1581 
1582  filters_visited[src.idx] = CYCLE_NODE_STARTED;
1583 
1584  // descend into every input, depth first
1585  if (src.idx_stream < fg->nb_inputs) {
1586  const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
1587  SchedulerNode node = src_filtergraph(sch, fi->src);
1588 
1589  // connected to demuxer, no cycles possible
1590  if (node.type == SCH_NODE_TYPE_DEMUX)
1591  continue;
1592 
1593  // otherwise connected to another filtergraph
1595 
1596  // found a cycle
1597  if (filters_visited[node.idx] == CYCLE_NODE_STARTED)
1598  return AVERROR(EINVAL);
1599 
1600  // place current position on stack and descend
1601  av_assert0(nb_filters_stack < sch->nb_filters);
1602  filters_stack[nb_filters_stack++] = src;
1603  src = (SchedulerNode){ .idx = node.idx, .idx_stream = 0 };
1604  continue;
1605  }
1606 
1607  filters_visited[src.idx] = CYCLE_NODE_DONE;
1608 
1609  // previous search finished,
1610  if (nb_filters_stack) {
1611  src = filters_stack[--nb_filters_stack];
1612  continue;
1613  }
1614  return 0;
1615  }
1616 }
1617 
1618 static int check_acyclic(Scheduler *sch)
1619 {
1620  uint8_t *filters_visited = NULL;
1621  SchedulerNode *filters_stack = NULL;
1622 
1623  int ret = 0;
1624 
1625  if (!sch->nb_filters)
1626  return 0;
1627 
1628  filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited));
1629  if (!filters_visited)
1630  return AVERROR(ENOMEM);
1631 
1632  filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack));
1633  if (!filters_stack) {
1634  ret = AVERROR(ENOMEM);
1635  goto fail;
1636  }
1637 
1638  // trace the transcoding graph upstream from every filtegraph
1639  for (unsigned i = 0; i < sch->nb_filters; i++) {
1640  ret = check_acyclic_for_output(sch, (SchedulerNode){ .idx = i },
1641  filters_visited, filters_stack);
1642  if (ret < 0) {
1643  av_log(&sch->filters[i], AV_LOG_ERROR, "Transcoding graph has a cycle\n");
1644  goto fail;
1645  }
1646  }
1647 
1648 fail:
1649  av_freep(&filters_visited);
1650  av_freep(&filters_stack);
1651  return ret;
1652 }
1653 
1654 static int start_prepare(Scheduler *sch)
1655 {
1656  int ret;
1657 
1658  for (unsigned i = 0; i < sch->nb_demux; i++) {
1659  SchDemux *d = &sch->demux[i];
1660 
1661  for (unsigned j = 0; j < d->nb_streams; j++) {
1662  SchDemuxStream *ds = &d->streams[j];
1663 
1664  if (!ds->nb_dst) {
1665  av_log(d, AV_LOG_ERROR,
1666  "Demuxer stream %u not connected to any sink\n", j);
1667  return AVERROR(EINVAL);
1668  }
1669 
1670  ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished));
1671  if (!ds->dst_finished)
1672  return AVERROR(ENOMEM);
1673  }
1674  }
1675 
1676  for (unsigned i = 0; i < sch->nb_dec; i++) {
1677  SchDec *dec = &sch->dec[i];
1678 
1679  if (!dec->src.type) {
1680  av_log(dec, AV_LOG_ERROR,
1681  "Decoder not connected to a source\n");
1682  return AVERROR(EINVAL);
1683  }
1684 
1685  for (unsigned j = 0; j < dec->nb_outputs; j++) {
1686  SchDecOutput *o = &dec->outputs[j];
1687 
1688  if (!o->nb_dst) {
1689  av_log(dec, AV_LOG_ERROR,
1690  "Decoder output %u not connected to any sink\n", j);
1691  return AVERROR(EINVAL);
1692  }
1693 
1694  o->dst_finished = av_calloc(o->nb_dst, sizeof(*o->dst_finished));
1695  if (!o->dst_finished)
1696  return AVERROR(ENOMEM);
1697  }
1698  }
1699 
1700  for (unsigned i = 0; i < sch->nb_enc; i++) {
1701  SchEnc *enc = &sch->enc[i];
1702 
1703  if (!enc->src.type) {
1704  av_log(enc, AV_LOG_ERROR,
1705  "Encoder not connected to a source\n");
1706  return AVERROR(EINVAL);
1707  }
1708  if (!enc->nb_dst) {
1709  av_log(enc, AV_LOG_ERROR,
1710  "Encoder not connected to any sink\n");
1711  return AVERROR(EINVAL);
1712  }
1713 
1714  enc->dst_finished = av_calloc(enc->nb_dst, sizeof(*enc->dst_finished));
1715  if (!enc->dst_finished)
1716  return AVERROR(ENOMEM);
1717  }
1718 
1719  for (unsigned i = 0; i < sch->nb_mux; i++) {
1720  SchMux *mux = &sch->mux[i];
1721 
1722  for (unsigned j = 0; j < mux->nb_streams; j++) {
1723  SchMuxStream *ms = &mux->streams[j];
1724 
1725  if (!ms->src.type) {
1726  av_log(mux, AV_LOG_ERROR,
1727  "Muxer stream #%u not connected to a source\n", j);
1728  return AVERROR(EINVAL);
1729  }
1730  }
1731 
1732  ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
1733  QUEUE_PACKETS);
1734  if (ret < 0)
1735  return ret;
1736  }
1737 
1738  for (unsigned i = 0; i < sch->nb_filters; i++) {
1739  SchFilterGraph *fg = &sch->filters[i];
1740 
1741  for (unsigned j = 0; j < fg->nb_inputs; j++) {
1742  SchFilterIn *fi = &fg->inputs[j];
1743 
1744  if (!fi->src.type) {
1745  av_log(fg, AV_LOG_ERROR,
1746  "Filtergraph input %u not connected to a source\n", j);
1747  return AVERROR(EINVAL);
1748  }
1749  }
1750 
1751  for (unsigned j = 0; j < fg->nb_outputs; j++) {
1752  SchFilterOut *fo = &fg->outputs[j];
1753 
1754  if (!fo->dst.type) {
1755  av_log(fg, AV_LOG_ERROR,
1756  "Filtergraph %u output %u not connected to a sink\n", i, j);
1757  return AVERROR(EINVAL);
1758  }
1759  }
1760  }
1761 
1762  // Check that the transcoding graph has no cycles.
1763  ret = check_acyclic(sch);
1764  if (ret < 0)
1765  return ret;
1766 
1767  return 0;
1768 }
1769 
1771 {
1772  int ret;
1773 
1774  ret = start_prepare(sch);
1775  if (ret < 0)
1776  return ret;
1777 
1779  sch->state = SCH_STATE_STARTED;
1780 
1781  for (unsigned i = 0; i < sch->nb_mux; i++) {
1782  SchMux *mux = &sch->mux[i];
1783 
1784  if (mux->nb_streams_ready == mux->nb_streams) {
1785  ret = mux_init(sch, mux);
1786  if (ret < 0)
1787  goto fail;
1788  }
1789  }
1790 
1791  for (unsigned i = 0; i < sch->nb_enc; i++) {
1792  SchEnc *enc = &sch->enc[i];
1793 
1794  ret = task_start(&enc->task);
1795  if (ret < 0)
1796  goto fail;
1797  }
1798 
1799  for (unsigned i = 0; i < sch->nb_filters; i++) {
1800  SchFilterGraph *fg = &sch->filters[i];
1801 
1802  ret = task_start(&fg->task);
1803  if (ret < 0)
1804  goto fail;
1805  }
1806 
1807  for (unsigned i = 0; i < sch->nb_dec; i++) {
1808  SchDec *dec = &sch->dec[i];
1809 
1810  ret = task_start(&dec->task);
1811  if (ret < 0)
1812  goto fail;
1813  }
1814 
1815  for (unsigned i = 0; i < sch->nb_demux; i++) {
1816  SchDemux *d = &sch->demux[i];
1817 
1818  if (!d->nb_streams)
1819  continue;
1820 
1821  ret = task_start(&d->task);
1822  if (ret < 0)
1823  goto fail;
1824  }
1825 
1829 
1830  return 0;
1831 fail:
1832  sch_stop(sch, NULL);
1833  return ret;
1834 }
1835 
1836 int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
1837 {
1838  int ret;
1839 
1840  // convert delay to absolute timestamp
1841  timeout_us += av_gettime();
1842 
1844 
1845  if (sch->nb_mux_done < sch->nb_mux) {
1846  struct timespec tv = { .tv_sec = timeout_us / 1000000,
1847  .tv_nsec = (timeout_us % 1000000) * 1000 };
1848  pthread_cond_timedwait(&sch->finish_cond, &sch->finish_lock, &tv);
1849  }
1850 
1851  // abort transcoding if any task failed
1852  ret = sch->nb_mux_done == sch->nb_mux || sch->task_failed;
1853 
1855 
1856  *transcode_ts = atomic_load(&sch->last_dts);
1857 
1858  return ret;
1859 }
1860 
1861 static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
1862 {
1863  int ret;
1864 
1865  ret = enc->open_cb(enc->task.func_arg, frame);
1866  if (ret < 0)
1867  return ret;
1868 
1869  // ret>0 signals audio frame size, which means sync queue must
1870  // have been enabled during encoder creation
1871  if (ret > 0) {
1872  SchSyncQueue *sq;
1873 
1874  av_assert0(enc->sq_idx[0] >= 0);
1875  sq = &sch->sq_enc[enc->sq_idx[0]];
1876 
1877  pthread_mutex_lock(&sq->lock);
1878 
1879  sq_frame_samples(sq->sq, enc->sq_idx[1], ret);
1880 
1881  pthread_mutex_unlock(&sq->lock);
1882  }
1883 
1884  return 0;
1885 }
1886 
1888 {
1889  int ret;
1890 
1891  if (!frame) {
1892  tq_send_finish(enc->queue, 0);
1893  return 0;
1894  }
1895 
1896  if (enc->in_finished)
1897  return AVERROR_EOF;
1898 
1899  ret = tq_send(enc->queue, 0, frame);
1900  if (ret < 0)
1901  enc->in_finished = 1;
1902 
1903  return ret;
1904 }
1905 
1906 static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1907 {
1908  SchSyncQueue *sq = &sch->sq_enc[enc->sq_idx[0]];
1909  int ret = 0;
1910 
1911  // inform the scheduling code that no more input will arrive along this path;
1912  // this is necessary because the sync queue may not send an EOF downstream
1913  // until other streams finish
1914  // TODO: consider a cleaner way of passing this information through
1915  // the pipeline
1916  if (!frame) {
1917  for (unsigned i = 0; i < enc->nb_dst; i++) {
1918  SchMux *mux;
1919  SchMuxStream *ms;
1920 
1921  if (enc->dst[i].type != SCH_NODE_TYPE_MUX)
1922  continue;
1923 
1924  mux = &sch->mux[enc->dst[i].idx];
1925  ms = &mux->streams[enc->dst[i].idx_stream];
1926 
1928 
1929  ms->source_finished = 1;
1931 
1933  }
1934  }
1935 
1936  pthread_mutex_lock(&sq->lock);
1937 
1938  ret = sq_send(sq->sq, enc->sq_idx[1], SQFRAME(frame));
1939  if (ret < 0)
1940  goto finish;
1941 
1942  while (1) {
1943  SchEnc *enc;
1944 
1945  // TODO: the SQ API should be extended to allow returning EOF
1946  // for individual streams
1947  ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame));
1948  if (ret < 0) {
1949  ret = (ret == AVERROR(EAGAIN)) ? 0 : ret;
1950  break;
1951  }
1952 
1953  enc = &sch->enc[sq->enc_idx[ret]];
1954  ret = send_to_enc_thread(sch, enc, sq->frame);
1955  if (ret < 0) {
1956  av_frame_unref(sq->frame);
1957  if (ret != AVERROR_EOF)
1958  break;
1959 
1960  sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL));
1961  continue;
1962  }
1963  }
1964 
1965  if (ret < 0) {
1966  // close all encoders fed from this sync queue
1967  for (unsigned i = 0; i < sq->nb_enc_idx; i++) {
1968  int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL);
1969 
1970  // if the sync queue error is EOF and closing the encoder
1971  // produces a more serious error, make sure to pick the latter
1972  ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err);
1973  }
1974  }
1975 
1976 finish:
1977  pthread_mutex_unlock(&sq->lock);
1978 
1979  return ret;
1980 }
1981 
1982 static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1983 {
1984  if (enc->open_cb && frame && !enc->opened) {
1985  int ret = enc_open(sch, enc, frame);
1986  if (ret < 0)
1987  return ret;
1988  enc->opened = 1;
1989 
1990  // discard empty frames that only carry encoder init parameters
1991  if (!frame->buf[0]) {
1993  return 0;
1994  }
1995  }
1996 
1997  return (enc->sq_idx[0] >= 0) ?
1998  send_to_enc_sq (sch, enc, frame) :
1999  send_to_enc_thread(sch, enc, frame);
2000 }
2001 
2003 {
2004  PreMuxQueue *q = &ms->pre_mux_queue;
2005  AVPacket *tmp_pkt = NULL;
2006  int ret;
2007 
2008  if (!av_fifo_can_write(q->fifo)) {
2009  size_t packets = av_fifo_can_read(q->fifo);
2010  size_t pkt_size = pkt ? pkt->size : 0;
2011  int thresh_reached = (q->data_size + pkt_size) > q->data_threshold;
2012  size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX;
2013  size_t new_size = FFMIN(2 * packets, max_packets);
2014 
2015  if (new_size <= packets) {
2016  av_log(mux, AV_LOG_ERROR,
2017  "Too many packets buffered for output stream.\n");
2018  return AVERROR_BUFFER_TOO_SMALL;
2019  }
2020  ret = av_fifo_grow2(q->fifo, new_size - packets);
2021  if (ret < 0)
2022  return ret;
2023  }
2024 
2025  if (pkt) {
2026  tmp_pkt = av_packet_alloc();
2027  if (!tmp_pkt)
2028  return AVERROR(ENOMEM);
2029 
2030  av_packet_move_ref(tmp_pkt, pkt);
2031  q->data_size += tmp_pkt->size;
2032  }
2033  av_fifo_write(q->fifo, &tmp_pkt, 1);
2034 
2035  return 0;
2036 }
2037 
2038 static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx,
2039  AVPacket *pkt)
2040 {
2041  SchMuxStream *ms = &mux->streams[stream_idx];
2042  int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ?
2045 
2046  // queue the packet if the muxer cannot be started yet
2047  if (!atomic_load(&mux->mux_started)) {
2048  int queued = 0;
2049 
2050  // the muxer could have started between the above atomic check and
2051  // locking the mutex, then this block falls through to normal send path
2053 
2054  if (!atomic_load(&mux->mux_started)) {
2055  int ret = mux_queue_packet(mux, ms, pkt);
2056  queued = ret < 0 ? ret : 1;
2057  }
2058 
2060 
2061  if (queued < 0)
2062  return queued;
2063  else if (queued)
2064  goto update_schedule;
2065  }
2066 
2067  if (pkt) {
2068  int ret;
2069 
2070  if (ms->init_eof)
2071  return AVERROR_EOF;
2072 
2073  ret = tq_send(mux->queue, stream_idx, pkt);
2074  if (ret < 0)
2075  return ret;
2076  } else
2077  tq_send_finish(mux->queue, stream_idx);
2078 
2079 update_schedule:
2080  // TODO: use atomics to check whether this changes trailing dts
2081  // to avoid locking unnecessarily
2082  if (dts != AV_NOPTS_VALUE || !pkt) {
2084 
2085  if (pkt) ms->last_dts = dts;
2086  else ms->source_finished = 1;
2087 
2089 
2091  }
2092 
2093  return 0;
2094 }
2095 
2096 static int
2098  uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
2099 {
2100  int ret;
2101 
2102  if (*dst_finished)
2103  return AVERROR_EOF;
2104 
2105  if (pkt && dst.type == SCH_NODE_TYPE_MUX &&
2108  pkt = NULL;
2109  }
2110 
2111  if (!pkt)
2112  goto finish;
2113 
2114  ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2115  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2116  tq_send(sch->dec[dst.idx].queue, 0, pkt);
2117  if (ret == AVERROR_EOF)
2118  goto finish;
2119 
2120  return ret;
2121 
2122 finish:
2123  if (dst.type == SCH_NODE_TYPE_MUX)
2124  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2125  else
2126  tq_send_finish(sch->dec[dst.idx].queue, 0);
2127 
2128  *dst_finished = 1;
2129  return AVERROR_EOF;
2130 }
2131 
2133  AVPacket *pkt, unsigned flags)
2134 {
2135  unsigned nb_done = 0;
2136 
2137  for (unsigned i = 0; i < ds->nb_dst; i++) {
2138  AVPacket *to_send = pkt;
2139  uint8_t *finished = &ds->dst_finished[i];
2140 
2141  int ret;
2142 
2143  // sending a packet consumes it, so make a temporary reference if needed
2144  if (pkt && i < ds->nb_dst - 1) {
2145  to_send = d->send_pkt;
2146 
2147  ret = av_packet_ref(to_send, pkt);
2148  if (ret < 0)
2149  return ret;
2150  }
2151 
2152  ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags);
2153  if (to_send)
2154  av_packet_unref(to_send);
2155  if (ret == AVERROR_EOF)
2156  nb_done++;
2157  else if (ret < 0)
2158  return ret;
2159  }
2160 
2161  return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0;
2162 }
2163 
2165 {
2166  Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE };
2167 
2168  av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems);
2169 
2170  for (unsigned i = 0; i < d->nb_streams; i++) {
2171  SchDemuxStream *ds = &d->streams[i];
2172 
2173  for (unsigned j = 0; j < ds->nb_dst; j++) {
2174  const SchedulerNode *dst = &ds->dst[j];
2175  SchDec *dec;
2176  int ret;
2177 
2178  if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC)
2179  continue;
2180 
2181  dec = &sch->dec[dst->idx];
2182 
2183  ret = tq_send(dec->queue, 0, pkt);
2184  if (ret < 0)
2185  return ret;
2186 
2187  if (dec->queue_end_ts) {
2188  Timestamp ts;
2190  if (ret < 0)
2191  return ret;
2192 
2193  if (max_end_ts.ts == AV_NOPTS_VALUE ||
2194  (ts.ts != AV_NOPTS_VALUE &&
2195  av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0))
2196  max_end_ts = ts;
2197 
2198  }
2199  }
2200  }
2201 
2202  pkt->pts = max_end_ts.ts;
2203  pkt->time_base = max_end_ts.tb;
2204 
2205  return 0;
2206 }
2207 
2208 int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt,
2209  unsigned flags)
2210 {
2211  SchDemux *d;
2212  int terminate;
2213 
2214  av_assert0(demux_idx < sch->nb_demux);
2215  d = &sch->demux[demux_idx];
2216 
2217  terminate = waiter_wait(sch, &d->waiter);
2218  if (terminate)
2219  return AVERROR_EXIT;
2220 
2221  // flush the downstreams after seek
2222  if (pkt->stream_index == -1)
2223  return demux_flush(sch, d, pkt);
2224 
2226 
2227  return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags);
2228 }
2229 
2230 static int demux_done(Scheduler *sch, unsigned demux_idx)
2231 {
2232  SchDemux *d = &sch->demux[demux_idx];
2233  int ret = 0;
2234 
2235  for (unsigned i = 0; i < d->nb_streams; i++) {
2236  int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0);
2237  if (err != AVERROR_EOF)
2238  ret = err_merge(ret, err);
2239  }
2240 
2242 
2243  d->task_exited = 1;
2244 
2246 
2248 
2249  return ret;
2250 }
2251 
2252 int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
2253 {
2254  SchMux *mux;
2255  int ret, stream_idx;
2256 
2257  av_assert0(mux_idx < sch->nb_mux);
2258  mux = &sch->mux[mux_idx];
2259 
2260  ret = tq_receive(mux->queue, &stream_idx, pkt, 0);
2261  pkt->stream_index = stream_idx;
2262  return ret;
2263 }
2264 
2265 void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
2266 {
2267  SchMux *mux;
2268 
2269  av_assert0(mux_idx < sch->nb_mux);
2270  mux = &sch->mux[mux_idx];
2271 
2272  av_assert0(stream_idx < mux->nb_streams);
2273  tq_receive_finish(mux->queue, stream_idx);
2274 
2276  mux->streams[stream_idx].source_finished = 1;
2277 
2279 
2281 }
2282 
2283 int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
2284  const AVPacket *pkt)
2285 {
2286  SchMux *mux;
2287  SchMuxStream *ms;
2288 
2289  av_assert0(mux_idx < sch->nb_mux);
2290  mux = &sch->mux[mux_idx];
2291 
2292  av_assert0(stream_idx < mux->nb_streams);
2293  ms = &mux->streams[stream_idx];
2294 
2295  for (unsigned i = 0; i < ms->nb_sub_heartbeat_dst; i++) {
2296  SchDec *dst = &sch->dec[ms->sub_heartbeat_dst[i]];
2297  int ret;
2298 
2300  if (ret < 0)
2301  return ret;
2302 
2303  tq_send(dst->queue, 0, mux->sub_heartbeat_pkt);
2304  }
2305 
2306  return 0;
2307 }
2308 
2309 static int mux_done(Scheduler *sch, unsigned mux_idx)
2310 {
2311  SchMux *mux = &sch->mux[mux_idx];
2312 
2314 
2315  for (unsigned i = 0; i < mux->nb_streams; i++) {
2316  tq_receive_finish(mux->queue, i);
2317  mux->streams[i].source_finished = 1;
2318  }
2319 
2321 
2323 
2325 
2326  av_assert0(sch->nb_mux_done < sch->nb_mux);
2327  sch->nb_mux_done++;
2328 
2330 
2332 
2333  return 0;
2334 }
2335 
2336 int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
2337 {
2338  SchDec *dec;
2339  int ret, dummy;
2340 
2341  av_assert0(dec_idx < sch->nb_dec);
2342  dec = &sch->dec[dec_idx];
2343 
2344 retry:
2345  // Pull a packet from the overflow FIFO while unchoked or expecting EOF ts
2347  (!atomic_load(&dec->waiter.choked) || dec->expect_end_ts))
2348  return av_container_fifo_read(dec->overflow, pkt, 0);
2349 
2350  // the decoder should have given us post-flush end timestamp in pkt
2351  if (dec->expect_end_ts) {
2352  Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base };
2354  if (ret < 0)
2355  return ret;
2356 
2357  dec->expect_end_ts = 0;
2358  }
2359 
2360  ret = tq_receive(dec->queue, &dummy, pkt, 0);
2361  av_assert0(dummy <= 0);
2362 
2363  // drain packets from overflow queue before returning EOF
2365  int terminate = waiter_wait(sch, &dec->waiter);
2366  if (terminate)
2367  return ret;
2368  return av_container_fifo_read(dec->overflow, pkt, 0);
2369  } else if (ret < 0)
2370  return ret;
2371 
2372  // got a flush packet, on the next call to this function the decoder
2373  // should give us post-flush end timestamp (after draining overflow fifo)
2374  if (!pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
2375  dec->expect_end_ts = 1;
2376 
2377  // we got a packet, but we're currently choked or have existing overflow
2378  // packets; so push it to the FIFO first
2381  if (ret < 0)
2382  return ret;
2383  goto retry;
2384  }
2385 
2386  return ret;
2387 }
2388 
2390  unsigned in_idx, AVFrame *frame)
2391 {
2392  if (frame)
2393  return tq_send(fg->queue, in_idx, frame);
2394 
2396 
2397  if (!fg->inputs[in_idx].send_finished) {
2398  fg->inputs[in_idx].send_finished = 1;
2399  tq_send_finish(fg->queue, in_idx);
2400 
2401  // close the control stream when all actual inputs are done
2402  if (++fg->nb_inputs_finished_send == fg->nb_inputs)
2403  tq_send_finish(fg->queue, fg->nb_inputs);
2404 
2406  }
2407 
2409  return 0;
2410 }
2411 
2413  uint8_t *dst_finished, AVFrame *frame)
2414 {
2415  int ret;
2416 
2417  if (*dst_finished)
2418  return AVERROR_EOF;
2419 
2420  if (!frame)
2421  goto finish;
2422 
2423  ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ?
2424  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) :
2425  send_to_enc(sch, &sch->enc[dst.idx], frame);
2426  if (ret == AVERROR_EOF)
2427  goto finish;
2428 
2429  return ret;
2430 
2431 finish:
2432  if (dst.type == SCH_NODE_TYPE_FILTER_IN)
2433  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2434  else
2435  send_to_enc(sch, &sch->enc[dst.idx], NULL);
2436 
2437  *dst_finished = 1;
2438 
2439  return AVERROR_EOF;
2440 }
2441 
2442 int sch_dec_send(Scheduler *sch, unsigned dec_idx,
2443  unsigned out_idx, AVFrame *frame)
2444 {
2445  SchDec *dec;
2446  SchDecOutput *o;
2447  int ret;
2448  unsigned nb_done = 0;
2449 
2450  av_assert0(dec_idx < sch->nb_dec);
2451  dec = &sch->dec[dec_idx];
2452 
2453  av_assert0(out_idx < dec->nb_outputs);
2454  o = &dec->outputs[out_idx];
2455 
2456  for (unsigned i = 0; i < o->nb_dst; i++) {
2457  uint8_t *finished = &o->dst_finished[i];
2458  AVFrame *to_send = frame;
2459 
2460  // sending a frame consumes it, so make a temporary reference if needed
2461  if (i < o->nb_dst - 1) {
2462  to_send = dec->send_frame;
2463 
2464  // frame may sometimes contain props only,
2465  // e.g. to signal EOF timestamp
2466  ret = frame->buf[0] ? av_frame_ref(to_send, frame) :
2467  av_frame_copy_props(to_send, frame);
2468  if (ret < 0)
2469  return ret;
2470  }
2471 
2472  ret = dec_send_to_dst(sch, o->dst[i], finished, to_send);
2473  if (ret < 0) {
2474  av_frame_unref(to_send);
2475  if (ret == AVERROR_EOF) {
2476  nb_done++;
2477  continue;
2478  }
2479  return ret;
2480  }
2481  }
2482 
2483  return (nb_done == o->nb_dst) ? AVERROR_EOF : 0;
2484 }
2485 
2486 static int dec_done(Scheduler *sch, unsigned dec_idx)
2487 {
2488  SchDec *dec = &sch->dec[dec_idx];
2489  int ret = 0;
2490 
2491  tq_receive_finish(dec->queue, 0);
2492 
2493  // make sure our source does not get stuck waiting for end timestamps
2494  // that will never arrive
2495  if (dec->queue_end_ts)
2497 
2498  for (unsigned i = 0; i < dec->nb_outputs; i++) {
2499  SchDecOutput *o = &dec->outputs[i];
2500 
2501  for (unsigned j = 0; j < o->nb_dst; j++) {
2502  int err = dec_send_to_dst(sch, o->dst[j], &o->dst_finished[j], NULL);
2503  if (err < 0 && err != AVERROR_EOF)
2504  ret = err_merge(ret, err);
2505  }
2506  }
2507 
2508  return ret;
2509 }
2510 
2511 int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
2512 {
2513  SchEnc *enc;
2514  int ret, dummy;
2515 
2516  av_assert0(enc_idx < sch->nb_enc);
2517  enc = &sch->enc[enc_idx];
2518 
2519  ret = tq_receive(enc->queue, &dummy, frame, 0);
2520  av_assert0(dummy <= 0);
2521 
2522  return ret;
2523 }
2524 
2526  uint8_t *dst_finished, AVPacket *pkt)
2527 {
2528  int ret;
2529 
2530  if (*dst_finished)
2531  return AVERROR_EOF;
2532 
2533  if (!pkt)
2534  goto finish;
2535 
2536  ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2537  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2538  tq_send(sch->dec[dst.idx].queue, 0, pkt);
2539  if (ret == AVERROR_EOF)
2540  goto finish;
2541 
2542  return ret;
2543 
2544 finish:
2545  if (dst.type == SCH_NODE_TYPE_MUX)
2546  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2547  else
2548  tq_send_finish(sch->dec[dst.idx].queue, 0);
2549 
2550  *dst_finished = 1;
2551 
2552  return AVERROR_EOF;
2553 }
2554 
2555 int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
2556 {
2557  SchEnc *enc;
2558  int ret;
2559 
2560  av_assert0(enc_idx < sch->nb_enc);
2561  enc = &sch->enc[enc_idx];
2562 
2563  for (unsigned i = 0; i < enc->nb_dst; i++) {
2564  uint8_t *finished = &enc->dst_finished[i];
2565  AVPacket *to_send = pkt;
2566 
2567  // sending a packet consumes it, so make a temporary reference if needed
2568  if (i < enc->nb_dst - 1) {
2569  to_send = enc->send_pkt;
2570 
2571  ret = av_packet_ref(to_send, pkt);
2572  if (ret < 0)
2573  return ret;
2574  }
2575 
2576  ret = enc_send_to_dst(sch, enc->dst[i], finished, to_send);
2577  if (ret < 0) {
2578  av_packet_unref(to_send);
2579  if (ret == AVERROR_EOF)
2580  continue;
2581  return ret;
2582  }
2583  }
2584 
2585  return 0;
2586 }
2587 
2588 static int enc_done(Scheduler *sch, unsigned enc_idx)
2589 {
2590  SchEnc *enc = &sch->enc[enc_idx];
2591  int ret = 0;
2592 
2593  tq_receive_finish(enc->queue, 0);
2594 
2595  for (unsigned i = 0; i < enc->nb_dst; i++) {
2596  int err = enc_send_to_dst(sch, enc->dst[i], &enc->dst_finished[i], NULL);
2597  if (err < 0 && err != AVERROR_EOF)
2598  ret = err_merge(ret, err);
2599  }
2600 
2601  return ret;
2602 }
2603 
2604 int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
2605  unsigned *in_idx, AVFrame *frame)
2606 {
2607  SchFilterGraph *fg;
2608  int ret, idx;
2609 
2610  av_assert0(fg_idx < sch->nb_filters);
2611  fg = &sch->filters[fg_idx];
2612 
2613  av_assert0(*in_idx <= fg->nb_inputs);
2614 
2615  // update scheduling to account for desired input stream, if it changed
2616  //
2617  // this check needs no locking because only the filtering thread
2618  // updates this value
2619  if (*in_idx != fg->best_input) {
2621 
2622  fg->best_input = *in_idx;
2624 
2626  }
2627 
2628  if (*in_idx == fg->nb_inputs) {
2629  // drain incoming frames before waiting, to avoid blocking downstream
2631  if (ret >= 0) {
2632  av_assert0(idx >= 0);
2633  *in_idx = idx;
2634  return 0;
2635  }
2636 
2637  int terminate = waiter_wait(sch, &fg->waiter);
2638  return terminate ? AVERROR_EOF : AVERROR(EAGAIN);
2639  }
2640 
2641  while (1) {
2642  ret = tq_receive(fg->queue, &idx, frame, 0);
2643  if (idx < 0)
2644  return AVERROR_EOF;
2645  else if (ret >= 0) {
2646  *in_idx = idx;
2647  return 0;
2648  }
2649 
2650  // disregard EOFs for specific streams - they should always be
2651  // preceded by an EOF frame
2652  }
2653 }
2654 
2655 void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
2656 {
2657  SchFilterGraph *fg;
2658  SchFilterIn *fi;
2659 
2660  av_assert0(fg_idx < sch->nb_filters);
2661  fg = &sch->filters[fg_idx];
2662 
2663  av_assert0(in_idx < fg->nb_inputs);
2664  fi = &fg->inputs[in_idx];
2665 
2667 
2668  if (!fi->receive_finished) {
2669  fi->receive_finished = 1;
2670  tq_receive_finish(fg->queue, in_idx);
2671 
2672  // close the control stream when all actual inputs are done
2673  if (++fg->nb_inputs_finished_receive == fg->nb_inputs)
2674  tq_receive_finish(fg->queue, fg->nb_inputs);
2675 
2677  }
2678 
2680 }
2681 
2682 int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
2683 {
2684  SchFilterGraph *fg;
2686  int ret;
2687 
2688  av_assert0(fg_idx < sch->nb_filters);
2689  fg = &sch->filters[fg_idx];
2690 
2691  av_assert0(out_idx < fg->nb_outputs);
2692  dst = fg->outputs[out_idx].dst;
2693 
2694  if (dst.type == SCH_NODE_TYPE_ENC) {
2695  ret = send_to_enc(sch, &sch->enc[dst.idx], frame);
2696  if (ret == AVERROR_EOF)
2697  send_to_enc(sch, &sch->enc[dst.idx], NULL);
2698  } else {
2699  ret = send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame);
2700  if (ret == AVERROR_EOF)
2701  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2702  }
2703  return ret;
2704 }
2705 
2706 static int filter_done(Scheduler *sch, unsigned fg_idx)
2707 {
2708  SchFilterGraph *fg = &sch->filters[fg_idx];
2709  int ret = 0;
2710 
2711  for (unsigned i = 0; i <= fg->nb_inputs; i++)
2712  tq_receive_finish(fg->queue, i);
2713 
2714  for (unsigned i = 0; i < fg->nb_outputs; i++) {
2715  SchedulerNode dst = fg->outputs[i].dst;
2716  int err = (dst.type == SCH_NODE_TYPE_ENC) ?
2717  send_to_enc (sch, &sch->enc[dst.idx], NULL) :
2718  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2719 
2720  if (err < 0 && err != AVERROR_EOF)
2721  ret = err_merge(ret, err);
2722  }
2723 
2725 
2726  fg->task_exited = 1;
2727 
2729 
2731 
2732  return ret;
2733 }
2734 
2735 int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
2736 {
2737  SchFilterGraph *fg;
2738 
2739  av_assert0(fg_idx < sch->nb_filters);
2740  fg = &sch->filters[fg_idx];
2741 
2742  return send_to_filter(sch, fg, fg->nb_inputs, frame);
2743 }
2744 
2745 void sch_filter_choke_inputs(Scheduler *sch, unsigned fg_idx)
2746 {
2747  SchFilterGraph *fg;
2748  av_assert0(fg_idx < sch->nb_filters);
2749  fg = &sch->filters[fg_idx];
2750 
2752  fg->best_input = fg->nb_inputs;
2755 }
2756 
2757 static int task_cleanup(Scheduler *sch, SchedulerNode node)
2758 {
2759  switch (node.type) {
2760  case SCH_NODE_TYPE_DEMUX: return demux_done (sch, node.idx);
2761  case SCH_NODE_TYPE_MUX: return mux_done (sch, node.idx);
2762  case SCH_NODE_TYPE_DEC: return dec_done (sch, node.idx);
2763  case SCH_NODE_TYPE_ENC: return enc_done (sch, node.idx);
2764  case SCH_NODE_TYPE_FILTER_IN: return filter_done(sch, node.idx);
2765  default: av_unreachable("Invalid node type?");
2766  }
2767 }
2768 
2769 static void *task_wrapper(void *arg)
2770 {
2771  SchTask *task = arg;
2772  Scheduler *sch = task->parent;
2773  int ret;
2774  int err = 0;
2775 
2776  ret = task->func(task->func_arg);
2777  if (ret < 0)
2778  av_log(task->func_arg, AV_LOG_ERROR,
2779  "Task finished with error code: %d (%s)\n", ret, av_err2str(ret));
2780 
2781  err = task_cleanup(sch, task->node);
2782  ret = err_merge(ret, err);
2783 
2784  // EOF is considered normal termination
2785  if (ret == AVERROR_EOF)
2786  ret = 0;
2787  if (ret < 0) {
2789  sch->task_failed = 1;
2792  }
2793 
2795  "Terminating thread with return code %d (%s)\n", ret,
2796  ret < 0 ? av_err2str(ret) : "success");
2797 
2798  return (void*)(intptr_t)ret;
2799 }
2800 
2801 static int task_stop(Scheduler *sch, SchTask *task)
2802 {
2803  int ret;
2804  void *thread_ret;
2805 
2806  if (!task->parent)
2807  return 0;
2808 
2809  if (!task->thread_running)
2810  return task_cleanup(sch, task->node);
2811 
2812  ret = pthread_join(task->thread, &thread_ret);
2813  av_assert0(ret == 0);
2814 
2815  task->thread_running = 0;
2816 
2817  return (intptr_t)thread_ret;
2818 }
2819 
2820 int sch_stop(Scheduler *sch, int64_t *finish_ts)
2821 {
2822  int ret = 0, err;
2823 
2824  if (sch->state != SCH_STATE_STARTED)
2825  return 0;
2826 
2827  atomic_store(&sch->terminate, 1);
2828 
2829  // Ensure no other thread is currently in schedule_update_locked while
2830  // we are choking all demuxers
2832 
2833  for (unsigned type = 0; type < 2; type++)
2834  for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
2835  SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
2836  waiter_set(w, 1);
2837  if (type)
2838  choke_demux(sch, i, 0); // unfreeze to allow draining
2839  }
2840 
2841  for (unsigned i = 0; i < sch->nb_dec; i++)
2842  waiter_set(&sch->dec[i].waiter, 0); // unfreeze to allow draining
2843 
2845 
2846  for (unsigned i = 0; i < sch->nb_demux; i++) {
2847  SchDemux *d = &sch->demux[i];
2848 
2849  err = task_stop(sch, &d->task);
2850  ret = err_merge(ret, err);
2851  }
2852 
2853  for (unsigned i = 0; i < sch->nb_dec; i++) {
2854  SchDec *dec = &sch->dec[i];
2855 
2856  err = task_stop(sch, &dec->task);
2857  ret = err_merge(ret, err);
2858  }
2859 
2860  for (unsigned i = 0; i < sch->nb_filters; i++) {
2861  SchFilterGraph *fg = &sch->filters[i];
2862 
2863  err = task_stop(sch, &fg->task);
2864  ret = err_merge(ret, err);
2865  }
2866 
2867  for (unsigned i = 0; i < sch->nb_enc; i++) {
2868  SchEnc *enc = &sch->enc[i];
2869 
2870  err = task_stop(sch, &enc->task);
2871  ret = err_merge(ret, err);
2872  }
2873 
2874  for (unsigned i = 0; i < sch->nb_mux; i++) {
2875  SchMux *mux = &sch->mux[i];
2876 
2877  err = task_stop(sch, &mux->task);
2878  ret = err_merge(ret, err);
2879  }
2880 
2881  if (finish_ts)
2882  *finish_ts = progressing_dts(sch, 1);
2883 
2884  sch->state = SCH_STATE_STOPPED;
2885 
2886  return ret;
2887 }
Scheduler::sq_enc
SchSyncQueue * sq_enc
Definition: ffmpeg_sched.c:302
flags
const SwsFlags flags[]
Definition: swscale.c:72
func
int(* func)(AVBPrint *dst, const char *in, const char *arg)
Definition: jacosubdec.c:66
pthread_mutex_t
_fmutex pthread_mutex_t
Definition: os2threads.h:53
SchWaiter
Definition: ffmpeg_sched.c:53
av_packet_unref
void av_packet_unref(AVPacket *pkt)
Wipe the packet.
Definition: packet.c:433
THREAD_QUEUE_FLAG_NO_BLOCK
@ THREAD_QUEUE_FLAG_NO_BLOCK
Definition: thread_queue.h:32
mux_task_start
static int mux_task_start(SchMux *mux)
Definition: ffmpeg_sched.c:1151
pthread_join
static av_always_inline int pthread_join(pthread_t thread, void **value_ptr)
Definition: os2threads.h:94
SchedulerNode::idx_stream
unsigned idx_stream
Definition: ffmpeg_sched.h:106
SchDecOutput::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:77
waiter_init
static int waiter_init(SchWaiter *w)
Definition: ffmpeg_sched.c:354
av_fifo_can_write
size_t av_fifo_can_write(const AVFifo *f)
Definition: fifo.c:94
Scheduler::finish_lock
pthread_mutex_t finish_lock
Definition: ffmpeg_sched.c:292
atomic_store
#define atomic_store(object, desired)
Definition: stdatomic.h:85
sch_filter_send
int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
Called by filtergraph tasks to send a filtered frame or EOF to consumers.
Definition: ffmpeg_sched.c:2682
err_merge
static int err_merge(int err0, int err1)
Merge two return codes - return one of the error codes if at least one of them was negative,...
Definition: ffmpeg_utils.h:39
SchDec::task
SchTask task
Definition: ffmpeg_sched.c:89
av_container_fifo_write
int av_container_fifo_write(AVContainerFifo *cf, void *obj, unsigned flags)
Write the contents of obj to the FIFO.
Definition: container_fifo.c:162
THREAD_QUEUE_PACKETS
@ THREAD_QUEUE_PACKETS
Definition: thread_queue.h:26
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
Scheduler::enc
SchEnc * enc
Definition: ffmpeg_sched.c:299
Scheduler::nb_mux_done
unsigned nb_mux_done
Definition: ffmpeg_sched.c:290
av_compare_ts
int av_compare_ts(int64_t ts_a, AVRational tb_a, int64_t ts_b, AVRational tb_b)
Compare two timestamps each in its own time base.
Definition: mathematics.c:147
SchedulerState
SchedulerState
Definition: ffmpeg_sched.c:272
sch_mux_receive_finish
void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
Called by muxer tasks to signal that a stream will no longer accept input.
Definition: ffmpeg_sched.c:2265
SCH_NODE_TYPE_ENC
@ SCH_NODE_TYPE_ENC
Definition: ffmpeg_sched.h:98
SchSyncQueue::sq
SyncQueue * sq
Definition: ffmpeg_sched.c:106
SchTask::thread
pthread_t thread
Definition: ffmpeg_sched.c:71
demux_flush
static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt)
Definition: ffmpeg_sched.c:2164
thread.h
AVERROR_EOF
#define AVERROR_EOF
End of file.
Definition: error.h:57
Scheduler::nb_sq_enc
unsigned nb_sq_enc
Definition: ffmpeg_sched.c:303
SchMux::sub_heartbeat_pkt
AVPacket * sub_heartbeat_pkt
Definition: ffmpeg_sched.c:237
sq_limit_frames
void sq_limit_frames(SyncQueue *sq, unsigned int stream_idx, uint64_t frames)
Limit the number of output frames for stream with index stream_idx to max_frames.
Definition: sync_queue.c:628
pthread_mutex_init
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
SchEnc::send_pkt
AVPacket * send_pkt
Definition: ffmpeg_sched.c:152
tq_receive
int tq_receive(ThreadQueue *tq, int *stream_idx, void *data, int flags)
Read the next item from the queue.
Definition: thread_queue.c:197
SCHEDULE_TOLERANCE
#define SCHEDULE_TOLERANCE
Definition: ffmpeg_sched.c:46
sch_add_demux
int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx)
Add a demuxer to the scheduler.
Definition: ffmpeg_sched.c:726
PreMuxQueue::data_size
size_t data_size
Definition: ffmpeg_sched.c:190
SchFilterGraph::nb_inputs_finished_send
unsigned nb_inputs_finished_send
Definition: ffmpeg_sched.c:255
AV_TIME_BASE_Q
#define AV_TIME_BASE_Q
Internal time base represented as fractional value.
Definition: avutil.h:263
int64_t
long long int64_t
Definition: coverity.c:34
SchTask::func
SchThreadFunc func
Definition: ffmpeg_sched.c:68
mux_done
static int mux_done(Scheduler *sch, unsigned mux_idx)
Definition: ffmpeg_sched.c:2309
Scheduler::nb_enc
unsigned nb_enc
Definition: ffmpeg_sched.c:300
av_frame_free
void av_frame_free(AVFrame **frame)
Free the frame and any dynamically allocated objects in it, e.g.
Definition: frame.c:64
SQFRAME
#define SQFRAME(frame)
Definition: sync_queue.h:38
container_fifo.h
av_fifo_peek
int av_fifo_peek(const AVFifo *f, void *buf, size_t nb_elems, size_t offset)
Read data from a FIFO without modifying FIFO state.
Definition: fifo.c:255
check_acyclic_for_output
static int check_acyclic_for_output(const Scheduler *sch, SchedulerNode src, uint8_t *filters_visited, SchedulerNode *filters_stack)
Definition: ffmpeg_sched.c:1572
AVFrame
This structure describes decoded (raw) audio or video data.
Definition: frame.h:459
task_cleanup
static int task_cleanup(Scheduler *sch, SchedulerNode node)
Definition: ffmpeg_sched.c:2757
sync_queue.h
AVPacket::data
uint8_t * data
Definition: packet.h:595
SchMux::nb_streams_ready
unsigned nb_streams_ready
Definition: ffmpeg_sched.c:222
pthread_mutex_lock
static av_always_inline int pthread_mutex_lock(pthread_mutex_t *mutex)
Definition: os2threads.h:119
SchDemux::nb_streams
unsigned nb_streams
Definition: ffmpeg_sched.c:165
send_to_mux
static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx, AVPacket *pkt)
Definition: ffmpeg_sched.c:2038
Scheduler::nb_mux_ready
unsigned nb_mux_ready
Definition: ffmpeg_sched.c:287
atomic_int
intptr_t atomic_int
Definition: stdatomic.h:55
tq_alloc
ThreadQueue * tq_alloc(unsigned int nb_streams, size_t queue_size, enum ThreadQueueType type)
Allocate a queue for sending data between threads.
Definition: thread_queue.c:72
enc_done
static int enc_done(Scheduler *sch, unsigned enc_idx)
Definition: ffmpeg_sched.c:2588
SCH_NODE_TYPE_MUX
@ SCH_NODE_TYPE_MUX
Definition: ffmpeg_sched.h:96
AV_LOG_VERBOSE
#define AV_LOG_VERBOSE
Detailed information.
Definition: log.h:226
unchoke_downstream
static void unchoke_downstream(Scheduler *sch, SchedulerNode *dst)
Definition: ffmpeg_sched.c:1333
nb_streams
static unsigned int nb_streams
Definition: ffprobe.c:352
AVPacket::duration
int64_t duration
Duration of this packet in AVStream->time_base units, 0 if unknown.
Definition: packet.h:613
SchWaiter::choked_prev
int choked_prev
Definition: ffmpeg_sched.c:60
QUEUE_PACKETS
@ QUEUE_PACKETS
Definition: ffmpeg_sched.c:49
SchMux
Definition: ffmpeg_sched.c:217
FFMAX
#define FFMAX(a, b)
Definition: macros.h:47
Scheduler::class
const AVClass * class
Definition: ffmpeg_sched.c:279
SchFilterOut
Definition: ffmpeg_sched.c:246
progressing_dts
static int64_t progressing_dts(const Scheduler *sch, int count_finished)
Definition: ffmpeg_sched.c:465
SCH_STATE_UNINIT
@ SCH_STATE_UNINIT
Definition: ffmpeg_sched.c:273
Timestamp::ts
int64_t ts
Definition: ffmpeg_utils.h:31
PreMuxQueue::fifo
AVFifo * fifo
Queue for buffering the packets before the muxer task can be started.
Definition: ffmpeg_sched.c:181
SchMuxStream::last_dts
int64_t last_dts
Definition: ffmpeg_sched.c:211
dummy
static int dummy
Definition: ffplay.c:3751
av_packet_free
void av_packet_free(AVPacket **pkt)
Free the packet, if the packet is reference counted, it will be unreferenced first.
Definition: packet.c:74
DEFAULT_FRAME_THREAD_QUEUE_SIZE
#define DEFAULT_FRAME_THREAD_QUEUE_SIZE
Default size of a frame thread queue.
Definition: ffmpeg_sched.h:262
Scheduler::last_dts
atomic_int_least64_t last_dts
Definition: ffmpeg_sched.c:316
SchDemux::send_pkt
AVPacket * send_pkt
Definition: ffmpeg_sched.c:171
sch_mux_stream_ready
int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
Signal to the scheduler that the specified muxed stream is initialized and ready.
Definition: ffmpeg_sched.c:1267
send_to_enc_thread
static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1887
task_stop
static int task_stop(Scheduler *sch, SchTask *task)
Definition: ffmpeg_sched.c:2801
SchFilterIn::receive_finished
int receive_finished
Definition: ffmpeg_sched.c:243
SchedulerNode::type
enum SchedulerNodeType type
Definition: ffmpeg_sched.h:104
fifo.h
finish
static void finish(void)
Definition: movenc.c:374
sch_stop
int sch_stop(Scheduler *sch, int64_t *finish_ts)
Definition: ffmpeg_sched.c:2820
SchEnc::sq_idx
int sq_idx[2]
Definition: ffmpeg_sched.c:124
fail
#define fail()
Definition: checkasm.h:225
av_fifo_write
int av_fifo_write(AVFifo *f, const void *buf, size_t nb_elems)
Write data into a FIFO.
Definition: fifo.c:188
SchThreadFunc
int(* SchThreadFunc)(void *arg)
Definition: ffmpeg_sched.h:109
SchFilterOut::dst
SchedulerNode dst
Definition: ffmpeg_sched.c:247
SchDec::outputs
SchDecOutput * outputs
Definition: ffmpeg_sched.c:86
SchEnc
Definition: ffmpeg_sched.c:114
av_fifo_grow2
int av_fifo_grow2(AVFifo *f, size_t inc)
Enlarge an AVFifo.
Definition: fifo.c:99
SchDec::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:92
sch_add_mux_stream
int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
Add a muxed stream for a previously added muxer.
Definition: ffmpeg_sched.c:694
SchMux::class
const AVClass * class
Definition: ffmpeg_sched.c:218
SCH_STATE_STOPPED
@ SCH_STATE_STOPPED
Definition: ffmpeg_sched.c:275
sq_receive
int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
Read a frame from the queue.
Definition: sync_queue.c:586
AVERROR_BUFFER_TOO_SMALL
#define AVERROR_BUFFER_TOO_SMALL
Buffer too small.
Definition: error.h:53
type
it s the only field you need to keep assuming you have a context There is some magic you don t need to care about around this just let it vf type
Definition: writing_filters.txt:86
Scheduler::nb_dec
unsigned nb_dec
Definition: ffmpeg_sched.c:297
av_thread_message_queue_recv
int av_thread_message_queue_recv(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Receive a message from the queue.
Definition: threadmessage.c:177
sch_add_filtergraph
int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, SchThreadFunc func, void *ctx)
Add a filtergraph to the scheduler.
Definition: ffmpeg_sched.c:875
av_frame_alloc
AVFrame * av_frame_alloc(void)
Allocate an AVFrame and set its fields to default values.
Definition: frame.c:52
SchFilterGraph
Definition: ffmpeg_sched.c:250
avassert.h
AV_LOG_ERROR
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:210
sch_free
void sch_free(Scheduler **psch)
Definition: ffmpeg_sched.c:502
Scheduler::state
enum SchedulerState state
Definition: ffmpeg_sched.c:311
SchMux::streams
SchMuxStream * streams
Definition: ffmpeg_sched.c:220
av_thread_message_queue_send
int av_thread_message_queue_send(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Send a message on the queue.
Definition: threadmessage.c:161
av_fifo_read
int av_fifo_read(AVFifo *f, void *buf, size_t nb_elems)
Read data from a FIFO.
Definition: fifo.c:240
SchMuxStream
Definition: ffmpeg_sched.c:195
SchDecOutput
Definition: ffmpeg_sched.c:75
sch_add_mux
int sch_add_mux(Scheduler *sch, SchThreadFunc func, int(*init)(void *), void *arg, int sdp_auto, unsigned thread_queue_size)
Add a muxer to the scheduler.
Definition: ffmpeg_sched.c:670
waiter_set
static void waiter_set(SchWaiter *w, int choked)
Definition: ffmpeg_sched.c:344
SchFilterGraph::nb_outputs
unsigned nb_outputs
Definition: ffmpeg_sched.c:259
SchDec::expect_end_ts
int expect_end_ts
Definition: ffmpeg_sched.c:96
pthread_mutex_unlock
static av_always_inline int pthread_mutex_unlock(pthread_mutex_t *mutex)
Definition: os2threads.h:126
enc_open
static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
Definition: ffmpeg_sched.c:1861
sch_alloc
Scheduler * sch_alloc(void)
Definition: ffmpeg_sched.c:623
dec_send_to_dst
static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVFrame *frame)
Definition: ffmpeg_sched.c:2412
task_init
static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx, SchThreadFunc func, void *func_arg)
Definition: ffmpeg_sched.c:431
SchMuxStream::nb_sub_heartbeat_dst
unsigned nb_sub_heartbeat_dst
Definition: ffmpeg_sched.c:199
SchEnc::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:118
sch_add_demux_stream
int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
Add a demuxed stream for a previously added demuxer.
Definition: ffmpeg_sched.c:753
SchMuxStream::src
SchedulerNode src
Definition: ffmpeg_sched.c:196
UNCHOKE_ALL
@ UNCHOKE_ALL
Definition: ffmpeg_sched.c:1326
filters
#define filters(fmt, type, inverse, clp, inverset, clip, one, clip_fn, packed)
Definition: af_crystalizer.c:55
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:42
SchedulerNodeType
SchedulerNodeType
Definition: ffmpeg_sched.h:93
sch_dec_send
int sch_dec_send(Scheduler *sch, unsigned dec_idx, unsigned out_idx, AVFrame *frame)
Called by decoder tasks to send a decoded frame downstream.
Definition: ffmpeg_sched.c:2442
ctx
static AVFormatContext * ctx
Definition: movenc.c:49
SchMuxStream::source_finished
int source_finished
Definition: ffmpeg_sched.c:213
av_container_fifo_read
int av_container_fifo_read(AVContainerFifo *cf, void *obj, unsigned flags)
Read the next available object from the FIFO into obj.
Definition: container_fifo.c:122
av_rescale_q
int64_t av_rescale_q(int64_t a, AVRational bq, AVRational cq)
Rescale a 64-bit integer by 2 rational numbers.
Definition: mathematics.c:142
ffmpeg_utils.h
AVContainerFifo
AVContainerFifo is a FIFO for "containers" - dynamically allocated reusable structs (e....
Definition: container_fifo.c:27
filter_done
static int filter_done(Scheduler *sch, unsigned fg_idx)
Definition: ffmpeg_sched.c:2706
SchFilterGraph::class
const AVClass * class
Definition: ffmpeg_sched.c:251
sch_enc_receive
int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
Called by encoder tasks to obtain frames for encoding.
Definition: ffmpeg_sched.c:2511
AVThreadMessageQueue
Definition: threadmessage.c:30
av_mallocz
#define av_mallocz(s)
Definition: tableprint_vlc.h:31
atomic_load
#define atomic_load(object)
Definition: stdatomic.h:93
sch_sdp_filename
int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
Set the file path for the SDP.
Definition: ffmpeg_sched.c:657
SchEnc::class
const AVClass * class
Definition: ffmpeg_sched.c:115
demux_send_for_stream
static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds, AVPacket *pkt, unsigned flags)
Definition: ffmpeg_sched.c:2132
arg
const char * arg
Definition: jacosubdec.c:65
pthread_create
static av_always_inline int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
Definition: os2threads.h:80
SchMuxStream::pre_mux_queue
PreMuxQueue pre_mux_queue
Definition: ffmpeg_sched.c:201
sq_add_stream
int sq_add_stream(SyncQueue *sq, int limiting)
Add a new stream to the sync queue.
Definition: sync_queue.c:598
SchMux::mux_started
atomic_int mux_started
Set to 1 after starting the muxer task and flushing the pre-muxing queues.
Definition: ffmpeg_sched.c:233
Scheduler::finish_cond
pthread_cond_t finish_cond
Definition: ffmpeg_sched.c:293
SCH_NODE_TYPE_DEMUX
@ SCH_NODE_TYPE_DEMUX
Definition: ffmpeg_sched.h:95
Scheduler::demux
SchDemux * demux
Definition: ffmpeg_sched.c:281
AVPacket::buf
AVBufferRef * buf
A reference to the reference-counted buffer where the packet data is stored.
Definition: packet.h:578
tq_free
void tq_free(ThreadQueue **ptq)
Definition: thread_queue.c:54
LIBAVUTIL_VERSION_INT
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
waiter_uninit
static void waiter_uninit(SchWaiter *w)
Definition: ffmpeg_sched.c:371
AVClass
Describe the class of an AVClass context structure.
Definition: log.h:76
Scheduler::sdp_filename
char * sdp_filename
Definition: ffmpeg_sched.c:308
send_to_enc_sq
static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1906
NULL
#define NULL
Definition: coverity.c:32
SchEnc::open_cb
int(* open_cb)(void *opaque, const AVFrame *frame)
Definition: ffmpeg_sched.c:142
av_frame_copy_props
int av_frame_copy_props(AVFrame *dst, const AVFrame *src)
Copy only "metadata" fields from src to dst.
Definition: frame.c:599
SchFilterGraph::nb_inputs
unsigned nb_inputs
Definition: ffmpeg_sched.c:254
Scheduler::mux
SchMux * mux
Definition: ffmpeg_sched.c:284
schedule_update_locked
static void schedule_update_locked(Scheduler *sch)
Definition: ffmpeg_sched.c:1446
tq_receive_finish
void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the receiving side.
Definition: thread_queue.c:243
sch_add_enc
int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx, int(*open_cb)(void *opaque, const AVFrame *frame))
Definition: ffmpeg_sched.c:837
SchSyncQueue::enc_idx
unsigned * enc_idx
Definition: ffmpeg_sched.c:110
av_unreachable
#define av_unreachable(msg)
Asserts that are used as compiler optimization hints depending upon ASSERT_LEVEL and NBDEBUG.
Definition: avassert.h:116
SCH_STATE_STARTED
@ SCH_STATE_STARTED
Definition: ffmpeg_sched.c:274
choke_demux
static void choke_demux(const Scheduler *sch, int demux_id, int choked)
Definition: ffmpeg_sched.c:1416
CYCLE_NODE_DONE
@ CYCLE_NODE_DONE
Definition: ffmpeg_sched.c:1547
dec_done
static int dec_done(Scheduler *sch, unsigned dec_idx)
Definition: ffmpeg_sched.c:2486
SchFilterGraph::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:264
SchDemux::class
const AVClass * class
Definition: ffmpeg_sched.c:162
av_fifo_can_read
size_t av_fifo_can_read(const AVFifo *f)
Definition: fifo.c:87
UNCHOKE_DECODE
@ UNCHOKE_DECODE
Definition: ffmpeg_sched.c:1324
SchEnc::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:119
sch_add_dec
int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
Add a decoder to the scheduler.
Definition: ffmpeg_sched.c:786
SchWaiter::choked
atomic_int choked
Definition: ffmpeg_sched.c:56
SchWaiter::cond
pthread_cond_t cond
Definition: ffmpeg_sched.c:55
unchoke_for_stream
static void unchoke_for_stream(Scheduler *sch, SchedulerNode src, int flags)
Definition: ffmpeg_sched.c:1370
time.h
DEMUX_SEND_STREAMCOPY_EOF
@ DEMUX_SEND_STREAMCOPY_EOF
Treat the packet as an EOF for SCH_NODE_TYPE_MUX destinations send normally to other types.
Definition: ffmpeg_sched.h:340
sch_fg_class
static const AVClass sch_fg_class
Definition: ffmpeg_sched.c:869
QUEUE_FRAMES
@ QUEUE_FRAMES
Definition: ffmpeg_sched.c:50
av_packet_ref
int av_packet_ref(AVPacket *dst, const AVPacket *src)
Setup a new reference to the data described by a given packet.
Definition: packet.c:441
av_packet_move_ref
void av_packet_move_ref(AVPacket *dst, AVPacket *src)
Move every field in src to dst and reset src.
Definition: packet.c:490
SchTask::thread_running
int thread_running
Definition: ffmpeg_sched.c:72
sch_enc_send
int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
Called by encoder tasks to send encoded packets downstream.
Definition: ffmpeg_sched.c:2555
error.h
Scheduler
Definition: ffmpeg_sched.c:278
SchMux::nb_streams
unsigned nb_streams
Definition: ffmpeg_sched.c:221
SchSyncQueue::lock
pthread_mutex_t lock
Definition: ffmpeg_sched.c:108
SchMuxStream::sub_heartbeat_dst
unsigned * sub_heartbeat_dst
Definition: ffmpeg_sched.c:198
SchDec::class
const AVClass * class
Definition: ffmpeg_sched.c:82
sq_frame_samples
void sq_frame_samples(SyncQueue *sq, unsigned int stream_idx, int frame_samples)
Set a constant output audio frame size, in samples.
Definition: sync_queue.c:640
SchEnc::in_finished
int in_finished
Definition: ffmpeg_sched.c:149
SchDemux::task
SchTask task
Definition: ffmpeg_sched.c:167
SchDemuxStream::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:158
SchFilterGraph::nb_inputs_finished_receive
unsigned nb_inputs_finished_receive
Definition: ffmpeg_sched.c:256
tq_send
int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
Send an item for the given stream to the queue.
Definition: thread_queue.c:117
RESET_WAITER
#define RESET_WAITER(field)
init
int(* init)(AVBSFContext *ctx)
Definition: dts2pts.c:551
AVPacket::size
int size
Definition: packet.h:596
AVFifo
Definition: fifo.c:35
SchSyncQueue::nb_enc_idx
unsigned nb_enc_idx
Definition: ffmpeg_sched.c:111
SchFilterGraph::task_exited
int task_exited
Definition: ffmpeg_sched.c:269
av_frame_ref
int av_frame_ref(AVFrame *dst, const AVFrame *src)
Set up a new reference to the data described by the source frame.
Definition: frame.c:278
threadmessage.h
dst
uint8_t ptrdiff_t const uint8_t ptrdiff_t int intptr_t intptr_t int int16_t * dst
Definition: dsp.h:87
PreMuxQueue::max_packets
int max_packets
Maximum number of packets in fifo.
Definition: ffmpeg_sched.c:185
i
#define i(width, name, range_min, range_max)
Definition: cbs_h264.c:63
SchFilterGraph::task
SchTask task
Definition: ffmpeg_sched.c:261
av_err2str
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
Definition: error.h:122
sch_filter_choke_inputs
void sch_filter_choke_inputs(Scheduler *sch, unsigned fg_idx)
Called by filtergraph tasks to choke all filter inputs, preventing them from receiving more frames un...
Definition: ffmpeg_sched.c:2745
SchWaiter::lock
pthread_mutex_t lock
Definition: ffmpeg_sched.c:54
sq_send
int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame)
Submit a frame for the stream with index stream_idx.
Definition: sync_queue.c:333
PreMuxQueue::data_threshold
size_t data_threshold
Definition: ffmpeg_sched.c:192
sq_free
void sq_free(SyncQueue **psq)
Definition: sync_queue.c:671
AV_NOPTS_VALUE
#define AV_NOPTS_VALUE
Undefined timestamp value.
Definition: avutil.h:247
sch_dec_class
static const AVClass sch_dec_class
Definition: ffmpeg_sched.c:780
SchFilterGraph::inputs
SchFilterIn * inputs
Definition: ffmpeg_sched.c:253
Scheduler::schedule_lock
pthread_mutex_t schedule_lock
Definition: ffmpeg_sched.c:314
frame.h
SchTask::func_arg
void * func_arg
Definition: ffmpeg_sched.c:69
SCH_NODE_TYPE_FILTER_OUT
@ SCH_NODE_TYPE_FILTER_OUT
Definition: ffmpeg_sched.h:100
AVPacket::dts
int64_t dts
Decompression timestamp in AVStream->time_base units; the time at which the packet is decompressed.
Definition: packet.h:594
enc_send_to_dst
static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVPacket *pkt)
Definition: ffmpeg_sched.c:2525
sch_mux_class
static const AVClass sch_mux_class
Definition: ffmpeg_sched.c:664
SchFilterIn
Definition: ffmpeg_sched.c:240
sch_filter_receive
int sch_filter_receive(Scheduler *sch, unsigned fg_idx, unsigned *in_idx, AVFrame *frame)
Called by filtergraph tasks to obtain frames for filtering.
Definition: ffmpeg_sched.c:2604
Scheduler::nb_mux
unsigned nb_mux
Definition: ffmpeg_sched.c:285
av_packet_alloc
AVPacket * av_packet_alloc(void)
Allocate an AVPacket and set its fields to default values.
Definition: packet.c:63
SchEnc::opened
int opened
Definition: ffmpeg_sched.c:143
scheduler_class
static const AVClass scheduler_class
Definition: ffmpeg_sched.c:618
pthread_t
Definition: os2threads.h:44
pthread_cond_destroy
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:144
Scheduler::nb_demux
unsigned nb_demux
Definition: ffmpeg_sched.c:282
UNCHOKE_ONCE
#define UNCHOKE_ONCE(field)
av_thread_message_queue_alloc
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, unsigned nelem, unsigned elsize)
Allocate a new message queue.
Definition: threadmessage.c:45
pthread_mutex_destroy
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
av_packet_copy_props
int av_packet_copy_props(AVPacket *dst, const AVPacket *src)
Copy only "properties" fields from src to dst.
Definition: packet.c:396
SchDemuxStream::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:157
SchDemux::task_exited
int task_exited
Definition: ffmpeg_sched.c:174
SCH_NODE_TYPE_FILTER_IN
@ SCH_NODE_TYPE_FILTER_IN
Definition: ffmpeg_sched.h:99
task_start
static int task_start(SchTask *task)
Definition: ffmpeg_sched.c:409
Scheduler::filters
SchFilterGraph * filters
Definition: ffmpeg_sched.c:305
AVPacket::pts
int64_t pts
Presentation timestamp in AVStream->time_base units; the time at which the decompressed packet will b...
Definition: packet.h:588
demux_done
static int demux_done(Scheduler *sch, unsigned demux_idx)
Definition: ffmpeg_sched.c:2230
packet.h
sch_remove_filtergraph
void sch_remove_filtergraph(Scheduler *sch, int idx)
Definition: ffmpeg_sched.c:485
SchWaiter::choked_next
int choked_next
Definition: ffmpeg_sched.c:61
SchFilterGraph::best_input
unsigned best_input
Definition: ffmpeg_sched.c:268
av_malloc_array
#define av_malloc_array(a, b)
Definition: tableprint_vlc.h:32
Scheduler::mux_ready_lock
pthread_mutex_t mux_ready_lock
Definition: ffmpeg_sched.c:288
Scheduler::terminate
atomic_int terminate
Definition: ffmpeg_sched.c:312
SchDec
Definition: ffmpeg_sched.c:81
UPDATE_WAITER
#define UPDATE_WAITER(field)
av_assert1
#define av_assert1(cond)
assert() equivalent, that does not lie in speed critical code.
Definition: avassert.h:58
DEFAULT_PACKET_THREAD_QUEUE_SIZE
#define DEFAULT_PACKET_THREAD_QUEUE_SIZE
Default size of a packet thread queue.
Definition: ffmpeg_sched.h:257
QueueType
QueueType
Definition: ffmpeg_sched.c:48
THREAD_QUEUE_FRAMES
@ THREAD_QUEUE_FRAMES
Definition: thread_queue.h:25
FFMIN
#define FFMIN(a, b)
Definition: macros.h:49
SchDec::nb_outputs
unsigned nb_outputs
Definition: ffmpeg_sched.c:87
SchDec::waiter
SchWaiter waiter
Definition: ffmpeg_sched.c:90
av_frame_unref
void av_frame_unref(AVFrame *frame)
Unreference all the buffers referenced by frame and reset the frame fields.
Definition: frame.c:496
SchFilterGraph::outputs
SchFilterOut * outputs
Definition: ffmpeg_sched.c:258
CYCLE_NODE_NEW
@ CYCLE_NODE_NEW
Definition: ffmpeg_sched.c:1545
sch_enc_class
static const AVClass sch_enc_class
Definition: ffmpeg_sched.c:831
SchedulerNode
Definition: ffmpeg_sched.h:103
SCH_NODE_TYPE_DEC
@ SCH_NODE_TYPE_DEC
Definition: ffmpeg_sched.h:97
pthread_cond_t
Definition: os2threads.h:58
SchTask
Definition: ffmpeg_sched.c:64
mux_init
static int mux_init(Scheduler *sch, SchMux *mux)
Definition: ffmpeg_sched.c:1215
send_to_filter
static int send_to_filter(Scheduler *sch, SchFilterGraph *fg, unsigned in_idx, AVFrame *frame)
Definition: ffmpeg_sched.c:2389
SchDemuxStream::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:156
av_calloc
void * av_calloc(size_t nmemb, size_t size)
Definition: mem.c:264
sch_connect
int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
Definition: ffmpeg_sched.c:973
send_to_enc
static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1982
sch_filter_command
int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
Definition: ffmpeg_sched.c:2735
SchDemuxStream
Definition: ffmpeg_sched.c:155
Timestamp::tb
AVRational tb
Definition: ffmpeg_utils.h:32
atomic_int_least64_t
intptr_t atomic_int_least64_t
Definition: stdatomic.h:68
ret
ret
Definition: filter_design.txt:187
sch_dec_receive
int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
Called by decoder tasks to receive a packet for decoding.
Definition: ffmpeg_sched.c:2336
AVClass::class_name
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:81
frame
these buffered frames must be flushed immediately if a new input produces new the filter must not call request_frame to get more It must just process the frame or queue it The task of requesting more frames is left to the filter s request_frame method or the application If a filter has several the filter must be ready for frames arriving randomly on any input any filter with several inputs will most likely require some kind of queuing mechanism It is perfectly acceptable to have a limited queue and to drop frames when the inputs are too unbalanced request_frame For filters that do not use the this method is called when a frame is wanted on an output For a it should directly call filter_frame on the corresponding output For a if there are queued frames already one of these frames should be pushed If the filter should request a frame on one of its repeatedly until at least one frame has been pushed Return or at least make progress towards producing a frame
Definition: filter_design.txt:265
SchMuxStream::init_eof
int init_eof
Definition: ffmpeg_sched.c:204
mux_queue_packet
static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt)
Definition: ffmpeg_sched.c:2002
SchMux::init
int(* init)(void *arg)
Definition: ffmpeg_sched.c:224
sch_demux_class
static const AVClass sch_demux_class
Definition: ffmpeg_sched.c:720
av_container_fifo_free
void av_container_fifo_free(AVContainerFifo **pcf)
Free a AVContainerFifo and everything in it.
Definition: container_fifo.c:101
ThreadQueue
Definition: thread_queue.c:40
av_fifo_alloc2
AVFifo * av_fifo_alloc2(size_t nb_elems, size_t elem_size, unsigned int flags)
Allocate and initialize an AVFifo with a given element size.
Definition: fifo.c:47
pthread_cond_signal
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:152
task_wrapper
static void * task_wrapper(void *arg)
Definition: ffmpeg_sched.c:2769
SchMux::task
SchTask task
Definition: ffmpeg_sched.c:226
SchDec::overflow
AVContainerFifo * overflow
Definition: ffmpeg_sched.c:102
SyncQueue
A sync queue provides timestamp synchronization between multiple streams.
Definition: sync_queue.c:90
sch_demux_send
int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt, unsigned flags)
Called by demuxer tasks to communicate with their downstreams.
Definition: ffmpeg_sched.c:2208
CYCLE_NODE_STARTED
@ CYCLE_NODE_STARTED
Definition: ffmpeg_sched.c:1546
SchDemux
Definition: ffmpeg_sched.c:161
Scheduler::dec
SchDec * dec
Definition: ffmpeg_sched.c:296
UNCHOKE_DEMUX
@ UNCHOKE_DEMUX
Definition: ffmpeg_sched.c:1322
SchDec::queue_end_ts
AVThreadMessageQueue * queue_end_ts
Definition: ffmpeg_sched.c:95
demux_stream_send_to_dst
static int demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
Definition: ffmpeg_sched.c:2097
SchDec::src
SchedulerNode src
Definition: ffmpeg_sched.c:84
thread_queue.h
AVPacket::stream_index
int stream_index
Definition: packet.h:597
GROW_ARRAY
#define GROW_ARRAY(array, nb_elems)
Definition: cmdutils.h:536
av_container_fifo_can_read
size_t av_container_fifo_can_read(const AVContainerFifo *cf)
Definition: container_fifo.c:185
pthread_cond_wait
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:192
SchMux::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:234
SchDemux::waiter
SchWaiter waiter
Definition: ffmpeg_sched.c:168
av_gettime
int64_t av_gettime(void)
Get the current time in microseconds.
Definition: time.c:39
waiter_wait
static int waiter_wait(Scheduler *sch, SchWaiter *w)
Wait until this task is allowed to proceed.
Definition: ffmpeg_sched.c:325
SchSyncQueue::frame
AVFrame * frame
Definition: ffmpeg_sched.c:107
trailing_dts
static int64_t trailing_dts(const Scheduler *sch)
Definition: ffmpeg_sched.c:443
Scheduler::task_failed
unsigned task_failed
Definition: ffmpeg_sched.c:291
SchTask::node
SchedulerNode node
Definition: ffmpeg_sched.c:66
sch_sq_add_enc
int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx, int limiting, uint64_t max_frames)
Definition: ffmpeg_sched.c:942
print_sdp
int print_sdp(const char *filename)
Definition: ffmpeg_mux.c:507
mem.h
av_strdup
#define av_strdup(s)
Definition: ops_asmgen.c:47
start_prepare
static int start_prepare(Scheduler *sch)
Definition: ffmpeg_sched.c:1654
sch_mux_sub_heartbeat_add
int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, unsigned dec_idx)
Definition: ffmpeg_sched.c:1292
w
uint8_t w
Definition: llvidencdsp.c:39
SchedulerNode::idx
unsigned idx
Definition: ffmpeg_sched.h:105
sch_filter_receive_finish
void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
Called by filter tasks to signal that a filter input will no longer accept input.
Definition: ffmpeg_sched.c:2655
ffmpeg_sched.h
sch_add_dec_output
int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
Add another output to decoder (e.g.
Definition: ffmpeg_sched.c:765
SchEnc::src
SchedulerNode src
Definition: ffmpeg_sched.c:117
sch_wait
int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
Wait until transcoding terminates or the specified timeout elapses.
Definition: ffmpeg_sched.c:1836
AVPacket
This structure stores compressed data.
Definition: packet.h:572
av_thread_message_queue_free
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
Free a message queue.
Definition: threadmessage.c:96
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:35
src_filtergraph
static SchedulerNode src_filtergraph(const Scheduler *sch, SchedulerNode src)
Definition: ffmpeg_sched.c:1551
cmdutils.h
SchSyncQueue
Definition: ffmpeg_sched.c:105
SchMux::queue_size
unsigned queue_size
Definition: ffmpeg_sched.c:235
SchTask::parent
Scheduler * parent
Definition: ffmpeg_sched.c:65
SchDec::send_frame
AVFrame * send_frame
Definition: ffmpeg_sched.c:99
queue_alloc
static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size, enum QueueType type)
Definition: ffmpeg_sched.c:377
sch_start
int sch_start(Scheduler *sch)
Definition: ffmpeg_sched.c:1770
av_thread_message_queue_set_err_recv
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err)
Set the receiving error code.
Definition: threadmessage.c:204
pthread_cond_timedwait
static av_always_inline int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime)
Definition: os2threads.h:170
av_log
#define av_log(a,...)
Definition: tableprint_vlc.h:27
sch_mux_sub_heartbeat
int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, const AVPacket *pkt)
Definition: ffmpeg_sched.c:2283
av_fifo_freep2
void av_fifo_freep2(AVFifo **f)
Free an AVFifo and reset pointer to NULL.
Definition: fifo.c:286
SchDecOutput::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:76
SchEnc::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:147
pthread_cond_init
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:133
AVERROR_EXIT
#define AVERROR_EXIT
Immediate exit was requested; the called function should not be restarted.
Definition: error.h:58
SYNC_QUEUE_FRAMES
@ SYNC_QUEUE_FRAMES
Definition: sync_queue.h:30
sq_alloc
SyncQueue * sq_alloc(enum SyncQueueType type, int64_t buf_size_us, void *logctx)
Allocate a sync queue of the given type.
Definition: sync_queue.c:654
pkt
static AVPacket * pkt
Definition: demux_decode.c:55
atomic_init
#define atomic_init(obj, value)
Definition: stdatomic.h:33
SchEnc::task
SchTask task
Definition: ffmpeg_sched.c:145
Timestamp
Definition: ffmpeg_utils.h:30
SchFilterIn::src
SchedulerNode src
Definition: ffmpeg_sched.c:241
sch_mux_stream_buffering
void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, size_t data_threshold, int max_packets)
Configure limits on packet buffering performed before the muxer task is started.
Definition: ffmpeg_sched.c:1251
check_acyclic
static int check_acyclic(Scheduler *sch)
Definition: ffmpeg_sched.c:1618
SchDemux::streams
SchDemuxStream * streams
Definition: ffmpeg_sched.c:164
PreMuxQueue
Definition: ffmpeg_sched.c:177
UNCHOKE_FILTER
@ UNCHOKE_FILTER
Definition: ffmpeg_sched.c:1323
Scheduler::sdp_auto
int sdp_auto
Definition: ffmpeg_sched.c:309
src
#define src
Definition: vp8dsp.c:248
SchFilterIn::send_finished
int send_finished
Definition: ffmpeg_sched.c:242
SchFilterGraph::waiter
SchWaiter waiter
Definition: ffmpeg_sched.c:265
AVPacket::time_base
AVRational time_base
Time base of the packet's timestamps.
Definition: packet.h:639
AVPacket::side_data_elems
int side_data_elems
Definition: packet.h:607
tq_choke
void tq_choke(ThreadQueue *tq, int choked)
Prevent further reads from the thread queue until it is unchoked.
Definition: thread_queue.c:258
sch_mux_receive
int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
Called by muxer tasks to obtain packets for muxing.
Definition: ffmpeg_sched.c:2252
sch_add_sq_enc
int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
Add an pre-encoding sync queue to the scheduler.
Definition: ffmpeg_sched.c:917
SchDecOutput::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:78
av_container_fifo_alloc_avpacket
AVContainerFifo * av_container_fifo_alloc_avpacket(unsigned flags)
Allocate an AVContainerFifo instance for AVPacket.
Definition: packet.c:770
SchEnc::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:120
tq_send_finish
void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the sending side.
Definition: thread_queue.c:227
Scheduler::nb_filters
unsigned nb_filters
Definition: ffmpeg_sched.c:306