FFmpeg
ffmpeg_sched.c
Go to the documentation of this file.
1 /*
2  * Inter-thread scheduling/synchronization.
3  * Copyright (c) 2023 Anton Khirnov
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <stdatomic.h>
23 #include <stddef.h>
24 #include <stdint.h>
25 
26 #include "cmdutils.h"
27 #include "ffmpeg_sched.h"
28 #include "ffmpeg_utils.h"
29 #include "sync_queue.h"
30 #include "thread_queue.h"
31 
32 #include "libavcodec/packet.h"
33 
34 #include "libavutil/avassert.h"
35 #include "libavutil/error.h"
36 #include "libavutil/fifo.h"
37 #include "libavutil/frame.h"
38 #include "libavutil/mem.h"
39 #include "libavutil/thread.h"
41 #include "libavutil/time.h"
42 
43 // 100 ms
44 // FIXME: some other value? make this dynamic?
45 #define SCHEDULE_TOLERANCE (100 * 1000)
46 
47 enum QueueType {
50 };
51 
52 typedef struct SchWaiter {
56 
57  // the following are internal state of schedule_update_locked() and must not
58  // be accessed outside of it
61 } SchWaiter;
62 
63 typedef struct SchTask {
66 
68  void *func_arg;
69 
72 } SchTask;
73 
74 typedef struct SchDecOutput {
76  uint8_t *dst_finished;
77  unsigned nb_dst;
78 } SchDecOutput;
79 
80 typedef struct SchDec {
81  const AVClass *class;
82 
84 
86  unsigned nb_outputs;
87 
89  // Queue for receiving input packets, one stream.
91 
92  // Queue for sending post-flush end timestamps back to the source
95 
96  // temporary storage used by sch_dec_send()
98 } SchDec;
99 
100 typedef struct SchSyncQueue {
104 
105  unsigned *enc_idx;
106  unsigned nb_enc_idx;
107 } SchSyncQueue;
108 
109 typedef struct SchEnc {
110  const AVClass *class;
111 
114  uint8_t *dst_finished;
115  unsigned nb_dst;
116 
117  // [0] - index of the sync queue in Scheduler.sq_enc,
118  // [1] - index of this encoder in the sq
119  int sq_idx[2];
120 
121  /* Opening encoders is somewhat nontrivial due to their interaction with
122  * sync queues, which are (among other things) responsible for maintaining
123  * constant audio frame size, when it is required by the encoder.
124  *
125  * Opening the encoder requires stream parameters, obtained from the first
126  * frame. However, that frame cannot be properly chunked by the sync queue
127  * without knowing the required frame size, which is only available after
128  * opening the encoder.
129  *
130  * This apparent circular dependency is resolved in the following way:
131  * - the caller creating the encoder gives us a callback which opens the
132  * encoder and returns the required frame size (if any)
133  * - when the first frame is sent to the encoder, the sending thread
134  * - calls this callback, opening the encoder
135  * - passes the returned frame size to the sync queue
136  */
137  int (*open_cb)(void *opaque, const AVFrame *frame);
138  int opened;
139 
141  // Queue for receiving input frames, one stream.
143  // tq_send() to queue returned EOF
145 
146  // temporary storage used by sch_enc_send()
148 } SchEnc;
149 
150 typedef struct SchDemuxStream {
152  uint8_t *dst_finished;
153  unsigned nb_dst;
155 
156 typedef struct SchDemux {
157  const AVClass *class;
158 
160  unsigned nb_streams;
161 
164 
165  // temporary storage used by sch_demux_send()
167 
168  // protected by schedule_lock
170 } SchDemux;
171 
172 typedef struct PreMuxQueue {
173  /**
174  * Queue for buffering the packets before the muxer task can be started.
175  */
177  /**
178  * Maximum number of packets in fifo.
179  */
181  /*
182  * The size of the AVPackets' buffers in queue.
183  * Updated when a packet is either pushed or pulled from the queue.
184  */
185  size_t data_size;
186  /* Threshold after which max_packets will be in effect */
188 } PreMuxQueue;
189 
190 typedef struct SchMuxStream {
192 
193  unsigned *sub_heartbeat_dst;
195 
197 
198  // an EOF was generated while flushing the pre-mux queue
199  int init_eof;
200 
201  ////////////////////////////////////////////////////////////
202  // The following are protected by Scheduler.schedule_lock //
203 
204  /* dts+duration of the last packet sent to this stream
205  in AV_TIME_BASE_Q */
207  // this stream no longer accepts input
209  ////////////////////////////////////////////////////////////
210 } SchMuxStream;
211 
212 typedef struct SchMux {
213  const AVClass *class;
214 
216  unsigned nb_streams;
218 
219  int (*init)(void *arg);
220 
222  /**
223  * Set to 1 after starting the muxer task and flushing the
224  * pre-muxing queues.
225  * Set either before any tasks have started, or with
226  * Scheduler.mux_ready_lock held.
227  */
230  unsigned queue_size;
231 
233 } SchMux;
234 
235 typedef struct SchFilterIn {
239 } SchFilterIn;
240 
241 typedef struct SchFilterOut {
243 } SchFilterOut;
244 
245 typedef struct SchFilterGraph {
246  const AVClass *class;
247 
249  unsigned nb_inputs;
252 
254  unsigned nb_outputs;
255 
257  // input queue, nb_inputs+1 streams
258  // last stream is control
261 
262  // protected by schedule_lock
263  unsigned best_input;
266 
271 };
272 
273 struct Scheduler {
274  const AVClass *class;
275 
277  unsigned nb_demux;
278 
280  unsigned nb_mux;
281 
282  unsigned nb_mux_ready;
284 
285  unsigned nb_mux_done;
286  unsigned task_failed;
289 
290 
292  unsigned nb_dec;
293 
295  unsigned nb_enc;
296 
298  unsigned nb_sq_enc;
299 
301  unsigned nb_filters;
302 
304  int sdp_auto;
305 
308 
310 
312 };
313 
314 /**
315  * Wait until this task is allowed to proceed.
316  *
317  * @retval 0 the caller should proceed
318  * @retval 1 the caller should terminate
319  */
320 static int waiter_wait(Scheduler *sch, SchWaiter *w)
321 {
322  int terminate;
323 
324  if (!atomic_load(&w->choked))
325  return 0;
326 
327  pthread_mutex_lock(&w->lock);
328 
329  while (atomic_load(&w->choked) && !atomic_load(&sch->terminate))
330  pthread_cond_wait(&w->cond, &w->lock);
331 
332  terminate = atomic_load(&sch->terminate);
333 
334  pthread_mutex_unlock(&w->lock);
335 
336  return terminate;
337 }
338 
339 static void waiter_set(SchWaiter *w, int choked)
340 {
341  pthread_mutex_lock(&w->lock);
342 
343  atomic_store(&w->choked, choked);
344  pthread_cond_signal(&w->cond);
345 
346  pthread_mutex_unlock(&w->lock);
347 }
348 
349 static int waiter_init(SchWaiter *w)
350 {
351  int ret;
352 
353  atomic_init(&w->choked, 0);
354 
355  ret = pthread_mutex_init(&w->lock, NULL);
356  if (ret)
357  return AVERROR(ret);
358 
359  ret = pthread_cond_init(&w->cond, NULL);
360  if (ret)
361  return AVERROR(ret);
362 
363  return 0;
364 }
365 
366 static void waiter_uninit(SchWaiter *w)
367 {
368  pthread_mutex_destroy(&w->lock);
369  pthread_cond_destroy(&w->cond);
370 }
371 
372 static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size,
373  enum QueueType type)
374 {
375  ThreadQueue *tq;
376 
377  if (queue_size <= 0) {
378  if (type == QUEUE_FRAMES)
379  queue_size = DEFAULT_FRAME_THREAD_QUEUE_SIZE;
380  else
382  }
383 
384  if (type == QUEUE_FRAMES) {
385  // This queue length is used in the decoder code to ensure that
386  // there are enough entries in fixed-size frame pools to account
387  // for frames held in queues inside the ffmpeg utility. If this
388  // can ever dynamically change then the corresponding decode
389  // code needs to be updated as well.
391  }
392 
393  tq = tq_alloc(nb_streams, queue_size,
395  if (!tq)
396  return AVERROR(ENOMEM);
397 
398  *ptq = tq;
399  return 0;
400 }
401 
402 static void *task_wrapper(void *arg);
403 
404 static int task_start(SchTask *task)
405 {
406  int ret;
407 
408  av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n");
409 
410  av_assert0(!task->thread_running);
411 
412  ret = pthread_create(&task->thread, NULL, task_wrapper, task);
413  if (ret) {
414  av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n",
415  strerror(ret));
416  return AVERROR(ret);
417  }
418 
419  task->thread_running = 1;
420  return 0;
421 }
422 
423 static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx,
424  SchThreadFunc func, void *func_arg)
425 {
426  task->parent = sch;
427 
428  task->node.type = type;
429  task->node.idx = idx;
430 
431  task->func = func;
432  task->func_arg = func_arg;
433 }
434 
435 static int64_t trailing_dts(const Scheduler *sch, int count_finished)
436 {
437  int64_t min_dts = INT64_MAX;
438 
439  for (unsigned i = 0; i < sch->nb_mux; i++) {
440  const SchMux *mux = &sch->mux[i];
441 
442  for (unsigned j = 0; j < mux->nb_streams; j++) {
443  const SchMuxStream *ms = &mux->streams[j];
444 
445  if (ms->source_finished && !count_finished)
446  continue;
447  if (ms->last_dts == AV_NOPTS_VALUE)
448  return AV_NOPTS_VALUE;
449 
450  min_dts = FFMIN(min_dts, ms->last_dts);
451  }
452  }
453 
454  return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts;
455 }
456 
457 void sch_free(Scheduler **psch)
458 {
459  Scheduler *sch = *psch;
460 
461  if (!sch)
462  return;
463 
464  sch_stop(sch, NULL);
465 
466  for (unsigned i = 0; i < sch->nb_demux; i++) {
467  SchDemux *d = &sch->demux[i];
468 
469  for (unsigned j = 0; j < d->nb_streams; j++) {
470  SchDemuxStream *ds = &d->streams[j];
471  av_freep(&ds->dst);
472  av_freep(&ds->dst_finished);
473  }
474  av_freep(&d->streams);
475 
477 
478  waiter_uninit(&d->waiter);
479  }
480  av_freep(&sch->demux);
481 
482  for (unsigned i = 0; i < sch->nb_mux; i++) {
483  SchMux *mux = &sch->mux[i];
484 
485  for (unsigned j = 0; j < mux->nb_streams; j++) {
486  SchMuxStream *ms = &mux->streams[j];
487 
488  if (ms->pre_mux_queue.fifo) {
489  AVPacket *pkt;
490  while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0)
493  }
494 
496  }
497  av_freep(&mux->streams);
498 
500 
501  tq_free(&mux->queue);
502  }
503  av_freep(&sch->mux);
504 
505  for (unsigned i = 0; i < sch->nb_dec; i++) {
506  SchDec *dec = &sch->dec[i];
507 
508  tq_free(&dec->queue);
509 
511 
512  for (unsigned j = 0; j < dec->nb_outputs; j++) {
513  SchDecOutput *o = &dec->outputs[j];
514 
515  av_freep(&o->dst);
516  av_freep(&o->dst_finished);
517  }
518 
519  av_freep(&dec->outputs);
520 
521  av_frame_free(&dec->send_frame);
522  }
523  av_freep(&sch->dec);
524 
525  for (unsigned i = 0; i < sch->nb_enc; i++) {
526  SchEnc *enc = &sch->enc[i];
527 
528  tq_free(&enc->queue);
529 
530  av_packet_free(&enc->send_pkt);
531 
532  av_freep(&enc->dst);
533  av_freep(&enc->dst_finished);
534  }
535  av_freep(&sch->enc);
536 
537  for (unsigned i = 0; i < sch->nb_sq_enc; i++) {
538  SchSyncQueue *sq = &sch->sq_enc[i];
539  sq_free(&sq->sq);
540  av_frame_free(&sq->frame);
542  av_freep(&sq->enc_idx);
543  }
544  av_freep(&sch->sq_enc);
545 
546  for (unsigned i = 0; i < sch->nb_filters; i++) {
547  SchFilterGraph *fg = &sch->filters[i];
548 
549  tq_free(&fg->queue);
550 
551  av_freep(&fg->inputs);
552  av_freep(&fg->outputs);
553 
554  waiter_uninit(&fg->waiter);
555  }
556  av_freep(&sch->filters);
557 
558  av_freep(&sch->sdp_filename);
559 
561 
563 
566 
567  av_freep(psch);
568 }
569 
570 static const AVClass scheduler_class = {
571  .class_name = "Scheduler",
572  .version = LIBAVUTIL_VERSION_INT,
573 };
574 
576 {
577  Scheduler *sch;
578  int ret;
579 
580  sch = av_mallocz(sizeof(*sch));
581  if (!sch)
582  return NULL;
583 
584  sch->class = &scheduler_class;
585  sch->sdp_auto = 1;
586 
588  if (ret)
589  goto fail;
590 
592  if (ret)
593  goto fail;
594 
596  if (ret)
597  goto fail;
598 
600  if (ret)
601  goto fail;
602 
603  return sch;
604 fail:
605  sch_free(&sch);
606  return NULL;
607 }
608 
609 int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
610 {
611  av_freep(&sch->sdp_filename);
612  sch->sdp_filename = av_strdup(sdp_filename);
613  return sch->sdp_filename ? 0 : AVERROR(ENOMEM);
614 }
615 
616 static const AVClass sch_mux_class = {
617  .class_name = "SchMux",
618  .version = LIBAVUTIL_VERSION_INT,
619  .parent_log_context_offset = offsetof(SchMux, task.func_arg),
620 };
621 
622 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
623  void *arg, int sdp_auto, unsigned thread_queue_size)
624 {
625  const unsigned idx = sch->nb_mux;
626 
627  SchMux *mux;
628  int ret;
629 
630  ret = GROW_ARRAY(sch->mux, sch->nb_mux);
631  if (ret < 0)
632  return ret;
633 
634  mux = &sch->mux[idx];
635  mux->class = &sch_mux_class;
636  mux->init = init;
637  mux->queue_size = thread_queue_size;
638 
639  task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
640 
641  sch->sdp_auto &= sdp_auto;
642 
643  return idx;
644 }
645 
646 int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
647 {
648  SchMux *mux;
649  SchMuxStream *ms;
650  unsigned stream_idx;
651  int ret;
652 
653  av_assert0(mux_idx < sch->nb_mux);
654  mux = &sch->mux[mux_idx];
655 
656  ret = GROW_ARRAY(mux->streams, mux->nb_streams);
657  if (ret < 0)
658  return ret;
659  stream_idx = mux->nb_streams - 1;
660 
661  ms = &mux->streams[stream_idx];
662 
663  ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
664  if (!ms->pre_mux_queue.fifo)
665  return AVERROR(ENOMEM);
666 
667  ms->last_dts = AV_NOPTS_VALUE;
668 
669  return stream_idx;
670 }
671 
672 static const AVClass sch_demux_class = {
673  .class_name = "SchDemux",
674  .version = LIBAVUTIL_VERSION_INT,
675  .parent_log_context_offset = offsetof(SchDemux, task.func_arg),
676 };
677 
679 {
680  const unsigned idx = sch->nb_demux;
681 
682  SchDemux *d;
683  int ret;
684 
685  ret = GROW_ARRAY(sch->demux, sch->nb_demux);
686  if (ret < 0)
687  return ret;
688 
689  d = &sch->demux[idx];
690 
691  task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx);
692 
693  d->class = &sch_demux_class;
694  d->send_pkt = av_packet_alloc();
695  if (!d->send_pkt)
696  return AVERROR(ENOMEM);
697 
698  ret = waiter_init(&d->waiter);
699  if (ret < 0)
700  return ret;
701 
702  return idx;
703 }
704 
705 int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
706 {
707  SchDemux *d;
708  int ret;
709 
710  av_assert0(demux_idx < sch->nb_demux);
711  d = &sch->demux[demux_idx];
712 
713  ret = GROW_ARRAY(d->streams, d->nb_streams);
714  return ret < 0 ? ret : d->nb_streams - 1;
715 }
716 
717 int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
718 {
719  SchDec *dec;
720  int ret;
721 
722  av_assert0(dec_idx < sch->nb_dec);
723  dec = &sch->dec[dec_idx];
724 
725  ret = GROW_ARRAY(dec->outputs, dec->nb_outputs);
726  if (ret < 0)
727  return ret;
728 
729  return dec->nb_outputs - 1;
730 }
731 
732 static const AVClass sch_dec_class = {
733  .class_name = "SchDec",
734  .version = LIBAVUTIL_VERSION_INT,
735  .parent_log_context_offset = offsetof(SchDec, task.func_arg),
736 };
737 
738 int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
739 {
740  const unsigned idx = sch->nb_dec;
741 
742  SchDec *dec;
743  int ret;
744 
745  ret = GROW_ARRAY(sch->dec, sch->nb_dec);
746  if (ret < 0)
747  return ret;
748 
749  dec = &sch->dec[idx];
750 
751  task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx);
752 
753  dec->class = &sch_dec_class;
754  dec->send_frame = av_frame_alloc();
755  if (!dec->send_frame)
756  return AVERROR(ENOMEM);
757 
758  ret = sch_add_dec_output(sch, idx);
759  if (ret < 0)
760  return ret;
761 
762  ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
763  if (ret < 0)
764  return ret;
765 
766  if (send_end_ts) {
768  if (ret < 0)
769  return ret;
770  }
771 
772  return idx;
773 }
774 
775 static const AVClass sch_enc_class = {
776  .class_name = "SchEnc",
777  .version = LIBAVUTIL_VERSION_INT,
778  .parent_log_context_offset = offsetof(SchEnc, task.func_arg),
779 };
780 
782  int (*open_cb)(void *opaque, const AVFrame *frame))
783 {
784  const unsigned idx = sch->nb_enc;
785 
786  SchEnc *enc;
787  int ret;
788 
789  ret = GROW_ARRAY(sch->enc, sch->nb_enc);
790  if (ret < 0)
791  return ret;
792 
793  enc = &sch->enc[idx];
794 
795  enc->class = &sch_enc_class;
796  enc->open_cb = open_cb;
797  enc->sq_idx[0] = -1;
798  enc->sq_idx[1] = -1;
799 
800  task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
801 
802  enc->send_pkt = av_packet_alloc();
803  if (!enc->send_pkt)
804  return AVERROR(ENOMEM);
805 
806  ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
807  if (ret < 0)
808  return ret;
809 
810  return idx;
811 }
812 
813 static const AVClass sch_fg_class = {
814  .class_name = "SchFilterGraph",
815  .version = LIBAVUTIL_VERSION_INT,
816  .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg),
817 };
818 
819 int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
820  SchThreadFunc func, void *ctx)
821 {
822  const unsigned idx = sch->nb_filters;
823 
824  SchFilterGraph *fg;
825  int ret;
826 
827  ret = GROW_ARRAY(sch->filters, sch->nb_filters);
828  if (ret < 0)
829  return ret;
830  fg = &sch->filters[idx];
831 
832  fg->class = &sch_fg_class;
833 
834  task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx);
835 
836  if (nb_inputs) {
837  fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs));
838  if (!fg->inputs)
839  return AVERROR(ENOMEM);
840  fg->nb_inputs = nb_inputs;
841  }
842 
843  if (nb_outputs) {
844  fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs));
845  if (!fg->outputs)
846  return AVERROR(ENOMEM);
847  fg->nb_outputs = nb_outputs;
848  }
849 
850  ret = waiter_init(&fg->waiter);
851  if (ret < 0)
852  return ret;
853 
854  ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
855  if (ret < 0)
856  return ret;
857 
858  return idx;
859 }
860 
861 int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
862 {
863  SchSyncQueue *sq;
864  int ret;
865 
866  ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc);
867  if (ret < 0)
868  return ret;
869  sq = &sch->sq_enc[sch->nb_sq_enc - 1];
870 
871  sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx);
872  if (!sq->sq)
873  return AVERROR(ENOMEM);
874 
875  sq->frame = av_frame_alloc();
876  if (!sq->frame)
877  return AVERROR(ENOMEM);
878 
879  ret = pthread_mutex_init(&sq->lock, NULL);
880  if (ret)
881  return AVERROR(ret);
882 
883  return sq - sch->sq_enc;
884 }
885 
886 int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
887  int limiting, uint64_t max_frames)
888 {
889  SchSyncQueue *sq;
890  SchEnc *enc;
891  int ret;
892 
893  av_assert0(sq_idx < sch->nb_sq_enc);
894  sq = &sch->sq_enc[sq_idx];
895 
896  av_assert0(enc_idx < sch->nb_enc);
897  enc = &sch->enc[enc_idx];
898 
899  ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx);
900  if (ret < 0)
901  return ret;
902  sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx;
903 
904  ret = sq_add_stream(sq->sq, limiting);
905  if (ret < 0)
906  return ret;
907 
908  enc->sq_idx[0] = sq_idx;
909  enc->sq_idx[1] = ret;
910 
911  if (max_frames != INT64_MAX)
912  sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames);
913 
914  return 0;
915 }
916 
918 {
919  int ret;
920 
921  switch (src.type) {
922  case SCH_NODE_TYPE_DEMUX: {
923  SchDemuxStream *ds;
924 
925  av_assert0(src.idx < sch->nb_demux &&
926  src.idx_stream < sch->demux[src.idx].nb_streams);
927  ds = &sch->demux[src.idx].streams[src.idx_stream];
928 
929  ret = GROW_ARRAY(ds->dst, ds->nb_dst);
930  if (ret < 0)
931  return ret;
932 
933  ds->dst[ds->nb_dst - 1] = dst;
934 
935  // demuxed packets go to decoding or streamcopy
936  switch (dst.type) {
937  case SCH_NODE_TYPE_DEC: {
938  SchDec *dec;
939 
940  av_assert0(dst.idx < sch->nb_dec);
941  dec = &sch->dec[dst.idx];
942 
943  av_assert0(!dec->src.type);
944  dec->src = src;
945  break;
946  }
947  case SCH_NODE_TYPE_MUX: {
948  SchMuxStream *ms;
949 
950  av_assert0(dst.idx < sch->nb_mux &&
951  dst.idx_stream < sch->mux[dst.idx].nb_streams);
952  ms = &sch->mux[dst.idx].streams[dst.idx_stream];
953 
954  av_assert0(!ms->src.type);
955  ms->src = src;
956 
957  break;
958  }
959  default: av_assert0(0);
960  }
961 
962  break;
963  }
964  case SCH_NODE_TYPE_DEC: {
965  SchDec *dec;
966  SchDecOutput *o;
967 
968  av_assert0(src.idx < sch->nb_dec);
969  dec = &sch->dec[src.idx];
970 
971  av_assert0(src.idx_stream < dec->nb_outputs);
972  o = &dec->outputs[src.idx_stream];
973 
974  ret = GROW_ARRAY(o->dst, o->nb_dst);
975  if (ret < 0)
976  return ret;
977 
978  o->dst[o->nb_dst - 1] = dst;
979 
980  // decoded frames go to filters or encoding
981  switch (dst.type) {
983  SchFilterIn *fi;
984 
985  av_assert0(dst.idx < sch->nb_filters &&
986  dst.idx_stream < sch->filters[dst.idx].nb_inputs);
987  fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
988 
989  av_assert0(!fi->src.type);
990  fi->src = src;
991  break;
992  }
993  case SCH_NODE_TYPE_ENC: {
994  SchEnc *enc;
995 
996  av_assert0(dst.idx < sch->nb_enc);
997  enc = &sch->enc[dst.idx];
998 
999  av_assert0(!enc->src.type);
1000  enc->src = src;
1001  break;
1002  }
1003  default: av_assert0(0);
1004  }
1005 
1006  break;
1007  }
1008  case SCH_NODE_TYPE_FILTER_OUT: {
1009  SchFilterOut *fo;
1010 
1011  av_assert0(src.idx < sch->nb_filters &&
1012  src.idx_stream < sch->filters[src.idx].nb_outputs);
1013  fo = &sch->filters[src.idx].outputs[src.idx_stream];
1014 
1015  av_assert0(!fo->dst.type);
1016  fo->dst = dst;
1017 
1018  // filtered frames go to encoding or another filtergraph
1019  switch (dst.type) {
1020  case SCH_NODE_TYPE_ENC: {
1021  SchEnc *enc;
1022 
1023  av_assert0(dst.idx < sch->nb_enc);
1024  enc = &sch->enc[dst.idx];
1025 
1026  av_assert0(!enc->src.type);
1027  enc->src = src;
1028  break;
1029  }
1030  case SCH_NODE_TYPE_FILTER_IN: {
1031  SchFilterIn *fi;
1032 
1033  av_assert0(dst.idx < sch->nb_filters &&
1034  dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1035  fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1036 
1037  av_assert0(!fi->src.type);
1038  fi->src = src;
1039  break;
1040  }
1041  default: av_assert0(0);
1042  }
1043 
1044 
1045  break;
1046  }
1047  case SCH_NODE_TYPE_ENC: {
1048  SchEnc *enc;
1049 
1050  av_assert0(src.idx < sch->nb_enc);
1051  enc = &sch->enc[src.idx];
1052 
1053  ret = GROW_ARRAY(enc->dst, enc->nb_dst);
1054  if (ret < 0)
1055  return ret;
1056 
1057  enc->dst[enc->nb_dst - 1] = dst;
1058 
1059  // encoding packets go to muxing or decoding
1060  switch (dst.type) {
1061  case SCH_NODE_TYPE_MUX: {
1062  SchMuxStream *ms;
1063 
1064  av_assert0(dst.idx < sch->nb_mux &&
1065  dst.idx_stream < sch->mux[dst.idx].nb_streams);
1066  ms = &sch->mux[dst.idx].streams[dst.idx_stream];
1067 
1068  av_assert0(!ms->src.type);
1069  ms->src = src;
1070 
1071  break;
1072  }
1073  case SCH_NODE_TYPE_DEC: {
1074  SchDec *dec;
1075 
1076  av_assert0(dst.idx < sch->nb_dec);
1077  dec = &sch->dec[dst.idx];
1078 
1079  av_assert0(!dec->src.type);
1080  dec->src = src;
1081 
1082  break;
1083  }
1084  default: av_assert0(0);
1085  }
1086 
1087  break;
1088  }
1089  default: av_assert0(0);
1090  }
1091 
1092  return 0;
1093 }
1094 
1095 static int mux_task_start(SchMux *mux)
1096 {
1097  int ret = 0;
1098 
1099  ret = task_start(&mux->task);
1100  if (ret < 0)
1101  return ret;
1102 
1103  /* flush the pre-muxing queues */
1104  while (1) {
1105  int min_stream = -1;
1106  Timestamp min_ts = { .ts = AV_NOPTS_VALUE };
1107 
1108  AVPacket *pkt;
1109 
1110  // find the stream with the earliest dts or EOF in pre-muxing queue
1111  for (unsigned i = 0; i < mux->nb_streams; i++) {
1112  SchMuxStream *ms = &mux->streams[i];
1113 
1114  if (av_fifo_peek(ms->pre_mux_queue.fifo, &pkt, 1, 0) < 0)
1115  continue;
1116 
1117  if (!pkt || pkt->dts == AV_NOPTS_VALUE) {
1118  min_stream = i;
1119  break;
1120  }
1121 
1122  if (min_ts.ts == AV_NOPTS_VALUE ||
1123  av_compare_ts(min_ts.ts, min_ts.tb, pkt->dts, pkt->time_base) > 0) {
1124  min_stream = i;
1125  min_ts = (Timestamp){ .ts = pkt->dts, .tb = pkt->time_base };
1126  }
1127  }
1128 
1129  if (min_stream >= 0) {
1130  SchMuxStream *ms = &mux->streams[min_stream];
1131 
1132  ret = av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1);
1133  av_assert0(ret >= 0);
1134 
1135  if (pkt) {
1136  if (!ms->init_eof)
1137  ret = tq_send(mux->queue, min_stream, pkt);
1138  av_packet_free(&pkt);
1139  if (ret == AVERROR_EOF)
1140  ms->init_eof = 1;
1141  else if (ret < 0)
1142  return ret;
1143  } else
1144  tq_send_finish(mux->queue, min_stream);
1145 
1146  continue;
1147  }
1148 
1149  break;
1150  }
1151 
1152  atomic_store(&mux->mux_started, 1);
1153 
1154  return 0;
1155 }
1156 
1157 int print_sdp(const char *filename);
1158 
1159 static int mux_init(Scheduler *sch, SchMux *mux)
1160 {
1161  int ret;
1162 
1163  ret = mux->init(mux->task.func_arg);
1164  if (ret < 0)
1165  return ret;
1166 
1167  sch->nb_mux_ready++;
1168 
1169  if (sch->sdp_filename || sch->sdp_auto) {
1170  if (sch->nb_mux_ready < sch->nb_mux)
1171  return 0;
1172 
1173  ret = print_sdp(sch->sdp_filename);
1174  if (ret < 0) {
1175  av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n");
1176  return ret;
1177  }
1178 
1179  /* SDP is written only after all the muxers are ready, so now we
1180  * start ALL the threads */
1181  for (unsigned i = 0; i < sch->nb_mux; i++) {
1182  ret = mux_task_start(&sch->mux[i]);
1183  if (ret < 0)
1184  return ret;
1185  }
1186  } else {
1187  ret = mux_task_start(mux);
1188  if (ret < 0)
1189  return ret;
1190  }
1191 
1192  return 0;
1193 }
1194 
1195 void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1196  size_t data_threshold, int max_packets)
1197 {
1198  SchMux *mux;
1199  SchMuxStream *ms;
1200 
1201  av_assert0(mux_idx < sch->nb_mux);
1202  mux = &sch->mux[mux_idx];
1203 
1204  av_assert0(stream_idx < mux->nb_streams);
1205  ms = &mux->streams[stream_idx];
1206 
1207  ms->pre_mux_queue.max_packets = max_packets;
1208  ms->pre_mux_queue.data_threshold = data_threshold;
1209 }
1210 
1211 int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
1212 {
1213  SchMux *mux;
1214  int ret = 0;
1215 
1216  av_assert0(mux_idx < sch->nb_mux);
1217  mux = &sch->mux[mux_idx];
1218 
1219  av_assert0(stream_idx < mux->nb_streams);
1220 
1222 
1223  av_assert0(mux->nb_streams_ready < mux->nb_streams);
1224 
1225  // this may be called during initialization - do not start
1226  // threads before sch_start() is called
1227  if (++mux->nb_streams_ready == mux->nb_streams &&
1228  sch->state >= SCH_STATE_STARTED)
1229  ret = mux_init(sch, mux);
1230 
1232 
1233  return ret;
1234 }
1235 
1236 int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1237  unsigned dec_idx)
1238 {
1239  SchMux *mux;
1240  SchMuxStream *ms;
1241  int ret = 0;
1242 
1243  av_assert0(mux_idx < sch->nb_mux);
1244  mux = &sch->mux[mux_idx];
1245 
1246  av_assert0(stream_idx < mux->nb_streams);
1247  ms = &mux->streams[stream_idx];
1248 
1250  if (ret < 0)
1251  return ret;
1252 
1253  av_assert0(dec_idx < sch->nb_dec);
1254  ms->sub_heartbeat_dst[ms->nb_sub_heartbeat_dst - 1] = dec_idx;
1255 
1256  if (!mux->sub_heartbeat_pkt) {
1258  if (!mux->sub_heartbeat_pkt)
1259  return AVERROR(ENOMEM);
1260  }
1261 
1262  return 0;
1263 }
1264 
1266 {
1267  while (1) {
1268  SchFilterGraph *fg;
1269  switch (src.type) {
1270  case SCH_NODE_TYPE_DEMUX:
1271  // fed directly by a demuxer (i.e. not through a filtergraph)
1272  sch->demux[src.idx].waiter.choked_next = 0;
1273  return;
1274  case SCH_NODE_TYPE_DEC:
1275  src = sch->dec[src.idx].src;
1276  continue;
1277  case SCH_NODE_TYPE_ENC:
1278  src = sch->enc[src.idx].src;
1279  continue;
1281  fg = &sch->filters[src.idx];
1282  // the filtergraph contains internal sources and
1283  // requested to be scheduled directly
1284  if (fg->best_input == fg->nb_inputs) {
1285  fg->waiter.choked_next = 0;
1286  return;
1287  }
1288  src = fg->inputs[fg->best_input].src;
1289  continue;
1290  default:
1291  av_unreachable("Invalid source node type?");
1292  return;
1293  }
1294  }
1295 }
1296 
1297 static void choke_demux(const Scheduler *sch, int demux_id, int choked)
1298 {
1299  av_assert1(demux_id < sch->nb_demux);
1300  SchDemux *demux = &sch->demux[demux_id];
1301 
1302  for (int i = 0; i < demux->nb_streams; i++) {
1303  SchedulerNode *dst = demux->streams[i].dst;
1304  SchFilterGraph *fg;
1305 
1306  switch (dst->type) {
1307  case SCH_NODE_TYPE_DEC:
1308  tq_choke(sch->dec[dst->idx].queue, choked);
1309  break;
1310  case SCH_NODE_TYPE_ENC:
1311  tq_choke(sch->enc[dst->idx].queue, choked);
1312  break;
1313  case SCH_NODE_TYPE_MUX:
1314  break;
1316  fg = &sch->filters[dst->idx];
1317  if (fg->nb_inputs == 1)
1318  tq_choke(fg->queue, choked);
1319  break;
1320  default:
1321  av_unreachable("Invalid destination node type?");
1322  break;
1323  }
1324  }
1325 }
1326 
1328 {
1329  int64_t dts;
1330  int have_unchoked = 0;
1331 
1332  // on termination request all waiters are choked,
1333  // we are not to unchoke them
1334  if (atomic_load(&sch->terminate))
1335  return;
1336 
1337  dts = trailing_dts(sch, 0);
1338 
1339  atomic_store(&sch->last_dts, dts);
1340 
1341  // initialize our internal state
1342  for (unsigned type = 0; type < 2; type++)
1343  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1344  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1345  w->choked_prev = atomic_load(&w->choked);
1346  w->choked_next = 1;
1347  }
1348 
1349  // figure out the sources that are allowed to proceed
1350  for (unsigned i = 0; i < sch->nb_mux; i++) {
1351  SchMux *mux = &sch->mux[i];
1352 
1353  for (unsigned j = 0; j < mux->nb_streams; j++) {
1354  SchMuxStream *ms = &mux->streams[j];
1355 
1356  // unblock sources for output streams that are not finished
1357  // and not too far ahead of the trailing stream
1358  if (ms->source_finished)
1359  continue;
1360  if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
1361  continue;
1362  if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
1363  continue;
1364 
1365  // resolve the source to unchoke
1366  unchoke_for_stream(sch, ms->src);
1367  have_unchoked = 1;
1368  }
1369  }
1370 
1371  // also unchoke any sources feeding into closed filter graph inputs, so
1372  // that they can observe the downstream EOF
1373  for (unsigned i = 0; i < sch->nb_filters; i++) {
1374  SchFilterGraph *fg = &sch->filters[i];
1375 
1376  for (unsigned j = 0; j < fg->nb_inputs; j++) {
1377  SchFilterIn *fi = &fg->inputs[j];
1378  if (fi->receive_finished && !fi->send_finished)
1379  unchoke_for_stream(sch, fi->src);
1380  }
1381  }
1382 
1383  // make sure to unchoke at least one source, if still available
1384  for (unsigned type = 0; !have_unchoked && type < 2; type++)
1385  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1386  int exited = type ? sch->filters[i].task_exited : sch->demux[i].task_exited;
1387  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1388  if (!exited) {
1389  w->choked_next = 0;
1390  have_unchoked = 1;
1391  break;
1392  }
1393  }
1394 
1395  for (unsigned type = 0; type < 2; type++) {
1396  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1397  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1398  if (w->choked_prev != w->choked_next) {
1399  waiter_set(w, w->choked_next);
1400  if (!type)
1401  choke_demux(sch, i, w->choked_next);
1402  }
1403  }
1404  }
1405 
1406 }
1407 
1408 enum {
1412 };
1413 
1414 // Finds the filtergraph or muxer upstream of a scheduler node
1416 {
1417  while (1) {
1418  switch (src.type) {
1419  case SCH_NODE_TYPE_DEMUX:
1421  return src;
1422  case SCH_NODE_TYPE_DEC:
1423  src = sch->dec[src.idx].src;
1424  continue;
1425  case SCH_NODE_TYPE_ENC:
1426  src = sch->enc[src.idx].src;
1427  continue;
1428  default:
1429  av_unreachable("Invalid source node type?");
1430  return (SchedulerNode) {0};
1431  }
1432  }
1433 }
1434 
1435 static int
1437  uint8_t *filters_visited, SchedulerNode *filters_stack)
1438 {
1439  unsigned nb_filters_stack = 0;
1440 
1441  memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited));
1442 
1443  while (1) {
1444  const SchFilterGraph *fg = &sch->filters[src.idx];
1445 
1446  filters_visited[src.idx] = CYCLE_NODE_STARTED;
1447 
1448  // descend into every input, depth first
1449  if (src.idx_stream < fg->nb_inputs) {
1450  const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
1451  SchedulerNode node = src_filtergraph(sch, fi->src);
1452 
1453  // connected to demuxer, no cycles possible
1454  if (node.type == SCH_NODE_TYPE_DEMUX)
1455  continue;
1456 
1457  // otherwise connected to another filtergraph
1459 
1460  // found a cycle
1461  if (filters_visited[node.idx] == CYCLE_NODE_STARTED)
1462  return AVERROR(EINVAL);
1463 
1464  // place current position on stack and descend
1465  av_assert0(nb_filters_stack < sch->nb_filters);
1466  filters_stack[nb_filters_stack++] = src;
1467  src = (SchedulerNode){ .idx = node.idx, .idx_stream = 0 };
1468  continue;
1469  }
1470 
1471  filters_visited[src.idx] = CYCLE_NODE_DONE;
1472 
1473  // previous search finished,
1474  if (nb_filters_stack) {
1475  src = filters_stack[--nb_filters_stack];
1476  continue;
1477  }
1478  return 0;
1479  }
1480 }
1481 
1482 static int check_acyclic(Scheduler *sch)
1483 {
1484  uint8_t *filters_visited = NULL;
1485  SchedulerNode *filters_stack = NULL;
1486 
1487  int ret = 0;
1488 
1489  if (!sch->nb_filters)
1490  return 0;
1491 
1492  filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited));
1493  if (!filters_visited)
1494  return AVERROR(ENOMEM);
1495 
1496  filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack));
1497  if (!filters_stack) {
1498  ret = AVERROR(ENOMEM);
1499  goto fail;
1500  }
1501 
1502  // trace the transcoding graph upstream from every filtegraph
1503  for (unsigned i = 0; i < sch->nb_filters; i++) {
1504  ret = check_acyclic_for_output(sch, (SchedulerNode){ .idx = i },
1505  filters_visited, filters_stack);
1506  if (ret < 0) {
1507  av_log(&sch->filters[i], AV_LOG_ERROR, "Transcoding graph has a cycle\n");
1508  goto fail;
1509  }
1510  }
1511 
1512 fail:
1513  av_freep(&filters_visited);
1514  av_freep(&filters_stack);
1515  return ret;
1516 }
1517 
1518 static int start_prepare(Scheduler *sch)
1519 {
1520  int ret;
1521 
1522  for (unsigned i = 0; i < sch->nb_demux; i++) {
1523  SchDemux *d = &sch->demux[i];
1524 
1525  for (unsigned j = 0; j < d->nb_streams; j++) {
1526  SchDemuxStream *ds = &d->streams[j];
1527 
1528  if (!ds->nb_dst) {
1529  av_log(d, AV_LOG_ERROR,
1530  "Demuxer stream %u not connected to any sink\n", j);
1531  return AVERROR(EINVAL);
1532  }
1533 
1534  ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished));
1535  if (!ds->dst_finished)
1536  return AVERROR(ENOMEM);
1537  }
1538  }
1539 
1540  for (unsigned i = 0; i < sch->nb_dec; i++) {
1541  SchDec *dec = &sch->dec[i];
1542 
1543  if (!dec->src.type) {
1544  av_log(dec, AV_LOG_ERROR,
1545  "Decoder not connected to a source\n");
1546  return AVERROR(EINVAL);
1547  }
1548 
1549  for (unsigned j = 0; j < dec->nb_outputs; j++) {
1550  SchDecOutput *o = &dec->outputs[j];
1551 
1552  if (!o->nb_dst) {
1553  av_log(dec, AV_LOG_ERROR,
1554  "Decoder output %u not connected to any sink\n", j);
1555  return AVERROR(EINVAL);
1556  }
1557 
1558  o->dst_finished = av_calloc(o->nb_dst, sizeof(*o->dst_finished));
1559  if (!o->dst_finished)
1560  return AVERROR(ENOMEM);
1561  }
1562  }
1563 
1564  for (unsigned i = 0; i < sch->nb_enc; i++) {
1565  SchEnc *enc = &sch->enc[i];
1566 
1567  if (!enc->src.type) {
1568  av_log(enc, AV_LOG_ERROR,
1569  "Encoder not connected to a source\n");
1570  return AVERROR(EINVAL);
1571  }
1572  if (!enc->nb_dst) {
1573  av_log(enc, AV_LOG_ERROR,
1574  "Encoder not connected to any sink\n");
1575  return AVERROR(EINVAL);
1576  }
1577 
1578  enc->dst_finished = av_calloc(enc->nb_dst, sizeof(*enc->dst_finished));
1579  if (!enc->dst_finished)
1580  return AVERROR(ENOMEM);
1581  }
1582 
1583  for (unsigned i = 0; i < sch->nb_mux; i++) {
1584  SchMux *mux = &sch->mux[i];
1585 
1586  for (unsigned j = 0; j < mux->nb_streams; j++) {
1587  SchMuxStream *ms = &mux->streams[j];
1588 
1589  if (!ms->src.type) {
1590  av_log(mux, AV_LOG_ERROR,
1591  "Muxer stream #%u not connected to a source\n", j);
1592  return AVERROR(EINVAL);
1593  }
1594  }
1595 
1596  ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
1597  QUEUE_PACKETS);
1598  if (ret < 0)
1599  return ret;
1600  }
1601 
1602  for (unsigned i = 0; i < sch->nb_filters; i++) {
1603  SchFilterGraph *fg = &sch->filters[i];
1604 
1605  for (unsigned j = 0; j < fg->nb_inputs; j++) {
1606  SchFilterIn *fi = &fg->inputs[j];
1607 
1608  if (!fi->src.type) {
1609  av_log(fg, AV_LOG_ERROR,
1610  "Filtergraph input %u not connected to a source\n", j);
1611  return AVERROR(EINVAL);
1612  }
1613  }
1614 
1615  for (unsigned j = 0; j < fg->nb_outputs; j++) {
1616  SchFilterOut *fo = &fg->outputs[j];
1617 
1618  if (!fo->dst.type) {
1619  av_log(fg, AV_LOG_ERROR,
1620  "Filtergraph %u output %u not connected to a sink\n", i, j);
1621  return AVERROR(EINVAL);
1622  }
1623  }
1624  }
1625 
1626  // Check that the transcoding graph has no cycles.
1627  ret = check_acyclic(sch);
1628  if (ret < 0)
1629  return ret;
1630 
1631  return 0;
1632 }
1633 
1635 {
1636  int ret;
1637 
1638  ret = start_prepare(sch);
1639  if (ret < 0)
1640  return ret;
1641 
1643  sch->state = SCH_STATE_STARTED;
1644 
1645  for (unsigned i = 0; i < sch->nb_mux; i++) {
1646  SchMux *mux = &sch->mux[i];
1647 
1648  if (mux->nb_streams_ready == mux->nb_streams) {
1649  ret = mux_init(sch, mux);
1650  if (ret < 0)
1651  goto fail;
1652  }
1653  }
1654 
1655  for (unsigned i = 0; i < sch->nb_enc; i++) {
1656  SchEnc *enc = &sch->enc[i];
1657 
1658  ret = task_start(&enc->task);
1659  if (ret < 0)
1660  goto fail;
1661  }
1662 
1663  for (unsigned i = 0; i < sch->nb_filters; i++) {
1664  SchFilterGraph *fg = &sch->filters[i];
1665 
1666  ret = task_start(&fg->task);
1667  if (ret < 0)
1668  goto fail;
1669  }
1670 
1671  for (unsigned i = 0; i < sch->nb_dec; i++) {
1672  SchDec *dec = &sch->dec[i];
1673 
1674  ret = task_start(&dec->task);
1675  if (ret < 0)
1676  goto fail;
1677  }
1678 
1679  for (unsigned i = 0; i < sch->nb_demux; i++) {
1680  SchDemux *d = &sch->demux[i];
1681 
1682  if (!d->nb_streams)
1683  continue;
1684 
1685  ret = task_start(&d->task);
1686  if (ret < 0)
1687  goto fail;
1688  }
1689 
1693 
1694  return 0;
1695 fail:
1696  sch_stop(sch, NULL);
1697  return ret;
1698 }
1699 
1700 int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
1701 {
1702  int ret;
1703 
1704  // convert delay to absolute timestamp
1705  timeout_us += av_gettime();
1706 
1708 
1709  if (sch->nb_mux_done < sch->nb_mux) {
1710  struct timespec tv = { .tv_sec = timeout_us / 1000000,
1711  .tv_nsec = (timeout_us % 1000000) * 1000 };
1712  pthread_cond_timedwait(&sch->finish_cond, &sch->finish_lock, &tv);
1713  }
1714 
1715  // abort transcoding if any task failed
1716  ret = sch->nb_mux_done == sch->nb_mux || sch->task_failed;
1717 
1719 
1720  *transcode_ts = atomic_load(&sch->last_dts);
1721 
1722  return ret;
1723 }
1724 
1725 static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
1726 {
1727  int ret;
1728 
1729  ret = enc->open_cb(enc->task.func_arg, frame);
1730  if (ret < 0)
1731  return ret;
1732 
1733  // ret>0 signals audio frame size, which means sync queue must
1734  // have been enabled during encoder creation
1735  if (ret > 0) {
1736  SchSyncQueue *sq;
1737 
1738  av_assert0(enc->sq_idx[0] >= 0);
1739  sq = &sch->sq_enc[enc->sq_idx[0]];
1740 
1741  pthread_mutex_lock(&sq->lock);
1742 
1743  sq_frame_samples(sq->sq, enc->sq_idx[1], ret);
1744 
1745  pthread_mutex_unlock(&sq->lock);
1746  }
1747 
1748  return 0;
1749 }
1750 
1752 {
1753  int ret;
1754 
1755  if (!frame) {
1756  tq_send_finish(enc->queue, 0);
1757  return 0;
1758  }
1759 
1760  if (enc->in_finished)
1761  return AVERROR_EOF;
1762 
1763  ret = tq_send(enc->queue, 0, frame);
1764  if (ret < 0)
1765  enc->in_finished = 1;
1766 
1767  return ret;
1768 }
1769 
1770 static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1771 {
1772  SchSyncQueue *sq = &sch->sq_enc[enc->sq_idx[0]];
1773  int ret = 0;
1774 
1775  // inform the scheduling code that no more input will arrive along this path;
1776  // this is necessary because the sync queue may not send an EOF downstream
1777  // until other streams finish
1778  // TODO: consider a cleaner way of passing this information through
1779  // the pipeline
1780  if (!frame) {
1781  for (unsigned i = 0; i < enc->nb_dst; i++) {
1782  SchMux *mux;
1783  SchMuxStream *ms;
1784 
1785  if (enc->dst[i].type != SCH_NODE_TYPE_MUX)
1786  continue;
1787 
1788  mux = &sch->mux[enc->dst[i].idx];
1789  ms = &mux->streams[enc->dst[i].idx_stream];
1790 
1792 
1793  ms->source_finished = 1;
1795 
1797  }
1798  }
1799 
1800  pthread_mutex_lock(&sq->lock);
1801 
1802  ret = sq_send(sq->sq, enc->sq_idx[1], SQFRAME(frame));
1803  if (ret < 0)
1804  goto finish;
1805 
1806  while (1) {
1807  SchEnc *enc;
1808 
1809  // TODO: the SQ API should be extended to allow returning EOF
1810  // for individual streams
1811  ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame));
1812  if (ret < 0) {
1813  ret = (ret == AVERROR(EAGAIN)) ? 0 : ret;
1814  break;
1815  }
1816 
1817  enc = &sch->enc[sq->enc_idx[ret]];
1818  ret = send_to_enc_thread(sch, enc, sq->frame);
1819  if (ret < 0) {
1820  av_frame_unref(sq->frame);
1821  if (ret != AVERROR_EOF)
1822  break;
1823 
1824  sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL));
1825  continue;
1826  }
1827  }
1828 
1829  if (ret < 0) {
1830  // close all encoders fed from this sync queue
1831  for (unsigned i = 0; i < sq->nb_enc_idx; i++) {
1832  int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL);
1833 
1834  // if the sync queue error is EOF and closing the encoder
1835  // produces a more serious error, make sure to pick the latter
1836  ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err);
1837  }
1838  }
1839 
1840 finish:
1841  pthread_mutex_unlock(&sq->lock);
1842 
1843  return ret;
1844 }
1845 
1846 static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1847 {
1848  if (enc->open_cb && frame && !enc->opened) {
1849  int ret = enc_open(sch, enc, frame);
1850  if (ret < 0)
1851  return ret;
1852  enc->opened = 1;
1853 
1854  // discard empty frames that only carry encoder init parameters
1855  if (!frame->buf[0]) {
1857  return 0;
1858  }
1859  }
1860 
1861  return (enc->sq_idx[0] >= 0) ?
1862  send_to_enc_sq (sch, enc, frame) :
1863  send_to_enc_thread(sch, enc, frame);
1864 }
1865 
1867 {
1868  PreMuxQueue *q = &ms->pre_mux_queue;
1869  AVPacket *tmp_pkt = NULL;
1870  int ret;
1871 
1872  if (!av_fifo_can_write(q->fifo)) {
1873  size_t packets = av_fifo_can_read(q->fifo);
1874  size_t pkt_size = pkt ? pkt->size : 0;
1875  int thresh_reached = (q->data_size + pkt_size) > q->data_threshold;
1876  size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX;
1877  size_t new_size = FFMIN(2 * packets, max_packets);
1878 
1879  if (new_size <= packets) {
1880  av_log(mux, AV_LOG_ERROR,
1881  "Too many packets buffered for output stream.\n");
1882  return AVERROR_BUFFER_TOO_SMALL;
1883  }
1884  ret = av_fifo_grow2(q->fifo, new_size - packets);
1885  if (ret < 0)
1886  return ret;
1887  }
1888 
1889  if (pkt) {
1890  tmp_pkt = av_packet_alloc();
1891  if (!tmp_pkt)
1892  return AVERROR(ENOMEM);
1893 
1894  av_packet_move_ref(tmp_pkt, pkt);
1895  q->data_size += tmp_pkt->size;
1896  }
1897  av_fifo_write(q->fifo, &tmp_pkt, 1);
1898 
1899  return 0;
1900 }
1901 
1902 static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx,
1903  AVPacket *pkt)
1904 {
1905  SchMuxStream *ms = &mux->streams[stream_idx];
1906  int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ?
1909 
1910  // queue the packet if the muxer cannot be started yet
1911  if (!atomic_load(&mux->mux_started)) {
1912  int queued = 0;
1913 
1914  // the muxer could have started between the above atomic check and
1915  // locking the mutex, then this block falls through to normal send path
1917 
1918  if (!atomic_load(&mux->mux_started)) {
1919  int ret = mux_queue_packet(mux, ms, pkt);
1920  queued = ret < 0 ? ret : 1;
1921  }
1922 
1924 
1925  if (queued < 0)
1926  return queued;
1927  else if (queued)
1928  goto update_schedule;
1929  }
1930 
1931  if (pkt) {
1932  int ret;
1933 
1934  if (ms->init_eof)
1935  return AVERROR_EOF;
1936 
1937  ret = tq_send(mux->queue, stream_idx, pkt);
1938  if (ret < 0)
1939  return ret;
1940  } else
1941  tq_send_finish(mux->queue, stream_idx);
1942 
1943 update_schedule:
1944  // TODO: use atomics to check whether this changes trailing dts
1945  // to avoid locking unnecessarily
1946  if (dts != AV_NOPTS_VALUE || !pkt) {
1948 
1949  if (pkt) ms->last_dts = dts;
1950  else ms->source_finished = 1;
1951 
1953 
1955  }
1956 
1957  return 0;
1958 }
1959 
1960 static int
1962  uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
1963 {
1964  int ret;
1965 
1966  if (*dst_finished)
1967  return AVERROR_EOF;
1968 
1969  if (pkt && dst.type == SCH_NODE_TYPE_MUX &&
1972  pkt = NULL;
1973  }
1974 
1975  if (!pkt)
1976  goto finish;
1977 
1978  ret = (dst.type == SCH_NODE_TYPE_MUX) ?
1979  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
1980  tq_send(sch->dec[dst.idx].queue, 0, pkt);
1981  if (ret == AVERROR_EOF)
1982  goto finish;
1983 
1984  return ret;
1985 
1986 finish:
1987  if (dst.type == SCH_NODE_TYPE_MUX)
1988  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
1989  else
1990  tq_send_finish(sch->dec[dst.idx].queue, 0);
1991 
1992  *dst_finished = 1;
1993  return AVERROR_EOF;
1994 }
1995 
1997  AVPacket *pkt, unsigned flags)
1998 {
1999  unsigned nb_done = 0;
2000 
2001  for (unsigned i = 0; i < ds->nb_dst; i++) {
2002  AVPacket *to_send = pkt;
2003  uint8_t *finished = &ds->dst_finished[i];
2004 
2005  int ret;
2006 
2007  // sending a packet consumes it, so make a temporary reference if needed
2008  if (pkt && i < ds->nb_dst - 1) {
2009  to_send = d->send_pkt;
2010 
2011  ret = av_packet_ref(to_send, pkt);
2012  if (ret < 0)
2013  return ret;
2014  }
2015 
2016  ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags);
2017  if (to_send)
2018  av_packet_unref(to_send);
2019  if (ret == AVERROR_EOF)
2020  nb_done++;
2021  else if (ret < 0)
2022  return ret;
2023  }
2024 
2025  return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0;
2026 }
2027 
2029 {
2030  Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE };
2031 
2032  av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems);
2033 
2034  for (unsigned i = 0; i < d->nb_streams; i++) {
2035  SchDemuxStream *ds = &d->streams[i];
2036 
2037  for (unsigned j = 0; j < ds->nb_dst; j++) {
2038  const SchedulerNode *dst = &ds->dst[j];
2039  SchDec *dec;
2040  int ret;
2041 
2042  if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC)
2043  continue;
2044 
2045  dec = &sch->dec[dst->idx];
2046 
2047  ret = tq_send(dec->queue, 0, pkt);
2048  if (ret < 0)
2049  return ret;
2050 
2051  if (dec->queue_end_ts) {
2052  Timestamp ts;
2054  if (ret < 0)
2055  return ret;
2056 
2057  if (max_end_ts.ts == AV_NOPTS_VALUE ||
2058  (ts.ts != AV_NOPTS_VALUE &&
2059  av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0))
2060  max_end_ts = ts;
2061 
2062  }
2063  }
2064  }
2065 
2066  pkt->pts = max_end_ts.ts;
2067  pkt->time_base = max_end_ts.tb;
2068 
2069  return 0;
2070 }
2071 
2072 int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt,
2073  unsigned flags)
2074 {
2075  SchDemux *d;
2076  int terminate;
2077 
2078  av_assert0(demux_idx < sch->nb_demux);
2079  d = &sch->demux[demux_idx];
2080 
2081  terminate = waiter_wait(sch, &d->waiter);
2082  if (terminate)
2083  return AVERROR_EXIT;
2084 
2085  // flush the downstreams after seek
2086  if (pkt->stream_index == -1)
2087  return demux_flush(sch, d, pkt);
2088 
2090 
2091  return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags);
2092 }
2093 
2094 static int demux_done(Scheduler *sch, unsigned demux_idx)
2095 {
2096  SchDemux *d = &sch->demux[demux_idx];
2097  int ret = 0;
2098 
2099  for (unsigned i = 0; i < d->nb_streams; i++) {
2100  int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0);
2101  if (err != AVERROR_EOF)
2102  ret = err_merge(ret, err);
2103  }
2104 
2106 
2107  d->task_exited = 1;
2108 
2110 
2112 
2113  return ret;
2114 }
2115 
2116 int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
2117 {
2118  SchMux *mux;
2119  int ret, stream_idx;
2120 
2121  av_assert0(mux_idx < sch->nb_mux);
2122  mux = &sch->mux[mux_idx];
2123 
2124  ret = tq_receive(mux->queue, &stream_idx, pkt);
2125  pkt->stream_index = stream_idx;
2126  return ret;
2127 }
2128 
2129 void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
2130 {
2131  SchMux *mux;
2132 
2133  av_assert0(mux_idx < sch->nb_mux);
2134  mux = &sch->mux[mux_idx];
2135 
2136  av_assert0(stream_idx < mux->nb_streams);
2137  tq_receive_finish(mux->queue, stream_idx);
2138 
2140  mux->streams[stream_idx].source_finished = 1;
2141 
2143 
2145 }
2146 
2147 int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
2148  const AVPacket *pkt)
2149 {
2150  SchMux *mux;
2151  SchMuxStream *ms;
2152 
2153  av_assert0(mux_idx < sch->nb_mux);
2154  mux = &sch->mux[mux_idx];
2155 
2156  av_assert0(stream_idx < mux->nb_streams);
2157  ms = &mux->streams[stream_idx];
2158 
2159  for (unsigned i = 0; i < ms->nb_sub_heartbeat_dst; i++) {
2160  SchDec *dst = &sch->dec[ms->sub_heartbeat_dst[i]];
2161  int ret;
2162 
2164  if (ret < 0)
2165  return ret;
2166 
2167  tq_send(dst->queue, 0, mux->sub_heartbeat_pkt);
2168  }
2169 
2170  return 0;
2171 }
2172 
2173 static int mux_done(Scheduler *sch, unsigned mux_idx)
2174 {
2175  SchMux *mux = &sch->mux[mux_idx];
2176 
2178 
2179  for (unsigned i = 0; i < mux->nb_streams; i++) {
2180  tq_receive_finish(mux->queue, i);
2181  mux->streams[i].source_finished = 1;
2182  }
2183 
2185 
2187 
2189 
2190  av_assert0(sch->nb_mux_done < sch->nb_mux);
2191  sch->nb_mux_done++;
2192 
2194 
2196 
2197  return 0;
2198 }
2199 
2200 int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
2201 {
2202  SchDec *dec;
2203  int ret, dummy;
2204 
2205  av_assert0(dec_idx < sch->nb_dec);
2206  dec = &sch->dec[dec_idx];
2207 
2208  // the decoder should have given us post-flush end timestamp in pkt
2209  if (dec->expect_end_ts) {
2210  Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base };
2212  if (ret < 0)
2213  return ret;
2214 
2215  dec->expect_end_ts = 0;
2216  }
2217 
2218  ret = tq_receive(dec->queue, &dummy, pkt);
2219  av_assert0(dummy <= 0);
2220 
2221  // got a flush packet, on the next call to this function the decoder
2222  // will give us post-flush end timestamp
2223  if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
2224  dec->expect_end_ts = 1;
2225 
2226  return ret;
2227 }
2228 
2230  unsigned in_idx, AVFrame *frame)
2231 {
2232  if (frame)
2233  return tq_send(fg->queue, in_idx, frame);
2234 
2235  if (!fg->inputs[in_idx].send_finished) {
2236  fg->inputs[in_idx].send_finished = 1;
2237  tq_send_finish(fg->queue, in_idx);
2238 
2239  // close the control stream when all actual inputs are done
2240  if (atomic_fetch_add(&fg->nb_inputs_finished_send, 1) == fg->nb_inputs - 1)
2241  tq_send_finish(fg->queue, fg->nb_inputs);
2242  }
2243  return 0;
2244 }
2245 
2247  uint8_t *dst_finished, AVFrame *frame)
2248 {
2249  int ret;
2250 
2251  if (*dst_finished)
2252  return AVERROR_EOF;
2253 
2254  if (!frame)
2255  goto finish;
2256 
2257  ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ?
2258  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) :
2259  send_to_enc(sch, &sch->enc[dst.idx], frame);
2260  if (ret == AVERROR_EOF)
2261  goto finish;
2262 
2263  return ret;
2264 
2265 finish:
2266  if (dst.type == SCH_NODE_TYPE_FILTER_IN)
2267  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2268  else
2269  send_to_enc(sch, &sch->enc[dst.idx], NULL);
2270 
2271  *dst_finished = 1;
2272 
2273  return AVERROR_EOF;
2274 }
2275 
2276 int sch_dec_send(Scheduler *sch, unsigned dec_idx,
2277  unsigned out_idx, AVFrame *frame)
2278 {
2279  SchDec *dec;
2280  SchDecOutput *o;
2281  int ret;
2282  unsigned nb_done = 0;
2283 
2284  av_assert0(dec_idx < sch->nb_dec);
2285  dec = &sch->dec[dec_idx];
2286 
2287  av_assert0(out_idx < dec->nb_outputs);
2288  o = &dec->outputs[out_idx];
2289 
2290  for (unsigned i = 0; i < o->nb_dst; i++) {
2291  uint8_t *finished = &o->dst_finished[i];
2292  AVFrame *to_send = frame;
2293 
2294  // sending a frame consumes it, so make a temporary reference if needed
2295  if (i < o->nb_dst - 1) {
2296  to_send = dec->send_frame;
2297 
2298  // frame may sometimes contain props only,
2299  // e.g. to signal EOF timestamp
2300  ret = frame->buf[0] ? av_frame_ref(to_send, frame) :
2301  av_frame_copy_props(to_send, frame);
2302  if (ret < 0)
2303  return ret;
2304  }
2305 
2306  ret = dec_send_to_dst(sch, o->dst[i], finished, to_send);
2307  if (ret < 0) {
2308  av_frame_unref(to_send);
2309  if (ret == AVERROR_EOF) {
2310  nb_done++;
2311  continue;
2312  }
2313  return ret;
2314  }
2315  }
2316 
2317  return (nb_done == o->nb_dst) ? AVERROR_EOF : 0;
2318 }
2319 
2320 static int dec_done(Scheduler *sch, unsigned dec_idx)
2321 {
2322  SchDec *dec = &sch->dec[dec_idx];
2323  int ret = 0;
2324 
2325  tq_receive_finish(dec->queue, 0);
2326 
2327  // make sure our source does not get stuck waiting for end timestamps
2328  // that will never arrive
2329  if (dec->queue_end_ts)
2331 
2332  for (unsigned i = 0; i < dec->nb_outputs; i++) {
2333  SchDecOutput *o = &dec->outputs[i];
2334 
2335  for (unsigned j = 0; j < o->nb_dst; j++) {
2336  int err = dec_send_to_dst(sch, o->dst[j], &o->dst_finished[j], NULL);
2337  if (err < 0 && err != AVERROR_EOF)
2338  ret = err_merge(ret, err);
2339  }
2340  }
2341 
2342  return ret;
2343 }
2344 
2345 int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
2346 {
2347  SchEnc *enc;
2348  int ret, dummy;
2349 
2350  av_assert0(enc_idx < sch->nb_enc);
2351  enc = &sch->enc[enc_idx];
2352 
2353  ret = tq_receive(enc->queue, &dummy, frame);
2354  av_assert0(dummy <= 0);
2355 
2356  return ret;
2357 }
2358 
2360  uint8_t *dst_finished, AVPacket *pkt)
2361 {
2362  int ret;
2363 
2364  if (*dst_finished)
2365  return AVERROR_EOF;
2366 
2367  if (!pkt)
2368  goto finish;
2369 
2370  ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2371  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2372  tq_send(sch->dec[dst.idx].queue, 0, pkt);
2373  if (ret == AVERROR_EOF)
2374  goto finish;
2375 
2376  return ret;
2377 
2378 finish:
2379  if (dst.type == SCH_NODE_TYPE_MUX)
2380  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2381  else
2382  tq_send_finish(sch->dec[dst.idx].queue, 0);
2383 
2384  *dst_finished = 1;
2385 
2386  return AVERROR_EOF;
2387 }
2388 
2389 int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
2390 {
2391  SchEnc *enc;
2392  int ret;
2393 
2394  av_assert0(enc_idx < sch->nb_enc);
2395  enc = &sch->enc[enc_idx];
2396 
2397  for (unsigned i = 0; i < enc->nb_dst; i++) {
2398  uint8_t *finished = &enc->dst_finished[i];
2399  AVPacket *to_send = pkt;
2400 
2401  // sending a packet consumes it, so make a temporary reference if needed
2402  if (i < enc->nb_dst - 1) {
2403  to_send = enc->send_pkt;
2404 
2405  ret = av_packet_ref(to_send, pkt);
2406  if (ret < 0)
2407  return ret;
2408  }
2409 
2410  ret = enc_send_to_dst(sch, enc->dst[i], finished, to_send);
2411  if (ret < 0) {
2412  av_packet_unref(to_send);
2413  if (ret == AVERROR_EOF)
2414  continue;
2415  return ret;
2416  }
2417  }
2418 
2419  return 0;
2420 }
2421 
2422 static int enc_done(Scheduler *sch, unsigned enc_idx)
2423 {
2424  SchEnc *enc = &sch->enc[enc_idx];
2425  int ret = 0;
2426 
2427  tq_receive_finish(enc->queue, 0);
2428 
2429  for (unsigned i = 0; i < enc->nb_dst; i++) {
2430  int err = enc_send_to_dst(sch, enc->dst[i], &enc->dst_finished[i], NULL);
2431  if (err < 0 && err != AVERROR_EOF)
2432  ret = err_merge(ret, err);
2433  }
2434 
2435  return ret;
2436 }
2437 
2438 int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
2439  unsigned *in_idx, AVFrame *frame)
2440 {
2441  SchFilterGraph *fg;
2442 
2443  av_assert0(fg_idx < sch->nb_filters);
2444  fg = &sch->filters[fg_idx];
2445 
2446  av_assert0(*in_idx <= fg->nb_inputs);
2447 
2448  // update scheduling to account for desired input stream, if it changed
2449  //
2450  // this check needs no locking because only the filtering thread
2451  // updates this value
2452  if (*in_idx != fg->best_input) {
2454 
2455  fg->best_input = *in_idx;
2457 
2459  }
2460 
2461  if (*in_idx == fg->nb_inputs) {
2462  int terminate = waiter_wait(sch, &fg->waiter);
2463  return terminate ? AVERROR_EOF : AVERROR(EAGAIN);
2464  }
2465 
2466  while (1) {
2467  int ret, idx;
2468 
2469  ret = tq_receive(fg->queue, &idx, frame);
2470  if (idx < 0)
2471  return AVERROR_EOF;
2472  else if (ret >= 0) {
2473  *in_idx = idx;
2474  return 0;
2475  }
2476 
2477  // disregard EOFs for specific streams - they should always be
2478  // preceded by an EOF frame
2479  }
2480 }
2481 
2482 void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
2483 {
2484  SchFilterGraph *fg;
2485  SchFilterIn *fi;
2486 
2487  av_assert0(fg_idx < sch->nb_filters);
2488  fg = &sch->filters[fg_idx];
2489 
2490  av_assert0(in_idx < fg->nb_inputs);
2491  fi = &fg->inputs[in_idx];
2492 
2494 
2495  if (!fi->receive_finished) {
2496  fi->receive_finished = 1;
2497  tq_receive_finish(fg->queue, in_idx);
2498 
2499  // close the control stream when all actual inputs are done
2500  if (++fg->nb_inputs_finished_receive == fg->nb_inputs)
2501  tq_receive_finish(fg->queue, fg->nb_inputs);
2502 
2504  }
2505 
2507 }
2508 
2509 int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
2510 {
2511  SchFilterGraph *fg;
2513  int ret;
2514 
2515  av_assert0(fg_idx < sch->nb_filters);
2516  fg = &sch->filters[fg_idx];
2517 
2518  av_assert0(out_idx < fg->nb_outputs);
2519  dst = fg->outputs[out_idx].dst;
2520 
2521  if (dst.type == SCH_NODE_TYPE_ENC) {
2522  ret = send_to_enc(sch, &sch->enc[dst.idx], frame);
2523  if (ret == AVERROR_EOF)
2524  send_to_enc(sch, &sch->enc[dst.idx], NULL);
2525  } else {
2526  ret = send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame);
2527  if (ret == AVERROR_EOF)
2528  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2529  }
2530  return ret;
2531 }
2532 
2533 static int filter_done(Scheduler *sch, unsigned fg_idx)
2534 {
2535  SchFilterGraph *fg = &sch->filters[fg_idx];
2536  int ret = 0;
2537 
2538  for (unsigned i = 0; i <= fg->nb_inputs; i++)
2539  tq_receive_finish(fg->queue, i);
2540 
2541  for (unsigned i = 0; i < fg->nb_outputs; i++) {
2542  SchedulerNode dst = fg->outputs[i].dst;
2543  int err = (dst.type == SCH_NODE_TYPE_ENC) ?
2544  send_to_enc (sch, &sch->enc[dst.idx], NULL) :
2545  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2546 
2547  if (err < 0 && err != AVERROR_EOF)
2548  ret = err_merge(ret, err);
2549  }
2550 
2552 
2553  fg->task_exited = 1;
2554 
2556 
2558 
2559  return ret;
2560 }
2561 
2562 int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
2563 {
2564  SchFilterGraph *fg;
2565 
2566  av_assert0(fg_idx < sch->nb_filters);
2567  fg = &sch->filters[fg_idx];
2568 
2569  return send_to_filter(sch, fg, fg->nb_inputs, frame);
2570 }
2571 
2572 void sch_filter_choke_inputs(Scheduler *sch, unsigned fg_idx)
2573 {
2574  SchFilterGraph *fg;
2575  av_assert0(fg_idx < sch->nb_filters);
2576  fg = &sch->filters[fg_idx];
2577 
2579  fg->best_input = fg->nb_inputs;
2582 }
2583 
2584 static int task_cleanup(Scheduler *sch, SchedulerNode node)
2585 {
2586  switch (node.type) {
2587  case SCH_NODE_TYPE_DEMUX: return demux_done (sch, node.idx);
2588  case SCH_NODE_TYPE_MUX: return mux_done (sch, node.idx);
2589  case SCH_NODE_TYPE_DEC: return dec_done (sch, node.idx);
2590  case SCH_NODE_TYPE_ENC: return enc_done (sch, node.idx);
2591  case SCH_NODE_TYPE_FILTER_IN: return filter_done(sch, node.idx);
2592  default: av_assert0(0);
2593  }
2594 }
2595 
2596 static void *task_wrapper(void *arg)
2597 {
2598  SchTask *task = arg;
2599  Scheduler *sch = task->parent;
2600  int ret;
2601  int err = 0;
2602 
2603  ret = task->func(task->func_arg);
2604  if (ret < 0)
2605  av_log(task->func_arg, AV_LOG_ERROR,
2606  "Task finished with error code: %d (%s)\n", ret, av_err2str(ret));
2607 
2608  err = task_cleanup(sch, task->node);
2609  ret = err_merge(ret, err);
2610 
2611  // EOF is considered normal termination
2612  if (ret == AVERROR_EOF)
2613  ret = 0;
2614  if (ret < 0) {
2616  sch->task_failed = 1;
2619  }
2620 
2622  "Terminating thread with return code %d (%s)\n", ret,
2623  ret < 0 ? av_err2str(ret) : "success");
2624 
2625  return (void*)(intptr_t)ret;
2626 }
2627 
2628 static int task_stop(Scheduler *sch, SchTask *task)
2629 {
2630  int ret;
2631  void *thread_ret;
2632 
2633  if (!task->thread_running)
2634  return task_cleanup(sch, task->node);
2635 
2636  ret = pthread_join(task->thread, &thread_ret);
2637  av_assert0(ret == 0);
2638 
2639  task->thread_running = 0;
2640 
2641  return (intptr_t)thread_ret;
2642 }
2643 
2644 int sch_stop(Scheduler *sch, int64_t *finish_ts)
2645 {
2646  int ret = 0, err;
2647 
2648  if (sch->state != SCH_STATE_STARTED)
2649  return 0;
2650 
2651  atomic_store(&sch->terminate, 1);
2652 
2653  for (unsigned type = 0; type < 2; type++)
2654  for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
2655  SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
2656  waiter_set(w, 1);
2657  if (type)
2658  choke_demux(sch, i, 0); // unfreeze to allow draining
2659  }
2660 
2661  for (unsigned i = 0; i < sch->nb_demux; i++) {
2662  SchDemux *d = &sch->demux[i];
2663 
2664  err = task_stop(sch, &d->task);
2665  ret = err_merge(ret, err);
2666  }
2667 
2668  for (unsigned i = 0; i < sch->nb_dec; i++) {
2669  SchDec *dec = &sch->dec[i];
2670 
2671  err = task_stop(sch, &dec->task);
2672  ret = err_merge(ret, err);
2673  }
2674 
2675  for (unsigned i = 0; i < sch->nb_filters; i++) {
2676  SchFilterGraph *fg = &sch->filters[i];
2677 
2678  err = task_stop(sch, &fg->task);
2679  ret = err_merge(ret, err);
2680  }
2681 
2682  for (unsigned i = 0; i < sch->nb_enc; i++) {
2683  SchEnc *enc = &sch->enc[i];
2684 
2685  err = task_stop(sch, &enc->task);
2686  ret = err_merge(ret, err);
2687  }
2688 
2689  for (unsigned i = 0; i < sch->nb_mux; i++) {
2690  SchMux *mux = &sch->mux[i];
2691 
2692  err = task_stop(sch, &mux->task);
2693  ret = err_merge(ret, err);
2694  }
2695 
2696  if (finish_ts)
2697  *finish_ts = trailing_dts(sch, 1);
2698 
2699  sch->state = SCH_STATE_STOPPED;
2700 
2701  return ret;
2702 }
Scheduler::sq_enc
SchSyncQueue * sq_enc
Definition: ffmpeg_sched.c:297
flags
const SwsFlags flags[]
Definition: swscale.c:61
func
int(* func)(AVBPrint *dst, const char *in, const char *arg)
Definition: jacosubdec.c:68
pthread_mutex_t
_fmutex pthread_mutex_t
Definition: os2threads.h:53
SchWaiter
Definition: ffmpeg_sched.c:52
av_packet_unref
void av_packet_unref(AVPacket *pkt)
Wipe the packet.
Definition: packet.c:433
mux_task_start
static int mux_task_start(SchMux *mux)
Definition: ffmpeg_sched.c:1095
pthread_join
static av_always_inline int pthread_join(pthread_t thread, void **value_ptr)
Definition: os2threads.h:94
SchedulerNode::idx_stream
unsigned idx_stream
Definition: ffmpeg_sched.h:106
SchDecOutput::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:76
waiter_init
static int waiter_init(SchWaiter *w)
Definition: ffmpeg_sched.c:349
av_fifo_can_write
size_t av_fifo_can_write(const AVFifo *f)
Definition: fifo.c:94
Scheduler::finish_lock
pthread_mutex_t finish_lock
Definition: ffmpeg_sched.c:287
atomic_store
#define atomic_store(object, desired)
Definition: stdatomic.h:85
sch_filter_send
int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
Called by filtergraph tasks to send a filtered frame or EOF to consumers.
Definition: ffmpeg_sched.c:2509
err_merge
static int err_merge(int err0, int err1)
Merge two return codes - return one of the error codes if at least one of them was negative,...
Definition: ffmpeg_utils.h:39
SchDec::task
SchTask task
Definition: ffmpeg_sched.c:88
THREAD_QUEUE_PACKETS
@ THREAD_QUEUE_PACKETS
Definition: thread_queue.h:26
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
Scheduler::enc
SchEnc * enc
Definition: ffmpeg_sched.c:294
Scheduler::nb_mux_done
unsigned nb_mux_done
Definition: ffmpeg_sched.c:285
av_compare_ts
int av_compare_ts(int64_t ts_a, AVRational tb_a, int64_t ts_b, AVRational tb_b)
Compare two timestamps each in its own time base.
Definition: mathematics.c:147
SchedulerState
SchedulerState
Definition: ffmpeg_sched.c:267
sch_mux_receive_finish
void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
Called by muxer tasks to signal that a stream will no longer accept input.
Definition: ffmpeg_sched.c:2129
SCH_NODE_TYPE_ENC
@ SCH_NODE_TYPE_ENC
Definition: ffmpeg_sched.h:98
SchSyncQueue::sq
SyncQueue * sq
Definition: ffmpeg_sched.c:101
SchTask::thread
pthread_t thread
Definition: ffmpeg_sched.c:70
atomic_fetch_add
#define atomic_fetch_add(object, operand)
Definition: stdatomic.h:137
demux_flush
static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt)
Definition: ffmpeg_sched.c:2028
thread.h
AVERROR_EOF
#define AVERROR_EOF
End of file.
Definition: error.h:57
Scheduler::nb_sq_enc
unsigned nb_sq_enc
Definition: ffmpeg_sched.c:298
SchMux::sub_heartbeat_pkt
AVPacket * sub_heartbeat_pkt
Definition: ffmpeg_sched.c:232
sq_limit_frames
void sq_limit_frames(SyncQueue *sq, unsigned int stream_idx, uint64_t frames)
Limit the number of output frames for stream with index stream_idx to max_frames.
Definition: sync_queue.c:628
pthread_mutex_init
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
SchEnc::send_pkt
AVPacket * send_pkt
Definition: ffmpeg_sched.c:147
SCHEDULE_TOLERANCE
#define SCHEDULE_TOLERANCE
Definition: ffmpeg_sched.c:45
sch_add_demux
int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx)
Add a demuxer to the scheduler.
Definition: ffmpeg_sched.c:678
PreMuxQueue::data_size
size_t data_size
Definition: ffmpeg_sched.c:185
AV_TIME_BASE_Q
#define AV_TIME_BASE_Q
Internal time base represented as fractional value.
Definition: avutil.h:263
int64_t
long long int64_t
Definition: coverity.c:34
CYCLE_NODE_STARTED
@ CYCLE_NODE_STARTED
Definition: ffmpeg_sched.c:1410
SchTask::func
SchThreadFunc func
Definition: ffmpeg_sched.c:67
mux_done
static int mux_done(Scheduler *sch, unsigned mux_idx)
Definition: ffmpeg_sched.c:2173
Scheduler::nb_enc
unsigned nb_enc
Definition: ffmpeg_sched.c:295
av_frame_free
void av_frame_free(AVFrame **frame)
Free the frame and any dynamically allocated objects in it, e.g.
Definition: frame.c:64
SQFRAME
#define SQFRAME(frame)
Definition: sync_queue.h:38
av_fifo_peek
int av_fifo_peek(const AVFifo *f, void *buf, size_t nb_elems, size_t offset)
Read data from a FIFO without modifying FIFO state.
Definition: fifo.c:255
check_acyclic_for_output
static int check_acyclic_for_output(const Scheduler *sch, SchedulerNode src, uint8_t *filters_visited, SchedulerNode *filters_stack)
Definition: ffmpeg_sched.c:1436
AVFrame
This structure describes decoded (raw) audio or video data.
Definition: frame.h:427
w
uint8_t w
Definition: llviddspenc.c:38
task_cleanup
static int task_cleanup(Scheduler *sch, SchedulerNode node)
Definition: ffmpeg_sched.c:2584
sync_queue.h
AVPacket::data
uint8_t * data
Definition: packet.h:558
SchMux::nb_streams_ready
unsigned nb_streams_ready
Definition: ffmpeg_sched.c:217
pthread_mutex_lock
static av_always_inline int pthread_mutex_lock(pthread_mutex_t *mutex)
Definition: os2threads.h:119
SchDemux::nb_streams
unsigned nb_streams
Definition: ffmpeg_sched.c:160
send_to_mux
static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx, AVPacket *pkt)
Definition: ffmpeg_sched.c:1902
Scheduler::nb_mux_ready
unsigned nb_mux_ready
Definition: ffmpeg_sched.c:282
atomic_int
intptr_t atomic_int
Definition: stdatomic.h:55
tq_alloc
ThreadQueue * tq_alloc(unsigned int nb_streams, size_t queue_size, enum ThreadQueueType type)
Allocate a queue for sending data between threads.
Definition: thread_queue.c:72
enc_done
static int enc_done(Scheduler *sch, unsigned enc_idx)
Definition: ffmpeg_sched.c:2422
SCH_NODE_TYPE_MUX
@ SCH_NODE_TYPE_MUX
Definition: ffmpeg_sched.h:96
AV_LOG_VERBOSE
#define AV_LOG_VERBOSE
Detailed information.
Definition: log.h:226
AVPacket::duration
int64_t duration
Duration of this packet in AVStream->time_base units, 0 if unknown.
Definition: packet.h:576
SchWaiter::choked_prev
int choked_prev
Definition: ffmpeg_sched.c:59
QUEUE_PACKETS
@ QUEUE_PACKETS
Definition: ffmpeg_sched.c:48
SchMux
Definition: ffmpeg_sched.c:212
Scheduler::class
const AVClass * class
Definition: ffmpeg_sched.c:274
SchFilterOut
Definition: ffmpeg_sched.c:241
SCH_STATE_UNINIT
@ SCH_STATE_UNINIT
Definition: ffmpeg_sched.c:268
Timestamp::ts
int64_t ts
Definition: ffmpeg_utils.h:31
PreMuxQueue::fifo
AVFifo * fifo
Queue for buffering the packets before the muxer task can be started.
Definition: ffmpeg_sched.c:176
SchMuxStream::last_dts
int64_t last_dts
Definition: ffmpeg_sched.c:206
av_packet_free
void av_packet_free(AVPacket **pkt)
Free the packet, if the packet is reference counted, it will be unreferenced first.
Definition: packet.c:75
DEFAULT_FRAME_THREAD_QUEUE_SIZE
#define DEFAULT_FRAME_THREAD_QUEUE_SIZE
Default size of a frame thread queue.
Definition: ffmpeg_sched.h:260
Scheduler::last_dts
atomic_int_least64_t last_dts
Definition: ffmpeg_sched.c:311
SchDemux::send_pkt
AVPacket * send_pkt
Definition: ffmpeg_sched.c:166
sch_mux_stream_ready
int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
Signal to the scheduler that the specified muxed stream is initialized and ready.
Definition: ffmpeg_sched.c:1211
send_to_enc_thread
static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1751
task_stop
static int task_stop(Scheduler *sch, SchTask *task)
Definition: ffmpeg_sched.c:2628
SchFilterIn::receive_finished
int receive_finished
Definition: ffmpeg_sched.c:238
SchedulerNode::type
enum SchedulerNodeType type
Definition: ffmpeg_sched.h:104
fifo.h
finish
static void finish(void)
Definition: movenc.c:374
sch_stop
int sch_stop(Scheduler *sch, int64_t *finish_ts)
Definition: ffmpeg_sched.c:2644
SchEnc::sq_idx
int sq_idx[2]
Definition: ffmpeg_sched.c:119
fail
#define fail()
Definition: checkasm.h:205
av_fifo_write
int av_fifo_write(AVFifo *f, const void *buf, size_t nb_elems)
Write data into a FIFO.
Definition: fifo.c:188
SchThreadFunc
int(* SchThreadFunc)(void *arg)
Definition: ffmpeg_sched.h:109
SchFilterOut::dst
SchedulerNode dst
Definition: ffmpeg_sched.c:242
SchDec::outputs
SchDecOutput * outputs
Definition: ffmpeg_sched.c:85
SchEnc
Definition: ffmpeg_sched.c:109
dummy
int dummy
Definition: motion.c:66
av_fifo_grow2
int av_fifo_grow2(AVFifo *f, size_t inc)
Enlarge an AVFifo.
Definition: fifo.c:99
SchDec::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:90
sch_add_mux_stream
int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
Add a muxed stream for a previously added muxer.
Definition: ffmpeg_sched.c:646
SchMux::class
const AVClass * class
Definition: ffmpeg_sched.c:213
SchFilterGraph::nb_inputs_finished_send
atomic_uint nb_inputs_finished_send
Definition: ffmpeg_sched.c:250
SCH_STATE_STOPPED
@ SCH_STATE_STOPPED
Definition: ffmpeg_sched.c:270
sq_receive
int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
Read a frame from the queue.
Definition: sync_queue.c:586
AVERROR_BUFFER_TOO_SMALL
#define AVERROR_BUFFER_TOO_SMALL
Buffer too small.
Definition: error.h:53
type
it s the only field you need to keep assuming you have a context There is some magic you don t need to care about around this just let it vf type
Definition: writing_filters.txt:86
Scheduler::nb_dec
unsigned nb_dec
Definition: ffmpeg_sched.c:292
av_thread_message_queue_recv
int av_thread_message_queue_recv(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Receive a message from the queue.
Definition: threadmessage.c:177
sch_add_filtergraph
int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, SchThreadFunc func, void *ctx)
Add a filtergraph to the scheduler.
Definition: ffmpeg_sched.c:819
av_frame_alloc
AVFrame * av_frame_alloc(void)
Allocate an AVFrame and set its fields to default values.
Definition: frame.c:52
SchFilterGraph
Definition: ffmpeg_sched.c:245
avassert.h
pkt
AVPacket * pkt
Definition: movenc.c:60
AV_LOG_ERROR
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:210
sch_free
void sch_free(Scheduler **psch)
Definition: ffmpeg_sched.c:457
Scheduler::state
enum SchedulerState state
Definition: ffmpeg_sched.c:306
SchMux::streams
SchMuxStream * streams
Definition: ffmpeg_sched.c:215
av_thread_message_queue_send
int av_thread_message_queue_send(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Send a message on the queue.
Definition: threadmessage.c:161
av_fifo_read
int av_fifo_read(AVFifo *f, void *buf, size_t nb_elems)
Read data from a FIFO.
Definition: fifo.c:240
SchMuxStream
Definition: ffmpeg_sched.c:190
SchDecOutput
Definition: ffmpeg_sched.c:74
sch_add_mux
int sch_add_mux(Scheduler *sch, SchThreadFunc func, int(*init)(void *), void *arg, int sdp_auto, unsigned thread_queue_size)
Add a muxer to the scheduler.
Definition: ffmpeg_sched.c:622
waiter_set
static void waiter_set(SchWaiter *w, int choked)
Definition: ffmpeg_sched.c:339
SchFilterGraph::nb_outputs
unsigned nb_outputs
Definition: ffmpeg_sched.c:254
SchDec::expect_end_ts
int expect_end_ts
Definition: ffmpeg_sched.c:94
pthread_mutex_unlock
static av_always_inline int pthread_mutex_unlock(pthread_mutex_t *mutex)
Definition: os2threads.h:126
enc_open
static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
Definition: ffmpeg_sched.c:1725
sch_alloc
Scheduler * sch_alloc(void)
Definition: ffmpeg_sched.c:575
dec_send_to_dst
static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVFrame *frame)
Definition: ffmpeg_sched.c:2246
task_init
static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx, SchThreadFunc func, void *func_arg)
Definition: ffmpeg_sched.c:423
SchMuxStream::nb_sub_heartbeat_dst
unsigned nb_sub_heartbeat_dst
Definition: ffmpeg_sched.c:194
SchEnc::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:113
sch_add_demux_stream
int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
Add a demuxed stream for a previously added demuxer.
Definition: ffmpeg_sched.c:705
SchMuxStream::src
SchedulerNode src
Definition: ffmpeg_sched.c:191
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:41
SchedulerNodeType
SchedulerNodeType
Definition: ffmpeg_sched.h:93
sch_dec_send
int sch_dec_send(Scheduler *sch, unsigned dec_idx, unsigned out_idx, AVFrame *frame)
Called by decoder tasks to send a decoded frame downstream.
Definition: ffmpeg_sched.c:2276
ctx
AVFormatContext * ctx
Definition: movenc.c:49
nb_streams
static int nb_streams
Definition: ffprobe.c:340
SchMuxStream::source_finished
int source_finished
Definition: ffmpeg_sched.c:208
av_rescale_q
int64_t av_rescale_q(int64_t a, AVRational bq, AVRational cq)
Rescale a 64-bit integer by 2 rational numbers.
Definition: mathematics.c:142
ffmpeg_utils.h
filter_done
static int filter_done(Scheduler *sch, unsigned fg_idx)
Definition: ffmpeg_sched.c:2533
SchFilterGraph::class
const AVClass * class
Definition: ffmpeg_sched.c:246
sch_enc_receive
int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
Called by encoder tasks to obtain frames for encoding.
Definition: ffmpeg_sched.c:2345
AVThreadMessageQueue
Definition: threadmessage.c:30
atomic_load
#define atomic_load(object)
Definition: stdatomic.h:93
sch_sdp_filename
int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
Set the file path for the SDP.
Definition: ffmpeg_sched.c:609
SchEnc::class
const AVClass * class
Definition: ffmpeg_sched.c:110
demux_send_for_stream
static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds, AVPacket *pkt, unsigned flags)
Definition: ffmpeg_sched.c:1996
arg
const char * arg
Definition: jacosubdec.c:67
pthread_create
static av_always_inline int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
Definition: os2threads.h:80
SchMuxStream::pre_mux_queue
PreMuxQueue pre_mux_queue
Definition: ffmpeg_sched.c:196
sq_add_stream
int sq_add_stream(SyncQueue *sq, int limiting)
Add a new stream to the sync queue.
Definition: sync_queue.c:598
SchMux::mux_started
atomic_int mux_started
Set to 1 after starting the muxer task and flushing the pre-muxing queues.
Definition: ffmpeg_sched.c:228
Scheduler::finish_cond
pthread_cond_t finish_cond
Definition: ffmpeg_sched.c:288
SCH_NODE_TYPE_DEMUX
@ SCH_NODE_TYPE_DEMUX
Definition: ffmpeg_sched.h:95
Scheduler::demux
SchDemux * demux
Definition: ffmpeg_sched.c:276
AVPacket::buf
AVBufferRef * buf
A reference to the reference-counted buffer where the packet data is stored.
Definition: packet.h:541
tq_free
void tq_free(ThreadQueue **ptq)
Definition: thread_queue.c:54
LIBAVUTIL_VERSION_INT
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
waiter_uninit
static void waiter_uninit(SchWaiter *w)
Definition: ffmpeg_sched.c:366
AVClass
Describe the class of an AVClass context structure.
Definition: log.h:76
Scheduler::sdp_filename
char * sdp_filename
Definition: ffmpeg_sched.c:303
send_to_enc_sq
static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1770
NULL
#define NULL
Definition: coverity.c:32
SchEnc::open_cb
int(* open_cb)(void *opaque, const AVFrame *frame)
Definition: ffmpeg_sched.c:137
av_frame_copy_props
int av_frame_copy_props(AVFrame *dst, const AVFrame *src)
Copy only "metadata" fields from src to dst.
Definition: frame.c:599
SchFilterGraph::nb_inputs
unsigned nb_inputs
Definition: ffmpeg_sched.c:249
Scheduler::mux
SchMux * mux
Definition: ffmpeg_sched.c:279
schedule_update_locked
static void schedule_update_locked(Scheduler *sch)
Definition: ffmpeg_sched.c:1327
tq_receive_finish
void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the receiving side.
Definition: thread_queue.c:243
sch_add_enc
int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx, int(*open_cb)(void *opaque, const AVFrame *frame))
Definition: ffmpeg_sched.c:781
SchSyncQueue::enc_idx
unsigned * enc_idx
Definition: ffmpeg_sched.c:105
av_unreachable
#define av_unreachable(msg)
Asserts that are used as compiler optimization hints depending upon ASSERT_LEVEL and NBDEBUG.
Definition: avassert.h:108
SCH_STATE_STARTED
@ SCH_STATE_STARTED
Definition: ffmpeg_sched.c:269
choke_demux
static void choke_demux(const Scheduler *sch, int demux_id, int choked)
Definition: ffmpeg_sched.c:1297
dec_done
static int dec_done(Scheduler *sch, unsigned dec_idx)
Definition: ffmpeg_sched.c:2320
SchFilterGraph::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:259
SchDemux::class
const AVClass * class
Definition: ffmpeg_sched.c:157
av_fifo_can_read
size_t av_fifo_can_read(const AVFifo *f)
Definition: fifo.c:87
SchEnc::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:114
sch_add_dec
int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
Add a decoder to the scheduler.
Definition: ffmpeg_sched.c:738
SchWaiter::choked
atomic_int choked
Definition: ffmpeg_sched.c:55
SchWaiter::cond
pthread_cond_t cond
Definition: ffmpeg_sched.c:54
CYCLE_NODE_NEW
@ CYCLE_NODE_NEW
Definition: ffmpeg_sched.c:1409
time.h
DEMUX_SEND_STREAMCOPY_EOF
@ DEMUX_SEND_STREAMCOPY_EOF
Treat the packet as an EOF for SCH_NODE_TYPE_MUX destinations send normally to other types.
Definition: ffmpeg_sched.h:338
sch_fg_class
static const AVClass sch_fg_class
Definition: ffmpeg_sched.c:813
QUEUE_FRAMES
@ QUEUE_FRAMES
Definition: ffmpeg_sched.c:49
av_packet_ref
int av_packet_ref(AVPacket *dst, const AVPacket *src)
Setup a new reference to the data described by a given packet.
Definition: packet.c:441
av_packet_move_ref
void av_packet_move_ref(AVPacket *dst, AVPacket *src)
Move every field in src to dst and reset src.
Definition: packet.c:490
SchTask::thread_running
int thread_running
Definition: ffmpeg_sched.c:71
sch_enc_send
int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
Called by encoder tasks to send encoded packets downstream.
Definition: ffmpeg_sched.c:2389
error.h
Scheduler
Definition: ffmpeg_sched.c:273
SchMux::nb_streams
unsigned nb_streams
Definition: ffmpeg_sched.c:216
SchSyncQueue::lock
pthread_mutex_t lock
Definition: ffmpeg_sched.c:103
SchMuxStream::sub_heartbeat_dst
unsigned * sub_heartbeat_dst
Definition: ffmpeg_sched.c:193
SchDec::class
const AVClass * class
Definition: ffmpeg_sched.c:81
sq_frame_samples
void sq_frame_samples(SyncQueue *sq, unsigned int stream_idx, int frame_samples)
Set a constant output audio frame size, in samples.
Definition: sync_queue.c:640
SchEnc::in_finished
int in_finished
Definition: ffmpeg_sched.c:144
SchDemux::task
SchTask task
Definition: ffmpeg_sched.c:162
SchDemuxStream::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:153
SchFilterGraph::nb_inputs_finished_receive
unsigned nb_inputs_finished_receive
Definition: ffmpeg_sched.c:251
tq_send
int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
Send an item for the given stream to the queue.
Definition: thread_queue.c:117
init
int(* init)(AVBSFContext *ctx)
Definition: dts2pts.c:368
AVPacket::size
int size
Definition: packet.h:559
AVFifo
Definition: fifo.c:35
SchSyncQueue::nb_enc_idx
unsigned nb_enc_idx
Definition: ffmpeg_sched.c:106
SchFilterGraph::task_exited
int task_exited
Definition: ffmpeg_sched.c:264
av_frame_ref
int av_frame_ref(AVFrame *dst, const AVFrame *src)
Set up a new reference to the data described by the source frame.
Definition: frame.c:278
threadmessage.h
dst
uint8_t ptrdiff_t const uint8_t ptrdiff_t int intptr_t intptr_t int int16_t * dst
Definition: dsp.h:87
PreMuxQueue::max_packets
int max_packets
Maximum number of packets in fifo.
Definition: ffmpeg_sched.c:180
SchFilterGraph::task
SchTask task
Definition: ffmpeg_sched.c:256
av_err2str
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
Definition: error.h:122
sch_filter_choke_inputs
void sch_filter_choke_inputs(Scheduler *sch, unsigned fg_idx)
Called by filtergraph tasks to choke all filter inputs, preventing them from receiving more frames un...
Definition: ffmpeg_sched.c:2572
SchWaiter::lock
pthread_mutex_t lock
Definition: ffmpeg_sched.c:53
sq_send
int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame)
Submit a frame for the stream with index stream_idx.
Definition: sync_queue.c:333
PreMuxQueue::data_threshold
size_t data_threshold
Definition: ffmpeg_sched.c:187
sq_free
void sq_free(SyncQueue **psq)
Definition: sync_queue.c:671
AV_NOPTS_VALUE
#define AV_NOPTS_VALUE
Undefined timestamp value.
Definition: avutil.h:247
sch_dec_class
static const AVClass sch_dec_class
Definition: ffmpeg_sched.c:732
SchFilterGraph::inputs
SchFilterIn * inputs
Definition: ffmpeg_sched.c:248
Scheduler::schedule_lock
pthread_mutex_t schedule_lock
Definition: ffmpeg_sched.c:309
frame.h
SchTask::func_arg
void * func_arg
Definition: ffmpeg_sched.c:68
SCH_NODE_TYPE_FILTER_OUT
@ SCH_NODE_TYPE_FILTER_OUT
Definition: ffmpeg_sched.h:100
AVPacket::dts
int64_t dts
Decompression timestamp in AVStream->time_base units; the time at which the packet is decompressed.
Definition: packet.h:557
enc_send_to_dst
static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVPacket *pkt)
Definition: ffmpeg_sched.c:2359
CYCLE_NODE_DONE
@ CYCLE_NODE_DONE
Definition: ffmpeg_sched.c:1411
sch_mux_class
static const AVClass sch_mux_class
Definition: ffmpeg_sched.c:616
SchFilterIn
Definition: ffmpeg_sched.c:235
sch_filter_receive
int sch_filter_receive(Scheduler *sch, unsigned fg_idx, unsigned *in_idx, AVFrame *frame)
Called by filtergraph tasks to obtain frames for filtering.
Definition: ffmpeg_sched.c:2438
Scheduler::nb_mux
unsigned nb_mux
Definition: ffmpeg_sched.c:280
av_packet_alloc
AVPacket * av_packet_alloc(void)
Allocate an AVPacket and set its fields to default values.
Definition: packet.c:64
SchEnc::opened
int opened
Definition: ffmpeg_sched.c:138
scheduler_class
static const AVClass scheduler_class
Definition: ffmpeg_sched.c:570
pthread_t
Definition: os2threads.h:44
pthread_cond_destroy
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:144
Scheduler::nb_demux
unsigned nb_demux
Definition: ffmpeg_sched.c:277
av_thread_message_queue_alloc
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, unsigned nelem, unsigned elsize)
Allocate a new message queue.
Definition: threadmessage.c:45
pthread_mutex_destroy
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
av_packet_copy_props
int av_packet_copy_props(AVPacket *dst, const AVPacket *src)
Copy only "properties" fields from src to dst.
Definition: packet.c:396
SchDemuxStream::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:152
SchDemux::task_exited
int task_exited
Definition: ffmpeg_sched.c:169
SCH_NODE_TYPE_FILTER_IN
@ SCH_NODE_TYPE_FILTER_IN
Definition: ffmpeg_sched.h:99
task_start
static int task_start(SchTask *task)
Definition: ffmpeg_sched.c:404
Scheduler::filters
SchFilterGraph * filters
Definition: ffmpeg_sched.c:300
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:256
AVPacket::pts
int64_t pts
Presentation timestamp in AVStream->time_base units; the time at which the decompressed packet will b...
Definition: packet.h:551
demux_done
static int demux_done(Scheduler *sch, unsigned demux_idx)
Definition: ffmpeg_sched.c:2094
packet.h
SchWaiter::choked_next
int choked_next
Definition: ffmpeg_sched.c:60
SchFilterGraph::best_input
unsigned best_input
Definition: ffmpeg_sched.c:263
av_malloc_array
#define av_malloc_array(a, b)
Definition: tableprint_vlc.h:32
Scheduler::mux_ready_lock
pthread_mutex_t mux_ready_lock
Definition: ffmpeg_sched.c:283
Scheduler::terminate
atomic_int terminate
Definition: ffmpeg_sched.c:307
SchDec
Definition: ffmpeg_sched.c:80
av_assert1
#define av_assert1(cond)
assert() equivalent, that does not lie in speed critical code.
Definition: avassert.h:57
DEFAULT_PACKET_THREAD_QUEUE_SIZE
#define DEFAULT_PACKET_THREAD_QUEUE_SIZE
Default size of a packet thread queue.
Definition: ffmpeg_sched.h:255
QueueType
QueueType
Definition: ffmpeg_sched.c:47
THREAD_QUEUE_FRAMES
@ THREAD_QUEUE_FRAMES
Definition: thread_queue.h:25
FFMIN
#define FFMIN(a, b)
Definition: macros.h:49
SchDec::nb_outputs
unsigned nb_outputs
Definition: ffmpeg_sched.c:86
av_frame_unref
void av_frame_unref(AVFrame *frame)
Unreference all the buffers referenced by frame and reset the frame fields.
Definition: frame.c:496
trailing_dts
static int64_t trailing_dts(const Scheduler *sch, int count_finished)
Definition: ffmpeg_sched.c:435
av_mallocz
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:256
SchFilterGraph::outputs
SchFilterOut * outputs
Definition: ffmpeg_sched.c:253
sch_enc_class
static const AVClass sch_enc_class
Definition: ffmpeg_sched.c:775
SchedulerNode
Definition: ffmpeg_sched.h:103
SCH_NODE_TYPE_DEC
@ SCH_NODE_TYPE_DEC
Definition: ffmpeg_sched.h:97
pthread_cond_t
Definition: os2threads.h:58
SchTask
Definition: ffmpeg_sched.c:63
mux_init
static int mux_init(Scheduler *sch, SchMux *mux)
Definition: ffmpeg_sched.c:1159
send_to_filter
static int send_to_filter(Scheduler *sch, SchFilterGraph *fg, unsigned in_idx, AVFrame *frame)
Definition: ffmpeg_sched.c:2229
tq_receive
int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
Read the next item from the queue.
Definition: thread_queue.c:197
SchDemuxStream::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:151
av_calloc
void * av_calloc(size_t nmemb, size_t size)
Definition: mem.c:264
sch_connect
int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
Definition: ffmpeg_sched.c:917
send_to_enc
static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1846
sch_filter_command
int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
Definition: ffmpeg_sched.c:2562
SchDemuxStream
Definition: ffmpeg_sched.c:150
Timestamp::tb
AVRational tb
Definition: ffmpeg_utils.h:32
atomic_int_least64_t
intptr_t atomic_int_least64_t
Definition: stdatomic.h:68
ret
ret
Definition: filter_design.txt:187
sch_dec_receive
int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
Called by decoder tasks to receive a packet for decoding.
Definition: ffmpeg_sched.c:2200
AVClass::class_name
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:81
frame
these buffered frames must be flushed immediately if a new input produces new the filter must not call request_frame to get more It must just process the frame or queue it The task of requesting more frames is left to the filter s request_frame method or the application If a filter has several the filter must be ready for frames arriving randomly on any input any filter with several inputs will most likely require some kind of queuing mechanism It is perfectly acceptable to have a limited queue and to drop frames when the inputs are too unbalanced request_frame For filters that do not use the this method is called when a frame is wanted on an output For a it should directly call filter_frame on the corresponding output For a if there are queued frames already one of these frames should be pushed If the filter should request a frame on one of its repeatedly until at least one frame has been pushed Return or at least make progress towards producing a frame
Definition: filter_design.txt:265
SchMuxStream::init_eof
int init_eof
Definition: ffmpeg_sched.c:199
mux_queue_packet
static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt)
Definition: ffmpeg_sched.c:1866
SchMux::init
int(* init)(void *arg)
Definition: ffmpeg_sched.c:219
sch_demux_class
static const AVClass sch_demux_class
Definition: ffmpeg_sched.c:672
ThreadQueue
Definition: thread_queue.c:40
av_fifo_alloc2
AVFifo * av_fifo_alloc2(size_t nb_elems, size_t elem_size, unsigned int flags)
Allocate and initialize an AVFifo with a given element size.
Definition: fifo.c:47
pthread_cond_signal
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:152
task_wrapper
static void * task_wrapper(void *arg)
Definition: ffmpeg_sched.c:2596
SchMux::task
SchTask task
Definition: ffmpeg_sched.c:221
SyncQueue
A sync queue provides timestamp synchronization between multiple streams.
Definition: sync_queue.c:90
sch_demux_send
int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt, unsigned flags)
Called by demuxer tasks to communicate with their downstreams.
Definition: ffmpeg_sched.c:2072
SchDemux
Definition: ffmpeg_sched.c:156
Scheduler::dec
SchDec * dec
Definition: ffmpeg_sched.c:291
atomic_uint
intptr_t atomic_uint
Definition: stdatomic.h:56
SchDec::queue_end_ts
AVThreadMessageQueue * queue_end_ts
Definition: ffmpeg_sched.c:93
demux_stream_send_to_dst
static int demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
Definition: ffmpeg_sched.c:1961
SchDec::src
SchedulerNode src
Definition: ffmpeg_sched.c:83
thread_queue.h
AVPacket::stream_index
int stream_index
Definition: packet.h:560
GROW_ARRAY
#define GROW_ARRAY(array, nb_elems)
Definition: cmdutils.h:532
pthread_cond_wait
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:192
SchMux::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:229
SchDemux::waiter
SchWaiter waiter
Definition: ffmpeg_sched.c:163
av_gettime
int64_t av_gettime(void)
Get the current time in microseconds.
Definition: time.c:39
waiter_wait
static int waiter_wait(Scheduler *sch, SchWaiter *w)
Wait until this task is allowed to proceed.
Definition: ffmpeg_sched.c:320
av_strdup
char * av_strdup(const char *s)
Duplicate a string.
Definition: mem.c:272
SchSyncQueue::frame
AVFrame * frame
Definition: ffmpeg_sched.c:102
Scheduler::task_failed
unsigned task_failed
Definition: ffmpeg_sched.c:286
SchTask::node
SchedulerNode node
Definition: ffmpeg_sched.c:65
sch_sq_add_enc
int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx, int limiting, uint64_t max_frames)
Definition: ffmpeg_sched.c:886
print_sdp
int print_sdp(const char *filename)
Definition: ffmpeg_mux.c:507
mem.h
start_prepare
static int start_prepare(Scheduler *sch)
Definition: ffmpeg_sched.c:1518
sch_mux_sub_heartbeat_add
int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, unsigned dec_idx)
Definition: ffmpeg_sched.c:1236
SchedulerNode::idx
unsigned idx
Definition: ffmpeg_sched.h:105
sch_filter_receive_finish
void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
Called by filter tasks to signal that a filter input will no longer accept input.
Definition: ffmpeg_sched.c:2482
ffmpeg_sched.h
sch_add_dec_output
int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
Add another output to decoder (e.g.
Definition: ffmpeg_sched.c:717
SchEnc::src
SchedulerNode src
Definition: ffmpeg_sched.c:112
sch_wait
int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
Wait until transcoding terminates or the specified timeout elapses.
Definition: ffmpeg_sched.c:1700
AVPacket
This structure stores compressed data.
Definition: packet.h:535
av_thread_message_queue_free
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
Free a message queue.
Definition: threadmessage.c:96
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:35
src_filtergraph
static SchedulerNode src_filtergraph(const Scheduler *sch, SchedulerNode src)
Definition: ffmpeg_sched.c:1415
cmdutils.h
SchSyncQueue
Definition: ffmpeg_sched.c:100
SchMux::queue_size
unsigned queue_size
Definition: ffmpeg_sched.c:230
SchTask::parent
Scheduler * parent
Definition: ffmpeg_sched.c:64
SchDec::send_frame
AVFrame * send_frame
Definition: ffmpeg_sched.c:97
queue_alloc
static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size, enum QueueType type)
Definition: ffmpeg_sched.c:372
sch_start
int sch_start(Scheduler *sch)
Definition: ffmpeg_sched.c:1634
av_thread_message_queue_set_err_recv
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err)
Set the receiving error code.
Definition: threadmessage.c:204
pthread_cond_timedwait
static av_always_inline int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime)
Definition: os2threads.h:170
av_log
#define av_log(a,...)
Definition: tableprint_vlc.h:27
sch_mux_sub_heartbeat
int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, const AVPacket *pkt)
Definition: ffmpeg_sched.c:2147
av_fifo_freep2
void av_fifo_freep2(AVFifo **f)
Free an AVFifo and reset pointer to NULL.
Definition: fifo.c:286
SchDecOutput::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:75
SchEnc::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:142
pthread_cond_init
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:133
AVERROR_EXIT
#define AVERROR_EXIT
Immediate exit was requested; the called function should not be restarted.
Definition: error.h:58
SYNC_QUEUE_FRAMES
@ SYNC_QUEUE_FRAMES
Definition: sync_queue.h:30
sq_alloc
SyncQueue * sq_alloc(enum SyncQueueType type, int64_t buf_size_us, void *logctx)
Allocate a sync queue of the given type.
Definition: sync_queue.c:654
atomic_init
#define atomic_init(obj, value)
Definition: stdatomic.h:33
SchEnc::task
SchTask task
Definition: ffmpeg_sched.c:140
Timestamp
Definition: ffmpeg_utils.h:30
SchFilterIn::src
SchedulerNode src
Definition: ffmpeg_sched.c:236
sch_mux_stream_buffering
void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, size_t data_threshold, int max_packets)
Configure limits on packet buffering performed before the muxer task is started.
Definition: ffmpeg_sched.c:1195
check_acyclic
static int check_acyclic(Scheduler *sch)
Definition: ffmpeg_sched.c:1482
SchDemux::streams
SchDemuxStream * streams
Definition: ffmpeg_sched.c:159
PreMuxQueue
Definition: ffmpeg_sched.c:172
Scheduler::sdp_auto
int sdp_auto
Definition: ffmpeg_sched.c:304
src
#define src
Definition: vp8dsp.c:248
SchFilterIn::send_finished
int send_finished
Definition: ffmpeg_sched.c:237
SchFilterGraph::waiter
SchWaiter waiter
Definition: ffmpeg_sched.c:260
AVPacket::time_base
AVRational time_base
Time base of the packet's timestamps.
Definition: packet.h:602
unchoke_for_stream
static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
Definition: ffmpeg_sched.c:1265
AVPacket::side_data_elems
int side_data_elems
Definition: packet.h:570
tq_choke
void tq_choke(ThreadQueue *tq, int choked)
Prevent further reads from the thread queue until it is unchoked.
Definition: thread_queue.c:258
sch_mux_receive
int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
Called by muxer tasks to obtain packets for muxing.
Definition: ffmpeg_sched.c:2116
sch_add_sq_enc
int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
Add an pre-encoding sync queue to the scheduler.
Definition: ffmpeg_sched.c:861
SchDecOutput::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:77
SchEnc::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:115
tq_send_finish
void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the sending side.
Definition: thread_queue.c:227
Scheduler::nb_filters
unsigned nb_filters
Definition: ffmpeg_sched.c:301