FFmpeg
ffmpeg_sched.c
Go to the documentation of this file.
1 /*
2  * Inter-thread scheduling/synchronization.
3  * Copyright (c) 2023 Anton Khirnov
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <stdatomic.h>
23 #include <stddef.h>
24 #include <stdint.h>
25 
26 #include "cmdutils.h"
27 #include "ffmpeg_sched.h"
28 #include "ffmpeg_utils.h"
29 #include "sync_queue.h"
30 #include "thread_queue.h"
31 
32 #include "libavcodec/packet.h"
33 
34 #include "libavutil/avassert.h"
35 #include "libavutil/error.h"
36 #include "libavutil/fifo.h"
37 #include "libavutil/frame.h"
38 #include "libavutil/mem.h"
39 #include "libavutil/thread.h"
41 #include "libavutil/time.h"
42 
43 // 100 ms
44 // FIXME: some other value? make this dynamic?
45 #define SCHEDULE_TOLERANCE (100 * 1000)
46 
47 enum QueueType {
50 };
51 
52 typedef struct SchWaiter {
56 
57  // the following are internal state of schedule_update_locked() and must not
58  // be accessed outside of it
61 } SchWaiter;
62 
63 typedef struct SchTask {
66 
68  void *func_arg;
69 
72 } SchTask;
73 
74 typedef struct SchDecOutput {
76  uint8_t *dst_finished;
77  unsigned nb_dst;
78 } SchDecOutput;
79 
80 typedef struct SchDec {
81  const AVClass *class;
82 
84 
86  unsigned nb_outputs;
87 
89  // Queue for receiving input packets, one stream.
91 
92  // Queue for sending post-flush end timestamps back to the source
95 
96  // temporary storage used by sch_dec_send()
98 } SchDec;
99 
100 typedef struct SchSyncQueue {
104 
105  unsigned *enc_idx;
106  unsigned nb_enc_idx;
107 } SchSyncQueue;
108 
109 typedef struct SchEnc {
110  const AVClass *class;
111 
114  uint8_t *dst_finished;
115  unsigned nb_dst;
116 
117  // [0] - index of the sync queue in Scheduler.sq_enc,
118  // [1] - index of this encoder in the sq
119  int sq_idx[2];
120 
121  /* Opening encoders is somewhat nontrivial due to their interaction with
122  * sync queues, which are (among other things) responsible for maintaining
123  * constant audio frame size, when it is required by the encoder.
124  *
125  * Opening the encoder requires stream parameters, obtained from the first
126  * frame. However, that frame cannot be properly chunked by the sync queue
127  * without knowing the required frame size, which is only available after
128  * opening the encoder.
129  *
130  * This apparent circular dependency is resolved in the following way:
131  * - the caller creating the encoder gives us a callback which opens the
132  * encoder and returns the required frame size (if any)
133  * - when the first frame is sent to the encoder, the sending thread
134  * - calls this callback, opening the encoder
135  * - passes the returned frame size to the sync queue
136  */
137  int (*open_cb)(void *opaque, const AVFrame *frame);
138  int opened;
139 
141  // Queue for receiving input frames, one stream.
143  // tq_send() to queue returned EOF
145 
146  // temporary storage used by sch_enc_send()
148 } SchEnc;
149 
150 typedef struct SchDemuxStream {
152  uint8_t *dst_finished;
153  unsigned nb_dst;
155 
156 typedef struct SchDemux {
157  const AVClass *class;
158 
160  unsigned nb_streams;
161 
164 
165  // temporary storage used by sch_demux_send()
167 
168  // protected by schedule_lock
170 } SchDemux;
171 
172 typedef struct PreMuxQueue {
173  /**
174  * Queue for buffering the packets before the muxer task can be started.
175  */
177  /**
178  * Maximum number of packets in fifo.
179  */
181  /*
182  * The size of the AVPackets' buffers in queue.
183  * Updated when a packet is either pushed or pulled from the queue.
184  */
185  size_t data_size;
186  /* Threshold after which max_packets will be in effect */
188 } PreMuxQueue;
189 
190 typedef struct SchMuxStream {
192 
193  unsigned *sub_heartbeat_dst;
195 
197 
198  // an EOF was generated while flushing the pre-mux queue
199  int init_eof;
200 
201  ////////////////////////////////////////////////////////////
202  // The following are protected by Scheduler.schedule_lock //
203 
204  /* dts+duration of the last packet sent to this stream
205  in AV_TIME_BASE_Q */
207  // this stream no longer accepts input
209  ////////////////////////////////////////////////////////////
210 } SchMuxStream;
211 
212 typedef struct SchMux {
213  const AVClass *class;
214 
216  unsigned nb_streams;
218 
219  int (*init)(void *arg);
220 
222  /**
223  * Set to 1 after starting the muxer task and flushing the
224  * pre-muxing queues.
225  * Set either before any tasks have started, or with
226  * Scheduler.mux_ready_lock held.
227  */
230  unsigned queue_size;
231 
233 } SchMux;
234 
235 typedef struct SchFilterIn {
239 } SchFilterIn;
240 
241 typedef struct SchFilterOut {
243 } SchFilterOut;
244 
245 typedef struct SchFilterGraph {
246  const AVClass *class;
247 
249  unsigned nb_inputs;
252 
254  unsigned nb_outputs;
255 
257  // input queue, nb_inputs+1 streams
258  // last stream is control
261 
262  // protected by schedule_lock
263  unsigned best_input;
266 
271 };
272 
273 struct Scheduler {
274  const AVClass *class;
275 
277  unsigned nb_demux;
278 
280  unsigned nb_mux;
281 
282  unsigned nb_mux_ready;
284 
285  unsigned nb_mux_done;
286  unsigned task_failed;
289 
290 
292  unsigned nb_dec;
293 
295  unsigned nb_enc;
296 
298  unsigned nb_sq_enc;
299 
301  unsigned nb_filters;
302 
304  int sdp_auto;
305 
308 
310 
312 };
313 
314 /**
315  * Wait until this task is allowed to proceed.
316  *
317  * @retval 0 the caller should proceed
318  * @retval 1 the caller should terminate
319  */
320 static int waiter_wait(Scheduler *sch, SchWaiter *w)
321 {
322  int terminate;
323 
324  if (!atomic_load(&w->choked))
325  return 0;
326 
327  pthread_mutex_lock(&w->lock);
328 
329  while (atomic_load(&w->choked) && !atomic_load(&sch->terminate))
330  pthread_cond_wait(&w->cond, &w->lock);
331 
332  terminate = atomic_load(&sch->terminate);
333 
334  pthread_mutex_unlock(&w->lock);
335 
336  return terminate;
337 }
338 
339 static void waiter_set(SchWaiter *w, int choked)
340 {
341  pthread_mutex_lock(&w->lock);
342 
343  atomic_store(&w->choked, choked);
344  pthread_cond_signal(&w->cond);
345 
346  pthread_mutex_unlock(&w->lock);
347 }
348 
349 static int waiter_init(SchWaiter *w)
350 {
351  int ret;
352 
353  atomic_init(&w->choked, 0);
354 
355  ret = pthread_mutex_init(&w->lock, NULL);
356  if (ret)
357  return AVERROR(ret);
358 
359  ret = pthread_cond_init(&w->cond, NULL);
360  if (ret)
361  return AVERROR(ret);
362 
363  return 0;
364 }
365 
366 static void waiter_uninit(SchWaiter *w)
367 {
368  pthread_mutex_destroy(&w->lock);
369  pthread_cond_destroy(&w->cond);
370 }
371 
372 static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size,
373  enum QueueType type)
374 {
375  ThreadQueue *tq;
376 
377  if (queue_size <= 0) {
378  if (type == QUEUE_FRAMES)
379  queue_size = DEFAULT_FRAME_THREAD_QUEUE_SIZE;
380  else
382  }
383 
384  if (type == QUEUE_FRAMES) {
385  // This queue length is used in the decoder code to ensure that
386  // there are enough entries in fixed-size frame pools to account
387  // for frames held in queues inside the ffmpeg utility. If this
388  // can ever dynamically change then the corresponding decode
389  // code needs to be updated as well.
391  }
392 
393  tq = tq_alloc(nb_streams, queue_size,
395  if (!tq)
396  return AVERROR(ENOMEM);
397 
398  *ptq = tq;
399  return 0;
400 }
401 
402 static void *task_wrapper(void *arg);
403 
404 static int task_start(SchTask *task)
405 {
406  int ret;
407 
408  if (!task->parent)
409  return 0;
410 
411  av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n");
412 
413  av_assert0(!task->thread_running);
414 
415  ret = pthread_create(&task->thread, NULL, task_wrapper, task);
416  if (ret) {
417  av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n",
418  strerror(ret));
419  return AVERROR(ret);
420  }
421 
422  task->thread_running = 1;
423  return 0;
424 }
425 
426 static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx,
427  SchThreadFunc func, void *func_arg)
428 {
429  task->parent = sch;
430 
431  task->node.type = type;
432  task->node.idx = idx;
433 
434  task->func = func;
435  task->func_arg = func_arg;
436 }
437 
438 static int64_t trailing_dts(const Scheduler *sch)
439 {
440  int64_t min_dts = INT64_MAX;
441 
442  for (unsigned i = 0; i < sch->nb_mux; i++) {
443  const SchMux *mux = &sch->mux[i];
444 
445  for (unsigned j = 0; j < mux->nb_streams; j++) {
446  const SchMuxStream *ms = &mux->streams[j];
447 
448  if (ms->source_finished)
449  continue;
450  if (ms->last_dts == AV_NOPTS_VALUE)
451  return AV_NOPTS_VALUE;
452 
453  min_dts = FFMIN(min_dts, ms->last_dts);
454  }
455  }
456 
457  return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts;
458 }
459 
460 static int64_t progressing_dts(const Scheduler *sch, int count_finished)
461 {
462  int64_t max_dts = INT64_MIN;
463 
464  for (unsigned i = 0; i < sch->nb_mux; i++) {
465  const SchMux *mux = &sch->mux[i];
466 
467  for (unsigned j = 0; j < mux->nb_streams; j++) {
468  const SchMuxStream *ms = &mux->streams[j];
469 
470  if (ms->source_finished && !count_finished)
471  continue;
472  if (ms->last_dts != AV_NOPTS_VALUE)
473  max_dts = FFMAX(max_dts, ms->last_dts);
474  }
475  }
476 
477  return max_dts == INT64_MIN ? AV_NOPTS_VALUE : max_dts;
478 }
479 
481 {
482  SchFilterGraph *fg = &sch->filters[idx];
483 
485  memset(&fg->task, 0, sizeof(fg->task));
486 
487  tq_free(&fg->queue);
488 
489  av_freep(&fg->inputs);
490  fg->nb_inputs = 0;
491  av_freep(&fg->outputs);
492  fg->nb_outputs = 0;
493 
494  fg->task_exited = 1;
495 }
496 
497 void sch_free(Scheduler **psch)
498 {
499  Scheduler *sch = *psch;
500 
501  if (!sch)
502  return;
503 
504  sch_stop(sch, NULL);
505 
506  for (unsigned i = 0; i < sch->nb_demux; i++) {
507  SchDemux *d = &sch->demux[i];
508 
509  for (unsigned j = 0; j < d->nb_streams; j++) {
510  SchDemuxStream *ds = &d->streams[j];
511  av_freep(&ds->dst);
512  av_freep(&ds->dst_finished);
513  }
514  av_freep(&d->streams);
515 
517 
518  waiter_uninit(&d->waiter);
519  }
520  av_freep(&sch->demux);
521 
522  for (unsigned i = 0; i < sch->nb_mux; i++) {
523  SchMux *mux = &sch->mux[i];
524 
525  for (unsigned j = 0; j < mux->nb_streams; j++) {
526  SchMuxStream *ms = &mux->streams[j];
527 
528  if (ms->pre_mux_queue.fifo) {
529  AVPacket *pkt;
530  while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0)
533  }
534 
536  }
537  av_freep(&mux->streams);
538 
540 
541  tq_free(&mux->queue);
542  }
543  av_freep(&sch->mux);
544 
545  for (unsigned i = 0; i < sch->nb_dec; i++) {
546  SchDec *dec = &sch->dec[i];
547 
548  tq_free(&dec->queue);
549 
551 
552  for (unsigned j = 0; j < dec->nb_outputs; j++) {
553  SchDecOutput *o = &dec->outputs[j];
554 
555  av_freep(&o->dst);
556  av_freep(&o->dst_finished);
557  }
558 
559  av_freep(&dec->outputs);
560 
561  av_frame_free(&dec->send_frame);
562  }
563  av_freep(&sch->dec);
564 
565  for (unsigned i = 0; i < sch->nb_enc; i++) {
566  SchEnc *enc = &sch->enc[i];
567 
568  tq_free(&enc->queue);
569 
570  av_packet_free(&enc->send_pkt);
571 
572  av_freep(&enc->dst);
573  av_freep(&enc->dst_finished);
574  }
575  av_freep(&sch->enc);
576 
577  for (unsigned i = 0; i < sch->nb_sq_enc; i++) {
578  SchSyncQueue *sq = &sch->sq_enc[i];
579  sq_free(&sq->sq);
580  av_frame_free(&sq->frame);
582  av_freep(&sq->enc_idx);
583  }
584  av_freep(&sch->sq_enc);
585 
586  for (unsigned i = 0; i < sch->nb_filters; i++) {
587  SchFilterGraph *fg = &sch->filters[i];
588 
589  tq_free(&fg->queue);
590 
591  av_freep(&fg->inputs);
592  av_freep(&fg->outputs);
593 
594  waiter_uninit(&fg->waiter);
595  }
596  av_freep(&sch->filters);
597 
598  av_freep(&sch->sdp_filename);
599 
601 
603 
606 
607  av_freep(psch);
608 }
609 
610 static const AVClass scheduler_class = {
611  .class_name = "Scheduler",
612  .version = LIBAVUTIL_VERSION_INT,
613 };
614 
616 {
617  Scheduler *sch;
618  int ret;
619 
620  sch = av_mallocz(sizeof(*sch));
621  if (!sch)
622  return NULL;
623 
624  sch->class = &scheduler_class;
625  sch->sdp_auto = 1;
626 
628  if (ret)
629  goto fail;
630 
632  if (ret)
633  goto fail;
634 
636  if (ret)
637  goto fail;
638 
640  if (ret)
641  goto fail;
642 
643  return sch;
644 fail:
645  sch_free(&sch);
646  return NULL;
647 }
648 
649 int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
650 {
651  av_freep(&sch->sdp_filename);
652  sch->sdp_filename = av_strdup(sdp_filename);
653  return sch->sdp_filename ? 0 : AVERROR(ENOMEM);
654 }
655 
656 static const AVClass sch_mux_class = {
657  .class_name = "SchMux",
658  .version = LIBAVUTIL_VERSION_INT,
659  .parent_log_context_offset = offsetof(SchMux, task.func_arg),
660 };
661 
662 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
663  void *arg, int sdp_auto, unsigned thread_queue_size)
664 {
665  const unsigned idx = sch->nb_mux;
666 
667  SchMux *mux;
668  int ret;
669 
670  ret = GROW_ARRAY(sch->mux, sch->nb_mux);
671  if (ret < 0)
672  return ret;
673 
674  mux = &sch->mux[idx];
675  mux->class = &sch_mux_class;
676  mux->init = init;
677  mux->queue_size = thread_queue_size;
678 
679  task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
680 
681  sch->sdp_auto &= sdp_auto;
682 
683  return idx;
684 }
685 
686 int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
687 {
688  SchMux *mux;
689  SchMuxStream *ms;
690  unsigned stream_idx;
691  int ret;
692 
693  av_assert0(mux_idx < sch->nb_mux);
694  mux = &sch->mux[mux_idx];
695 
696  ret = GROW_ARRAY(mux->streams, mux->nb_streams);
697  if (ret < 0)
698  return ret;
699  stream_idx = mux->nb_streams - 1;
700 
701  ms = &mux->streams[stream_idx];
702 
703  ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
704  if (!ms->pre_mux_queue.fifo)
705  return AVERROR(ENOMEM);
706 
707  ms->last_dts = AV_NOPTS_VALUE;
708 
709  return stream_idx;
710 }
711 
712 static const AVClass sch_demux_class = {
713  .class_name = "SchDemux",
714  .version = LIBAVUTIL_VERSION_INT,
715  .parent_log_context_offset = offsetof(SchDemux, task.func_arg),
716 };
717 
719 {
720  const unsigned idx = sch->nb_demux;
721 
722  SchDemux *d;
723  int ret;
724 
725  ret = GROW_ARRAY(sch->demux, sch->nb_demux);
726  if (ret < 0)
727  return ret;
728 
729  d = &sch->demux[idx];
730 
731  task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx);
732 
733  d->class = &sch_demux_class;
734  d->send_pkt = av_packet_alloc();
735  if (!d->send_pkt)
736  return AVERROR(ENOMEM);
737 
738  ret = waiter_init(&d->waiter);
739  if (ret < 0)
740  return ret;
741 
742  return idx;
743 }
744 
745 int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
746 {
747  SchDemux *d;
748  int ret;
749 
750  av_assert0(demux_idx < sch->nb_demux);
751  d = &sch->demux[demux_idx];
752 
753  ret = GROW_ARRAY(d->streams, d->nb_streams);
754  return ret < 0 ? ret : d->nb_streams - 1;
755 }
756 
757 int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
758 {
759  SchDec *dec;
760  int ret;
761 
762  av_assert0(dec_idx < sch->nb_dec);
763  dec = &sch->dec[dec_idx];
764 
765  ret = GROW_ARRAY(dec->outputs, dec->nb_outputs);
766  if (ret < 0)
767  return ret;
768 
769  return dec->nb_outputs - 1;
770 }
771 
772 static const AVClass sch_dec_class = {
773  .class_name = "SchDec",
774  .version = LIBAVUTIL_VERSION_INT,
775  .parent_log_context_offset = offsetof(SchDec, task.func_arg),
776 };
777 
778 int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
779 {
780  const unsigned idx = sch->nb_dec;
781 
782  SchDec *dec;
783  int ret;
784 
785  ret = GROW_ARRAY(sch->dec, sch->nb_dec);
786  if (ret < 0)
787  return ret;
788 
789  dec = &sch->dec[idx];
790 
791  task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx);
792 
793  dec->class = &sch_dec_class;
794  dec->send_frame = av_frame_alloc();
795  if (!dec->send_frame)
796  return AVERROR(ENOMEM);
797 
798  ret = sch_add_dec_output(sch, idx);
799  if (ret < 0)
800  return ret;
801 
802  ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
803  if (ret < 0)
804  return ret;
805 
806  if (send_end_ts) {
808  if (ret < 0)
809  return ret;
810  }
811 
812  return idx;
813 }
814 
815 static const AVClass sch_enc_class = {
816  .class_name = "SchEnc",
817  .version = LIBAVUTIL_VERSION_INT,
818  .parent_log_context_offset = offsetof(SchEnc, task.func_arg),
819 };
820 
822  int (*open_cb)(void *opaque, const AVFrame *frame))
823 {
824  const unsigned idx = sch->nb_enc;
825 
826  SchEnc *enc;
827  int ret;
828 
829  ret = GROW_ARRAY(sch->enc, sch->nb_enc);
830  if (ret < 0)
831  return ret;
832 
833  enc = &sch->enc[idx];
834 
835  enc->class = &sch_enc_class;
836  enc->open_cb = open_cb;
837  enc->sq_idx[0] = -1;
838  enc->sq_idx[1] = -1;
839 
840  task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
841 
842  enc->send_pkt = av_packet_alloc();
843  if (!enc->send_pkt)
844  return AVERROR(ENOMEM);
845 
846  ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
847  if (ret < 0)
848  return ret;
849 
850  return idx;
851 }
852 
853 static const AVClass sch_fg_class = {
854  .class_name = "SchFilterGraph",
855  .version = LIBAVUTIL_VERSION_INT,
856  .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg),
857 };
858 
859 int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
860  SchThreadFunc func, void *ctx)
861 {
862  const unsigned idx = sch->nb_filters;
863 
864  SchFilterGraph *fg;
865  int ret;
866 
867  ret = GROW_ARRAY(sch->filters, sch->nb_filters);
868  if (ret < 0)
869  return ret;
870  fg = &sch->filters[idx];
871 
872  fg->class = &sch_fg_class;
873 
874  task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx);
875 
876  if (nb_inputs) {
877  fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs));
878  if (!fg->inputs)
879  return AVERROR(ENOMEM);
880  fg->nb_inputs = nb_inputs;
881  }
882 
883  if (nb_outputs) {
884  fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs));
885  if (!fg->outputs)
886  return AVERROR(ENOMEM);
887  fg->nb_outputs = nb_outputs;
888  }
889 
890  ret = waiter_init(&fg->waiter);
891  if (ret < 0)
892  return ret;
893 
894  ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
895  if (ret < 0)
896  return ret;
897 
898  return idx;
899 }
900 
901 int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
902 {
903  SchSyncQueue *sq;
904  int ret;
905 
906  ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc);
907  if (ret < 0)
908  return ret;
909  sq = &sch->sq_enc[sch->nb_sq_enc - 1];
910 
911  sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx);
912  if (!sq->sq)
913  return AVERROR(ENOMEM);
914 
915  sq->frame = av_frame_alloc();
916  if (!sq->frame)
917  return AVERROR(ENOMEM);
918 
919  ret = pthread_mutex_init(&sq->lock, NULL);
920  if (ret)
921  return AVERROR(ret);
922 
923  return sq - sch->sq_enc;
924 }
925 
926 int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
927  int limiting, uint64_t max_frames)
928 {
929  SchSyncQueue *sq;
930  SchEnc *enc;
931  int ret;
932 
933  av_assert0(sq_idx < sch->nb_sq_enc);
934  sq = &sch->sq_enc[sq_idx];
935 
936  av_assert0(enc_idx < sch->nb_enc);
937  enc = &sch->enc[enc_idx];
938 
939  ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx);
940  if (ret < 0)
941  return ret;
942  sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx;
943 
944  ret = sq_add_stream(sq->sq, limiting);
945  if (ret < 0)
946  return ret;
947 
948  enc->sq_idx[0] = sq_idx;
949  enc->sq_idx[1] = ret;
950 
951  if (max_frames != INT64_MAX)
952  sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames);
953 
954  return 0;
955 }
956 
958 {
959  int ret;
960 
961  switch (src.type) {
962  case SCH_NODE_TYPE_DEMUX: {
963  SchDemuxStream *ds;
964 
965  av_assert0(src.idx < sch->nb_demux &&
966  src.idx_stream < sch->demux[src.idx].nb_streams);
967  ds = &sch->demux[src.idx].streams[src.idx_stream];
968 
969  ret = GROW_ARRAY(ds->dst, ds->nb_dst);
970  if (ret < 0)
971  return ret;
972 
973  ds->dst[ds->nb_dst - 1] = dst;
974 
975  // demuxed packets go to decoding or streamcopy
976  switch (dst.type) {
977  case SCH_NODE_TYPE_DEC: {
978  SchDec *dec;
979 
980  av_assert0(dst.idx < sch->nb_dec);
981  dec = &sch->dec[dst.idx];
982 
983  av_assert0(!dec->src.type);
984  dec->src = src;
985  break;
986  }
987  case SCH_NODE_TYPE_MUX: {
988  SchMuxStream *ms;
989 
990  av_assert0(dst.idx < sch->nb_mux &&
991  dst.idx_stream < sch->mux[dst.idx].nb_streams);
992  ms = &sch->mux[dst.idx].streams[dst.idx_stream];
993 
994  av_assert0(!ms->src.type);
995  ms->src = src;
996 
997  break;
998  }
999  default: av_assert0(0);
1000  }
1001 
1002  break;
1003  }
1004  case SCH_NODE_TYPE_DEC: {
1005  SchDec *dec;
1006  SchDecOutput *o;
1007 
1008  av_assert0(src.idx < sch->nb_dec);
1009  dec = &sch->dec[src.idx];
1010 
1011  av_assert0(src.idx_stream < dec->nb_outputs);
1012  o = &dec->outputs[src.idx_stream];
1013 
1014  ret = GROW_ARRAY(o->dst, o->nb_dst);
1015  if (ret < 0)
1016  return ret;
1017 
1018  o->dst[o->nb_dst - 1] = dst;
1019 
1020  // decoded frames go to filters or encoding
1021  switch (dst.type) {
1022  case SCH_NODE_TYPE_FILTER_IN: {
1023  SchFilterIn *fi;
1024 
1025  av_assert0(dst.idx < sch->nb_filters &&
1026  dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1027  fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1028 
1029  av_assert0(!fi->src.type);
1030  fi->src = src;
1031  break;
1032  }
1033  case SCH_NODE_TYPE_ENC: {
1034  SchEnc *enc;
1035 
1036  av_assert0(dst.idx < sch->nb_enc);
1037  enc = &sch->enc[dst.idx];
1038 
1039  av_assert0(!enc->src.type);
1040  enc->src = src;
1041  break;
1042  }
1043  default: av_assert0(0);
1044  }
1045 
1046  break;
1047  }
1048  case SCH_NODE_TYPE_FILTER_OUT: {
1049  SchFilterOut *fo;
1050 
1051  av_assert0(src.idx < sch->nb_filters &&
1052  src.idx_stream < sch->filters[src.idx].nb_outputs);
1053  fo = &sch->filters[src.idx].outputs[src.idx_stream];
1054 
1055  av_assert0(!fo->dst.type);
1056  fo->dst = dst;
1057 
1058  // filtered frames go to encoding or another filtergraph
1059  switch (dst.type) {
1060  case SCH_NODE_TYPE_ENC: {
1061  SchEnc *enc;
1062 
1063  av_assert0(dst.idx < sch->nb_enc);
1064  enc = &sch->enc[dst.idx];
1065 
1066  av_assert0(!enc->src.type);
1067  enc->src = src;
1068  break;
1069  }
1070  case SCH_NODE_TYPE_FILTER_IN: {
1071  SchFilterIn *fi;
1072 
1073  av_assert0(dst.idx < sch->nb_filters &&
1074  dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1075  fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1076 
1077  av_assert0(!fi->src.type);
1078  fi->src = src;
1079  break;
1080  }
1081  default: av_assert0(0);
1082  }
1083 
1084 
1085  break;
1086  }
1087  case SCH_NODE_TYPE_ENC: {
1088  SchEnc *enc;
1089 
1090  av_assert0(src.idx < sch->nb_enc);
1091  enc = &sch->enc[src.idx];
1092 
1093  ret = GROW_ARRAY(enc->dst, enc->nb_dst);
1094  if (ret < 0)
1095  return ret;
1096 
1097  enc->dst[enc->nb_dst - 1] = dst;
1098 
1099  // encoding packets go to muxing or decoding
1100  switch (dst.type) {
1101  case SCH_NODE_TYPE_MUX: {
1102  SchMuxStream *ms;
1103 
1104  av_assert0(dst.idx < sch->nb_mux &&
1105  dst.idx_stream < sch->mux[dst.idx].nb_streams);
1106  ms = &sch->mux[dst.idx].streams[dst.idx_stream];
1107 
1108  av_assert0(!ms->src.type);
1109  ms->src = src;
1110 
1111  break;
1112  }
1113  case SCH_NODE_TYPE_DEC: {
1114  SchDec *dec;
1115 
1116  av_assert0(dst.idx < sch->nb_dec);
1117  dec = &sch->dec[dst.idx];
1118 
1119  av_assert0(!dec->src.type);
1120  dec->src = src;
1121 
1122  break;
1123  }
1124  default: av_assert0(0);
1125  }
1126 
1127  break;
1128  }
1129  default: av_assert0(0);
1130  }
1131 
1132  return 0;
1133 }
1134 
1135 static int mux_task_start(SchMux *mux)
1136 {
1137  int ret = 0;
1138 
1139  ret = task_start(&mux->task);
1140  if (ret < 0)
1141  return ret;
1142 
1143  /* flush the pre-muxing queues */
1144  while (1) {
1145  int min_stream = -1;
1146  Timestamp min_ts = { .ts = AV_NOPTS_VALUE };
1147 
1148  AVPacket *pkt;
1149 
1150  // find the stream with the earliest dts or EOF in pre-muxing queue
1151  for (unsigned i = 0; i < mux->nb_streams; i++) {
1152  SchMuxStream *ms = &mux->streams[i];
1153 
1154  if (av_fifo_peek(ms->pre_mux_queue.fifo, &pkt, 1, 0) < 0)
1155  continue;
1156 
1157  if (!pkt || pkt->dts == AV_NOPTS_VALUE) {
1158  min_stream = i;
1159  break;
1160  }
1161 
1162  if (min_ts.ts == AV_NOPTS_VALUE ||
1163  av_compare_ts(min_ts.ts, min_ts.tb, pkt->dts, pkt->time_base) > 0) {
1164  min_stream = i;
1165  min_ts = (Timestamp){ .ts = pkt->dts, .tb = pkt->time_base };
1166  }
1167  }
1168 
1169  if (min_stream >= 0) {
1170  SchMuxStream *ms = &mux->streams[min_stream];
1171 
1172  ret = av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1);
1173  av_assert0(ret >= 0);
1174 
1175  if (pkt) {
1176  if (!ms->init_eof)
1177  ret = tq_send(mux->queue, min_stream, pkt);
1178  av_packet_free(&pkt);
1179  if (ret == AVERROR_EOF)
1180  ms->init_eof = 1;
1181  else if (ret < 0)
1182  return ret;
1183  } else
1184  tq_send_finish(mux->queue, min_stream);
1185 
1186  continue;
1187  }
1188 
1189  break;
1190  }
1191 
1192  atomic_store(&mux->mux_started, 1);
1193 
1194  return 0;
1195 }
1196 
1197 int print_sdp(const char *filename);
1198 
1199 static int mux_init(Scheduler *sch, SchMux *mux)
1200 {
1201  int ret;
1202 
1203  ret = mux->init(mux->task.func_arg);
1204  if (ret < 0)
1205  return ret;
1206 
1207  sch->nb_mux_ready++;
1208 
1209  if (sch->sdp_filename || sch->sdp_auto) {
1210  if (sch->nb_mux_ready < sch->nb_mux)
1211  return 0;
1212 
1213  ret = print_sdp(sch->sdp_filename);
1214  if (ret < 0) {
1215  av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n");
1216  return ret;
1217  }
1218 
1219  /* SDP is written only after all the muxers are ready, so now we
1220  * start ALL the threads */
1221  for (unsigned i = 0; i < sch->nb_mux; i++) {
1222  ret = mux_task_start(&sch->mux[i]);
1223  if (ret < 0)
1224  return ret;
1225  }
1226  } else {
1227  ret = mux_task_start(mux);
1228  if (ret < 0)
1229  return ret;
1230  }
1231 
1232  return 0;
1233 }
1234 
1235 void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1236  size_t data_threshold, int max_packets)
1237 {
1238  SchMux *mux;
1239  SchMuxStream *ms;
1240 
1241  av_assert0(mux_idx < sch->nb_mux);
1242  mux = &sch->mux[mux_idx];
1243 
1244  av_assert0(stream_idx < mux->nb_streams);
1245  ms = &mux->streams[stream_idx];
1246 
1247  ms->pre_mux_queue.max_packets = max_packets;
1248  ms->pre_mux_queue.data_threshold = data_threshold;
1249 }
1250 
1251 int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
1252 {
1253  SchMux *mux;
1254  int ret = 0;
1255 
1256  av_assert0(mux_idx < sch->nb_mux);
1257  mux = &sch->mux[mux_idx];
1258 
1259  av_assert0(stream_idx < mux->nb_streams);
1260 
1262 
1263  av_assert0(mux->nb_streams_ready < mux->nb_streams);
1264 
1265  // this may be called during initialization - do not start
1266  // threads before sch_start() is called
1267  if (++mux->nb_streams_ready == mux->nb_streams &&
1268  sch->state >= SCH_STATE_STARTED)
1269  ret = mux_init(sch, mux);
1270 
1272 
1273  return ret;
1274 }
1275 
1276 int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1277  unsigned dec_idx)
1278 {
1279  SchMux *mux;
1280  SchMuxStream *ms;
1281  int ret = 0;
1282 
1283  av_assert0(mux_idx < sch->nb_mux);
1284  mux = &sch->mux[mux_idx];
1285 
1286  av_assert0(stream_idx < mux->nb_streams);
1287  ms = &mux->streams[stream_idx];
1288 
1290  if (ret < 0)
1291  return ret;
1292 
1293  av_assert0(dec_idx < sch->nb_dec);
1294  ms->sub_heartbeat_dst[ms->nb_sub_heartbeat_dst - 1] = dec_idx;
1295 
1296  if (!mux->sub_heartbeat_pkt) {
1298  if (!mux->sub_heartbeat_pkt)
1299  return AVERROR(ENOMEM);
1300  }
1301 
1302  return 0;
1303 }
1304 
1305 static void unchoke_for_stream(Scheduler *sch, SchedulerNode src);
1306 
1307 // Unchoke any filter graphs that are downstream of this node, to prevent it
1308 // from getting stuck trying to push data to a full queue
1310 {
1311  SchFilterGraph *fg;
1312  SchDec *dec;
1313  SchEnc *enc;
1314  switch (dst->type) {
1315  case SCH_NODE_TYPE_DEC:
1316  dec = &sch->dec[dst->idx];
1317  for (int i = 0; i < dec->nb_outputs; i++)
1318  unchoke_downstream(sch, dec->outputs[i].dst);
1319  break;
1320  case SCH_NODE_TYPE_ENC:
1321  enc = &sch->enc[dst->idx];
1322  for (int i = 0; i < enc->nb_dst; i++)
1323  unchoke_downstream(sch, &enc->dst[i]);
1324  break;
1325  case SCH_NODE_TYPE_MUX:
1326  // muxers are never choked
1327  break;
1329  fg = &sch->filters[dst->idx];
1330  if (fg->best_input == fg->nb_inputs) {
1331  fg->waiter.choked_next = 0;
1332  } else {
1333  // ensure that this filter graph is not stuck waiting for
1334  // input from a different upstream demuxer
1335  unchoke_for_stream(sch, fg->inputs[fg->best_input].src);
1336  }
1337  break;
1338  default:
1339  av_unreachable("Invalid destination node type?");
1340  break;
1341  }
1342 }
1343 
1345 {
1346  while (1) {
1347  SchFilterGraph *fg;
1348  SchDemux *demux;
1349  switch (src.type) {
1350  case SCH_NODE_TYPE_DEMUX:
1351  // fed directly by a demuxer (i.e. not through a filtergraph)
1352  demux = &sch->demux[src.idx];
1353  if (demux->waiter.choked_next == 0)
1354  return; // prevent infinite loop
1355  demux->waiter.choked_next = 0;
1356  for (int i = 0; i < demux->nb_streams; i++)
1357  unchoke_downstream(sch, demux->streams[i].dst);
1358  return;
1359  case SCH_NODE_TYPE_DEC:
1360  src = sch->dec[src.idx].src;
1361  continue;
1362  case SCH_NODE_TYPE_ENC:
1363  src = sch->enc[src.idx].src;
1364  continue;
1366  fg = &sch->filters[src.idx];
1367  // the filtergraph contains internal sources and
1368  // requested to be scheduled directly
1369  if (fg->best_input == fg->nb_inputs) {
1370  fg->waiter.choked_next = 0;
1371  return;
1372  }
1373  src = fg->inputs[fg->best_input].src;
1374  continue;
1375  default:
1376  av_unreachable("Invalid source node type?");
1377  return;
1378  }
1379  }
1380 }
1381 
1382 static void choke_demux(const Scheduler *sch, int demux_id, int choked)
1383 {
1384  av_assert1(demux_id < sch->nb_demux);
1385  SchDemux *demux = &sch->demux[demux_id];
1386 
1387  for (int i = 0; i < demux->nb_streams; i++) {
1388  SchedulerNode *dst = demux->streams[i].dst;
1389  SchFilterGraph *fg;
1390 
1391  switch (dst->type) {
1392  case SCH_NODE_TYPE_DEC:
1393  tq_choke(sch->dec[dst->idx].queue, choked);
1394  break;
1395  case SCH_NODE_TYPE_ENC:
1396  tq_choke(sch->enc[dst->idx].queue, choked);
1397  break;
1398  case SCH_NODE_TYPE_MUX:
1399  break;
1401  fg = &sch->filters[dst->idx];
1402  if (fg->nb_inputs == 1)
1403  tq_choke(fg->queue, choked);
1404  break;
1405  default:
1406  av_unreachable("Invalid destination node type?");
1407  break;
1408  }
1409  }
1410 }
1411 
1413 {
1414  int64_t dts;
1415  int have_unchoked = 0;
1416 
1417  // on termination request all waiters are choked,
1418  // we are not to unchoke them
1419  if (atomic_load(&sch->terminate))
1420  return;
1421 
1422  dts = trailing_dts(sch);
1423 
1424  atomic_store(&sch->last_dts, progressing_dts(sch, 0));
1425 
1426  // initialize our internal state
1427  for (unsigned type = 0; type < 2; type++)
1428  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1429  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1430  w->choked_prev = atomic_load(&w->choked);
1431  w->choked_next = 1;
1432  }
1433 
1434  // figure out the sources that are allowed to proceed
1435  for (unsigned i = 0; i < sch->nb_mux; i++) {
1436  SchMux *mux = &sch->mux[i];
1437 
1438  for (unsigned j = 0; j < mux->nb_streams; j++) {
1439  SchMuxStream *ms = &mux->streams[j];
1440 
1441  // unblock sources for output streams that are not finished
1442  // and not too far ahead of the trailing stream
1443  if (ms->source_finished)
1444  continue;
1445  if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
1446  continue;
1447  if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
1448  continue;
1449 
1450  // resolve the source to unchoke
1451  unchoke_for_stream(sch, ms->src);
1452  have_unchoked = 1;
1453  }
1454  }
1455 
1456  // also unchoke any sources feeding into closed filter graph inputs, so
1457  // that they can observe the downstream EOF
1458  for (unsigned i = 0; i < sch->nb_filters; i++) {
1459  SchFilterGraph *fg = &sch->filters[i];
1460 
1461  for (unsigned j = 0; j < fg->nb_inputs; j++) {
1462  SchFilterIn *fi = &fg->inputs[j];
1463  if (fi->receive_finished && !fi->send_finished)
1464  unchoke_for_stream(sch, fi->src);
1465  }
1466  }
1467 
1468  // make sure to unchoke at least one source, if still available
1469  for (unsigned type = 0; !have_unchoked && type < 2; type++)
1470  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1471  int exited = type ? sch->filters[i].task_exited : sch->demux[i].task_exited;
1472  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1473  if (!exited) {
1474  w->choked_next = 0;
1475  have_unchoked = 1;
1476  break;
1477  }
1478  }
1479 
1480  for (unsigned type = 0; type < 2; type++) {
1481  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1482  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1483  if (w->choked_prev != w->choked_next) {
1484  waiter_set(w, w->choked_next);
1485  if (!type)
1486  choke_demux(sch, i, w->choked_next);
1487  }
1488  }
1489  }
1490 
1491 }
1492 
1493 enum {
1497 };
1498 
1499 // Finds the filtergraph or muxer upstream of a scheduler node
1501 {
1502  while (1) {
1503  switch (src.type) {
1504  case SCH_NODE_TYPE_DEMUX:
1506  return src;
1507  case SCH_NODE_TYPE_DEC:
1508  src = sch->dec[src.idx].src;
1509  continue;
1510  case SCH_NODE_TYPE_ENC:
1511  src = sch->enc[src.idx].src;
1512  continue;
1513  default:
1514  av_unreachable("Invalid source node type?");
1515  return (SchedulerNode) {0};
1516  }
1517  }
1518 }
1519 
1520 static int
1522  uint8_t *filters_visited, SchedulerNode *filters_stack)
1523 {
1524  unsigned nb_filters_stack = 0;
1525 
1526  memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited));
1527 
1528  while (1) {
1529  const SchFilterGraph *fg = &sch->filters[src.idx];
1530 
1531  filters_visited[src.idx] = CYCLE_NODE_STARTED;
1532 
1533  // descend into every input, depth first
1534  if (src.idx_stream < fg->nb_inputs) {
1535  const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
1536  SchedulerNode node = src_filtergraph(sch, fi->src);
1537 
1538  // connected to demuxer, no cycles possible
1539  if (node.type == SCH_NODE_TYPE_DEMUX)
1540  continue;
1541 
1542  // otherwise connected to another filtergraph
1544 
1545  // found a cycle
1546  if (filters_visited[node.idx] == CYCLE_NODE_STARTED)
1547  return AVERROR(EINVAL);
1548 
1549  // place current position on stack and descend
1550  av_assert0(nb_filters_stack < sch->nb_filters);
1551  filters_stack[nb_filters_stack++] = src;
1552  src = (SchedulerNode){ .idx = node.idx, .idx_stream = 0 };
1553  continue;
1554  }
1555 
1556  filters_visited[src.idx] = CYCLE_NODE_DONE;
1557 
1558  // previous search finished,
1559  if (nb_filters_stack) {
1560  src = filters_stack[--nb_filters_stack];
1561  continue;
1562  }
1563  return 0;
1564  }
1565 }
1566 
1567 static int check_acyclic(Scheduler *sch)
1568 {
1569  uint8_t *filters_visited = NULL;
1570  SchedulerNode *filters_stack = NULL;
1571 
1572  int ret = 0;
1573 
1574  if (!sch->nb_filters)
1575  return 0;
1576 
1577  filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited));
1578  if (!filters_visited)
1579  return AVERROR(ENOMEM);
1580 
1581  filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack));
1582  if (!filters_stack) {
1583  ret = AVERROR(ENOMEM);
1584  goto fail;
1585  }
1586 
1587  // trace the transcoding graph upstream from every filtegraph
1588  for (unsigned i = 0; i < sch->nb_filters; i++) {
1589  ret = check_acyclic_for_output(sch, (SchedulerNode){ .idx = i },
1590  filters_visited, filters_stack);
1591  if (ret < 0) {
1592  av_log(&sch->filters[i], AV_LOG_ERROR, "Transcoding graph has a cycle\n");
1593  goto fail;
1594  }
1595  }
1596 
1597 fail:
1598  av_freep(&filters_visited);
1599  av_freep(&filters_stack);
1600  return ret;
1601 }
1602 
1603 static int start_prepare(Scheduler *sch)
1604 {
1605  int ret;
1606 
1607  for (unsigned i = 0; i < sch->nb_demux; i++) {
1608  SchDemux *d = &sch->demux[i];
1609 
1610  for (unsigned j = 0; j < d->nb_streams; j++) {
1611  SchDemuxStream *ds = &d->streams[j];
1612 
1613  if (!ds->nb_dst) {
1614  av_log(d, AV_LOG_ERROR,
1615  "Demuxer stream %u not connected to any sink\n", j);
1616  return AVERROR(EINVAL);
1617  }
1618 
1619  ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished));
1620  if (!ds->dst_finished)
1621  return AVERROR(ENOMEM);
1622  }
1623  }
1624 
1625  for (unsigned i = 0; i < sch->nb_dec; i++) {
1626  SchDec *dec = &sch->dec[i];
1627 
1628  if (!dec->src.type) {
1629  av_log(dec, AV_LOG_ERROR,
1630  "Decoder not connected to a source\n");
1631  return AVERROR(EINVAL);
1632  }
1633 
1634  for (unsigned j = 0; j < dec->nb_outputs; j++) {
1635  SchDecOutput *o = &dec->outputs[j];
1636 
1637  if (!o->nb_dst) {
1638  av_log(dec, AV_LOG_ERROR,
1639  "Decoder output %u not connected to any sink\n", j);
1640  return AVERROR(EINVAL);
1641  }
1642 
1643  o->dst_finished = av_calloc(o->nb_dst, sizeof(*o->dst_finished));
1644  if (!o->dst_finished)
1645  return AVERROR(ENOMEM);
1646  }
1647  }
1648 
1649  for (unsigned i = 0; i < sch->nb_enc; i++) {
1650  SchEnc *enc = &sch->enc[i];
1651 
1652  if (!enc->src.type) {
1653  av_log(enc, AV_LOG_ERROR,
1654  "Encoder not connected to a source\n");
1655  return AVERROR(EINVAL);
1656  }
1657  if (!enc->nb_dst) {
1658  av_log(enc, AV_LOG_ERROR,
1659  "Encoder not connected to any sink\n");
1660  return AVERROR(EINVAL);
1661  }
1662 
1663  enc->dst_finished = av_calloc(enc->nb_dst, sizeof(*enc->dst_finished));
1664  if (!enc->dst_finished)
1665  return AVERROR(ENOMEM);
1666  }
1667 
1668  for (unsigned i = 0; i < sch->nb_mux; i++) {
1669  SchMux *mux = &sch->mux[i];
1670 
1671  for (unsigned j = 0; j < mux->nb_streams; j++) {
1672  SchMuxStream *ms = &mux->streams[j];
1673 
1674  if (!ms->src.type) {
1675  av_log(mux, AV_LOG_ERROR,
1676  "Muxer stream #%u not connected to a source\n", j);
1677  return AVERROR(EINVAL);
1678  }
1679  }
1680 
1681  ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
1682  QUEUE_PACKETS);
1683  if (ret < 0)
1684  return ret;
1685  }
1686 
1687  for (unsigned i = 0; i < sch->nb_filters; i++) {
1688  SchFilterGraph *fg = &sch->filters[i];
1689 
1690  for (unsigned j = 0; j < fg->nb_inputs; j++) {
1691  SchFilterIn *fi = &fg->inputs[j];
1692 
1693  if (!fi->src.type) {
1694  av_log(fg, AV_LOG_ERROR,
1695  "Filtergraph input %u not connected to a source\n", j);
1696  return AVERROR(EINVAL);
1697  }
1698  }
1699 
1700  for (unsigned j = 0; j < fg->nb_outputs; j++) {
1701  SchFilterOut *fo = &fg->outputs[j];
1702 
1703  if (!fo->dst.type) {
1704  av_log(fg, AV_LOG_ERROR,
1705  "Filtergraph %u output %u not connected to a sink\n", i, j);
1706  return AVERROR(EINVAL);
1707  }
1708  }
1709  }
1710 
1711  // Check that the transcoding graph has no cycles.
1712  ret = check_acyclic(sch);
1713  if (ret < 0)
1714  return ret;
1715 
1716  return 0;
1717 }
1718 
1720 {
1721  int ret;
1722 
1723  ret = start_prepare(sch);
1724  if (ret < 0)
1725  return ret;
1726 
1728  sch->state = SCH_STATE_STARTED;
1729 
1730  for (unsigned i = 0; i < sch->nb_mux; i++) {
1731  SchMux *mux = &sch->mux[i];
1732 
1733  if (mux->nb_streams_ready == mux->nb_streams) {
1734  ret = mux_init(sch, mux);
1735  if (ret < 0)
1736  goto fail;
1737  }
1738  }
1739 
1740  for (unsigned i = 0; i < sch->nb_enc; i++) {
1741  SchEnc *enc = &sch->enc[i];
1742 
1743  ret = task_start(&enc->task);
1744  if (ret < 0)
1745  goto fail;
1746  }
1747 
1748  for (unsigned i = 0; i < sch->nb_filters; i++) {
1749  SchFilterGraph *fg = &sch->filters[i];
1750 
1751  ret = task_start(&fg->task);
1752  if (ret < 0)
1753  goto fail;
1754  }
1755 
1756  for (unsigned i = 0; i < sch->nb_dec; i++) {
1757  SchDec *dec = &sch->dec[i];
1758 
1759  ret = task_start(&dec->task);
1760  if (ret < 0)
1761  goto fail;
1762  }
1763 
1764  for (unsigned i = 0; i < sch->nb_demux; i++) {
1765  SchDemux *d = &sch->demux[i];
1766 
1767  if (!d->nb_streams)
1768  continue;
1769 
1770  ret = task_start(&d->task);
1771  if (ret < 0)
1772  goto fail;
1773  }
1774 
1778 
1779  return 0;
1780 fail:
1781  sch_stop(sch, NULL);
1782  return ret;
1783 }
1784 
1785 int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
1786 {
1787  int ret;
1788 
1789  // convert delay to absolute timestamp
1790  timeout_us += av_gettime();
1791 
1793 
1794  if (sch->nb_mux_done < sch->nb_mux) {
1795  struct timespec tv = { .tv_sec = timeout_us / 1000000,
1796  .tv_nsec = (timeout_us % 1000000) * 1000 };
1797  pthread_cond_timedwait(&sch->finish_cond, &sch->finish_lock, &tv);
1798  }
1799 
1800  // abort transcoding if any task failed
1801  ret = sch->nb_mux_done == sch->nb_mux || sch->task_failed;
1802 
1804 
1805  *transcode_ts = atomic_load(&sch->last_dts);
1806 
1807  return ret;
1808 }
1809 
1810 static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
1811 {
1812  int ret;
1813 
1814  ret = enc->open_cb(enc->task.func_arg, frame);
1815  if (ret < 0)
1816  return ret;
1817 
1818  // ret>0 signals audio frame size, which means sync queue must
1819  // have been enabled during encoder creation
1820  if (ret > 0) {
1821  SchSyncQueue *sq;
1822 
1823  av_assert0(enc->sq_idx[0] >= 0);
1824  sq = &sch->sq_enc[enc->sq_idx[0]];
1825 
1826  pthread_mutex_lock(&sq->lock);
1827 
1828  sq_frame_samples(sq->sq, enc->sq_idx[1], ret);
1829 
1830  pthread_mutex_unlock(&sq->lock);
1831  }
1832 
1833  return 0;
1834 }
1835 
1837 {
1838  int ret;
1839 
1840  if (!frame) {
1841  tq_send_finish(enc->queue, 0);
1842  return 0;
1843  }
1844 
1845  if (enc->in_finished)
1846  return AVERROR_EOF;
1847 
1848  ret = tq_send(enc->queue, 0, frame);
1849  if (ret < 0)
1850  enc->in_finished = 1;
1851 
1852  return ret;
1853 }
1854 
1855 static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1856 {
1857  SchSyncQueue *sq = &sch->sq_enc[enc->sq_idx[0]];
1858  int ret = 0;
1859 
1860  // inform the scheduling code that no more input will arrive along this path;
1861  // this is necessary because the sync queue may not send an EOF downstream
1862  // until other streams finish
1863  // TODO: consider a cleaner way of passing this information through
1864  // the pipeline
1865  if (!frame) {
1866  for (unsigned i = 0; i < enc->nb_dst; i++) {
1867  SchMux *mux;
1868  SchMuxStream *ms;
1869 
1870  if (enc->dst[i].type != SCH_NODE_TYPE_MUX)
1871  continue;
1872 
1873  mux = &sch->mux[enc->dst[i].idx];
1874  ms = &mux->streams[enc->dst[i].idx_stream];
1875 
1877 
1878  ms->source_finished = 1;
1880 
1882  }
1883  }
1884 
1885  pthread_mutex_lock(&sq->lock);
1886 
1887  ret = sq_send(sq->sq, enc->sq_idx[1], SQFRAME(frame));
1888  if (ret < 0)
1889  goto finish;
1890 
1891  while (1) {
1892  SchEnc *enc;
1893 
1894  // TODO: the SQ API should be extended to allow returning EOF
1895  // for individual streams
1896  ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame));
1897  if (ret < 0) {
1898  ret = (ret == AVERROR(EAGAIN)) ? 0 : ret;
1899  break;
1900  }
1901 
1902  enc = &sch->enc[sq->enc_idx[ret]];
1903  ret = send_to_enc_thread(sch, enc, sq->frame);
1904  if (ret < 0) {
1905  av_frame_unref(sq->frame);
1906  if (ret != AVERROR_EOF)
1907  break;
1908 
1909  sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL));
1910  continue;
1911  }
1912  }
1913 
1914  if (ret < 0) {
1915  // close all encoders fed from this sync queue
1916  for (unsigned i = 0; i < sq->nb_enc_idx; i++) {
1917  int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL);
1918 
1919  // if the sync queue error is EOF and closing the encoder
1920  // produces a more serious error, make sure to pick the latter
1921  ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err);
1922  }
1923  }
1924 
1925 finish:
1926  pthread_mutex_unlock(&sq->lock);
1927 
1928  return ret;
1929 }
1930 
1931 static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1932 {
1933  if (enc->open_cb && frame && !enc->opened) {
1934  int ret = enc_open(sch, enc, frame);
1935  if (ret < 0)
1936  return ret;
1937  enc->opened = 1;
1938 
1939  // discard empty frames that only carry encoder init parameters
1940  if (!frame->buf[0]) {
1942  return 0;
1943  }
1944  }
1945 
1946  return (enc->sq_idx[0] >= 0) ?
1947  send_to_enc_sq (sch, enc, frame) :
1948  send_to_enc_thread(sch, enc, frame);
1949 }
1950 
1952 {
1953  PreMuxQueue *q = &ms->pre_mux_queue;
1954  AVPacket *tmp_pkt = NULL;
1955  int ret;
1956 
1957  if (!av_fifo_can_write(q->fifo)) {
1958  size_t packets = av_fifo_can_read(q->fifo);
1959  size_t pkt_size = pkt ? pkt->size : 0;
1960  int thresh_reached = (q->data_size + pkt_size) > q->data_threshold;
1961  size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX;
1962  size_t new_size = FFMIN(2 * packets, max_packets);
1963 
1964  if (new_size <= packets) {
1965  av_log(mux, AV_LOG_ERROR,
1966  "Too many packets buffered for output stream.\n");
1967  return AVERROR_BUFFER_TOO_SMALL;
1968  }
1969  ret = av_fifo_grow2(q->fifo, new_size - packets);
1970  if (ret < 0)
1971  return ret;
1972  }
1973 
1974  if (pkt) {
1975  tmp_pkt = av_packet_alloc();
1976  if (!tmp_pkt)
1977  return AVERROR(ENOMEM);
1978 
1979  av_packet_move_ref(tmp_pkt, pkt);
1980  q->data_size += tmp_pkt->size;
1981  }
1982  av_fifo_write(q->fifo, &tmp_pkt, 1);
1983 
1984  return 0;
1985 }
1986 
1987 static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx,
1988  AVPacket *pkt)
1989 {
1990  SchMuxStream *ms = &mux->streams[stream_idx];
1991  int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ?
1994 
1995  // queue the packet if the muxer cannot be started yet
1996  if (!atomic_load(&mux->mux_started)) {
1997  int queued = 0;
1998 
1999  // the muxer could have started between the above atomic check and
2000  // locking the mutex, then this block falls through to normal send path
2002 
2003  if (!atomic_load(&mux->mux_started)) {
2004  int ret = mux_queue_packet(mux, ms, pkt);
2005  queued = ret < 0 ? ret : 1;
2006  }
2007 
2009 
2010  if (queued < 0)
2011  return queued;
2012  else if (queued)
2013  goto update_schedule;
2014  }
2015 
2016  if (pkt) {
2017  int ret;
2018 
2019  if (ms->init_eof)
2020  return AVERROR_EOF;
2021 
2022  ret = tq_send(mux->queue, stream_idx, pkt);
2023  if (ret < 0)
2024  return ret;
2025  } else
2026  tq_send_finish(mux->queue, stream_idx);
2027 
2028 update_schedule:
2029  // TODO: use atomics to check whether this changes trailing dts
2030  // to avoid locking unnecessarily
2031  if (dts != AV_NOPTS_VALUE || !pkt) {
2033 
2034  if (pkt) ms->last_dts = dts;
2035  else ms->source_finished = 1;
2036 
2038 
2040  }
2041 
2042  return 0;
2043 }
2044 
2045 static int
2047  uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
2048 {
2049  int ret;
2050 
2051  if (*dst_finished)
2052  return AVERROR_EOF;
2053 
2054  if (pkt && dst.type == SCH_NODE_TYPE_MUX &&
2057  pkt = NULL;
2058  }
2059 
2060  if (!pkt)
2061  goto finish;
2062 
2063  ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2064  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2065  tq_send(sch->dec[dst.idx].queue, 0, pkt);
2066  if (ret == AVERROR_EOF)
2067  goto finish;
2068 
2069  return ret;
2070 
2071 finish:
2072  if (dst.type == SCH_NODE_TYPE_MUX)
2073  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2074  else
2075  tq_send_finish(sch->dec[dst.idx].queue, 0);
2076 
2077  *dst_finished = 1;
2078  return AVERROR_EOF;
2079 }
2080 
2082  AVPacket *pkt, unsigned flags)
2083 {
2084  unsigned nb_done = 0;
2085 
2086  for (unsigned i = 0; i < ds->nb_dst; i++) {
2087  AVPacket *to_send = pkt;
2088  uint8_t *finished = &ds->dst_finished[i];
2089 
2090  int ret;
2091 
2092  // sending a packet consumes it, so make a temporary reference if needed
2093  if (pkt && i < ds->nb_dst - 1) {
2094  to_send = d->send_pkt;
2095 
2096  ret = av_packet_ref(to_send, pkt);
2097  if (ret < 0)
2098  return ret;
2099  }
2100 
2101  ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags);
2102  if (to_send)
2103  av_packet_unref(to_send);
2104  if (ret == AVERROR_EOF)
2105  nb_done++;
2106  else if (ret < 0)
2107  return ret;
2108  }
2109 
2110  return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0;
2111 }
2112 
2114 {
2115  Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE };
2116 
2117  av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems);
2118 
2119  for (unsigned i = 0; i < d->nb_streams; i++) {
2120  SchDemuxStream *ds = &d->streams[i];
2121 
2122  for (unsigned j = 0; j < ds->nb_dst; j++) {
2123  const SchedulerNode *dst = &ds->dst[j];
2124  SchDec *dec;
2125  int ret;
2126 
2127  if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC)
2128  continue;
2129 
2130  dec = &sch->dec[dst->idx];
2131 
2132  ret = tq_send(dec->queue, 0, pkt);
2133  if (ret < 0)
2134  return ret;
2135 
2136  if (dec->queue_end_ts) {
2137  Timestamp ts;
2139  if (ret < 0)
2140  return ret;
2141 
2142  if (max_end_ts.ts == AV_NOPTS_VALUE ||
2143  (ts.ts != AV_NOPTS_VALUE &&
2144  av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0))
2145  max_end_ts = ts;
2146 
2147  }
2148  }
2149  }
2150 
2151  pkt->pts = max_end_ts.ts;
2152  pkt->time_base = max_end_ts.tb;
2153 
2154  return 0;
2155 }
2156 
2157 int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt,
2158  unsigned flags)
2159 {
2160  SchDemux *d;
2161  int terminate;
2162 
2163  av_assert0(demux_idx < sch->nb_demux);
2164  d = &sch->demux[demux_idx];
2165 
2166  terminate = waiter_wait(sch, &d->waiter);
2167  if (terminate)
2168  return AVERROR_EXIT;
2169 
2170  // flush the downstreams after seek
2171  if (pkt->stream_index == -1)
2172  return demux_flush(sch, d, pkt);
2173 
2175 
2176  return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags);
2177 }
2178 
2179 static int demux_done(Scheduler *sch, unsigned demux_idx)
2180 {
2181  SchDemux *d = &sch->demux[demux_idx];
2182  int ret = 0;
2183 
2184  for (unsigned i = 0; i < d->nb_streams; i++) {
2185  int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0);
2186  if (err != AVERROR_EOF)
2187  ret = err_merge(ret, err);
2188  }
2189 
2191 
2192  d->task_exited = 1;
2193 
2195 
2197 
2198  return ret;
2199 }
2200 
2201 int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
2202 {
2203  SchMux *mux;
2204  int ret, stream_idx;
2205 
2206  av_assert0(mux_idx < sch->nb_mux);
2207  mux = &sch->mux[mux_idx];
2208 
2209  ret = tq_receive(mux->queue, &stream_idx, pkt);
2210  pkt->stream_index = stream_idx;
2211  return ret;
2212 }
2213 
2214 void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
2215 {
2216  SchMux *mux;
2217 
2218  av_assert0(mux_idx < sch->nb_mux);
2219  mux = &sch->mux[mux_idx];
2220 
2221  av_assert0(stream_idx < mux->nb_streams);
2222  tq_receive_finish(mux->queue, stream_idx);
2223 
2225  mux->streams[stream_idx].source_finished = 1;
2226 
2228 
2230 }
2231 
2232 int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
2233  const AVPacket *pkt)
2234 {
2235  SchMux *mux;
2236  SchMuxStream *ms;
2237 
2238  av_assert0(mux_idx < sch->nb_mux);
2239  mux = &sch->mux[mux_idx];
2240 
2241  av_assert0(stream_idx < mux->nb_streams);
2242  ms = &mux->streams[stream_idx];
2243 
2244  for (unsigned i = 0; i < ms->nb_sub_heartbeat_dst; i++) {
2245  SchDec *dst = &sch->dec[ms->sub_heartbeat_dst[i]];
2246  int ret;
2247 
2249  if (ret < 0)
2250  return ret;
2251 
2252  tq_send(dst->queue, 0, mux->sub_heartbeat_pkt);
2253  }
2254 
2255  return 0;
2256 }
2257 
2258 static int mux_done(Scheduler *sch, unsigned mux_idx)
2259 {
2260  SchMux *mux = &sch->mux[mux_idx];
2261 
2263 
2264  for (unsigned i = 0; i < mux->nb_streams; i++) {
2265  tq_receive_finish(mux->queue, i);
2266  mux->streams[i].source_finished = 1;
2267  }
2268 
2270 
2272 
2274 
2275  av_assert0(sch->nb_mux_done < sch->nb_mux);
2276  sch->nb_mux_done++;
2277 
2279 
2281 
2282  return 0;
2283 }
2284 
2285 int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
2286 {
2287  SchDec *dec;
2288  int ret, dummy;
2289 
2290  av_assert0(dec_idx < sch->nb_dec);
2291  dec = &sch->dec[dec_idx];
2292 
2293  // the decoder should have given us post-flush end timestamp in pkt
2294  if (dec->expect_end_ts) {
2295  Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base };
2297  if (ret < 0)
2298  return ret;
2299 
2300  dec->expect_end_ts = 0;
2301  }
2302 
2303  ret = tq_receive(dec->queue, &dummy, pkt);
2304  av_assert0(dummy <= 0);
2305 
2306  // got a flush packet, on the next call to this function the decoder
2307  // will give us post-flush end timestamp
2308  if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
2309  dec->expect_end_ts = 1;
2310 
2311  return ret;
2312 }
2313 
2315  unsigned in_idx, AVFrame *frame)
2316 {
2317  if (frame)
2318  return tq_send(fg->queue, in_idx, frame);
2319 
2321 
2322  if (!fg->inputs[in_idx].send_finished) {
2323  fg->inputs[in_idx].send_finished = 1;
2324  tq_send_finish(fg->queue, in_idx);
2325 
2326  // close the control stream when all actual inputs are done
2327  if (++fg->nb_inputs_finished_send == fg->nb_inputs)
2328  tq_send_finish(fg->queue, fg->nb_inputs);
2329 
2331  }
2332 
2334  return 0;
2335 }
2336 
2338  uint8_t *dst_finished, AVFrame *frame)
2339 {
2340  int ret;
2341 
2342  if (*dst_finished)
2343  return AVERROR_EOF;
2344 
2345  if (!frame)
2346  goto finish;
2347 
2348  ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ?
2349  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) :
2350  send_to_enc(sch, &sch->enc[dst.idx], frame);
2351  if (ret == AVERROR_EOF)
2352  goto finish;
2353 
2354  return ret;
2355 
2356 finish:
2357  if (dst.type == SCH_NODE_TYPE_FILTER_IN)
2358  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2359  else
2360  send_to_enc(sch, &sch->enc[dst.idx], NULL);
2361 
2362  *dst_finished = 1;
2363 
2364  return AVERROR_EOF;
2365 }
2366 
2367 int sch_dec_send(Scheduler *sch, unsigned dec_idx,
2368  unsigned out_idx, AVFrame *frame)
2369 {
2370  SchDec *dec;
2371  SchDecOutput *o;
2372  int ret;
2373  unsigned nb_done = 0;
2374 
2375  av_assert0(dec_idx < sch->nb_dec);
2376  dec = &sch->dec[dec_idx];
2377 
2378  av_assert0(out_idx < dec->nb_outputs);
2379  o = &dec->outputs[out_idx];
2380 
2381  for (unsigned i = 0; i < o->nb_dst; i++) {
2382  uint8_t *finished = &o->dst_finished[i];
2383  AVFrame *to_send = frame;
2384 
2385  // sending a frame consumes it, so make a temporary reference if needed
2386  if (i < o->nb_dst - 1) {
2387  to_send = dec->send_frame;
2388 
2389  // frame may sometimes contain props only,
2390  // e.g. to signal EOF timestamp
2391  ret = frame->buf[0] ? av_frame_ref(to_send, frame) :
2392  av_frame_copy_props(to_send, frame);
2393  if (ret < 0)
2394  return ret;
2395  }
2396 
2397  ret = dec_send_to_dst(sch, o->dst[i], finished, to_send);
2398  if (ret < 0) {
2399  av_frame_unref(to_send);
2400  if (ret == AVERROR_EOF) {
2401  nb_done++;
2402  continue;
2403  }
2404  return ret;
2405  }
2406  }
2407 
2408  return (nb_done == o->nb_dst) ? AVERROR_EOF : 0;
2409 }
2410 
2411 static int dec_done(Scheduler *sch, unsigned dec_idx)
2412 {
2413  SchDec *dec = &sch->dec[dec_idx];
2414  int ret = 0;
2415 
2416  tq_receive_finish(dec->queue, 0);
2417 
2418  // make sure our source does not get stuck waiting for end timestamps
2419  // that will never arrive
2420  if (dec->queue_end_ts)
2422 
2423  for (unsigned i = 0; i < dec->nb_outputs; i++) {
2424  SchDecOutput *o = &dec->outputs[i];
2425 
2426  for (unsigned j = 0; j < o->nb_dst; j++) {
2427  int err = dec_send_to_dst(sch, o->dst[j], &o->dst_finished[j], NULL);
2428  if (err < 0 && err != AVERROR_EOF)
2429  ret = err_merge(ret, err);
2430  }
2431  }
2432 
2433  return ret;
2434 }
2435 
2436 int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
2437 {
2438  SchEnc *enc;
2439  int ret, dummy;
2440 
2441  av_assert0(enc_idx < sch->nb_enc);
2442  enc = &sch->enc[enc_idx];
2443 
2444  ret = tq_receive(enc->queue, &dummy, frame);
2445  av_assert0(dummy <= 0);
2446 
2447  return ret;
2448 }
2449 
2451  uint8_t *dst_finished, AVPacket *pkt)
2452 {
2453  int ret;
2454 
2455  if (*dst_finished)
2456  return AVERROR_EOF;
2457 
2458  if (!pkt)
2459  goto finish;
2460 
2461  ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2462  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2463  tq_send(sch->dec[dst.idx].queue, 0, pkt);
2464  if (ret == AVERROR_EOF)
2465  goto finish;
2466 
2467  return ret;
2468 
2469 finish:
2470  if (dst.type == SCH_NODE_TYPE_MUX)
2471  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2472  else
2473  tq_send_finish(sch->dec[dst.idx].queue, 0);
2474 
2475  *dst_finished = 1;
2476 
2477  return AVERROR_EOF;
2478 }
2479 
2480 int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
2481 {
2482  SchEnc *enc;
2483  int ret;
2484 
2485  av_assert0(enc_idx < sch->nb_enc);
2486  enc = &sch->enc[enc_idx];
2487 
2488  for (unsigned i = 0; i < enc->nb_dst; i++) {
2489  uint8_t *finished = &enc->dst_finished[i];
2490  AVPacket *to_send = pkt;
2491 
2492  // sending a packet consumes it, so make a temporary reference if needed
2493  if (i < enc->nb_dst - 1) {
2494  to_send = enc->send_pkt;
2495 
2496  ret = av_packet_ref(to_send, pkt);
2497  if (ret < 0)
2498  return ret;
2499  }
2500 
2501  ret = enc_send_to_dst(sch, enc->dst[i], finished, to_send);
2502  if (ret < 0) {
2503  av_packet_unref(to_send);
2504  if (ret == AVERROR_EOF)
2505  continue;
2506  return ret;
2507  }
2508  }
2509 
2510  return 0;
2511 }
2512 
2513 static int enc_done(Scheduler *sch, unsigned enc_idx)
2514 {
2515  SchEnc *enc = &sch->enc[enc_idx];
2516  int ret = 0;
2517 
2518  tq_receive_finish(enc->queue, 0);
2519 
2520  for (unsigned i = 0; i < enc->nb_dst; i++) {
2521  int err = enc_send_to_dst(sch, enc->dst[i], &enc->dst_finished[i], NULL);
2522  if (err < 0 && err != AVERROR_EOF)
2523  ret = err_merge(ret, err);
2524  }
2525 
2526  return ret;
2527 }
2528 
2529 int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
2530  unsigned *in_idx, AVFrame *frame)
2531 {
2532  SchFilterGraph *fg;
2533 
2534  av_assert0(fg_idx < sch->nb_filters);
2535  fg = &sch->filters[fg_idx];
2536 
2537  av_assert0(*in_idx <= fg->nb_inputs);
2538 
2539  // update scheduling to account for desired input stream, if it changed
2540  //
2541  // this check needs no locking because only the filtering thread
2542  // updates this value
2543  if (*in_idx != fg->best_input) {
2545 
2546  fg->best_input = *in_idx;
2548 
2550  }
2551 
2552  if (*in_idx == fg->nb_inputs) {
2553  int terminate = waiter_wait(sch, &fg->waiter);
2554  return terminate ? AVERROR_EOF : AVERROR(EAGAIN);
2555  }
2556 
2557  while (1) {
2558  int ret, idx;
2559 
2560  ret = tq_receive(fg->queue, &idx, frame);
2561  if (idx < 0)
2562  return AVERROR_EOF;
2563  else if (ret >= 0) {
2564  *in_idx = idx;
2565  return 0;
2566  }
2567 
2568  // disregard EOFs for specific streams - they should always be
2569  // preceded by an EOF frame
2570  }
2571 }
2572 
2573 void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
2574 {
2575  SchFilterGraph *fg;
2576  SchFilterIn *fi;
2577 
2578  av_assert0(fg_idx < sch->nb_filters);
2579  fg = &sch->filters[fg_idx];
2580 
2581  av_assert0(in_idx < fg->nb_inputs);
2582  fi = &fg->inputs[in_idx];
2583 
2585 
2586  if (!fi->receive_finished) {
2587  fi->receive_finished = 1;
2588  tq_receive_finish(fg->queue, in_idx);
2589 
2590  // close the control stream when all actual inputs are done
2591  if (++fg->nb_inputs_finished_receive == fg->nb_inputs)
2592  tq_receive_finish(fg->queue, fg->nb_inputs);
2593 
2595  }
2596 
2598 }
2599 
2600 int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
2601 {
2602  SchFilterGraph *fg;
2604  int ret;
2605 
2606  av_assert0(fg_idx < sch->nb_filters);
2607  fg = &sch->filters[fg_idx];
2608 
2609  av_assert0(out_idx < fg->nb_outputs);
2610  dst = fg->outputs[out_idx].dst;
2611 
2612  if (dst.type == SCH_NODE_TYPE_ENC) {
2613  ret = send_to_enc(sch, &sch->enc[dst.idx], frame);
2614  if (ret == AVERROR_EOF)
2615  send_to_enc(sch, &sch->enc[dst.idx], NULL);
2616  } else {
2617  ret = send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame);
2618  if (ret == AVERROR_EOF)
2619  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2620  }
2621  return ret;
2622 }
2623 
2624 static int filter_done(Scheduler *sch, unsigned fg_idx)
2625 {
2626  SchFilterGraph *fg = &sch->filters[fg_idx];
2627  int ret = 0;
2628 
2629  for (unsigned i = 0; i <= fg->nb_inputs; i++)
2630  tq_receive_finish(fg->queue, i);
2631 
2632  for (unsigned i = 0; i < fg->nb_outputs; i++) {
2633  SchedulerNode dst = fg->outputs[i].dst;
2634  int err = (dst.type == SCH_NODE_TYPE_ENC) ?
2635  send_to_enc (sch, &sch->enc[dst.idx], NULL) :
2636  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2637 
2638  if (err < 0 && err != AVERROR_EOF)
2639  ret = err_merge(ret, err);
2640  }
2641 
2643 
2644  fg->task_exited = 1;
2645 
2647 
2649 
2650  return ret;
2651 }
2652 
2653 int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
2654 {
2655  SchFilterGraph *fg;
2656 
2657  av_assert0(fg_idx < sch->nb_filters);
2658  fg = &sch->filters[fg_idx];
2659 
2660  return send_to_filter(sch, fg, fg->nb_inputs, frame);
2661 }
2662 
2663 void sch_filter_choke_inputs(Scheduler *sch, unsigned fg_idx)
2664 {
2665  SchFilterGraph *fg;
2666  av_assert0(fg_idx < sch->nb_filters);
2667  fg = &sch->filters[fg_idx];
2668 
2670  fg->best_input = fg->nb_inputs;
2673 }
2674 
2675 static int task_cleanup(Scheduler *sch, SchedulerNode node)
2676 {
2677  switch (node.type) {
2678  case SCH_NODE_TYPE_DEMUX: return demux_done (sch, node.idx);
2679  case SCH_NODE_TYPE_MUX: return mux_done (sch, node.idx);
2680  case SCH_NODE_TYPE_DEC: return dec_done (sch, node.idx);
2681  case SCH_NODE_TYPE_ENC: return enc_done (sch, node.idx);
2682  case SCH_NODE_TYPE_FILTER_IN: return filter_done(sch, node.idx);
2683  default: av_unreachable("Invalid node type?");
2684  }
2685 }
2686 
2687 static void *task_wrapper(void *arg)
2688 {
2689  SchTask *task = arg;
2690  Scheduler *sch = task->parent;
2691  int ret;
2692  int err = 0;
2693 
2694  ret = task->func(task->func_arg);
2695  if (ret < 0)
2696  av_log(task->func_arg, AV_LOG_ERROR,
2697  "Task finished with error code: %d (%s)\n", ret, av_err2str(ret));
2698 
2699  err = task_cleanup(sch, task->node);
2700  ret = err_merge(ret, err);
2701 
2702  // EOF is considered normal termination
2703  if (ret == AVERROR_EOF)
2704  ret = 0;
2705  if (ret < 0) {
2707  sch->task_failed = 1;
2710  }
2711 
2713  "Terminating thread with return code %d (%s)\n", ret,
2714  ret < 0 ? av_err2str(ret) : "success");
2715 
2716  return (void*)(intptr_t)ret;
2717 }
2718 
2719 static int task_stop(Scheduler *sch, SchTask *task)
2720 {
2721  int ret;
2722  void *thread_ret;
2723 
2724  if (!task->parent)
2725  return 0;
2726 
2727  if (!task->thread_running)
2728  return task_cleanup(sch, task->node);
2729 
2730  ret = pthread_join(task->thread, &thread_ret);
2731  av_assert0(ret == 0);
2732 
2733  task->thread_running = 0;
2734 
2735  return (intptr_t)thread_ret;
2736 }
2737 
2738 int sch_stop(Scheduler *sch, int64_t *finish_ts)
2739 {
2740  int ret = 0, err;
2741 
2742  if (sch->state != SCH_STATE_STARTED)
2743  return 0;
2744 
2745  atomic_store(&sch->terminate, 1);
2746 
2747  for (unsigned type = 0; type < 2; type++)
2748  for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
2749  SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
2750  waiter_set(w, 1);
2751  if (type)
2752  choke_demux(sch, i, 0); // unfreeze to allow draining
2753  }
2754 
2755  for (unsigned i = 0; i < sch->nb_demux; i++) {
2756  SchDemux *d = &sch->demux[i];
2757 
2758  err = task_stop(sch, &d->task);
2759  ret = err_merge(ret, err);
2760  }
2761 
2762  for (unsigned i = 0; i < sch->nb_dec; i++) {
2763  SchDec *dec = &sch->dec[i];
2764 
2765  err = task_stop(sch, &dec->task);
2766  ret = err_merge(ret, err);
2767  }
2768 
2769  for (unsigned i = 0; i < sch->nb_filters; i++) {
2770  SchFilterGraph *fg = &sch->filters[i];
2771 
2772  err = task_stop(sch, &fg->task);
2773  ret = err_merge(ret, err);
2774  }
2775 
2776  for (unsigned i = 0; i < sch->nb_enc; i++) {
2777  SchEnc *enc = &sch->enc[i];
2778 
2779  err = task_stop(sch, &enc->task);
2780  ret = err_merge(ret, err);
2781  }
2782 
2783  for (unsigned i = 0; i < sch->nb_mux; i++) {
2784  SchMux *mux = &sch->mux[i];
2785 
2786  err = task_stop(sch, &mux->task);
2787  ret = err_merge(ret, err);
2788  }
2789 
2790  if (finish_ts)
2791  *finish_ts = progressing_dts(sch, 1);
2792 
2793  sch->state = SCH_STATE_STOPPED;
2794 
2795  return ret;
2796 }
Scheduler::sq_enc
SchSyncQueue * sq_enc
Definition: ffmpeg_sched.c:297
flags
const SwsFlags flags[]
Definition: swscale.c:71
func
int(* func)(AVBPrint *dst, const char *in, const char *arg)
Definition: jacosubdec.c:66
pthread_mutex_t
_fmutex pthread_mutex_t
Definition: os2threads.h:53
SchWaiter
Definition: ffmpeg_sched.c:52
CYCLE_NODE_NEW
@ CYCLE_NODE_NEW
Definition: ffmpeg_sched.c:1494
av_packet_unref
void av_packet_unref(AVPacket *pkt)
Wipe the packet.
Definition: packet.c:432
mux_task_start
static int mux_task_start(SchMux *mux)
Definition: ffmpeg_sched.c:1135
pthread_join
static av_always_inline int pthread_join(pthread_t thread, void **value_ptr)
Definition: os2threads.h:94
SchedulerNode::idx_stream
unsigned idx_stream
Definition: ffmpeg_sched.h:106
SchDecOutput::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:76
waiter_init
static int waiter_init(SchWaiter *w)
Definition: ffmpeg_sched.c:349
av_fifo_can_write
size_t av_fifo_can_write(const AVFifo *f)
Definition: fifo.c:94
Scheduler::finish_lock
pthread_mutex_t finish_lock
Definition: ffmpeg_sched.c:287
atomic_store
#define atomic_store(object, desired)
Definition: stdatomic.h:85
sch_filter_send
int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
Called by filtergraph tasks to send a filtered frame or EOF to consumers.
Definition: ffmpeg_sched.c:2600
err_merge
static int err_merge(int err0, int err1)
Merge two return codes - return one of the error codes if at least one of them was negative,...
Definition: ffmpeg_utils.h:39
SchDec::task
SchTask task
Definition: ffmpeg_sched.c:88
THREAD_QUEUE_PACKETS
@ THREAD_QUEUE_PACKETS
Definition: thread_queue.h:26
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
Scheduler::enc
SchEnc * enc
Definition: ffmpeg_sched.c:294
Scheduler::nb_mux_done
unsigned nb_mux_done
Definition: ffmpeg_sched.c:285
av_compare_ts
int av_compare_ts(int64_t ts_a, AVRational tb_a, int64_t ts_b, AVRational tb_b)
Compare two timestamps each in its own time base.
Definition: mathematics.c:147
SchedulerState
SchedulerState
Definition: ffmpeg_sched.c:267
sch_mux_receive_finish
void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
Called by muxer tasks to signal that a stream will no longer accept input.
Definition: ffmpeg_sched.c:2214
SCH_NODE_TYPE_ENC
@ SCH_NODE_TYPE_ENC
Definition: ffmpeg_sched.h:98
SchSyncQueue::sq
SyncQueue * sq
Definition: ffmpeg_sched.c:101
SchTask::thread
pthread_t thread
Definition: ffmpeg_sched.c:70
demux_flush
static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt)
Definition: ffmpeg_sched.c:2113
thread.h
AVERROR_EOF
#define AVERROR_EOF
End of file.
Definition: error.h:57
Scheduler::nb_sq_enc
unsigned nb_sq_enc
Definition: ffmpeg_sched.c:298
SchMux::sub_heartbeat_pkt
AVPacket * sub_heartbeat_pkt
Definition: ffmpeg_sched.c:232
sq_limit_frames
void sq_limit_frames(SyncQueue *sq, unsigned int stream_idx, uint64_t frames)
Limit the number of output frames for stream with index stream_idx to max_frames.
Definition: sync_queue.c:628
pthread_mutex_init
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
SchEnc::send_pkt
AVPacket * send_pkt
Definition: ffmpeg_sched.c:147
SCHEDULE_TOLERANCE
#define SCHEDULE_TOLERANCE
Definition: ffmpeg_sched.c:45
sch_add_demux
int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx)
Add a demuxer to the scheduler.
Definition: ffmpeg_sched.c:718
PreMuxQueue::data_size
size_t data_size
Definition: ffmpeg_sched.c:185
SchFilterGraph::nb_inputs_finished_send
unsigned nb_inputs_finished_send
Definition: ffmpeg_sched.c:250
AV_TIME_BASE_Q
#define AV_TIME_BASE_Q
Internal time base represented as fractional value.
Definition: avutil.h:263
int64_t
long long int64_t
Definition: coverity.c:34
SchTask::func
SchThreadFunc func
Definition: ffmpeg_sched.c:67
mux_done
static int mux_done(Scheduler *sch, unsigned mux_idx)
Definition: ffmpeg_sched.c:2258
Scheduler::nb_enc
unsigned nb_enc
Definition: ffmpeg_sched.c:295
av_frame_free
void av_frame_free(AVFrame **frame)
Free the frame and any dynamically allocated objects in it, e.g.
Definition: frame.c:64
SQFRAME
#define SQFRAME(frame)
Definition: sync_queue.h:38
av_fifo_peek
int av_fifo_peek(const AVFifo *f, void *buf, size_t nb_elems, size_t offset)
Read data from a FIFO without modifying FIFO state.
Definition: fifo.c:255
check_acyclic_for_output
static int check_acyclic_for_output(const Scheduler *sch, SchedulerNode src, uint8_t *filters_visited, SchedulerNode *filters_stack)
Definition: ffmpeg_sched.c:1521
AVFrame
This structure describes decoded (raw) audio or video data.
Definition: frame.h:427
task_cleanup
static int task_cleanup(Scheduler *sch, SchedulerNode node)
Definition: ffmpeg_sched.c:2675
sync_queue.h
AVPacket::data
uint8_t * data
Definition: packet.h:588
SchMux::nb_streams_ready
unsigned nb_streams_ready
Definition: ffmpeg_sched.c:217
pthread_mutex_lock
static av_always_inline int pthread_mutex_lock(pthread_mutex_t *mutex)
Definition: os2threads.h:119
SchDemux::nb_streams
unsigned nb_streams
Definition: ffmpeg_sched.c:160
send_to_mux
static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx, AVPacket *pkt)
Definition: ffmpeg_sched.c:1987
Scheduler::nb_mux_ready
unsigned nb_mux_ready
Definition: ffmpeg_sched.c:282
atomic_int
intptr_t atomic_int
Definition: stdatomic.h:55
tq_alloc
ThreadQueue * tq_alloc(unsigned int nb_streams, size_t queue_size, enum ThreadQueueType type)
Allocate a queue for sending data between threads.
Definition: thread_queue.c:72
enc_done
static int enc_done(Scheduler *sch, unsigned enc_idx)
Definition: ffmpeg_sched.c:2513
SCH_NODE_TYPE_MUX
@ SCH_NODE_TYPE_MUX
Definition: ffmpeg_sched.h:96
AV_LOG_VERBOSE
#define AV_LOG_VERBOSE
Detailed information.
Definition: log.h:226
unchoke_downstream
static void unchoke_downstream(Scheduler *sch, SchedulerNode *dst)
Definition: ffmpeg_sched.c:1309
AVPacket::duration
int64_t duration
Duration of this packet in AVStream->time_base units, 0 if unknown.
Definition: packet.h:606
SchWaiter::choked_prev
int choked_prev
Definition: ffmpeg_sched.c:59
QUEUE_PACKETS
@ QUEUE_PACKETS
Definition: ffmpeg_sched.c:48
SchMux
Definition: ffmpeg_sched.c:212
FFMAX
#define FFMAX(a, b)
Definition: macros.h:47
Scheduler::class
const AVClass * class
Definition: ffmpeg_sched.c:274
SchFilterOut
Definition: ffmpeg_sched.c:241
progressing_dts
static int64_t progressing_dts(const Scheduler *sch, int count_finished)
Definition: ffmpeg_sched.c:460
SCH_STATE_UNINIT
@ SCH_STATE_UNINIT
Definition: ffmpeg_sched.c:268
Timestamp::ts
int64_t ts
Definition: ffmpeg_utils.h:31
PreMuxQueue::fifo
AVFifo * fifo
Queue for buffering the packets before the muxer task can be started.
Definition: ffmpeg_sched.c:176
SchMuxStream::last_dts
int64_t last_dts
Definition: ffmpeg_sched.c:206
av_packet_free
void av_packet_free(AVPacket **pkt)
Free the packet, if the packet is reference counted, it will be unreferenced first.
Definition: packet.c:74
DEFAULT_FRAME_THREAD_QUEUE_SIZE
#define DEFAULT_FRAME_THREAD_QUEUE_SIZE
Default size of a frame thread queue.
Definition: ffmpeg_sched.h:262
Scheduler::last_dts
atomic_int_least64_t last_dts
Definition: ffmpeg_sched.c:311
SchDemux::send_pkt
AVPacket * send_pkt
Definition: ffmpeg_sched.c:166
sch_mux_stream_ready
int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
Signal to the scheduler that the specified muxed stream is initialized and ready.
Definition: ffmpeg_sched.c:1251
send_to_enc_thread
static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1836
task_stop
static int task_stop(Scheduler *sch, SchTask *task)
Definition: ffmpeg_sched.c:2719
SchFilterIn::receive_finished
int receive_finished
Definition: ffmpeg_sched.c:238
SchedulerNode::type
enum SchedulerNodeType type
Definition: ffmpeg_sched.h:104
fifo.h
finish
static void finish(void)
Definition: movenc.c:374
sch_stop
int sch_stop(Scheduler *sch, int64_t *finish_ts)
Definition: ffmpeg_sched.c:2738
SchEnc::sq_idx
int sq_idx[2]
Definition: ffmpeg_sched.c:119
fail
#define fail()
Definition: checkasm.h:220
av_fifo_write
int av_fifo_write(AVFifo *f, const void *buf, size_t nb_elems)
Write data into a FIFO.
Definition: fifo.c:188
SchThreadFunc
int(* SchThreadFunc)(void *arg)
Definition: ffmpeg_sched.h:109
SchFilterOut::dst
SchedulerNode dst
Definition: ffmpeg_sched.c:242
SchDec::outputs
SchDecOutput * outputs
Definition: ffmpeg_sched.c:85
SchEnc
Definition: ffmpeg_sched.c:109
dummy
int dummy
Definition: motion.c:64
av_fifo_grow2
int av_fifo_grow2(AVFifo *f, size_t inc)
Enlarge an AVFifo.
Definition: fifo.c:99
SchDec::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:90
sch_add_mux_stream
int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
Add a muxed stream for a previously added muxer.
Definition: ffmpeg_sched.c:686
SchMux::class
const AVClass * class
Definition: ffmpeg_sched.c:213
SCH_STATE_STOPPED
@ SCH_STATE_STOPPED
Definition: ffmpeg_sched.c:270
sq_receive
int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
Read a frame from the queue.
Definition: sync_queue.c:586
AVERROR_BUFFER_TOO_SMALL
#define AVERROR_BUFFER_TOO_SMALL
Buffer too small.
Definition: error.h:53
type
it s the only field you need to keep assuming you have a context There is some magic you don t need to care about around this just let it vf type
Definition: writing_filters.txt:86
Scheduler::nb_dec
unsigned nb_dec
Definition: ffmpeg_sched.c:292
av_thread_message_queue_recv
int av_thread_message_queue_recv(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Receive a message from the queue.
Definition: threadmessage.c:177
CYCLE_NODE_DONE
@ CYCLE_NODE_DONE
Definition: ffmpeg_sched.c:1496
sch_add_filtergraph
int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, SchThreadFunc func, void *ctx)
Add a filtergraph to the scheduler.
Definition: ffmpeg_sched.c:859
av_frame_alloc
AVFrame * av_frame_alloc(void)
Allocate an AVFrame and set its fields to default values.
Definition: frame.c:52
SchFilterGraph
Definition: ffmpeg_sched.c:245
avassert.h
AV_LOG_ERROR
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:210
sch_free
void sch_free(Scheduler **psch)
Definition: ffmpeg_sched.c:497
Scheduler::state
enum SchedulerState state
Definition: ffmpeg_sched.c:306
SchMux::streams
SchMuxStream * streams
Definition: ffmpeg_sched.c:215
av_thread_message_queue_send
int av_thread_message_queue_send(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Send a message on the queue.
Definition: threadmessage.c:161
av_fifo_read
int av_fifo_read(AVFifo *f, void *buf, size_t nb_elems)
Read data from a FIFO.
Definition: fifo.c:240
SchMuxStream
Definition: ffmpeg_sched.c:190
SchDecOutput
Definition: ffmpeg_sched.c:74
sch_add_mux
int sch_add_mux(Scheduler *sch, SchThreadFunc func, int(*init)(void *), void *arg, int sdp_auto, unsigned thread_queue_size)
Add a muxer to the scheduler.
Definition: ffmpeg_sched.c:662
waiter_set
static void waiter_set(SchWaiter *w, int choked)
Definition: ffmpeg_sched.c:339
SchFilterGraph::nb_outputs
unsigned nb_outputs
Definition: ffmpeg_sched.c:254
SchDec::expect_end_ts
int expect_end_ts
Definition: ffmpeg_sched.c:94
pthread_mutex_unlock
static av_always_inline int pthread_mutex_unlock(pthread_mutex_t *mutex)
Definition: os2threads.h:126
enc_open
static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
Definition: ffmpeg_sched.c:1810
sch_alloc
Scheduler * sch_alloc(void)
Definition: ffmpeg_sched.c:615
dec_send_to_dst
static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVFrame *frame)
Definition: ffmpeg_sched.c:2337
task_init
static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx, SchThreadFunc func, void *func_arg)
Definition: ffmpeg_sched.c:426
SchMuxStream::nb_sub_heartbeat_dst
unsigned nb_sub_heartbeat_dst
Definition: ffmpeg_sched.c:194
SchEnc::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:113
sch_add_demux_stream
int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
Add a demuxed stream for a previously added demuxer.
Definition: ffmpeg_sched.c:745
SchMuxStream::src
SchedulerNode src
Definition: ffmpeg_sched.c:191
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:42
SchedulerNodeType
SchedulerNodeType
Definition: ffmpeg_sched.h:93
sch_dec_send
int sch_dec_send(Scheduler *sch, unsigned dec_idx, unsigned out_idx, AVFrame *frame)
Called by decoder tasks to send a decoded frame downstream.
Definition: ffmpeg_sched.c:2367
ctx
static AVFormatContext * ctx
Definition: movenc.c:49
nb_streams
static int nb_streams
Definition: ffprobe.c:348
SchMuxStream::source_finished
int source_finished
Definition: ffmpeg_sched.c:208
av_rescale_q
int64_t av_rescale_q(int64_t a, AVRational bq, AVRational cq)
Rescale a 64-bit integer by 2 rational numbers.
Definition: mathematics.c:142
ffmpeg_utils.h
filter_done
static int filter_done(Scheduler *sch, unsigned fg_idx)
Definition: ffmpeg_sched.c:2624
SchFilterGraph::class
const AVClass * class
Definition: ffmpeg_sched.c:246
sch_enc_receive
int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
Called by encoder tasks to obtain frames for encoding.
Definition: ffmpeg_sched.c:2436
AVThreadMessageQueue
Definition: threadmessage.c:30
av_mallocz
#define av_mallocz(s)
Definition: tableprint_vlc.h:31
atomic_load
#define atomic_load(object)
Definition: stdatomic.h:93
sch_sdp_filename
int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
Set the file path for the SDP.
Definition: ffmpeg_sched.c:649
SchEnc::class
const AVClass * class
Definition: ffmpeg_sched.c:110
demux_send_for_stream
static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds, AVPacket *pkt, unsigned flags)
Definition: ffmpeg_sched.c:2081
arg
const char * arg
Definition: jacosubdec.c:65
pthread_create
static av_always_inline int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
Definition: os2threads.h:80
SchMuxStream::pre_mux_queue
PreMuxQueue pre_mux_queue
Definition: ffmpeg_sched.c:196
sq_add_stream
int sq_add_stream(SyncQueue *sq, int limiting)
Add a new stream to the sync queue.
Definition: sync_queue.c:598
SchMux::mux_started
atomic_int mux_started
Set to 1 after starting the muxer task and flushing the pre-muxing queues.
Definition: ffmpeg_sched.c:228
Scheduler::finish_cond
pthread_cond_t finish_cond
Definition: ffmpeg_sched.c:288
SCH_NODE_TYPE_DEMUX
@ SCH_NODE_TYPE_DEMUX
Definition: ffmpeg_sched.h:95
Scheduler::demux
SchDemux * demux
Definition: ffmpeg_sched.c:276
AVPacket::buf
AVBufferRef * buf
A reference to the reference-counted buffer where the packet data is stored.
Definition: packet.h:571
tq_free
void tq_free(ThreadQueue **ptq)
Definition: thread_queue.c:54
LIBAVUTIL_VERSION_INT
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
waiter_uninit
static void waiter_uninit(SchWaiter *w)
Definition: ffmpeg_sched.c:366
AVClass
Describe the class of an AVClass context structure.
Definition: log.h:76
Scheduler::sdp_filename
char * sdp_filename
Definition: ffmpeg_sched.c:303
send_to_enc_sq
static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1855
NULL
#define NULL
Definition: coverity.c:32
SchEnc::open_cb
int(* open_cb)(void *opaque, const AVFrame *frame)
Definition: ffmpeg_sched.c:137
av_frame_copy_props
int av_frame_copy_props(AVFrame *dst, const AVFrame *src)
Copy only "metadata" fields from src to dst.
Definition: frame.c:599
SchFilterGraph::nb_inputs
unsigned nb_inputs
Definition: ffmpeg_sched.c:249
Scheduler::mux
SchMux * mux
Definition: ffmpeg_sched.c:279
schedule_update_locked
static void schedule_update_locked(Scheduler *sch)
Definition: ffmpeg_sched.c:1412
tq_receive_finish
void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the receiving side.
Definition: thread_queue.c:243
sch_add_enc
int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx, int(*open_cb)(void *opaque, const AVFrame *frame))
Definition: ffmpeg_sched.c:821
SchSyncQueue::enc_idx
unsigned * enc_idx
Definition: ffmpeg_sched.c:105
av_unreachable
#define av_unreachable(msg)
Asserts that are used as compiler optimization hints depending upon ASSERT_LEVEL and NBDEBUG.
Definition: avassert.h:116
SCH_STATE_STARTED
@ SCH_STATE_STARTED
Definition: ffmpeg_sched.c:269
choke_demux
static void choke_demux(const Scheduler *sch, int demux_id, int choked)
Definition: ffmpeg_sched.c:1382
dec_done
static int dec_done(Scheduler *sch, unsigned dec_idx)
Definition: ffmpeg_sched.c:2411
SchFilterGraph::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:259
SchDemux::class
const AVClass * class
Definition: ffmpeg_sched.c:157
av_fifo_can_read
size_t av_fifo_can_read(const AVFifo *f)
Definition: fifo.c:87
SchEnc::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:114
sch_add_dec
int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
Add a decoder to the scheduler.
Definition: ffmpeg_sched.c:778
SchWaiter::choked
atomic_int choked
Definition: ffmpeg_sched.c:55
SchWaiter::cond
pthread_cond_t cond
Definition: ffmpeg_sched.c:54
time.h
DEMUX_SEND_STREAMCOPY_EOF
@ DEMUX_SEND_STREAMCOPY_EOF
Treat the packet as an EOF for SCH_NODE_TYPE_MUX destinations send normally to other types.
Definition: ffmpeg_sched.h:340
sch_fg_class
static const AVClass sch_fg_class
Definition: ffmpeg_sched.c:853
QUEUE_FRAMES
@ QUEUE_FRAMES
Definition: ffmpeg_sched.c:49
av_packet_ref
int av_packet_ref(AVPacket *dst, const AVPacket *src)
Setup a new reference to the data described by a given packet.
Definition: packet.c:440
av_packet_move_ref
void av_packet_move_ref(AVPacket *dst, AVPacket *src)
Move every field in src to dst and reset src.
Definition: packet.c:489
SchTask::thread_running
int thread_running
Definition: ffmpeg_sched.c:71
sch_enc_send
int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
Called by encoder tasks to send encoded packets downstream.
Definition: ffmpeg_sched.c:2480
error.h
Scheduler
Definition: ffmpeg_sched.c:273
SchMux::nb_streams
unsigned nb_streams
Definition: ffmpeg_sched.c:216
SchSyncQueue::lock
pthread_mutex_t lock
Definition: ffmpeg_sched.c:103
SchMuxStream::sub_heartbeat_dst
unsigned * sub_heartbeat_dst
Definition: ffmpeg_sched.c:193
SchDec::class
const AVClass * class
Definition: ffmpeg_sched.c:81
sq_frame_samples
void sq_frame_samples(SyncQueue *sq, unsigned int stream_idx, int frame_samples)
Set a constant output audio frame size, in samples.
Definition: sync_queue.c:640
SchEnc::in_finished
int in_finished
Definition: ffmpeg_sched.c:144
SchDemux::task
SchTask task
Definition: ffmpeg_sched.c:162
SchDemuxStream::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:153
SchFilterGraph::nb_inputs_finished_receive
unsigned nb_inputs_finished_receive
Definition: ffmpeg_sched.c:251
tq_send
int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
Send an item for the given stream to the queue.
Definition: thread_queue.c:117
init
int(* init)(AVBSFContext *ctx)
Definition: dts2pts.c:550
AVPacket::size
int size
Definition: packet.h:589
AVFifo
Definition: fifo.c:35
SchSyncQueue::nb_enc_idx
unsigned nb_enc_idx
Definition: ffmpeg_sched.c:106
SchFilterGraph::task_exited
int task_exited
Definition: ffmpeg_sched.c:264
av_frame_ref
int av_frame_ref(AVFrame *dst, const AVFrame *src)
Set up a new reference to the data described by the source frame.
Definition: frame.c:278
threadmessage.h
dst
uint8_t ptrdiff_t const uint8_t ptrdiff_t int intptr_t intptr_t int int16_t * dst
Definition: dsp.h:87
PreMuxQueue::max_packets
int max_packets
Maximum number of packets in fifo.
Definition: ffmpeg_sched.c:180
i
#define i(width, name, range_min, range_max)
Definition: cbs_h264.c:63
SchFilterGraph::task
SchTask task
Definition: ffmpeg_sched.c:256
av_err2str
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
Definition: error.h:122
sch_filter_choke_inputs
void sch_filter_choke_inputs(Scheduler *sch, unsigned fg_idx)
Called by filtergraph tasks to choke all filter inputs, preventing them from receiving more frames un...
Definition: ffmpeg_sched.c:2663
SchWaiter::lock
pthread_mutex_t lock
Definition: ffmpeg_sched.c:53
sq_send
int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame)
Submit a frame for the stream with index stream_idx.
Definition: sync_queue.c:333
PreMuxQueue::data_threshold
size_t data_threshold
Definition: ffmpeg_sched.c:187
sq_free
void sq_free(SyncQueue **psq)
Definition: sync_queue.c:671
AV_NOPTS_VALUE
#define AV_NOPTS_VALUE
Undefined timestamp value.
Definition: avutil.h:247
sch_dec_class
static const AVClass sch_dec_class
Definition: ffmpeg_sched.c:772
SchFilterGraph::inputs
SchFilterIn * inputs
Definition: ffmpeg_sched.c:248
Scheduler::schedule_lock
pthread_mutex_t schedule_lock
Definition: ffmpeg_sched.c:309
frame.h
SchTask::func_arg
void * func_arg
Definition: ffmpeg_sched.c:68
SCH_NODE_TYPE_FILTER_OUT
@ SCH_NODE_TYPE_FILTER_OUT
Definition: ffmpeg_sched.h:100
AVPacket::dts
int64_t dts
Decompression timestamp in AVStream->time_base units; the time at which the packet is decompressed.
Definition: packet.h:587
enc_send_to_dst
static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVPacket *pkt)
Definition: ffmpeg_sched.c:2450
sch_mux_class
static const AVClass sch_mux_class
Definition: ffmpeg_sched.c:656
SchFilterIn
Definition: ffmpeg_sched.c:235
sch_filter_receive
int sch_filter_receive(Scheduler *sch, unsigned fg_idx, unsigned *in_idx, AVFrame *frame)
Called by filtergraph tasks to obtain frames for filtering.
Definition: ffmpeg_sched.c:2529
Scheduler::nb_mux
unsigned nb_mux
Definition: ffmpeg_sched.c:280
av_packet_alloc
AVPacket * av_packet_alloc(void)
Allocate an AVPacket and set its fields to default values.
Definition: packet.c:63
SchEnc::opened
int opened
Definition: ffmpeg_sched.c:138
scheduler_class
static const AVClass scheduler_class
Definition: ffmpeg_sched.c:610
pthread_t
Definition: os2threads.h:44
pthread_cond_destroy
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:144
Scheduler::nb_demux
unsigned nb_demux
Definition: ffmpeg_sched.c:277
av_thread_message_queue_alloc
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, unsigned nelem, unsigned elsize)
Allocate a new message queue.
Definition: threadmessage.c:45
pthread_mutex_destroy
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
av_packet_copy_props
int av_packet_copy_props(AVPacket *dst, const AVPacket *src)
Copy only "properties" fields from src to dst.
Definition: packet.c:395
SchDemuxStream::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:152
SchDemux::task_exited
int task_exited
Definition: ffmpeg_sched.c:169
SCH_NODE_TYPE_FILTER_IN
@ SCH_NODE_TYPE_FILTER_IN
Definition: ffmpeg_sched.h:99
task_start
static int task_start(SchTask *task)
Definition: ffmpeg_sched.c:404
Scheduler::filters
SchFilterGraph * filters
Definition: ffmpeg_sched.c:300
AVPacket::pts
int64_t pts
Presentation timestamp in AVStream->time_base units; the time at which the decompressed packet will b...
Definition: packet.h:581
demux_done
static int demux_done(Scheduler *sch, unsigned demux_idx)
Definition: ffmpeg_sched.c:2179
packet.h
sch_remove_filtergraph
void sch_remove_filtergraph(Scheduler *sch, int idx)
Definition: ffmpeg_sched.c:480
SchWaiter::choked_next
int choked_next
Definition: ffmpeg_sched.c:60
SchFilterGraph::best_input
unsigned best_input
Definition: ffmpeg_sched.c:263
av_malloc_array
#define av_malloc_array(a, b)
Definition: tableprint_vlc.h:32
Scheduler::mux_ready_lock
pthread_mutex_t mux_ready_lock
Definition: ffmpeg_sched.c:283
Scheduler::terminate
atomic_int terminate
Definition: ffmpeg_sched.c:307
SchDec
Definition: ffmpeg_sched.c:80
av_assert1
#define av_assert1(cond)
assert() equivalent, that does not lie in speed critical code.
Definition: avassert.h:58
DEFAULT_PACKET_THREAD_QUEUE_SIZE
#define DEFAULT_PACKET_THREAD_QUEUE_SIZE
Default size of a packet thread queue.
Definition: ffmpeg_sched.h:257
QueueType
QueueType
Definition: ffmpeg_sched.c:47
THREAD_QUEUE_FRAMES
@ THREAD_QUEUE_FRAMES
Definition: thread_queue.h:25
FFMIN
#define FFMIN(a, b)
Definition: macros.h:49
SchDec::nb_outputs
unsigned nb_outputs
Definition: ffmpeg_sched.c:86
av_frame_unref
void av_frame_unref(AVFrame *frame)
Unreference all the buffers referenced by frame and reset the frame fields.
Definition: frame.c:496
SchFilterGraph::outputs
SchFilterOut * outputs
Definition: ffmpeg_sched.c:253
CYCLE_NODE_STARTED
@ CYCLE_NODE_STARTED
Definition: ffmpeg_sched.c:1495
sch_enc_class
static const AVClass sch_enc_class
Definition: ffmpeg_sched.c:815
SchedulerNode
Definition: ffmpeg_sched.h:103
SCH_NODE_TYPE_DEC
@ SCH_NODE_TYPE_DEC
Definition: ffmpeg_sched.h:97
pthread_cond_t
Definition: os2threads.h:58
SchTask
Definition: ffmpeg_sched.c:63
mux_init
static int mux_init(Scheduler *sch, SchMux *mux)
Definition: ffmpeg_sched.c:1199
send_to_filter
static int send_to_filter(Scheduler *sch, SchFilterGraph *fg, unsigned in_idx, AVFrame *frame)
Definition: ffmpeg_sched.c:2314
tq_receive
int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
Read the next item from the queue.
Definition: thread_queue.c:197
SchDemuxStream::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:151
av_calloc
void * av_calloc(size_t nmemb, size_t size)
Definition: mem.c:264
sch_connect
int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
Definition: ffmpeg_sched.c:957
send_to_enc
static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1931
sch_filter_command
int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
Definition: ffmpeg_sched.c:2653
SchDemuxStream
Definition: ffmpeg_sched.c:150
Timestamp::tb
AVRational tb
Definition: ffmpeg_utils.h:32
atomic_int_least64_t
intptr_t atomic_int_least64_t
Definition: stdatomic.h:68
ret
ret
Definition: filter_design.txt:187
sch_dec_receive
int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
Called by decoder tasks to receive a packet for decoding.
Definition: ffmpeg_sched.c:2285
AVClass::class_name
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:81
frame
these buffered frames must be flushed immediately if a new input produces new the filter must not call request_frame to get more It must just process the frame or queue it The task of requesting more frames is left to the filter s request_frame method or the application If a filter has several the filter must be ready for frames arriving randomly on any input any filter with several inputs will most likely require some kind of queuing mechanism It is perfectly acceptable to have a limited queue and to drop frames when the inputs are too unbalanced request_frame For filters that do not use the this method is called when a frame is wanted on an output For a it should directly call filter_frame on the corresponding output For a if there are queued frames already one of these frames should be pushed If the filter should request a frame on one of its repeatedly until at least one frame has been pushed Return or at least make progress towards producing a frame
Definition: filter_design.txt:265
SchMuxStream::init_eof
int init_eof
Definition: ffmpeg_sched.c:199
mux_queue_packet
static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt)
Definition: ffmpeg_sched.c:1951
SchMux::init
int(* init)(void *arg)
Definition: ffmpeg_sched.c:219
sch_demux_class
static const AVClass sch_demux_class
Definition: ffmpeg_sched.c:712
ThreadQueue
Definition: thread_queue.c:40
av_fifo_alloc2
AVFifo * av_fifo_alloc2(size_t nb_elems, size_t elem_size, unsigned int flags)
Allocate and initialize an AVFifo with a given element size.
Definition: fifo.c:47
pthread_cond_signal
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:152
task_wrapper
static void * task_wrapper(void *arg)
Definition: ffmpeg_sched.c:2687
SchMux::task
SchTask task
Definition: ffmpeg_sched.c:221
SyncQueue
A sync queue provides timestamp synchronization between multiple streams.
Definition: sync_queue.c:90
sch_demux_send
int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt, unsigned flags)
Called by demuxer tasks to communicate with their downstreams.
Definition: ffmpeg_sched.c:2157
SchDemux
Definition: ffmpeg_sched.c:156
Scheduler::dec
SchDec * dec
Definition: ffmpeg_sched.c:291
SchDec::queue_end_ts
AVThreadMessageQueue * queue_end_ts
Definition: ffmpeg_sched.c:93
demux_stream_send_to_dst
static int demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
Definition: ffmpeg_sched.c:2046
SchDec::src
SchedulerNode src
Definition: ffmpeg_sched.c:83
thread_queue.h
AVPacket::stream_index
int stream_index
Definition: packet.h:590
GROW_ARRAY
#define GROW_ARRAY(array, nb_elems)
Definition: cmdutils.h:536
pthread_cond_wait
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:192
SchMux::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:229
SchDemux::waiter
SchWaiter waiter
Definition: ffmpeg_sched.c:163
av_gettime
int64_t av_gettime(void)
Get the current time in microseconds.
Definition: time.c:39
waiter_wait
static int waiter_wait(Scheduler *sch, SchWaiter *w)
Wait until this task is allowed to proceed.
Definition: ffmpeg_sched.c:320
av_strdup
char * av_strdup(const char *s)
Duplicate a string.
Definition: mem.c:272
SchSyncQueue::frame
AVFrame * frame
Definition: ffmpeg_sched.c:102
trailing_dts
static int64_t trailing_dts(const Scheduler *sch)
Definition: ffmpeg_sched.c:438
Scheduler::task_failed
unsigned task_failed
Definition: ffmpeg_sched.c:286
SchTask::node
SchedulerNode node
Definition: ffmpeg_sched.c:65
sch_sq_add_enc
int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx, int limiting, uint64_t max_frames)
Definition: ffmpeg_sched.c:926
print_sdp
int print_sdp(const char *filename)
Definition: ffmpeg_mux.c:507
mem.h
start_prepare
static int start_prepare(Scheduler *sch)
Definition: ffmpeg_sched.c:1603
sch_mux_sub_heartbeat_add
int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, unsigned dec_idx)
Definition: ffmpeg_sched.c:1276
w
uint8_t w
Definition: llvidencdsp.c:39
SchedulerNode::idx
unsigned idx
Definition: ffmpeg_sched.h:105
sch_filter_receive_finish
void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
Called by filter tasks to signal that a filter input will no longer accept input.
Definition: ffmpeg_sched.c:2573
ffmpeg_sched.h
sch_add_dec_output
int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
Add another output to decoder (e.g.
Definition: ffmpeg_sched.c:757
SchEnc::src
SchedulerNode src
Definition: ffmpeg_sched.c:112
sch_wait
int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
Wait until transcoding terminates or the specified timeout elapses.
Definition: ffmpeg_sched.c:1785
AVPacket
This structure stores compressed data.
Definition: packet.h:565
av_thread_message_queue_free
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
Free a message queue.
Definition: threadmessage.c:96
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:35
src_filtergraph
static SchedulerNode src_filtergraph(const Scheduler *sch, SchedulerNode src)
Definition: ffmpeg_sched.c:1500
cmdutils.h
SchSyncQueue
Definition: ffmpeg_sched.c:100
SchMux::queue_size
unsigned queue_size
Definition: ffmpeg_sched.c:230
SchTask::parent
Scheduler * parent
Definition: ffmpeg_sched.c:64
SchDec::send_frame
AVFrame * send_frame
Definition: ffmpeg_sched.c:97
queue_alloc
static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size, enum QueueType type)
Definition: ffmpeg_sched.c:372
sch_start
int sch_start(Scheduler *sch)
Definition: ffmpeg_sched.c:1719
av_thread_message_queue_set_err_recv
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err)
Set the receiving error code.
Definition: threadmessage.c:204
pthread_cond_timedwait
static av_always_inline int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime)
Definition: os2threads.h:170
av_log
#define av_log(a,...)
Definition: tableprint_vlc.h:27
sch_mux_sub_heartbeat
int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, const AVPacket *pkt)
Definition: ffmpeg_sched.c:2232
av_fifo_freep2
void av_fifo_freep2(AVFifo **f)
Free an AVFifo and reset pointer to NULL.
Definition: fifo.c:286
SchDecOutput::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:75
SchEnc::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:142
pthread_cond_init
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:133
AVERROR_EXIT
#define AVERROR_EXIT
Immediate exit was requested; the called function should not be restarted.
Definition: error.h:58
SYNC_QUEUE_FRAMES
@ SYNC_QUEUE_FRAMES
Definition: sync_queue.h:30
sq_alloc
SyncQueue * sq_alloc(enum SyncQueueType type, int64_t buf_size_us, void *logctx)
Allocate a sync queue of the given type.
Definition: sync_queue.c:654
pkt
static AVPacket * pkt
Definition: demux_decode.c:55
atomic_init
#define atomic_init(obj, value)
Definition: stdatomic.h:33
SchEnc::task
SchTask task
Definition: ffmpeg_sched.c:140
Timestamp
Definition: ffmpeg_utils.h:30
SchFilterIn::src
SchedulerNode src
Definition: ffmpeg_sched.c:236
sch_mux_stream_buffering
void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, size_t data_threshold, int max_packets)
Configure limits on packet buffering performed before the muxer task is started.
Definition: ffmpeg_sched.c:1235
check_acyclic
static int check_acyclic(Scheduler *sch)
Definition: ffmpeg_sched.c:1567
SchDemux::streams
SchDemuxStream * streams
Definition: ffmpeg_sched.c:159
PreMuxQueue
Definition: ffmpeg_sched.c:172
Scheduler::sdp_auto
int sdp_auto
Definition: ffmpeg_sched.c:304
src
#define src
Definition: vp8dsp.c:248
SchFilterIn::send_finished
int send_finished
Definition: ffmpeg_sched.c:237
SchFilterGraph::waiter
SchWaiter waiter
Definition: ffmpeg_sched.c:260
AVPacket::time_base
AVRational time_base
Time base of the packet's timestamps.
Definition: packet.h:632
unchoke_for_stream
static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
Definition: ffmpeg_sched.c:1344
AVPacket::side_data_elems
int side_data_elems
Definition: packet.h:600
tq_choke
void tq_choke(ThreadQueue *tq, int choked)
Prevent further reads from the thread queue until it is unchoked.
Definition: thread_queue.c:258
sch_mux_receive
int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
Called by muxer tasks to obtain packets for muxing.
Definition: ffmpeg_sched.c:2201
sch_add_sq_enc
int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
Add an pre-encoding sync queue to the scheduler.
Definition: ffmpeg_sched.c:901
SchDecOutput::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:77
SchEnc::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:115
tq_send_finish
void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the sending side.
Definition: thread_queue.c:227
Scheduler::nb_filters
unsigned nb_filters
Definition: ffmpeg_sched.c:301