FFmpeg
ffmpeg_sched.c
Go to the documentation of this file.
1 /*
2  * Inter-thread scheduling/synchronization.
3  * Copyright (c) 2023 Anton Khirnov
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <stdatomic.h>
23 #include <stddef.h>
24 #include <stdint.h>
25 
26 #include "cmdutils.h"
27 #include "ffmpeg_sched.h"
28 #include "ffmpeg_utils.h"
29 #include "sync_queue.h"
30 #include "thread_queue.h"
31 
32 #include "libavcodec/packet.h"
33 
34 #include "libavutil/avassert.h"
35 #include "libavutil/error.h"
36 #include "libavutil/fifo.h"
37 #include "libavutil/frame.h"
38 #include "libavutil/mem.h"
39 #include "libavutil/thread.h"
41 #include "libavutil/time.h"
42 
43 // 100 ms
44 // FIXME: some other value? make this dynamic?
45 #define SCHEDULE_TOLERANCE (100 * 1000)
46 
47 enum QueueType {
50 };
51 
52 typedef struct SchWaiter {
56 
57  // the following are internal state of schedule_update_locked() and must not
58  // be accessed outside of it
61 } SchWaiter;
62 
63 typedef struct SchTask {
66 
68  void *func_arg;
69 
72 } SchTask;
73 
74 typedef struct SchDecOutput {
76  uint8_t *dst_finished;
77  unsigned nb_dst;
78 } SchDecOutput;
79 
80 typedef struct SchDec {
81  const AVClass *class;
82 
84 
86  unsigned nb_outputs;
87 
89  // Queue for receiving input packets, one stream.
91 
92  // Queue for sending post-flush end timestamps back to the source
95 
96  // temporary storage used by sch_dec_send()
98 } SchDec;
99 
100 typedef struct SchSyncQueue {
104 
105  unsigned *enc_idx;
106  unsigned nb_enc_idx;
107 } SchSyncQueue;
108 
109 typedef struct SchEnc {
110  const AVClass *class;
111 
114  uint8_t *dst_finished;
115  unsigned nb_dst;
116 
117  // [0] - index of the sync queue in Scheduler.sq_enc,
118  // [1] - index of this encoder in the sq
119  int sq_idx[2];
120 
121  /* Opening encoders is somewhat nontrivial due to their interaction with
122  * sync queues, which are (among other things) responsible for maintaining
123  * constant audio frame size, when it is required by the encoder.
124  *
125  * Opening the encoder requires stream parameters, obtained from the first
126  * frame. However, that frame cannot be properly chunked by the sync queue
127  * without knowing the required frame size, which is only available after
128  * opening the encoder.
129  *
130  * This apparent circular dependency is resolved in the following way:
131  * - the caller creating the encoder gives us a callback which opens the
132  * encoder and returns the required frame size (if any)
133  * - when the first frame is sent to the encoder, the sending thread
134  * - calls this callback, opening the encoder
135  * - passes the returned frame size to the sync queue
136  */
137  int (*open_cb)(void *opaque, const AVFrame *frame);
138  int opened;
139 
141  // Queue for receiving input frames, one stream.
143  // tq_send() to queue returned EOF
145 
146  // temporary storage used by sch_enc_send()
148 } SchEnc;
149 
150 typedef struct SchDemuxStream {
152  uint8_t *dst_finished;
153  unsigned nb_dst;
155 
156 typedef struct SchDemux {
157  const AVClass *class;
158 
160  unsigned nb_streams;
161 
164 
165  // temporary storage used by sch_demux_send()
167 
168  // protected by schedule_lock
170 } SchDemux;
171 
172 typedef struct PreMuxQueue {
173  /**
174  * Queue for buffering the packets before the muxer task can be started.
175  */
177  /**
178  * Maximum number of packets in fifo.
179  */
181  /*
182  * The size of the AVPackets' buffers in queue.
183  * Updated when a packet is either pushed or pulled from the queue.
184  */
185  size_t data_size;
186  /* Threshold after which max_packets will be in effect */
188 } PreMuxQueue;
189 
190 typedef struct SchMuxStream {
193 
194  unsigned *sub_heartbeat_dst;
196 
198 
199  // an EOF was generated while flushing the pre-mux queue
200  int init_eof;
201 
202  ////////////////////////////////////////////////////////////
203  // The following are protected by Scheduler.schedule_lock //
204 
205  /* dts+duration of the last packet sent to this stream
206  in AV_TIME_BASE_Q */
208  // this stream no longer accepts input
210  ////////////////////////////////////////////////////////////
211 } SchMuxStream;
212 
213 typedef struct SchMux {
214  const AVClass *class;
215 
217  unsigned nb_streams;
219 
220  int (*init)(void *arg);
221 
223  /**
224  * Set to 1 after starting the muxer task and flushing the
225  * pre-muxing queues.
226  * Set either before any tasks have started, or with
227  * Scheduler.mux_ready_lock held.
228  */
231  unsigned queue_size;
232 
234 } SchMux;
235 
236 typedef struct SchFilterIn {
241 } SchFilterIn;
242 
243 typedef struct SchFilterOut {
245 } SchFilterOut;
246 
247 typedef struct SchFilterGraph {
248  const AVClass *class;
249 
251  unsigned nb_inputs;
254 
256  unsigned nb_outputs;
257 
259  // input queue, nb_inputs+1 streams
260  // last stream is control
263 
264  // protected by schedule_lock
265  unsigned best_input;
268 
273 };
274 
275 struct Scheduler {
276  const AVClass *class;
277 
279  unsigned nb_demux;
280 
282  unsigned nb_mux;
283 
284  unsigned nb_mux_ready;
286 
287  unsigned nb_mux_done;
288  unsigned task_failed;
291 
292 
294  unsigned nb_dec;
295 
297  unsigned nb_enc;
298 
300  unsigned nb_sq_enc;
301 
303  unsigned nb_filters;
304 
306  int sdp_auto;
307 
310 
312 
314 };
315 
316 /**
317  * Wait until this task is allowed to proceed.
318  *
319  * @retval 0 the caller should proceed
320  * @retval 1 the caller should terminate
321  */
322 static int waiter_wait(Scheduler *sch, SchWaiter *w)
323 {
324  int terminate;
325 
326  if (!atomic_load(&w->choked))
327  return 0;
328 
329  pthread_mutex_lock(&w->lock);
330 
331  while (atomic_load(&w->choked) && !atomic_load(&sch->terminate))
332  pthread_cond_wait(&w->cond, &w->lock);
333 
334  terminate = atomic_load(&sch->terminate);
335 
336  pthread_mutex_unlock(&w->lock);
337 
338  return terminate;
339 }
340 
341 static void waiter_set(SchWaiter *w, int choked)
342 {
343  pthread_mutex_lock(&w->lock);
344 
345  atomic_store(&w->choked, choked);
346  pthread_cond_signal(&w->cond);
347 
348  pthread_mutex_unlock(&w->lock);
349 }
350 
351 static int waiter_init(SchWaiter *w)
352 {
353  int ret;
354 
355  atomic_init(&w->choked, 0);
356 
357  ret = pthread_mutex_init(&w->lock, NULL);
358  if (ret)
359  return AVERROR(ret);
360 
361  ret = pthread_cond_init(&w->cond, NULL);
362  if (ret)
363  return AVERROR(ret);
364 
365  return 0;
366 }
367 
368 static void waiter_uninit(SchWaiter *w)
369 {
370  pthread_mutex_destroy(&w->lock);
371  pthread_cond_destroy(&w->cond);
372 }
373 
374 static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size,
375  enum QueueType type)
376 {
377  ThreadQueue *tq;
378 
379  if (queue_size <= 0) {
380  if (type == QUEUE_FRAMES)
381  queue_size = DEFAULT_FRAME_THREAD_QUEUE_SIZE;
382  else
384  }
385 
386  if (type == QUEUE_FRAMES) {
387  // This queue length is used in the decoder code to ensure that
388  // there are enough entries in fixed-size frame pools to account
389  // for frames held in queues inside the ffmpeg utility. If this
390  // can ever dynamically change then the corresponding decode
391  // code needs to be updated as well.
393  }
394 
395  tq = tq_alloc(nb_streams, queue_size,
397  if (!tq)
398  return AVERROR(ENOMEM);
399 
400  *ptq = tq;
401  return 0;
402 }
403 
404 static void *task_wrapper(void *arg);
405 
406 static int task_start(SchTask *task)
407 {
408  int ret;
409 
410  av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n");
411 
412  av_assert0(!task->thread_running);
413 
414  ret = pthread_create(&task->thread, NULL, task_wrapper, task);
415  if (ret) {
416  av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n",
417  strerror(ret));
418  return AVERROR(ret);
419  }
420 
421  task->thread_running = 1;
422  return 0;
423 }
424 
425 static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx,
426  SchThreadFunc func, void *func_arg)
427 {
428  task->parent = sch;
429 
430  task->node.type = type;
431  task->node.idx = idx;
432 
433  task->func = func;
434  task->func_arg = func_arg;
435 }
436 
437 static int64_t trailing_dts(const Scheduler *sch, int count_finished)
438 {
439  int64_t min_dts = INT64_MAX;
440 
441  for (unsigned i = 0; i < sch->nb_mux; i++) {
442  const SchMux *mux = &sch->mux[i];
443 
444  for (unsigned j = 0; j < mux->nb_streams; j++) {
445  const SchMuxStream *ms = &mux->streams[j];
446 
447  if (ms->source_finished && !count_finished)
448  continue;
449  if (ms->last_dts == AV_NOPTS_VALUE)
450  return AV_NOPTS_VALUE;
451 
452  min_dts = FFMIN(min_dts, ms->last_dts);
453  }
454  }
455 
456  return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts;
457 }
458 
459 void sch_free(Scheduler **psch)
460 {
461  Scheduler *sch = *psch;
462 
463  if (!sch)
464  return;
465 
466  sch_stop(sch, NULL);
467 
468  for (unsigned i = 0; i < sch->nb_demux; i++) {
469  SchDemux *d = &sch->demux[i];
470 
471  for (unsigned j = 0; j < d->nb_streams; j++) {
472  SchDemuxStream *ds = &d->streams[j];
473  av_freep(&ds->dst);
474  av_freep(&ds->dst_finished);
475  }
476  av_freep(&d->streams);
477 
479 
480  waiter_uninit(&d->waiter);
481  }
482  av_freep(&sch->demux);
483 
484  for (unsigned i = 0; i < sch->nb_mux; i++) {
485  SchMux *mux = &sch->mux[i];
486 
487  for (unsigned j = 0; j < mux->nb_streams; j++) {
488  SchMuxStream *ms = &mux->streams[j];
489 
490  if (ms->pre_mux_queue.fifo) {
491  AVPacket *pkt;
492  while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0)
495  }
496 
498  }
499  av_freep(&mux->streams);
500 
502 
503  tq_free(&mux->queue);
504  }
505  av_freep(&sch->mux);
506 
507  for (unsigned i = 0; i < sch->nb_dec; i++) {
508  SchDec *dec = &sch->dec[i];
509 
510  tq_free(&dec->queue);
511 
513 
514  for (unsigned j = 0; j < dec->nb_outputs; j++) {
515  SchDecOutput *o = &dec->outputs[j];
516 
517  av_freep(&o->dst);
518  av_freep(&o->dst_finished);
519  }
520 
521  av_freep(&dec->outputs);
522 
523  av_frame_free(&dec->send_frame);
524  }
525  av_freep(&sch->dec);
526 
527  for (unsigned i = 0; i < sch->nb_enc; i++) {
528  SchEnc *enc = &sch->enc[i];
529 
530  tq_free(&enc->queue);
531 
532  av_packet_free(&enc->send_pkt);
533 
534  av_freep(&enc->dst);
535  av_freep(&enc->dst_finished);
536  }
537  av_freep(&sch->enc);
538 
539  for (unsigned i = 0; i < sch->nb_sq_enc; i++) {
540  SchSyncQueue *sq = &sch->sq_enc[i];
541  sq_free(&sq->sq);
542  av_frame_free(&sq->frame);
544  av_freep(&sq->enc_idx);
545  }
546  av_freep(&sch->sq_enc);
547 
548  for (unsigned i = 0; i < sch->nb_filters; i++) {
549  SchFilterGraph *fg = &sch->filters[i];
550 
551  tq_free(&fg->queue);
552 
553  av_freep(&fg->inputs);
554  av_freep(&fg->outputs);
555 
556  waiter_uninit(&fg->waiter);
557  }
558  av_freep(&sch->filters);
559 
560  av_freep(&sch->sdp_filename);
561 
563 
565 
568 
569  av_freep(psch);
570 }
571 
572 static const AVClass scheduler_class = {
573  .class_name = "Scheduler",
574  .version = LIBAVUTIL_VERSION_INT,
575 };
576 
578 {
579  Scheduler *sch;
580  int ret;
581 
582  sch = av_mallocz(sizeof(*sch));
583  if (!sch)
584  return NULL;
585 
586  sch->class = &scheduler_class;
587  sch->sdp_auto = 1;
588 
590  if (ret)
591  goto fail;
592 
594  if (ret)
595  goto fail;
596 
598  if (ret)
599  goto fail;
600 
602  if (ret)
603  goto fail;
604 
605  return sch;
606 fail:
607  sch_free(&sch);
608  return NULL;
609 }
610 
611 int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
612 {
613  av_freep(&sch->sdp_filename);
614  sch->sdp_filename = av_strdup(sdp_filename);
615  return sch->sdp_filename ? 0 : AVERROR(ENOMEM);
616 }
617 
618 static const AVClass sch_mux_class = {
619  .class_name = "SchMux",
620  .version = LIBAVUTIL_VERSION_INT,
621  .parent_log_context_offset = offsetof(SchMux, task.func_arg),
622 };
623 
624 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
625  void *arg, int sdp_auto, unsigned thread_queue_size)
626 {
627  const unsigned idx = sch->nb_mux;
628 
629  SchMux *mux;
630  int ret;
631 
632  ret = GROW_ARRAY(sch->mux, sch->nb_mux);
633  if (ret < 0)
634  return ret;
635 
636  mux = &sch->mux[idx];
637  mux->class = &sch_mux_class;
638  mux->init = init;
639  mux->queue_size = thread_queue_size;
640 
641  task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
642 
643  sch->sdp_auto &= sdp_auto;
644 
645  return idx;
646 }
647 
648 int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
649 {
650  SchMux *mux;
651  SchMuxStream *ms;
652  unsigned stream_idx;
653  int ret;
654 
655  av_assert0(mux_idx < sch->nb_mux);
656  mux = &sch->mux[mux_idx];
657 
658  ret = GROW_ARRAY(mux->streams, mux->nb_streams);
659  if (ret < 0)
660  return ret;
661  stream_idx = mux->nb_streams - 1;
662 
663  ms = &mux->streams[stream_idx];
664 
665  ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
666  if (!ms->pre_mux_queue.fifo)
667  return AVERROR(ENOMEM);
668 
669  ms->last_dts = AV_NOPTS_VALUE;
670 
671  return stream_idx;
672 }
673 
674 static const AVClass sch_demux_class = {
675  .class_name = "SchDemux",
676  .version = LIBAVUTIL_VERSION_INT,
677  .parent_log_context_offset = offsetof(SchDemux, task.func_arg),
678 };
679 
681 {
682  const unsigned idx = sch->nb_demux;
683 
684  SchDemux *d;
685  int ret;
686 
687  ret = GROW_ARRAY(sch->demux, sch->nb_demux);
688  if (ret < 0)
689  return ret;
690 
691  d = &sch->demux[idx];
692 
693  task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx);
694 
695  d->class = &sch_demux_class;
696  d->send_pkt = av_packet_alloc();
697  if (!d->send_pkt)
698  return AVERROR(ENOMEM);
699 
700  ret = waiter_init(&d->waiter);
701  if (ret < 0)
702  return ret;
703 
704  return idx;
705 }
706 
707 int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
708 {
709  SchDemux *d;
710  int ret;
711 
712  av_assert0(demux_idx < sch->nb_demux);
713  d = &sch->demux[demux_idx];
714 
715  ret = GROW_ARRAY(d->streams, d->nb_streams);
716  return ret < 0 ? ret : d->nb_streams - 1;
717 }
718 
719 int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
720 {
721  SchDec *dec;
722  int ret;
723 
724  av_assert0(dec_idx < sch->nb_dec);
725  dec = &sch->dec[dec_idx];
726 
727  ret = GROW_ARRAY(dec->outputs, dec->nb_outputs);
728  if (ret < 0)
729  return ret;
730 
731  return dec->nb_outputs - 1;
732 }
733 
734 static const AVClass sch_dec_class = {
735  .class_name = "SchDec",
736  .version = LIBAVUTIL_VERSION_INT,
737  .parent_log_context_offset = offsetof(SchDec, task.func_arg),
738 };
739 
740 int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
741 {
742  const unsigned idx = sch->nb_dec;
743 
744  SchDec *dec;
745  int ret;
746 
747  ret = GROW_ARRAY(sch->dec, sch->nb_dec);
748  if (ret < 0)
749  return ret;
750 
751  dec = &sch->dec[idx];
752 
753  task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx);
754 
755  dec->class = &sch_dec_class;
756  dec->send_frame = av_frame_alloc();
757  if (!dec->send_frame)
758  return AVERROR(ENOMEM);
759 
760  ret = sch_add_dec_output(sch, idx);
761  if (ret < 0)
762  return ret;
763 
764  ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
765  if (ret < 0)
766  return ret;
767 
768  if (send_end_ts) {
770  if (ret < 0)
771  return ret;
772  }
773 
774  return idx;
775 }
776 
777 static const AVClass sch_enc_class = {
778  .class_name = "SchEnc",
779  .version = LIBAVUTIL_VERSION_INT,
780  .parent_log_context_offset = offsetof(SchEnc, task.func_arg),
781 };
782 
784  int (*open_cb)(void *opaque, const AVFrame *frame))
785 {
786  const unsigned idx = sch->nb_enc;
787 
788  SchEnc *enc;
789  int ret;
790 
791  ret = GROW_ARRAY(sch->enc, sch->nb_enc);
792  if (ret < 0)
793  return ret;
794 
795  enc = &sch->enc[idx];
796 
797  enc->class = &sch_enc_class;
798  enc->open_cb = open_cb;
799  enc->sq_idx[0] = -1;
800  enc->sq_idx[1] = -1;
801 
802  task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
803 
804  enc->send_pkt = av_packet_alloc();
805  if (!enc->send_pkt)
806  return AVERROR(ENOMEM);
807 
808  ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
809  if (ret < 0)
810  return ret;
811 
812  return idx;
813 }
814 
815 static const AVClass sch_fg_class = {
816  .class_name = "SchFilterGraph",
817  .version = LIBAVUTIL_VERSION_INT,
818  .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg),
819 };
820 
821 int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
822  SchThreadFunc func, void *ctx)
823 {
824  const unsigned idx = sch->nb_filters;
825 
826  SchFilterGraph *fg;
827  int ret;
828 
829  ret = GROW_ARRAY(sch->filters, sch->nb_filters);
830  if (ret < 0)
831  return ret;
832  fg = &sch->filters[idx];
833 
834  fg->class = &sch_fg_class;
835 
836  task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx);
837 
838  if (nb_inputs) {
839  fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs));
840  if (!fg->inputs)
841  return AVERROR(ENOMEM);
842  fg->nb_inputs = nb_inputs;
843  }
844 
845  if (nb_outputs) {
846  fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs));
847  if (!fg->outputs)
848  return AVERROR(ENOMEM);
849  fg->nb_outputs = nb_outputs;
850  }
851 
852  ret = waiter_init(&fg->waiter);
853  if (ret < 0)
854  return ret;
855 
856  ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
857  if (ret < 0)
858  return ret;
859 
860  return idx;
861 }
862 
863 int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
864 {
865  SchSyncQueue *sq;
866  int ret;
867 
868  ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc);
869  if (ret < 0)
870  return ret;
871  sq = &sch->sq_enc[sch->nb_sq_enc - 1];
872 
873  sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx);
874  if (!sq->sq)
875  return AVERROR(ENOMEM);
876 
877  sq->frame = av_frame_alloc();
878  if (!sq->frame)
879  return AVERROR(ENOMEM);
880 
881  ret = pthread_mutex_init(&sq->lock, NULL);
882  if (ret)
883  return AVERROR(ret);
884 
885  return sq - sch->sq_enc;
886 }
887 
888 int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
889  int limiting, uint64_t max_frames)
890 {
891  SchSyncQueue *sq;
892  SchEnc *enc;
893  int ret;
894 
895  av_assert0(sq_idx < sch->nb_sq_enc);
896  sq = &sch->sq_enc[sq_idx];
897 
898  av_assert0(enc_idx < sch->nb_enc);
899  enc = &sch->enc[enc_idx];
900 
901  ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx);
902  if (ret < 0)
903  return ret;
904  sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx;
905 
906  ret = sq_add_stream(sq->sq, limiting);
907  if (ret < 0)
908  return ret;
909 
910  enc->sq_idx[0] = sq_idx;
911  enc->sq_idx[1] = ret;
912 
913  if (max_frames != INT64_MAX)
914  sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames);
915 
916  return 0;
917 }
918 
920 {
921  int ret;
922 
923  switch (src.type) {
924  case SCH_NODE_TYPE_DEMUX: {
925  SchDemuxStream *ds;
926 
927  av_assert0(src.idx < sch->nb_demux &&
928  src.idx_stream < sch->demux[src.idx].nb_streams);
929  ds = &sch->demux[src.idx].streams[src.idx_stream];
930 
931  ret = GROW_ARRAY(ds->dst, ds->nb_dst);
932  if (ret < 0)
933  return ret;
934 
935  ds->dst[ds->nb_dst - 1] = dst;
936 
937  // demuxed packets go to decoding or streamcopy
938  switch (dst.type) {
939  case SCH_NODE_TYPE_DEC: {
940  SchDec *dec;
941 
942  av_assert0(dst.idx < sch->nb_dec);
943  dec = &sch->dec[dst.idx];
944 
945  av_assert0(!dec->src.type);
946  dec->src = src;
947  break;
948  }
949  case SCH_NODE_TYPE_MUX: {
950  SchMuxStream *ms;
951 
952  av_assert0(dst.idx < sch->nb_mux &&
953  dst.idx_stream < sch->mux[dst.idx].nb_streams);
954  ms = &sch->mux[dst.idx].streams[dst.idx_stream];
955 
956  av_assert0(!ms->src.type);
957  ms->src = src;
958 
959  break;
960  }
961  default: av_assert0(0);
962  }
963 
964  break;
965  }
966  case SCH_NODE_TYPE_DEC: {
967  SchDec *dec;
968  SchDecOutput *o;
969 
970  av_assert0(src.idx < sch->nb_dec);
971  dec = &sch->dec[src.idx];
972 
973  av_assert0(src.idx_stream < dec->nb_outputs);
974  o = &dec->outputs[src.idx_stream];
975 
976  ret = GROW_ARRAY(o->dst, o->nb_dst);
977  if (ret < 0)
978  return ret;
979 
980  o->dst[o->nb_dst - 1] = dst;
981 
982  // decoded frames go to filters or encoding
983  switch (dst.type) {
985  SchFilterIn *fi;
986 
987  av_assert0(dst.idx < sch->nb_filters &&
988  dst.idx_stream < sch->filters[dst.idx].nb_inputs);
989  fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
990 
991  av_assert0(!fi->src.type);
992  fi->src = src;
993  break;
994  }
995  case SCH_NODE_TYPE_ENC: {
996  SchEnc *enc;
997 
998  av_assert0(dst.idx < sch->nb_enc);
999  enc = &sch->enc[dst.idx];
1000 
1001  av_assert0(!enc->src.type);
1002  enc->src = src;
1003  break;
1004  }
1005  default: av_assert0(0);
1006  }
1007 
1008  break;
1009  }
1010  case SCH_NODE_TYPE_FILTER_OUT: {
1011  SchFilterOut *fo;
1012 
1013  av_assert0(src.idx < sch->nb_filters &&
1014  src.idx_stream < sch->filters[src.idx].nb_outputs);
1015  fo = &sch->filters[src.idx].outputs[src.idx_stream];
1016 
1017  av_assert0(!fo->dst.type);
1018  fo->dst = dst;
1019 
1020  // filtered frames go to encoding or another filtergraph
1021  switch (dst.type) {
1022  case SCH_NODE_TYPE_ENC: {
1023  SchEnc *enc;
1024 
1025  av_assert0(dst.idx < sch->nb_enc);
1026  enc = &sch->enc[dst.idx];
1027 
1028  av_assert0(!enc->src.type);
1029  enc->src = src;
1030  break;
1031  }
1032  case SCH_NODE_TYPE_FILTER_IN: {
1033  SchFilterIn *fi;
1034 
1035  av_assert0(dst.idx < sch->nb_filters &&
1036  dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1037  fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1038 
1039  av_assert0(!fi->src.type);
1040  fi->src = src;
1041  break;
1042  }
1043  default: av_assert0(0);
1044  }
1045 
1046 
1047  break;
1048  }
1049  case SCH_NODE_TYPE_ENC: {
1050  SchEnc *enc;
1051 
1052  av_assert0(src.idx < sch->nb_enc);
1053  enc = &sch->enc[src.idx];
1054 
1055  ret = GROW_ARRAY(enc->dst, enc->nb_dst);
1056  if (ret < 0)
1057  return ret;
1058 
1059  enc->dst[enc->nb_dst - 1] = dst;
1060 
1061  // encoding packets go to muxing or decoding
1062  switch (dst.type) {
1063  case SCH_NODE_TYPE_MUX: {
1064  SchMuxStream *ms;
1065 
1066  av_assert0(dst.idx < sch->nb_mux &&
1067  dst.idx_stream < sch->mux[dst.idx].nb_streams);
1068  ms = &sch->mux[dst.idx].streams[dst.idx_stream];
1069 
1070  av_assert0(!ms->src.type);
1071  ms->src = src;
1072 
1073  break;
1074  }
1075  case SCH_NODE_TYPE_DEC: {
1076  SchDec *dec;
1077 
1078  av_assert0(dst.idx < sch->nb_dec);
1079  dec = &sch->dec[dst.idx];
1080 
1081  av_assert0(!dec->src.type);
1082  dec->src = src;
1083 
1084  break;
1085  }
1086  default: av_assert0(0);
1087  }
1088 
1089  break;
1090  }
1091  default: av_assert0(0);
1092  }
1093 
1094  return 0;
1095 }
1096 
1097 static int mux_task_start(SchMux *mux)
1098 {
1099  int ret = 0;
1100 
1101  ret = task_start(&mux->task);
1102  if (ret < 0)
1103  return ret;
1104 
1105  /* flush the pre-muxing queues */
1106  while (1) {
1107  int min_stream = -1;
1108  Timestamp min_ts = { .ts = AV_NOPTS_VALUE };
1109 
1110  AVPacket *pkt;
1111 
1112  // find the stream with the earliest dts or EOF in pre-muxing queue
1113  for (unsigned i = 0; i < mux->nb_streams; i++) {
1114  SchMuxStream *ms = &mux->streams[i];
1115 
1116  if (av_fifo_peek(ms->pre_mux_queue.fifo, &pkt, 1, 0) < 0)
1117  continue;
1118 
1119  if (!pkt || pkt->dts == AV_NOPTS_VALUE) {
1120  min_stream = i;
1121  break;
1122  }
1123 
1124  if (min_ts.ts == AV_NOPTS_VALUE ||
1125  av_compare_ts(min_ts.ts, min_ts.tb, pkt->dts, pkt->time_base) > 0) {
1126  min_stream = i;
1127  min_ts = (Timestamp){ .ts = pkt->dts, .tb = pkt->time_base };
1128  }
1129  }
1130 
1131  if (min_stream >= 0) {
1132  SchMuxStream *ms = &mux->streams[min_stream];
1133 
1134  ret = av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1);
1135  av_assert0(ret >= 0);
1136 
1137  if (pkt) {
1138  if (!ms->init_eof)
1139  ret = tq_send(mux->queue, min_stream, pkt);
1140  av_packet_free(&pkt);
1141  if (ret == AVERROR_EOF)
1142  ms->init_eof = 1;
1143  else if (ret < 0)
1144  return ret;
1145  } else
1146  tq_send_finish(mux->queue, min_stream);
1147 
1148  continue;
1149  }
1150 
1151  break;
1152  }
1153 
1154  atomic_store(&mux->mux_started, 1);
1155 
1156  return 0;
1157 }
1158 
1159 int print_sdp(const char *filename);
1160 
1161 static int mux_init(Scheduler *sch, SchMux *mux)
1162 {
1163  int ret;
1164 
1165  ret = mux->init(mux->task.func_arg);
1166  if (ret < 0)
1167  return ret;
1168 
1169  sch->nb_mux_ready++;
1170 
1171  if (sch->sdp_filename || sch->sdp_auto) {
1172  if (sch->nb_mux_ready < sch->nb_mux)
1173  return 0;
1174 
1175  ret = print_sdp(sch->sdp_filename);
1176  if (ret < 0) {
1177  av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n");
1178  return ret;
1179  }
1180 
1181  /* SDP is written only after all the muxers are ready, so now we
1182  * start ALL the threads */
1183  for (unsigned i = 0; i < sch->nb_mux; i++) {
1184  ret = mux_task_start(&sch->mux[i]);
1185  if (ret < 0)
1186  return ret;
1187  }
1188  } else {
1189  ret = mux_task_start(mux);
1190  if (ret < 0)
1191  return ret;
1192  }
1193 
1194  return 0;
1195 }
1196 
1197 void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1198  size_t data_threshold, int max_packets)
1199 {
1200  SchMux *mux;
1201  SchMuxStream *ms;
1202 
1203  av_assert0(mux_idx < sch->nb_mux);
1204  mux = &sch->mux[mux_idx];
1205 
1206  av_assert0(stream_idx < mux->nb_streams);
1207  ms = &mux->streams[stream_idx];
1208 
1209  ms->pre_mux_queue.max_packets = max_packets;
1210  ms->pre_mux_queue.data_threshold = data_threshold;
1211 }
1212 
1213 int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
1214 {
1215  SchMux *mux;
1216  int ret = 0;
1217 
1218  av_assert0(mux_idx < sch->nb_mux);
1219  mux = &sch->mux[mux_idx];
1220 
1221  av_assert0(stream_idx < mux->nb_streams);
1222 
1224 
1225  av_assert0(mux->nb_streams_ready < mux->nb_streams);
1226 
1227  // this may be called during initialization - do not start
1228  // threads before sch_start() is called
1229  if (++mux->nb_streams_ready == mux->nb_streams &&
1230  sch->state >= SCH_STATE_STARTED)
1231  ret = mux_init(sch, mux);
1232 
1234 
1235  return ret;
1236 }
1237 
1238 int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1239  unsigned dec_idx)
1240 {
1241  SchMux *mux;
1242  SchMuxStream *ms;
1243  int ret = 0;
1244 
1245  av_assert0(mux_idx < sch->nb_mux);
1246  mux = &sch->mux[mux_idx];
1247 
1248  av_assert0(stream_idx < mux->nb_streams);
1249  ms = &mux->streams[stream_idx];
1250 
1252  if (ret < 0)
1253  return ret;
1254 
1255  av_assert0(dec_idx < sch->nb_dec);
1256  ms->sub_heartbeat_dst[ms->nb_sub_heartbeat_dst - 1] = dec_idx;
1257 
1258  if (!mux->sub_heartbeat_pkt) {
1260  if (!mux->sub_heartbeat_pkt)
1261  return AVERROR(ENOMEM);
1262  }
1263 
1264  return 0;
1265 }
1266 
1268 {
1269  while (1) {
1270  SchFilterGraph *fg;
1271 
1272  // fed directly by a demuxer (i.e. not through a filtergraph)
1273  if (src.type == SCH_NODE_TYPE_DEMUX) {
1274  sch->demux[src.idx].waiter.choked_next = 0;
1275  return;
1276  }
1277 
1279  fg = &sch->filters[src.idx];
1280 
1281  // the filtergraph contains internal sources and
1282  // requested to be scheduled directly
1283  if (fg->best_input == fg->nb_inputs) {
1284  fg->waiter.choked_next = 0;
1285  return;
1286  }
1287 
1288  src = fg->inputs[fg->best_input].src_sched;
1289  }
1290 }
1291 
1293 {
1294  int64_t dts;
1295  int have_unchoked = 0;
1296 
1297  // on termination request all waiters are choked,
1298  // we are not to unchoke them
1299  if (atomic_load(&sch->terminate))
1300  return;
1301 
1302  dts = trailing_dts(sch, 0);
1303 
1304  atomic_store(&sch->last_dts, dts);
1305 
1306  // initialize our internal state
1307  for (unsigned type = 0; type < 2; type++)
1308  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1309  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1310  w->choked_prev = atomic_load(&w->choked);
1311  w->choked_next = 1;
1312  }
1313 
1314  // figure out the sources that are allowed to proceed
1315  for (unsigned i = 0; i < sch->nb_mux; i++) {
1316  SchMux *mux = &sch->mux[i];
1317 
1318  for (unsigned j = 0; j < mux->nb_streams; j++) {
1319  SchMuxStream *ms = &mux->streams[j];
1320 
1321  // unblock sources for output streams that are not finished
1322  // and not too far ahead of the trailing stream
1323  if (ms->source_finished)
1324  continue;
1325  if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
1326  continue;
1327  if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
1328  continue;
1329 
1330  // resolve the source to unchoke
1331  unchoke_for_stream(sch, ms->src_sched);
1332  have_unchoked = 1;
1333  }
1334  }
1335 
1336  // make sure to unchoke at least one source, if still available
1337  for (unsigned type = 0; !have_unchoked && type < 2; type++)
1338  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1339  int exited = type ? sch->filters[i].task_exited : sch->demux[i].task_exited;
1340  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1341  if (!exited) {
1342  w->choked_next = 0;
1343  have_unchoked = 1;
1344  break;
1345  }
1346  }
1347 
1348 
1349  for (unsigned type = 0; type < 2; type++)
1350  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1351  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1352  if (w->choked_prev != w->choked_next)
1353  waiter_set(w, w->choked_next);
1354  }
1355 
1356 }
1357 
1358 enum {
1362 };
1363 
1364 static int
1366  uint8_t *filters_visited, SchedulerNode *filters_stack)
1367 {
1368  unsigned nb_filters_stack = 0;
1369 
1370  memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited));
1371 
1372  while (1) {
1373  const SchFilterGraph *fg = &sch->filters[src.idx];
1374 
1375  filters_visited[src.idx] = CYCLE_NODE_STARTED;
1376 
1377  // descend into every input, depth first
1378  if (src.idx_stream < fg->nb_inputs) {
1379  const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
1380 
1381  // connected to demuxer, no cycles possible
1382  if (fi->src_sched.type == SCH_NODE_TYPE_DEMUX)
1383  continue;
1384 
1385  // otherwise connected to another filtergraph
1387 
1388  // found a cycle
1389  if (filters_visited[fi->src_sched.idx] == CYCLE_NODE_STARTED)
1390  return AVERROR(EINVAL);
1391 
1392  // place current position on stack and descend
1393  av_assert0(nb_filters_stack < sch->nb_filters);
1394  filters_stack[nb_filters_stack++] = src;
1395  src = (SchedulerNode){ .idx = fi->src_sched.idx, .idx_stream = 0 };
1396  continue;
1397  }
1398 
1399  filters_visited[src.idx] = CYCLE_NODE_DONE;
1400 
1401  // previous search finished,
1402  if (nb_filters_stack) {
1403  src = filters_stack[--nb_filters_stack];
1404  continue;
1405  }
1406  return 0;
1407  }
1408 }
1409 
1410 static int check_acyclic(Scheduler *sch)
1411 {
1412  uint8_t *filters_visited = NULL;
1413  SchedulerNode *filters_stack = NULL;
1414 
1415  int ret = 0;
1416 
1417  if (!sch->nb_filters)
1418  return 0;
1419 
1420  filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited));
1421  if (!filters_visited)
1422  return AVERROR(ENOMEM);
1423 
1424  filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack));
1425  if (!filters_stack) {
1426  ret = AVERROR(ENOMEM);
1427  goto fail;
1428  }
1429 
1430  // trace the transcoding graph upstream from every filtegraph
1431  for (unsigned i = 0; i < sch->nb_filters; i++) {
1432  ret = check_acyclic_for_output(sch, (SchedulerNode){ .idx = i },
1433  filters_visited, filters_stack);
1434  if (ret < 0) {
1435  av_log(&sch->filters[i], AV_LOG_ERROR, "Transcoding graph has a cycle\n");
1436  goto fail;
1437  }
1438  }
1439 
1440 fail:
1441  av_freep(&filters_visited);
1442  av_freep(&filters_stack);
1443  return ret;
1444 }
1445 
1446 static int start_prepare(Scheduler *sch)
1447 {
1448  int ret;
1449 
1450  for (unsigned i = 0; i < sch->nb_demux; i++) {
1451  SchDemux *d = &sch->demux[i];
1452 
1453  for (unsigned j = 0; j < d->nb_streams; j++) {
1454  SchDemuxStream *ds = &d->streams[j];
1455 
1456  if (!ds->nb_dst) {
1457  av_log(d, AV_LOG_ERROR,
1458  "Demuxer stream %u not connected to any sink\n", j);
1459  return AVERROR(EINVAL);
1460  }
1461 
1462  ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished));
1463  if (!ds->dst_finished)
1464  return AVERROR(ENOMEM);
1465  }
1466  }
1467 
1468  for (unsigned i = 0; i < sch->nb_dec; i++) {
1469  SchDec *dec = &sch->dec[i];
1470 
1471  if (!dec->src.type) {
1472  av_log(dec, AV_LOG_ERROR,
1473  "Decoder not connected to a source\n");
1474  return AVERROR(EINVAL);
1475  }
1476 
1477  for (unsigned j = 0; j < dec->nb_outputs; j++) {
1478  SchDecOutput *o = &dec->outputs[j];
1479 
1480  if (!o->nb_dst) {
1481  av_log(dec, AV_LOG_ERROR,
1482  "Decoder output %u not connected to any sink\n", j);
1483  return AVERROR(EINVAL);
1484  }
1485 
1486  o->dst_finished = av_calloc(o->nb_dst, sizeof(*o->dst_finished));
1487  if (!o->dst_finished)
1488  return AVERROR(ENOMEM);
1489  }
1490  }
1491 
1492  for (unsigned i = 0; i < sch->nb_enc; i++) {
1493  SchEnc *enc = &sch->enc[i];
1494 
1495  if (!enc->src.type) {
1496  av_log(enc, AV_LOG_ERROR,
1497  "Encoder not connected to a source\n");
1498  return AVERROR(EINVAL);
1499  }
1500  if (!enc->nb_dst) {
1501  av_log(enc, AV_LOG_ERROR,
1502  "Encoder not connected to any sink\n");
1503  return AVERROR(EINVAL);
1504  }
1505 
1506  enc->dst_finished = av_calloc(enc->nb_dst, sizeof(*enc->dst_finished));
1507  if (!enc->dst_finished)
1508  return AVERROR(ENOMEM);
1509  }
1510 
1511  for (unsigned i = 0; i < sch->nb_mux; i++) {
1512  SchMux *mux = &sch->mux[i];
1513 
1514  for (unsigned j = 0; j < mux->nb_streams; j++) {
1515  SchMuxStream *ms = &mux->streams[j];
1516 
1517  switch (ms->src.type) {
1518  case SCH_NODE_TYPE_ENC: {
1519  SchEnc *enc = &sch->enc[ms->src.idx];
1520  if (enc->src.type == SCH_NODE_TYPE_DEC) {
1521  ms->src_sched = sch->dec[enc->src.idx].src;
1523  } else {
1524  ms->src_sched = enc->src;
1526  }
1527  break;
1528  }
1529  case SCH_NODE_TYPE_DEMUX:
1530  ms->src_sched = ms->src;
1531  break;
1532  default:
1533  av_log(mux, AV_LOG_ERROR,
1534  "Muxer stream #%u not connected to a source\n", j);
1535  return AVERROR(EINVAL);
1536  }
1537  }
1538 
1539  ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
1540  QUEUE_PACKETS);
1541  if (ret < 0)
1542  return ret;
1543  }
1544 
1545  for (unsigned i = 0; i < sch->nb_filters; i++) {
1546  SchFilterGraph *fg = &sch->filters[i];
1547 
1548  for (unsigned j = 0; j < fg->nb_inputs; j++) {
1549  SchFilterIn *fi = &fg->inputs[j];
1550  SchDec *dec;
1551 
1552  if (!fi->src.type) {
1553  av_log(fg, AV_LOG_ERROR,
1554  "Filtergraph input %u not connected to a source\n", j);
1555  return AVERROR(EINVAL);
1556  }
1557 
1558  if (fi->src.type == SCH_NODE_TYPE_FILTER_OUT)
1559  fi->src_sched = fi->src;
1560  else {
1562  dec = &sch->dec[fi->src.idx];
1563 
1564  switch (dec->src.type) {
1565  case SCH_NODE_TYPE_DEMUX: fi->src_sched = dec->src; break;
1566  case SCH_NODE_TYPE_ENC: fi->src_sched = sch->enc[dec->src.idx].src; break;
1567  default: av_assert0(0);
1568  }
1569  }
1570  }
1571 
1572  for (unsigned j = 0; j < fg->nb_outputs; j++) {
1573  SchFilterOut *fo = &fg->outputs[j];
1574 
1575  if (!fo->dst.type) {
1576  av_log(fg, AV_LOG_ERROR,
1577  "Filtergraph %u output %u not connected to a sink\n", i, j);
1578  return AVERROR(EINVAL);
1579  }
1580  }
1581  }
1582 
1583  // Check that the transcoding graph has no cycles.
1584  ret = check_acyclic(sch);
1585  if (ret < 0)
1586  return ret;
1587 
1588  return 0;
1589 }
1590 
1592 {
1593  int ret;
1594 
1595  ret = start_prepare(sch);
1596  if (ret < 0)
1597  return ret;
1598 
1600  sch->state = SCH_STATE_STARTED;
1601 
1602  for (unsigned i = 0; i < sch->nb_mux; i++) {
1603  SchMux *mux = &sch->mux[i];
1604 
1605  if (mux->nb_streams_ready == mux->nb_streams) {
1606  ret = mux_init(sch, mux);
1607  if (ret < 0)
1608  goto fail;
1609  }
1610  }
1611 
1612  for (unsigned i = 0; i < sch->nb_enc; i++) {
1613  SchEnc *enc = &sch->enc[i];
1614 
1615  ret = task_start(&enc->task);
1616  if (ret < 0)
1617  goto fail;
1618  }
1619 
1620  for (unsigned i = 0; i < sch->nb_filters; i++) {
1621  SchFilterGraph *fg = &sch->filters[i];
1622 
1623  ret = task_start(&fg->task);
1624  if (ret < 0)
1625  goto fail;
1626  }
1627 
1628  for (unsigned i = 0; i < sch->nb_dec; i++) {
1629  SchDec *dec = &sch->dec[i];
1630 
1631  ret = task_start(&dec->task);
1632  if (ret < 0)
1633  goto fail;
1634  }
1635 
1636  for (unsigned i = 0; i < sch->nb_demux; i++) {
1637  SchDemux *d = &sch->demux[i];
1638 
1639  if (!d->nb_streams)
1640  continue;
1641 
1642  ret = task_start(&d->task);
1643  if (ret < 0)
1644  goto fail;
1645  }
1646 
1650 
1651  return 0;
1652 fail:
1653  sch_stop(sch, NULL);
1654  return ret;
1655 }
1656 
1657 int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
1658 {
1659  int ret;
1660 
1661  // convert delay to absolute timestamp
1662  timeout_us += av_gettime();
1663 
1665 
1666  if (sch->nb_mux_done < sch->nb_mux) {
1667  struct timespec tv = { .tv_sec = timeout_us / 1000000,
1668  .tv_nsec = (timeout_us % 1000000) * 1000 };
1669  pthread_cond_timedwait(&sch->finish_cond, &sch->finish_lock, &tv);
1670  }
1671 
1672  // abort transcoding if any task failed
1673  ret = sch->nb_mux_done == sch->nb_mux || sch->task_failed;
1674 
1676 
1677  *transcode_ts = atomic_load(&sch->last_dts);
1678 
1679  return ret;
1680 }
1681 
1682 static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
1683 {
1684  int ret;
1685 
1686  ret = enc->open_cb(enc->task.func_arg, frame);
1687  if (ret < 0)
1688  return ret;
1689 
1690  // ret>0 signals audio frame size, which means sync queue must
1691  // have been enabled during encoder creation
1692  if (ret > 0) {
1693  SchSyncQueue *sq;
1694 
1695  av_assert0(enc->sq_idx[0] >= 0);
1696  sq = &sch->sq_enc[enc->sq_idx[0]];
1697 
1698  pthread_mutex_lock(&sq->lock);
1699 
1700  sq_frame_samples(sq->sq, enc->sq_idx[1], ret);
1701 
1702  pthread_mutex_unlock(&sq->lock);
1703  }
1704 
1705  return 0;
1706 }
1707 
1709 {
1710  int ret;
1711 
1712  if (!frame) {
1713  tq_send_finish(enc->queue, 0);
1714  return 0;
1715  }
1716 
1717  if (enc->in_finished)
1718  return AVERROR_EOF;
1719 
1720  ret = tq_send(enc->queue, 0, frame);
1721  if (ret < 0)
1722  enc->in_finished = 1;
1723 
1724  return ret;
1725 }
1726 
1727 static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1728 {
1729  SchSyncQueue *sq = &sch->sq_enc[enc->sq_idx[0]];
1730  int ret = 0;
1731 
1732  // inform the scheduling code that no more input will arrive along this path;
1733  // this is necessary because the sync queue may not send an EOF downstream
1734  // until other streams finish
1735  // TODO: consider a cleaner way of passing this information through
1736  // the pipeline
1737  if (!frame) {
1738  for (unsigned i = 0; i < enc->nb_dst; i++) {
1739  SchMux *mux;
1740  SchMuxStream *ms;
1741 
1742  if (enc->dst[i].type != SCH_NODE_TYPE_MUX)
1743  continue;
1744 
1745  mux = &sch->mux[enc->dst[i].idx];
1746  ms = &mux->streams[enc->dst[i].idx_stream];
1747 
1749 
1750  ms->source_finished = 1;
1752 
1754  }
1755  }
1756 
1757  pthread_mutex_lock(&sq->lock);
1758 
1759  ret = sq_send(sq->sq, enc->sq_idx[1], SQFRAME(frame));
1760  if (ret < 0)
1761  goto finish;
1762 
1763  while (1) {
1764  SchEnc *enc;
1765 
1766  // TODO: the SQ API should be extended to allow returning EOF
1767  // for individual streams
1768  ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame));
1769  if (ret < 0) {
1770  ret = (ret == AVERROR(EAGAIN)) ? 0 : ret;
1771  break;
1772  }
1773 
1774  enc = &sch->enc[sq->enc_idx[ret]];
1775  ret = send_to_enc_thread(sch, enc, sq->frame);
1776  if (ret < 0) {
1777  av_frame_unref(sq->frame);
1778  if (ret != AVERROR_EOF)
1779  break;
1780 
1781  sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL));
1782  continue;
1783  }
1784  }
1785 
1786  if (ret < 0) {
1787  // close all encoders fed from this sync queue
1788  for (unsigned i = 0; i < sq->nb_enc_idx; i++) {
1789  int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL);
1790 
1791  // if the sync queue error is EOF and closing the encoder
1792  // produces a more serious error, make sure to pick the latter
1793  ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err);
1794  }
1795  }
1796 
1797 finish:
1798  pthread_mutex_unlock(&sq->lock);
1799 
1800  return ret;
1801 }
1802 
1803 static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1804 {
1805  if (enc->open_cb && frame && !enc->opened) {
1806  int ret = enc_open(sch, enc, frame);
1807  if (ret < 0)
1808  return ret;
1809  enc->opened = 1;
1810 
1811  // discard empty frames that only carry encoder init parameters
1812  if (!frame->buf[0]) {
1814  return 0;
1815  }
1816  }
1817 
1818  return (enc->sq_idx[0] >= 0) ?
1819  send_to_enc_sq (sch, enc, frame) :
1820  send_to_enc_thread(sch, enc, frame);
1821 }
1822 
1824 {
1825  PreMuxQueue *q = &ms->pre_mux_queue;
1826  AVPacket *tmp_pkt = NULL;
1827  int ret;
1828 
1829  if (!av_fifo_can_write(q->fifo)) {
1830  size_t packets = av_fifo_can_read(q->fifo);
1831  size_t pkt_size = pkt ? pkt->size : 0;
1832  int thresh_reached = (q->data_size + pkt_size) > q->data_threshold;
1833  size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX;
1834  size_t new_size = FFMIN(2 * packets, max_packets);
1835 
1836  if (new_size <= packets) {
1837  av_log(mux, AV_LOG_ERROR,
1838  "Too many packets buffered for output stream.\n");
1839  return AVERROR_BUFFER_TOO_SMALL;
1840  }
1841  ret = av_fifo_grow2(q->fifo, new_size - packets);
1842  if (ret < 0)
1843  return ret;
1844  }
1845 
1846  if (pkt) {
1847  tmp_pkt = av_packet_alloc();
1848  if (!tmp_pkt)
1849  return AVERROR(ENOMEM);
1850 
1851  av_packet_move_ref(tmp_pkt, pkt);
1852  q->data_size += tmp_pkt->size;
1853  }
1854  av_fifo_write(q->fifo, &tmp_pkt, 1);
1855 
1856  return 0;
1857 }
1858 
1859 static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx,
1860  AVPacket *pkt)
1861 {
1862  SchMuxStream *ms = &mux->streams[stream_idx];
1863  int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ?
1866 
1867  // queue the packet if the muxer cannot be started yet
1868  if (!atomic_load(&mux->mux_started)) {
1869  int queued = 0;
1870 
1871  // the muxer could have started between the above atomic check and
1872  // locking the mutex, then this block falls through to normal send path
1874 
1875  if (!atomic_load(&mux->mux_started)) {
1876  int ret = mux_queue_packet(mux, ms, pkt);
1877  queued = ret < 0 ? ret : 1;
1878  }
1879 
1881 
1882  if (queued < 0)
1883  return queued;
1884  else if (queued)
1885  goto update_schedule;
1886  }
1887 
1888  if (pkt) {
1889  int ret;
1890 
1891  if (ms->init_eof)
1892  return AVERROR_EOF;
1893 
1894  ret = tq_send(mux->queue, stream_idx, pkt);
1895  if (ret < 0)
1896  return ret;
1897  } else
1898  tq_send_finish(mux->queue, stream_idx);
1899 
1900 update_schedule:
1901  // TODO: use atomics to check whether this changes trailing dts
1902  // to avoid locking unnecesarily
1903  if (dts != AV_NOPTS_VALUE || !pkt) {
1905 
1906  if (pkt) ms->last_dts = dts;
1907  else ms->source_finished = 1;
1908 
1910 
1912  }
1913 
1914  return 0;
1915 }
1916 
1917 static int
1919  uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
1920 {
1921  int ret;
1922 
1923  if (*dst_finished)
1924  return AVERROR_EOF;
1925 
1926  if (pkt && dst.type == SCH_NODE_TYPE_MUX &&
1929  pkt = NULL;
1930  }
1931 
1932  if (!pkt)
1933  goto finish;
1934 
1935  ret = (dst.type == SCH_NODE_TYPE_MUX) ?
1936  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
1937  tq_send(sch->dec[dst.idx].queue, 0, pkt);
1938  if (ret == AVERROR_EOF)
1939  goto finish;
1940 
1941  return ret;
1942 
1943 finish:
1944  if (dst.type == SCH_NODE_TYPE_MUX)
1945  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
1946  else
1947  tq_send_finish(sch->dec[dst.idx].queue, 0);
1948 
1949  *dst_finished = 1;
1950  return AVERROR_EOF;
1951 }
1952 
1954  AVPacket *pkt, unsigned flags)
1955 {
1956  unsigned nb_done = 0;
1957 
1958  for (unsigned i = 0; i < ds->nb_dst; i++) {
1959  AVPacket *to_send = pkt;
1960  uint8_t *finished = &ds->dst_finished[i];
1961 
1962  int ret;
1963 
1964  // sending a packet consumes it, so make a temporary reference if needed
1965  if (pkt && i < ds->nb_dst - 1) {
1966  to_send = d->send_pkt;
1967 
1968  ret = av_packet_ref(to_send, pkt);
1969  if (ret < 0)
1970  return ret;
1971  }
1972 
1973  ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags);
1974  if (to_send)
1975  av_packet_unref(to_send);
1976  if (ret == AVERROR_EOF)
1977  nb_done++;
1978  else if (ret < 0)
1979  return ret;
1980  }
1981 
1982  return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0;
1983 }
1984 
1986 {
1987  Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE };
1988 
1989  av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems);
1990 
1991  for (unsigned i = 0; i < d->nb_streams; i++) {
1992  SchDemuxStream *ds = &d->streams[i];
1993 
1994  for (unsigned j = 0; j < ds->nb_dst; j++) {
1995  const SchedulerNode *dst = &ds->dst[j];
1996  SchDec *dec;
1997  int ret;
1998 
1999  if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC)
2000  continue;
2001 
2002  dec = &sch->dec[dst->idx];
2003 
2004  ret = tq_send(dec->queue, 0, pkt);
2005  if (ret < 0)
2006  return ret;
2007 
2008  if (dec->queue_end_ts) {
2009  Timestamp ts;
2011  if (ret < 0)
2012  return ret;
2013 
2014  if (max_end_ts.ts == AV_NOPTS_VALUE ||
2015  (ts.ts != AV_NOPTS_VALUE &&
2016  av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0))
2017  max_end_ts = ts;
2018 
2019  }
2020  }
2021  }
2022 
2023  pkt->pts = max_end_ts.ts;
2024  pkt->time_base = max_end_ts.tb;
2025 
2026  return 0;
2027 }
2028 
2029 int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt,
2030  unsigned flags)
2031 {
2032  SchDemux *d;
2033  int terminate;
2034 
2035  av_assert0(demux_idx < sch->nb_demux);
2036  d = &sch->demux[demux_idx];
2037 
2038  terminate = waiter_wait(sch, &d->waiter);
2039  if (terminate)
2040  return AVERROR_EXIT;
2041 
2042  // flush the downstreams after seek
2043  if (pkt->stream_index == -1)
2044  return demux_flush(sch, d, pkt);
2045 
2047 
2048  return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags);
2049 }
2050 
2051 static int demux_done(Scheduler *sch, unsigned demux_idx)
2052 {
2053  SchDemux *d = &sch->demux[demux_idx];
2054  int ret = 0;
2055 
2056  for (unsigned i = 0; i < d->nb_streams; i++) {
2057  int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0);
2058  if (err != AVERROR_EOF)
2059  ret = err_merge(ret, err);
2060  }
2061 
2063 
2064  d->task_exited = 1;
2065 
2067 
2069 
2070  return ret;
2071 }
2072 
2073 int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
2074 {
2075  SchMux *mux;
2076  int ret, stream_idx;
2077 
2078  av_assert0(mux_idx < sch->nb_mux);
2079  mux = &sch->mux[mux_idx];
2080 
2081  ret = tq_receive(mux->queue, &stream_idx, pkt);
2082  pkt->stream_index = stream_idx;
2083  return ret;
2084 }
2085 
2086 void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
2087 {
2088  SchMux *mux;
2089 
2090  av_assert0(mux_idx < sch->nb_mux);
2091  mux = &sch->mux[mux_idx];
2092 
2093  av_assert0(stream_idx < mux->nb_streams);
2094  tq_receive_finish(mux->queue, stream_idx);
2095 
2097  mux->streams[stream_idx].source_finished = 1;
2098 
2100 
2102 }
2103 
2104 int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
2105  const AVPacket *pkt)
2106 {
2107  SchMux *mux;
2108  SchMuxStream *ms;
2109 
2110  av_assert0(mux_idx < sch->nb_mux);
2111  mux = &sch->mux[mux_idx];
2112 
2113  av_assert0(stream_idx < mux->nb_streams);
2114  ms = &mux->streams[stream_idx];
2115 
2116  for (unsigned i = 0; i < ms->nb_sub_heartbeat_dst; i++) {
2117  SchDec *dst = &sch->dec[ms->sub_heartbeat_dst[i]];
2118  int ret;
2119 
2121  if (ret < 0)
2122  return ret;
2123 
2124  tq_send(dst->queue, 0, mux->sub_heartbeat_pkt);
2125  }
2126 
2127  return 0;
2128 }
2129 
2130 static int mux_done(Scheduler *sch, unsigned mux_idx)
2131 {
2132  SchMux *mux = &sch->mux[mux_idx];
2133 
2135 
2136  for (unsigned i = 0; i < mux->nb_streams; i++) {
2137  tq_receive_finish(mux->queue, i);
2138  mux->streams[i].source_finished = 1;
2139  }
2140 
2142 
2144 
2146 
2147  av_assert0(sch->nb_mux_done < sch->nb_mux);
2148  sch->nb_mux_done++;
2149 
2151 
2153 
2154  return 0;
2155 }
2156 
2157 int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
2158 {
2159  SchDec *dec;
2160  int ret, dummy;
2161 
2162  av_assert0(dec_idx < sch->nb_dec);
2163  dec = &sch->dec[dec_idx];
2164 
2165  // the decoder should have given us post-flush end timestamp in pkt
2166  if (dec->expect_end_ts) {
2167  Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base };
2169  if (ret < 0)
2170  return ret;
2171 
2172  dec->expect_end_ts = 0;
2173  }
2174 
2175  ret = tq_receive(dec->queue, &dummy, pkt);
2176  av_assert0(dummy <= 0);
2177 
2178  // got a flush packet, on the next call to this function the decoder
2179  // will give us post-flush end timestamp
2180  if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
2181  dec->expect_end_ts = 1;
2182 
2183  return ret;
2184 }
2185 
2187  unsigned in_idx, AVFrame *frame)
2188 {
2189  if (frame)
2190  return tq_send(fg->queue, in_idx, frame);
2191 
2192  if (!fg->inputs[in_idx].send_finished) {
2193  fg->inputs[in_idx].send_finished = 1;
2194  tq_send_finish(fg->queue, in_idx);
2195 
2196  // close the control stream when all actual inputs are done
2197  if (atomic_fetch_add(&fg->nb_inputs_finished_send, 1) == fg->nb_inputs - 1)
2198  tq_send_finish(fg->queue, fg->nb_inputs);
2199  }
2200  return 0;
2201 }
2202 
2204  uint8_t *dst_finished, AVFrame *frame)
2205 {
2206  int ret;
2207 
2208  if (*dst_finished)
2209  return AVERROR_EOF;
2210 
2211  if (!frame)
2212  goto finish;
2213 
2214  ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ?
2215  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) :
2216  send_to_enc(sch, &sch->enc[dst.idx], frame);
2217  if (ret == AVERROR_EOF)
2218  goto finish;
2219 
2220  return ret;
2221 
2222 finish:
2223  if (dst.type == SCH_NODE_TYPE_FILTER_IN)
2224  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2225  else
2226  send_to_enc(sch, &sch->enc[dst.idx], NULL);
2227 
2228  *dst_finished = 1;
2229 
2230  return AVERROR_EOF;
2231 }
2232 
2233 int sch_dec_send(Scheduler *sch, unsigned dec_idx,
2234  unsigned out_idx, AVFrame *frame)
2235 {
2236  SchDec *dec;
2237  SchDecOutput *o;
2238  int ret;
2239  unsigned nb_done = 0;
2240 
2241  av_assert0(dec_idx < sch->nb_dec);
2242  dec = &sch->dec[dec_idx];
2243 
2244  av_assert0(out_idx < dec->nb_outputs);
2245  o = &dec->outputs[out_idx];
2246 
2247  for (unsigned i = 0; i < o->nb_dst; i++) {
2248  uint8_t *finished = &o->dst_finished[i];
2249  AVFrame *to_send = frame;
2250 
2251  // sending a frame consumes it, so make a temporary reference if needed
2252  if (i < o->nb_dst - 1) {
2253  to_send = dec->send_frame;
2254 
2255  // frame may sometimes contain props only,
2256  // e.g. to signal EOF timestamp
2257  ret = frame->buf[0] ? av_frame_ref(to_send, frame) :
2258  av_frame_copy_props(to_send, frame);
2259  if (ret < 0)
2260  return ret;
2261  }
2262 
2263  ret = dec_send_to_dst(sch, o->dst[i], finished, to_send);
2264  if (ret < 0) {
2265  av_frame_unref(to_send);
2266  if (ret == AVERROR_EOF) {
2267  nb_done++;
2268  continue;
2269  }
2270  return ret;
2271  }
2272  }
2273 
2274  return (nb_done == o->nb_dst) ? AVERROR_EOF : 0;
2275 }
2276 
2277 static int dec_done(Scheduler *sch, unsigned dec_idx)
2278 {
2279  SchDec *dec = &sch->dec[dec_idx];
2280  int ret = 0;
2281 
2282  tq_receive_finish(dec->queue, 0);
2283 
2284  // make sure our source does not get stuck waiting for end timestamps
2285  // that will never arrive
2286  if (dec->queue_end_ts)
2288 
2289  for (unsigned i = 0; i < dec->nb_outputs; i++) {
2290  SchDecOutput *o = &dec->outputs[i];
2291 
2292  for (unsigned j = 0; j < o->nb_dst; j++) {
2293  int err = dec_send_to_dst(sch, o->dst[j], &o->dst_finished[j], NULL);
2294  if (err < 0 && err != AVERROR_EOF)
2295  ret = err_merge(ret, err);
2296  }
2297  }
2298 
2299  return ret;
2300 }
2301 
2302 int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
2303 {
2304  SchEnc *enc;
2305  int ret, dummy;
2306 
2307  av_assert0(enc_idx < sch->nb_enc);
2308  enc = &sch->enc[enc_idx];
2309 
2310  ret = tq_receive(enc->queue, &dummy, frame);
2311  av_assert0(dummy <= 0);
2312 
2313  return ret;
2314 }
2315 
2317  uint8_t *dst_finished, AVPacket *pkt)
2318 {
2319  int ret;
2320 
2321  if (*dst_finished)
2322  return AVERROR_EOF;
2323 
2324  if (!pkt)
2325  goto finish;
2326 
2327  ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2328  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2329  tq_send(sch->dec[dst.idx].queue, 0, pkt);
2330  if (ret == AVERROR_EOF)
2331  goto finish;
2332 
2333  return ret;
2334 
2335 finish:
2336  if (dst.type == SCH_NODE_TYPE_MUX)
2337  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2338  else
2339  tq_send_finish(sch->dec[dst.idx].queue, 0);
2340 
2341  *dst_finished = 1;
2342 
2343  return AVERROR_EOF;
2344 }
2345 
2346 int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
2347 {
2348  SchEnc *enc;
2349  int ret;
2350 
2351  av_assert0(enc_idx < sch->nb_enc);
2352  enc = &sch->enc[enc_idx];
2353 
2354  for (unsigned i = 0; i < enc->nb_dst; i++) {
2355  uint8_t *finished = &enc->dst_finished[i];
2356  AVPacket *to_send = pkt;
2357 
2358  // sending a packet consumes it, so make a temporary reference if needed
2359  if (i < enc->nb_dst - 1) {
2360  to_send = enc->send_pkt;
2361 
2362  ret = av_packet_ref(to_send, pkt);
2363  if (ret < 0)
2364  return ret;
2365  }
2366 
2367  ret = enc_send_to_dst(sch, enc->dst[i], finished, to_send);
2368  if (ret < 0) {
2369  av_packet_unref(to_send);
2370  if (ret == AVERROR_EOF)
2371  continue;
2372  return ret;
2373  }
2374  }
2375 
2376  return 0;
2377 }
2378 
2379 static int enc_done(Scheduler *sch, unsigned enc_idx)
2380 {
2381  SchEnc *enc = &sch->enc[enc_idx];
2382  int ret = 0;
2383 
2384  tq_receive_finish(enc->queue, 0);
2385 
2386  for (unsigned i = 0; i < enc->nb_dst; i++) {
2387  int err = enc_send_to_dst(sch, enc->dst[i], &enc->dst_finished[i], NULL);
2388  if (err < 0 && err != AVERROR_EOF)
2389  ret = err_merge(ret, err);
2390  }
2391 
2392  return ret;
2393 }
2394 
2395 int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
2396  unsigned *in_idx, AVFrame *frame)
2397 {
2398  SchFilterGraph *fg;
2399 
2400  av_assert0(fg_idx < sch->nb_filters);
2401  fg = &sch->filters[fg_idx];
2402 
2403  av_assert0(*in_idx <= fg->nb_inputs);
2404 
2405  // update scheduling to account for desired input stream, if it changed
2406  //
2407  // this check needs no locking because only the filtering thread
2408  // updates this value
2409  if (*in_idx != fg->best_input) {
2411 
2412  fg->best_input = *in_idx;
2414 
2416  }
2417 
2418  if (*in_idx == fg->nb_inputs) {
2419  int terminate = waiter_wait(sch, &fg->waiter);
2420  return terminate ? AVERROR_EOF : AVERROR(EAGAIN);
2421  }
2422 
2423  while (1) {
2424  int ret, idx;
2425 
2426  ret = tq_receive(fg->queue, &idx, frame);
2427  if (idx < 0)
2428  return AVERROR_EOF;
2429  else if (ret >= 0) {
2430  *in_idx = idx;
2431  return 0;
2432  }
2433 
2434  // disregard EOFs for specific streams - they should always be
2435  // preceded by an EOF frame
2436  }
2437 }
2438 
2439 void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
2440 {
2441  SchFilterGraph *fg;
2442  SchFilterIn *fi;
2443 
2444  av_assert0(fg_idx < sch->nb_filters);
2445  fg = &sch->filters[fg_idx];
2446 
2447  av_assert0(in_idx < fg->nb_inputs);
2448  fi = &fg->inputs[in_idx];
2449 
2450  if (!fi->receive_finished) {
2451  fi->receive_finished = 1;
2452  tq_receive_finish(fg->queue, in_idx);
2453 
2454  // close the control stream when all actual inputs are done
2455  if (++fg->nb_inputs_finished_receive == fg->nb_inputs)
2456  tq_receive_finish(fg->queue, fg->nb_inputs);
2457  }
2458 }
2459 
2460 int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
2461 {
2462  SchFilterGraph *fg;
2464 
2465  av_assert0(fg_idx < sch->nb_filters);
2466  fg = &sch->filters[fg_idx];
2467 
2468  av_assert0(out_idx < fg->nb_outputs);
2469  dst = fg->outputs[out_idx].dst;
2470 
2471  return (dst.type == SCH_NODE_TYPE_ENC) ?
2472  send_to_enc (sch, &sch->enc[dst.idx], frame) :
2473  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame);
2474 }
2475 
2476 static int filter_done(Scheduler *sch, unsigned fg_idx)
2477 {
2478  SchFilterGraph *fg = &sch->filters[fg_idx];
2479  int ret = 0;
2480 
2481  for (unsigned i = 0; i <= fg->nb_inputs; i++)
2482  tq_receive_finish(fg->queue, i);
2483 
2484  for (unsigned i = 0; i < fg->nb_outputs; i++) {
2485  SchedulerNode dst = fg->outputs[i].dst;
2486  int err = (dst.type == SCH_NODE_TYPE_ENC) ?
2487  send_to_enc (sch, &sch->enc[dst.idx], NULL) :
2488  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2489 
2490  if (err < 0 && err != AVERROR_EOF)
2491  ret = err_merge(ret, err);
2492  }
2493 
2495 
2496  fg->task_exited = 1;
2497 
2499 
2501 
2502  return ret;
2503 }
2504 
2505 int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
2506 {
2507  SchFilterGraph *fg;
2508 
2509  av_assert0(fg_idx < sch->nb_filters);
2510  fg = &sch->filters[fg_idx];
2511 
2512  return send_to_filter(sch, fg, fg->nb_inputs, frame);
2513 }
2514 
2515 static int task_cleanup(Scheduler *sch, SchedulerNode node)
2516 {
2517  switch (node.type) {
2518  case SCH_NODE_TYPE_DEMUX: return demux_done (sch, node.idx);
2519  case SCH_NODE_TYPE_MUX: return mux_done (sch, node.idx);
2520  case SCH_NODE_TYPE_DEC: return dec_done (sch, node.idx);
2521  case SCH_NODE_TYPE_ENC: return enc_done (sch, node.idx);
2522  case SCH_NODE_TYPE_FILTER_IN: return filter_done(sch, node.idx);
2523  default: av_assert0(0);
2524  }
2525 }
2526 
2527 static void *task_wrapper(void *arg)
2528 {
2529  SchTask *task = arg;
2530  Scheduler *sch = task->parent;
2531  int ret;
2532  int err = 0;
2533 
2534  ret = task->func(task->func_arg);
2535  if (ret < 0)
2536  av_log(task->func_arg, AV_LOG_ERROR,
2537  "Task finished with error code: %d (%s)\n", ret, av_err2str(ret));
2538 
2539  err = task_cleanup(sch, task->node);
2540  ret = err_merge(ret, err);
2541 
2542  // EOF is considered normal termination
2543  if (ret == AVERROR_EOF)
2544  ret = 0;
2545  if (ret < 0) {
2547  sch->task_failed = 1;
2550  }
2551 
2553  "Terminating thread with return code %d (%s)\n", ret,
2554  ret < 0 ? av_err2str(ret) : "success");
2555 
2556  return (void*)(intptr_t)ret;
2557 }
2558 
2559 static int task_stop(Scheduler *sch, SchTask *task)
2560 {
2561  int ret;
2562  void *thread_ret;
2563 
2564  if (!task->thread_running)
2565  return task_cleanup(sch, task->node);
2566 
2567  ret = pthread_join(task->thread, &thread_ret);
2568  av_assert0(ret == 0);
2569 
2570  task->thread_running = 0;
2571 
2572  return (intptr_t)thread_ret;
2573 }
2574 
2575 int sch_stop(Scheduler *sch, int64_t *finish_ts)
2576 {
2577  int ret = 0, err;
2578 
2579  if (sch->state != SCH_STATE_STARTED)
2580  return 0;
2581 
2582  atomic_store(&sch->terminate, 1);
2583 
2584  for (unsigned type = 0; type < 2; type++)
2585  for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
2586  SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
2587  waiter_set(w, 1);
2588  }
2589 
2590  for (unsigned i = 0; i < sch->nb_demux; i++) {
2591  SchDemux *d = &sch->demux[i];
2592 
2593  err = task_stop(sch, &d->task);
2594  ret = err_merge(ret, err);
2595  }
2596 
2597  for (unsigned i = 0; i < sch->nb_dec; i++) {
2598  SchDec *dec = &sch->dec[i];
2599 
2600  err = task_stop(sch, &dec->task);
2601  ret = err_merge(ret, err);
2602  }
2603 
2604  for (unsigned i = 0; i < sch->nb_filters; i++) {
2605  SchFilterGraph *fg = &sch->filters[i];
2606 
2607  err = task_stop(sch, &fg->task);
2608  ret = err_merge(ret, err);
2609  }
2610 
2611  for (unsigned i = 0; i < sch->nb_enc; i++) {
2612  SchEnc *enc = &sch->enc[i];
2613 
2614  err = task_stop(sch, &enc->task);
2615  ret = err_merge(ret, err);
2616  }
2617 
2618  for (unsigned i = 0; i < sch->nb_mux; i++) {
2619  SchMux *mux = &sch->mux[i];
2620 
2621  err = task_stop(sch, &mux->task);
2622  ret = err_merge(ret, err);
2623  }
2624 
2625  if (finish_ts)
2626  *finish_ts = trailing_dts(sch, 1);
2627 
2628  sch->state = SCH_STATE_STOPPED;
2629 
2630  return ret;
2631 }
Scheduler::sq_enc
SchSyncQueue * sq_enc
Definition: ffmpeg_sched.c:299
func
int(* func)(AVBPrint *dst, const char *in, const char *arg)
Definition: jacosubdec.c:68
pthread_mutex_t
_fmutex pthread_mutex_t
Definition: os2threads.h:53
SchWaiter
Definition: ffmpeg_sched.c:52
av_packet_unref
void av_packet_unref(AVPacket *pkt)
Wipe the packet.
Definition: packet.c:430
mux_task_start
static int mux_task_start(SchMux *mux)
Definition: ffmpeg_sched.c:1097
pthread_join
static av_always_inline int pthread_join(pthread_t thread, void **value_ptr)
Definition: os2threads.h:94
SchedulerNode::idx_stream
unsigned idx_stream
Definition: ffmpeg_sched.h:106
SchDecOutput::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:76
waiter_init
static int waiter_init(SchWaiter *w)
Definition: ffmpeg_sched.c:351
av_fifo_can_write
size_t av_fifo_can_write(const AVFifo *f)
Definition: fifo.c:94
Scheduler::finish_lock
pthread_mutex_t finish_lock
Definition: ffmpeg_sched.c:289
atomic_store
#define atomic_store(object, desired)
Definition: stdatomic.h:85
sch_filter_send
int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
Called by filtergraph tasks to send a filtered frame or EOF to consumers.
Definition: ffmpeg_sched.c:2460
err_merge
static int err_merge(int err0, int err1)
Merge two return codes - return one of the error codes if at least one of them was negative,...
Definition: ffmpeg_utils.h:39
SchDec::task
SchTask task
Definition: ffmpeg_sched.c:88
THREAD_QUEUE_PACKETS
@ THREAD_QUEUE_PACKETS
Definition: thread_queue.h:26
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
Scheduler::enc
SchEnc * enc
Definition: ffmpeg_sched.c:296
Scheduler::nb_mux_done
unsigned nb_mux_done
Definition: ffmpeg_sched.c:287
av_compare_ts
int av_compare_ts(int64_t ts_a, AVRational tb_a, int64_t ts_b, AVRational tb_b)
Compare two timestamps each in its own time base.
Definition: mathematics.c:147
SchedulerState
SchedulerState
Definition: ffmpeg_sched.c:269
sch_mux_receive_finish
void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
Called by muxer tasks to signal that a stream will no longer accept input.
Definition: ffmpeg_sched.c:2086
SCH_NODE_TYPE_ENC
@ SCH_NODE_TYPE_ENC
Definition: ffmpeg_sched.h:98
SchSyncQueue::sq
SyncQueue * sq
Definition: ffmpeg_sched.c:101
SchTask::thread
pthread_t thread
Definition: ffmpeg_sched.c:70
atomic_fetch_add
#define atomic_fetch_add(object, operand)
Definition: stdatomic.h:137
demux_flush
static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt)
Definition: ffmpeg_sched.c:1985
thread.h
AVERROR_EOF
#define AVERROR_EOF
End of file.
Definition: error.h:57
Scheduler::nb_sq_enc
unsigned nb_sq_enc
Definition: ffmpeg_sched.c:300
SchMux::sub_heartbeat_pkt
AVPacket * sub_heartbeat_pkt
Definition: ffmpeg_sched.c:233
sq_limit_frames
void sq_limit_frames(SyncQueue *sq, unsigned int stream_idx, uint64_t frames)
Limit the number of output frames for stream with index stream_idx to max_frames.
Definition: sync_queue.c:628
pthread_mutex_init
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
SchEnc::send_pkt
AVPacket * send_pkt
Definition: ffmpeg_sched.c:147
SCHEDULE_TOLERANCE
#define SCHEDULE_TOLERANCE
Definition: ffmpeg_sched.c:45
sch_add_demux
int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx)
Add a demuxer to the scheduler.
Definition: ffmpeg_sched.c:680
PreMuxQueue::data_size
size_t data_size
Definition: ffmpeg_sched.c:185
AV_TIME_BASE_Q
#define AV_TIME_BASE_Q
Internal time base represented as fractional value.
Definition: avutil.h:264
int64_t
long long int64_t
Definition: coverity.c:34
CYCLE_NODE_STARTED
@ CYCLE_NODE_STARTED
Definition: ffmpeg_sched.c:1360
SchTask::func
SchThreadFunc func
Definition: ffmpeg_sched.c:67
mux_done
static int mux_done(Scheduler *sch, unsigned mux_idx)
Definition: ffmpeg_sched.c:2130
Scheduler::nb_enc
unsigned nb_enc
Definition: ffmpeg_sched.c:297
av_frame_free
void av_frame_free(AVFrame **frame)
Free the frame and any dynamically allocated objects in it, e.g.
Definition: frame.c:163
SQFRAME
#define SQFRAME(frame)
Definition: sync_queue.h:38
av_fifo_peek
int av_fifo_peek(const AVFifo *f, void *buf, size_t nb_elems, size_t offset)
Read data from a FIFO without modifying FIFO state.
Definition: fifo.c:255
check_acyclic_for_output
static int check_acyclic_for_output(const Scheduler *sch, SchedulerNode src, uint8_t *filters_visited, SchedulerNode *filters_stack)
Definition: ffmpeg_sched.c:1365
AVFrame
This structure describes decoded (raw) audio or video data.
Definition: frame.h:403
w
uint8_t w
Definition: llviddspenc.c:38
task_cleanup
static int task_cleanup(Scheduler *sch, SchedulerNode node)
Definition: ffmpeg_sched.c:2515
sync_queue.h
AVPacket::data
uint8_t * data
Definition: packet.h:539
SchMux::nb_streams_ready
unsigned nb_streams_ready
Definition: ffmpeg_sched.c:218
SchDemux::nb_streams
unsigned nb_streams
Definition: ffmpeg_sched.c:160
send_to_mux
static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx, AVPacket *pkt)
Definition: ffmpeg_sched.c:1859
Scheduler::nb_mux_ready
unsigned nb_mux_ready
Definition: ffmpeg_sched.c:284
atomic_int
intptr_t atomic_int
Definition: stdatomic.h:55
tq_alloc
ThreadQueue * tq_alloc(unsigned int nb_streams, size_t queue_size, enum ThreadQueueType type)
Allocate a queue for sending data between threads.
Definition: thread_queue.c:71
enc_done
static int enc_done(Scheduler *sch, unsigned enc_idx)
Definition: ffmpeg_sched.c:2379
SCH_NODE_TYPE_MUX
@ SCH_NODE_TYPE_MUX
Definition: ffmpeg_sched.h:96
AV_LOG_VERBOSE
#define AV_LOG_VERBOSE
Detailed information.
Definition: log.h:225
AVPacket::duration
int64_t duration
Duration of this packet in AVStream->time_base units, 0 if unknown.
Definition: packet.h:557
SchWaiter::choked_prev
int choked_prev
Definition: ffmpeg_sched.c:59
QUEUE_PACKETS
@ QUEUE_PACKETS
Definition: ffmpeg_sched.c:48
SchMux
Definition: ffmpeg_sched.c:213
Scheduler::class
const AVClass * class
Definition: ffmpeg_sched.c:276
SchFilterOut
Definition: ffmpeg_sched.c:243
SCH_STATE_UNINIT
@ SCH_STATE_UNINIT
Definition: ffmpeg_sched.c:270
Timestamp::ts
int64_t ts
Definition: ffmpeg_utils.h:31
PreMuxQueue::fifo
AVFifo * fifo
Queue for buffering the packets before the muxer task can be started.
Definition: ffmpeg_sched.c:176
SchMuxStream::last_dts
int64_t last_dts
Definition: ffmpeg_sched.c:207
av_packet_free
void av_packet_free(AVPacket **pkt)
Free the packet, if the packet is reference counted, it will be unreferenced first.
Definition: packet.c:75
DEFAULT_FRAME_THREAD_QUEUE_SIZE
#define DEFAULT_FRAME_THREAD_QUEUE_SIZE
Default size of a frame thread queue.
Definition: ffmpeg_sched.h:260
Scheduler::last_dts
atomic_int_least64_t last_dts
Definition: ffmpeg_sched.c:313
SchDemux::send_pkt
AVPacket * send_pkt
Definition: ffmpeg_sched.c:166
sch_mux_stream_ready
int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
Signal to the scheduler that the specified muxed stream is initialized and ready.
Definition: ffmpeg_sched.c:1213
send_to_enc_thread
static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1708
task_stop
static int task_stop(Scheduler *sch, SchTask *task)
Definition: ffmpeg_sched.c:2559
SchFilterIn::receive_finished
int receive_finished
Definition: ffmpeg_sched.c:240
SchedulerNode::type
enum SchedulerNodeType type
Definition: ffmpeg_sched.h:104
fifo.h
finish
static void finish(void)
Definition: movenc.c:374
sch_stop
int sch_stop(Scheduler *sch, int64_t *finish_ts)
Definition: ffmpeg_sched.c:2575
SchEnc::sq_idx
int sq_idx[2]
Definition: ffmpeg_sched.c:119
fail
#define fail()
Definition: checkasm.h:193
av_fifo_write
int av_fifo_write(AVFifo *f, const void *buf, size_t nb_elems)
Write data into a FIFO.
Definition: fifo.c:188
SchThreadFunc
int(* SchThreadFunc)(void *arg)
Definition: ffmpeg_sched.h:109
SchFilterOut::dst
SchedulerNode dst
Definition: ffmpeg_sched.c:244
SchDec::outputs
SchDecOutput * outputs
Definition: ffmpeg_sched.c:85
SchEnc
Definition: ffmpeg_sched.c:109
dummy
int dummy
Definition: motion.c:66
av_fifo_grow2
int av_fifo_grow2(AVFifo *f, size_t inc)
Enlarge an AVFifo.
Definition: fifo.c:99
SchDec::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:90
sch_add_mux_stream
int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
Add a muxed stream for a previously added muxer.
Definition: ffmpeg_sched.c:648
SchMux::class
const AVClass * class
Definition: ffmpeg_sched.c:214
SchFilterGraph::nb_inputs_finished_send
atomic_uint nb_inputs_finished_send
Definition: ffmpeg_sched.c:252
SCH_STATE_STOPPED
@ SCH_STATE_STOPPED
Definition: ffmpeg_sched.c:272
sq_receive
int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
Read a frame from the queue.
Definition: sync_queue.c:586
AVERROR_BUFFER_TOO_SMALL
#define AVERROR_BUFFER_TOO_SMALL
Buffer too small.
Definition: error.h:53
type
it s the only field you need to keep assuming you have a context There is some magic you don t need to care about around this just let it vf type
Definition: writing_filters.txt:86
Scheduler::nb_dec
unsigned nb_dec
Definition: ffmpeg_sched.c:294
av_thread_message_queue_recv
int av_thread_message_queue_recv(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Receive a message from the queue.
Definition: threadmessage.c:177
sch_add_filtergraph
int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, SchThreadFunc func, void *ctx)
Add a filtergraph to the scheduler.
Definition: ffmpeg_sched.c:821
av_frame_alloc
AVFrame * av_frame_alloc(void)
Allocate an AVFrame and set its fields to default values.
Definition: frame.c:151
SchFilterGraph
Definition: ffmpeg_sched.c:247
SchMuxStream::src_sched
SchedulerNode src_sched
Definition: ffmpeg_sched.c:192
avassert.h
pkt
AVPacket * pkt
Definition: movenc.c:60
AV_LOG_ERROR
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:209
sch_free
void sch_free(Scheduler **psch)
Definition: ffmpeg_sched.c:459
Scheduler::state
enum SchedulerState state
Definition: ffmpeg_sched.c:308
SchMux::streams
SchMuxStream * streams
Definition: ffmpeg_sched.c:216
av_thread_message_queue_send
int av_thread_message_queue_send(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Send a message on the queue.
Definition: threadmessage.c:161
av_fifo_read
int av_fifo_read(AVFifo *f, void *buf, size_t nb_elems)
Read data from a FIFO.
Definition: fifo.c:240
SchMuxStream
Definition: ffmpeg_sched.c:190
SchDecOutput
Definition: ffmpeg_sched.c:74
sch_add_mux
int sch_add_mux(Scheduler *sch, SchThreadFunc func, int(*init)(void *), void *arg, int sdp_auto, unsigned thread_queue_size)
Add a muxer to the scheduler.
Definition: ffmpeg_sched.c:624
waiter_set
static void waiter_set(SchWaiter *w, int choked)
Definition: ffmpeg_sched.c:341
SchFilterGraph::nb_outputs
unsigned nb_outputs
Definition: ffmpeg_sched.c:256
SchDec::expect_end_ts
int expect_end_ts
Definition: ffmpeg_sched.c:94
enc_open
static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
Definition: ffmpeg_sched.c:1682
sch_alloc
Scheduler * sch_alloc(void)
Definition: ffmpeg_sched.c:577
dec_send_to_dst
static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVFrame *frame)
Definition: ffmpeg_sched.c:2203
task_init
static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx, SchThreadFunc func, void *func_arg)
Definition: ffmpeg_sched.c:425
SchMuxStream::nb_sub_heartbeat_dst
unsigned nb_sub_heartbeat_dst
Definition: ffmpeg_sched.c:195
SchEnc::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:113
sch_add_demux_stream
int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
Add a demuxed stream for a previously added demuxer.
Definition: ffmpeg_sched.c:707
SchMuxStream::src
SchedulerNode src
Definition: ffmpeg_sched.c:191
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:40
SchedulerNodeType
SchedulerNodeType
Definition: ffmpeg_sched.h:93
sch_dec_send
int sch_dec_send(Scheduler *sch, unsigned dec_idx, unsigned out_idx, AVFrame *frame)
Called by decoder tasks to send a decoded frame downstream.
Definition: ffmpeg_sched.c:2233
ctx
AVFormatContext * ctx
Definition: movenc.c:49
nb_streams
static int nb_streams
Definition: ffprobe.c:385
SchMuxStream::source_finished
int source_finished
Definition: ffmpeg_sched.c:209
av_rescale_q
int64_t av_rescale_q(int64_t a, AVRational bq, AVRational cq)
Rescale a 64-bit integer by 2 rational numbers.
Definition: mathematics.c:142
ffmpeg_utils.h
filter_done
static int filter_done(Scheduler *sch, unsigned fg_idx)
Definition: ffmpeg_sched.c:2476
SchFilterGraph::class
const AVClass * class
Definition: ffmpeg_sched.c:248
sch_enc_receive
int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
Called by encoder tasks to obtain frames for encoding.
Definition: ffmpeg_sched.c:2302
AVThreadMessageQueue
Definition: threadmessage.c:30
atomic_load
#define atomic_load(object)
Definition: stdatomic.h:93
sch_sdp_filename
int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
Set the file path for the SDP.
Definition: ffmpeg_sched.c:611
SchEnc::class
const AVClass * class
Definition: ffmpeg_sched.c:110
demux_send_for_stream
static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds, AVPacket *pkt, unsigned flags)
Definition: ffmpeg_sched.c:1953
arg
const char * arg
Definition: jacosubdec.c:67
pthread_create
static av_always_inline int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
Definition: os2threads.h:80
SchMuxStream::pre_mux_queue
PreMuxQueue pre_mux_queue
Definition: ffmpeg_sched.c:197
sq_add_stream
int sq_add_stream(SyncQueue *sq, int limiting)
Add a new stream to the sync queue.
Definition: sync_queue.c:598
SchMux::mux_started
atomic_int mux_started
Set to 1 after starting the muxer task and flushing the pre-muxing queues.
Definition: ffmpeg_sched.c:229
Scheduler::finish_cond
pthread_cond_t finish_cond
Definition: ffmpeg_sched.c:290
SCH_NODE_TYPE_DEMUX
@ SCH_NODE_TYPE_DEMUX
Definition: ffmpeg_sched.h:95
Scheduler::demux
SchDemux * demux
Definition: ffmpeg_sched.c:278
AVPacket::buf
AVBufferRef * buf
A reference to the reference-counted buffer where the packet data is stored.
Definition: packet.h:522
tq_free
void tq_free(ThreadQueue **ptq)
Definition: thread_queue.c:53
LIBAVUTIL_VERSION_INT
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
waiter_uninit
static void waiter_uninit(SchWaiter *w)
Definition: ffmpeg_sched.c:368
AVClass
Describe the class of an AVClass context structure.
Definition: log.h:75
Scheduler::sdp_filename
char * sdp_filename
Definition: ffmpeg_sched.c:305
send_to_enc_sq
static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1727
NULL
#define NULL
Definition: coverity.c:32
SchEnc::open_cb
int(* open_cb)(void *opaque, const AVFrame *frame)
Definition: ffmpeg_sched.c:137
av_frame_copy_props
int av_frame_copy_props(AVFrame *dst, const AVFrame *src)
Copy only "metadata" fields from src to dst.
Definition: frame.c:726
SchFilterGraph::nb_inputs
unsigned nb_inputs
Definition: ffmpeg_sched.c:251
Scheduler::mux
SchMux * mux
Definition: ffmpeg_sched.c:281
schedule_update_locked
static void schedule_update_locked(Scheduler *sch)
Definition: ffmpeg_sched.c:1292
tq_receive_finish
void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the receiving side.
Definition: thread_queue.c:238
sch_add_enc
int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx, int(*open_cb)(void *opaque, const AVFrame *frame))
Definition: ffmpeg_sched.c:783
SchSyncQueue::enc_idx
unsigned * enc_idx
Definition: ffmpeg_sched.c:105
SCH_STATE_STARTED
@ SCH_STATE_STARTED
Definition: ffmpeg_sched.c:271
dec_done
static int dec_done(Scheduler *sch, unsigned dec_idx)
Definition: ffmpeg_sched.c:2277
SchFilterGraph::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:261
SchDemux::class
const AVClass * class
Definition: ffmpeg_sched.c:157
av_fifo_can_read
size_t av_fifo_can_read(const AVFifo *f)
Definition: fifo.c:87
SchEnc::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:114
sch_add_dec
int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
Add a decoder to the scheduler.
Definition: ffmpeg_sched.c:740
SchWaiter::choked
atomic_int choked
Definition: ffmpeg_sched.c:55
SchWaiter::cond
pthread_cond_t cond
Definition: ffmpeg_sched.c:54
CYCLE_NODE_NEW
@ CYCLE_NODE_NEW
Definition: ffmpeg_sched.c:1359
time.h
DEMUX_SEND_STREAMCOPY_EOF
@ DEMUX_SEND_STREAMCOPY_EOF
Treat the packet as an EOF for SCH_NODE_TYPE_MUX destinations send normally to other types.
Definition: ffmpeg_sched.h:338
sch_fg_class
static const AVClass sch_fg_class
Definition: ffmpeg_sched.c:815
QUEUE_FRAMES
@ QUEUE_FRAMES
Definition: ffmpeg_sched.c:49
av_packet_ref
int av_packet_ref(AVPacket *dst, const AVPacket *src)
Setup a new reference to the data described by a given packet.
Definition: packet.c:438
av_packet_move_ref
void av_packet_move_ref(AVPacket *dst, AVPacket *src)
Move every field in src to dst and reset src.
Definition: packet.c:487
SchTask::thread_running
int thread_running
Definition: ffmpeg_sched.c:71
sch_enc_send
int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
Called by encoder tasks to send encoded packets downstream.
Definition: ffmpeg_sched.c:2346
pthread_mutex_unlock
#define pthread_mutex_unlock(a)
Definition: ffprobe.c:82
error.h
Scheduler
Definition: ffmpeg_sched.c:275
SchMux::nb_streams
unsigned nb_streams
Definition: ffmpeg_sched.c:217
SchSyncQueue::lock
pthread_mutex_t lock
Definition: ffmpeg_sched.c:103
SchMuxStream::sub_heartbeat_dst
unsigned * sub_heartbeat_dst
Definition: ffmpeg_sched.c:194
SchDec::class
const AVClass * class
Definition: ffmpeg_sched.c:81
sq_frame_samples
void sq_frame_samples(SyncQueue *sq, unsigned int stream_idx, int frame_samples)
Set a constant output audio frame size, in samples.
Definition: sync_queue.c:640
SchEnc::in_finished
int in_finished
Definition: ffmpeg_sched.c:144
SchDemux::task
SchTask task
Definition: ffmpeg_sched.c:162
SchDemuxStream::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:153
SchFilterGraph::nb_inputs_finished_receive
unsigned nb_inputs_finished_receive
Definition: ffmpeg_sched.c:253
tq_send
int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
Send an item for the given stream to the queue.
Definition: thread_queue.c:116
init
int(* init)(AVBSFContext *ctx)
Definition: dts2pts.c:368
AVPacket::size
int size
Definition: packet.h:540
AVFifo
Definition: fifo.c:35
SchSyncQueue::nb_enc_idx
unsigned nb_enc_idx
Definition: ffmpeg_sched.c:106
SchFilterGraph::task_exited
int task_exited
Definition: ffmpeg_sched.c:266
av_frame_ref
int av_frame_ref(AVFrame *dst, const AVFrame *src)
Set up a new reference to the data described by the source frame.
Definition: frame.c:401
threadmessage.h
dst
uint8_t ptrdiff_t const uint8_t ptrdiff_t int intptr_t intptr_t int int16_t * dst
Definition: dsp.h:83
PreMuxQueue::max_packets
int max_packets
Maximum number of packets in fifo.
Definition: ffmpeg_sched.c:180
SchFilterGraph::task
SchTask task
Definition: ffmpeg_sched.c:258
av_err2str
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
Definition: error.h:122
SchWaiter::lock
pthread_mutex_t lock
Definition: ffmpeg_sched.c:53
sq_send
int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame)
Submit a frame for the stream with index stream_idx.
Definition: sync_queue.c:333
PreMuxQueue::data_threshold
size_t data_threshold
Definition: ffmpeg_sched.c:187
sq_free
void sq_free(SyncQueue **psq)
Definition: sync_queue.c:671
AV_NOPTS_VALUE
#define AV_NOPTS_VALUE
Undefined timestamp value.
Definition: avutil.h:248
sch_dec_class
static const AVClass sch_dec_class
Definition: ffmpeg_sched.c:734
SchFilterGraph::inputs
SchFilterIn * inputs
Definition: ffmpeg_sched.c:250
Scheduler::schedule_lock
pthread_mutex_t schedule_lock
Definition: ffmpeg_sched.c:311
frame.h
SchTask::func_arg
void * func_arg
Definition: ffmpeg_sched.c:68
SCH_NODE_TYPE_FILTER_OUT
@ SCH_NODE_TYPE_FILTER_OUT
Definition: ffmpeg_sched.h:100
AVPacket::dts
int64_t dts
Decompression timestamp in AVStream->time_base units; the time at which the packet is decompressed.
Definition: packet.h:538
enc_send_to_dst
static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVPacket *pkt)
Definition: ffmpeg_sched.c:2316
CYCLE_NODE_DONE
@ CYCLE_NODE_DONE
Definition: ffmpeg_sched.c:1361
sch_mux_class
static const AVClass sch_mux_class
Definition: ffmpeg_sched.c:618
SchFilterIn
Definition: ffmpeg_sched.c:236
sch_filter_receive
int sch_filter_receive(Scheduler *sch, unsigned fg_idx, unsigned *in_idx, AVFrame *frame)
Called by filtergraph tasks to obtain frames for filtering.
Definition: ffmpeg_sched.c:2395
Scheduler::nb_mux
unsigned nb_mux
Definition: ffmpeg_sched.c:282
av_packet_alloc
AVPacket * av_packet_alloc(void)
Allocate an AVPacket and set its fields to default values.
Definition: packet.c:64
SchEnc::opened
int opened
Definition: ffmpeg_sched.c:138
scheduler_class
static const AVClass scheduler_class
Definition: ffmpeg_sched.c:572
pthread_t
Definition: os2threads.h:44
pthread_cond_destroy
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:144
Scheduler::nb_demux
unsigned nb_demux
Definition: ffmpeg_sched.c:279
av_thread_message_queue_alloc
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, unsigned nelem, unsigned elsize)
Allocate a new message queue.
Definition: threadmessage.c:45
pthread_mutex_destroy
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
av_packet_copy_props
int av_packet_copy_props(AVPacket *dst, const AVPacket *src)
Copy only "properties" fields from src to dst.
Definition: packet.c:393
SchDemuxStream::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:152
SchDemux::task_exited
int task_exited
Definition: ffmpeg_sched.c:169
SCH_NODE_TYPE_FILTER_IN
@ SCH_NODE_TYPE_FILTER_IN
Definition: ffmpeg_sched.h:99
task_start
static int task_start(SchTask *task)
Definition: ffmpeg_sched.c:406
Scheduler::filters
SchFilterGraph * filters
Definition: ffmpeg_sched.c:302
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:256
AVPacket::pts
int64_t pts
Presentation timestamp in AVStream->time_base units; the time at which the decompressed packet will b...
Definition: packet.h:532
demux_done
static int demux_done(Scheduler *sch, unsigned demux_idx)
Definition: ffmpeg_sched.c:2051
packet.h
SchWaiter::choked_next
int choked_next
Definition: ffmpeg_sched.c:60
SchFilterGraph::best_input
unsigned best_input
Definition: ffmpeg_sched.c:265
av_malloc_array
#define av_malloc_array(a, b)
Definition: tableprint_vlc.h:31
Scheduler::mux_ready_lock
pthread_mutex_t mux_ready_lock
Definition: ffmpeg_sched.c:285
Scheduler::terminate
atomic_int terminate
Definition: ffmpeg_sched.c:309
SchDec
Definition: ffmpeg_sched.c:80
DEFAULT_PACKET_THREAD_QUEUE_SIZE
#define DEFAULT_PACKET_THREAD_QUEUE_SIZE
Default size of a packet thread queue.
Definition: ffmpeg_sched.h:255
QueueType
QueueType
Definition: ffmpeg_sched.c:47
THREAD_QUEUE_FRAMES
@ THREAD_QUEUE_FRAMES
Definition: thread_queue.h:25
FFMIN
#define FFMIN(a, b)
Definition: macros.h:49
SchDec::nb_outputs
unsigned nb_outputs
Definition: ffmpeg_sched.c:86
av_frame_unref
void av_frame_unref(AVFrame *frame)
Unreference all the buffers referenced by frame and reset the frame fields.
Definition: frame.c:623
trailing_dts
static int64_t trailing_dts(const Scheduler *sch, int count_finished)
Definition: ffmpeg_sched.c:437
av_mallocz
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:256
SchFilterGraph::outputs
SchFilterOut * outputs
Definition: ffmpeg_sched.c:255
sch_enc_class
static const AVClass sch_enc_class
Definition: ffmpeg_sched.c:777
SchedulerNode
Definition: ffmpeg_sched.h:103
SCH_NODE_TYPE_DEC
@ SCH_NODE_TYPE_DEC
Definition: ffmpeg_sched.h:97
pthread_cond_t
Definition: os2threads.h:58
SchTask
Definition: ffmpeg_sched.c:63
mux_init
static int mux_init(Scheduler *sch, SchMux *mux)
Definition: ffmpeg_sched.c:1161
send_to_filter
static int send_to_filter(Scheduler *sch, SchFilterGraph *fg, unsigned in_idx, AVFrame *frame)
Definition: ffmpeg_sched.c:2186
tq_receive
int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
Read the next item from the queue.
Definition: thread_queue.c:193
SchDemuxStream::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:151
av_calloc
void * av_calloc(size_t nmemb, size_t size)
Definition: mem.c:264
sch_connect
int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
Definition: ffmpeg_sched.c:919
send_to_enc
static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1803
sch_filter_command
int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
Definition: ffmpeg_sched.c:2505
SchDemuxStream
Definition: ffmpeg_sched.c:150
Timestamp::tb
AVRational tb
Definition: ffmpeg_utils.h:32
atomic_int_least64_t
intptr_t atomic_int_least64_t
Definition: stdatomic.h:68
SchFilterIn::src_sched
SchedulerNode src_sched
Definition: ffmpeg_sched.c:238
ret
ret
Definition: filter_design.txt:187
sch_dec_receive
int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
Called by decoder tasks to receive a packet for decoding.
Definition: ffmpeg_sched.c:2157
AVClass::class_name
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:80
frame
these buffered frames must be flushed immediately if a new input produces new the filter must not call request_frame to get more It must just process the frame or queue it The task of requesting more frames is left to the filter s request_frame method or the application If a filter has several the filter must be ready for frames arriving randomly on any input any filter with several inputs will most likely require some kind of queuing mechanism It is perfectly acceptable to have a limited queue and to drop frames when the inputs are too unbalanced request_frame For filters that do not use the this method is called when a frame is wanted on an output For a it should directly call filter_frame on the corresponding output For a if there are queued frames already one of these frames should be pushed If the filter should request a frame on one of its repeatedly until at least one frame has been pushed Return or at least make progress towards producing a frame
Definition: filter_design.txt:264
SchMuxStream::init_eof
int init_eof
Definition: ffmpeg_sched.c:200
mux_queue_packet
static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt)
Definition: ffmpeg_sched.c:1823
SchMux::init
int(* init)(void *arg)
Definition: ffmpeg_sched.c:220
sch_demux_class
static const AVClass sch_demux_class
Definition: ffmpeg_sched.c:674
ThreadQueue
Definition: thread_queue.c:40
av_fifo_alloc2
AVFifo * av_fifo_alloc2(size_t nb_elems, size_t elem_size, unsigned int flags)
Allocate and initialize an AVFifo with a given element size.
Definition: fifo.c:47
pthread_cond_signal
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:152
task_wrapper
static void * task_wrapper(void *arg)
Definition: ffmpeg_sched.c:2527
SchMux::task
SchTask task
Definition: ffmpeg_sched.c:222
SyncQueue
A sync queue provides timestamp synchronization between multiple streams.
Definition: sync_queue.c:90
sch_demux_send
int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt, unsigned flags)
Called by demuxer tasks to communicate with their downstreams.
Definition: ffmpeg_sched.c:2029
SchDemux
Definition: ffmpeg_sched.c:156
Scheduler::dec
SchDec * dec
Definition: ffmpeg_sched.c:293
atomic_uint
intptr_t atomic_uint
Definition: stdatomic.h:56
SchDec::queue_end_ts
AVThreadMessageQueue * queue_end_ts
Definition: ffmpeg_sched.c:93
demux_stream_send_to_dst
static int demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
Definition: ffmpeg_sched.c:1918
SchDec::src
SchedulerNode src
Definition: ffmpeg_sched.c:83
thread_queue.h
AVPacket::stream_index
int stream_index
Definition: packet.h:541
GROW_ARRAY
#define GROW_ARRAY(array, nb_elems)
Definition: cmdutils.h:532
pthread_cond_wait
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:192
SchMux::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:230
SchDemux::waiter
SchWaiter waiter
Definition: ffmpeg_sched.c:163
av_gettime
int64_t av_gettime(void)
Get the current time in microseconds.
Definition: time.c:39
waiter_wait
static int waiter_wait(Scheduler *sch, SchWaiter *w)
Wait until this task is allowed to proceed.
Definition: ffmpeg_sched.c:322
av_strdup
char * av_strdup(const char *s)
Duplicate a string.
Definition: mem.c:272
SchSyncQueue::frame
AVFrame * frame
Definition: ffmpeg_sched.c:102
Scheduler::task_failed
unsigned task_failed
Definition: ffmpeg_sched.c:288
SchTask::node
SchedulerNode node
Definition: ffmpeg_sched.c:65
sch_sq_add_enc
int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx, int limiting, uint64_t max_frames)
Definition: ffmpeg_sched.c:888
print_sdp
int print_sdp(const char *filename)
Definition: ffmpeg_mux.c:507
mem.h
start_prepare
static int start_prepare(Scheduler *sch)
Definition: ffmpeg_sched.c:1446
sch_mux_sub_heartbeat_add
int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, unsigned dec_idx)
Definition: ffmpeg_sched.c:1238
SchedulerNode::idx
unsigned idx
Definition: ffmpeg_sched.h:105
sch_filter_receive_finish
void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
Called by filter tasks to signal that a filter input will no longer accept input.
Definition: ffmpeg_sched.c:2439
ffmpeg_sched.h
sch_add_dec_output
int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
Add another output to decoder (e.g.
Definition: ffmpeg_sched.c:719
SchEnc::src
SchedulerNode src
Definition: ffmpeg_sched.c:112
sch_wait
int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
Wait until transcoding terminates or the specified timeout elapses.
Definition: ffmpeg_sched.c:1657
AVPacket
This structure stores compressed data.
Definition: packet.h:516
av_thread_message_queue_free
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
Free a message queue.
Definition: threadmessage.c:96
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:34
cmdutils.h
SchSyncQueue
Definition: ffmpeg_sched.c:100
SchMux::queue_size
unsigned queue_size
Definition: ffmpeg_sched.c:231
SchTask::parent
Scheduler * parent
Definition: ffmpeg_sched.c:64
SchDec::send_frame
AVFrame * send_frame
Definition: ffmpeg_sched.c:97
queue_alloc
static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size, enum QueueType type)
Definition: ffmpeg_sched.c:374
sch_start
int sch_start(Scheduler *sch)
Definition: ffmpeg_sched.c:1591
flags
#define flags(name, subs,...)
Definition: cbs_av1.c:482
av_thread_message_queue_set_err_recv
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err)
Set the receiving error code.
Definition: threadmessage.c:204
pthread_cond_timedwait
static av_always_inline int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime)
Definition: os2threads.h:170
av_log
#define av_log(a,...)
Definition: tableprint_vlc.h:27
sch_mux_sub_heartbeat
int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, const AVPacket *pkt)
Definition: ffmpeg_sched.c:2104
av_fifo_freep2
void av_fifo_freep2(AVFifo **f)
Free an AVFifo and reset pointer to NULL.
Definition: fifo.c:286
SchDecOutput::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:75
SchEnc::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:142
pthread_cond_init
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:133
AVERROR_EXIT
#define AVERROR_EXIT
Immediate exit was requested; the called function should not be restarted.
Definition: error.h:58
SYNC_QUEUE_FRAMES
@ SYNC_QUEUE_FRAMES
Definition: sync_queue.h:30
sq_alloc
SyncQueue * sq_alloc(enum SyncQueueType type, int64_t buf_size_us, void *logctx)
Allocate a sync queue of the given type.
Definition: sync_queue.c:654
atomic_init
#define atomic_init(obj, value)
Definition: stdatomic.h:33
SchEnc::task
SchTask task
Definition: ffmpeg_sched.c:140
Timestamp
Definition: ffmpeg_utils.h:30
SchFilterIn::src
SchedulerNode src
Definition: ffmpeg_sched.c:237
sch_mux_stream_buffering
void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, size_t data_threshold, int max_packets)
Configure limits on packet buffering performed before the muxer task is started.
Definition: ffmpeg_sched.c:1197
check_acyclic
static int check_acyclic(Scheduler *sch)
Definition: ffmpeg_sched.c:1410
SchDemux::streams
SchDemuxStream * streams
Definition: ffmpeg_sched.c:159
PreMuxQueue
Definition: ffmpeg_sched.c:172
Scheduler::sdp_auto
int sdp_auto
Definition: ffmpeg_sched.c:306
src
#define src
Definition: vp8dsp.c:248
SchFilterIn::send_finished
int send_finished
Definition: ffmpeg_sched.c:239
SchFilterGraph::waiter
SchWaiter waiter
Definition: ffmpeg_sched.c:262
AVPacket::time_base
AVRational time_base
Time base of the packet's timestamps.
Definition: packet.h:583
unchoke_for_stream
static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
Definition: ffmpeg_sched.c:1267
AVPacket::side_data_elems
int side_data_elems
Definition: packet.h:551
sch_mux_receive
int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
Called by muxer tasks to obtain packets for muxing.
Definition: ffmpeg_sched.c:2073
sch_add_sq_enc
int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
Add an pre-encoding sync queue to the scheduler.
Definition: ffmpeg_sched.c:863
pthread_mutex_lock
#define pthread_mutex_lock(a)
Definition: ffprobe.c:78
SchDecOutput::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:77
SchEnc::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:115
tq_send_finish
void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the sending side.
Definition: thread_queue.c:223
Scheduler::nb_filters
unsigned nb_filters
Definition: ffmpeg_sched.c:303