FFmpeg
ffmpeg_sched.c
Go to the documentation of this file.
1 /*
2  * Inter-thread scheduling/synchronization.
3  * Copyright (c) 2023 Anton Khirnov
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <stdatomic.h>
23 #include <stddef.h>
24 #include <stdint.h>
25 
26 #include "cmdutils.h"
27 #include "ffmpeg_sched.h"
28 #include "ffmpeg_utils.h"
29 #include "sync_queue.h"
30 #include "thread_queue.h"
31 
32 #include "libavcodec/packet.h"
33 
34 #include "libavutil/avassert.h"
35 #include "libavutil/error.h"
36 #include "libavutil/fifo.h"
37 #include "libavutil/frame.h"
38 #include "libavutil/mem.h"
39 #include "libavutil/thread.h"
41 #include "libavutil/time.h"
42 
43 // 100 ms
44 // FIXME: some other value? make this dynamic?
45 #define SCHEDULE_TOLERANCE (100 * 1000)
46 
47 enum QueueType {
50 };
51 
52 typedef struct SchWaiter {
56 
57  // the following are internal state of schedule_update_locked() and must not
58  // be accessed outside of it
61 } SchWaiter;
62 
63 typedef struct SchTask {
66 
68  void *func_arg;
69 
72 } SchTask;
73 
74 typedef struct SchDec {
75  const AVClass *class;
76 
79  uint8_t *dst_finished;
80  unsigned nb_dst;
81 
83  // Queue for receiving input packets, one stream.
85 
86  // Queue for sending post-flush end timestamps back to the source
89 
90  // temporary storage used by sch_dec_send()
92 } SchDec;
93 
94 typedef struct SchSyncQueue {
98 
99  unsigned *enc_idx;
100  unsigned nb_enc_idx;
101 } SchSyncQueue;
102 
103 typedef struct SchEnc {
104  const AVClass *class;
105 
108  uint8_t *dst_finished;
109  unsigned nb_dst;
110 
111  // [0] - index of the sync queue in Scheduler.sq_enc,
112  // [1] - index of this encoder in the sq
113  int sq_idx[2];
114 
115  /* Opening encoders is somewhat nontrivial due to their interaction with
116  * sync queues, which are (among other things) responsible for maintaining
117  * constant audio frame size, when it is required by the encoder.
118  *
119  * Opening the encoder requires stream parameters, obtained from the first
120  * frame. However, that frame cannot be properly chunked by the sync queue
121  * without knowing the required frame size, which is only available after
122  * opening the encoder.
123  *
124  * This apparent circular dependency is resolved in the following way:
125  * - the caller creating the encoder gives us a callback which opens the
126  * encoder and returns the required frame size (if any)
127  * - when the first frame is sent to the encoder, the sending thread
128  * - calls this callback, opening the encoder
129  * - passes the returned frame size to the sync queue
130  */
131  int (*open_cb)(void *opaque, const AVFrame *frame);
132  int opened;
133 
135  // Queue for receiving input frames, one stream.
137  // tq_send() to queue returned EOF
139 
140  // temporary storage used by sch_enc_send()
142 } SchEnc;
143 
144 typedef struct SchDemuxStream {
146  uint8_t *dst_finished;
147  unsigned nb_dst;
149 
150 typedef struct SchDemux {
151  const AVClass *class;
152 
154  unsigned nb_streams;
155 
158 
159  // temporary storage used by sch_demux_send()
161 
162  // protected by schedule_lock
164 } SchDemux;
165 
166 typedef struct PreMuxQueue {
167  /**
168  * Queue for buffering the packets before the muxer task can be started.
169  */
171  /**
172  * Maximum number of packets in fifo.
173  */
175  /*
176  * The size of the AVPackets' buffers in queue.
177  * Updated when a packet is either pushed or pulled from the queue.
178  */
179  size_t data_size;
180  /* Threshold after which max_packets will be in effect */
182 } PreMuxQueue;
183 
184 typedef struct SchMuxStream {
187 
188  unsigned *sub_heartbeat_dst;
190 
192 
193  // an EOF was generated while flushing the pre-mux queue
194  int init_eof;
195 
196  ////////////////////////////////////////////////////////////
197  // The following are protected by Scheduler.schedule_lock //
198 
199  /* dts+duration of the last packet sent to this stream
200  in AV_TIME_BASE_Q */
202  // this stream no longer accepts input
204  ////////////////////////////////////////////////////////////
205 } SchMuxStream;
206 
207 typedef struct SchMux {
208  const AVClass *class;
209 
211  unsigned nb_streams;
213 
214  int (*init)(void *arg);
215 
217  /**
218  * Set to 1 after starting the muxer task and flushing the
219  * pre-muxing queues.
220  * Set either before any tasks have started, or with
221  * Scheduler.mux_ready_lock held.
222  */
225  unsigned queue_size;
226 
228 } SchMux;
229 
230 typedef struct SchFilterIn {
235 } SchFilterIn;
236 
237 typedef struct SchFilterOut {
239 } SchFilterOut;
240 
241 typedef struct SchFilterGraph {
242  const AVClass *class;
243 
245  unsigned nb_inputs;
248 
250  unsigned nb_outputs;
251 
253  // input queue, nb_inputs+1 streams
254  // last stream is control
257 
258  // protected by schedule_lock
259  unsigned best_input;
262 
267 };
268 
269 struct Scheduler {
270  const AVClass *class;
271 
273  unsigned nb_demux;
274 
276  unsigned nb_mux;
277 
278  unsigned nb_mux_ready;
280 
281  unsigned nb_mux_done;
284 
285 
287  unsigned nb_dec;
288 
290  unsigned nb_enc;
291 
293  unsigned nb_sq_enc;
294 
296  unsigned nb_filters;
297 
299  int sdp_auto;
300 
304 
306 
308 };
309 
310 /**
311  * Wait until this task is allowed to proceed.
312  *
313  * @retval 0 the caller should proceed
314  * @retval 1 the caller should terminate
315  */
316 static int waiter_wait(Scheduler *sch, SchWaiter *w)
317 {
318  int terminate;
319 
320  if (!atomic_load(&w->choked))
321  return 0;
322 
323  pthread_mutex_lock(&w->lock);
324 
325  while (atomic_load(&w->choked) && !atomic_load(&sch->terminate))
326  pthread_cond_wait(&w->cond, &w->lock);
327 
328  terminate = atomic_load(&sch->terminate);
329 
330  pthread_mutex_unlock(&w->lock);
331 
332  return terminate;
333 }
334 
335 static void waiter_set(SchWaiter *w, int choked)
336 {
337  pthread_mutex_lock(&w->lock);
338 
339  atomic_store(&w->choked, choked);
340  pthread_cond_signal(&w->cond);
341 
342  pthread_mutex_unlock(&w->lock);
343 }
344 
345 static int waiter_init(SchWaiter *w)
346 {
347  int ret;
348 
349  atomic_init(&w->choked, 0);
350 
351  ret = pthread_mutex_init(&w->lock, NULL);
352  if (ret)
353  return AVERROR(ret);
354 
355  ret = pthread_cond_init(&w->cond, NULL);
356  if (ret)
357  return AVERROR(ret);
358 
359  return 0;
360 }
361 
362 static void waiter_uninit(SchWaiter *w)
363 {
364  pthread_mutex_destroy(&w->lock);
365  pthread_cond_destroy(&w->cond);
366 }
367 
368 static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size,
369  enum QueueType type)
370 {
371  ThreadQueue *tq;
372  ObjPool *op;
373 
374  if (queue_size <= 0) {
375  if (type == QUEUE_FRAMES)
376  queue_size = DEFAULT_FRAME_THREAD_QUEUE_SIZE;
377  else
379  }
380 
381  if (type == QUEUE_FRAMES) {
382  // This queue length is used in the decoder code to ensure that
383  // there are enough entries in fixed-size frame pools to account
384  // for frames held in queues inside the ffmpeg utility. If this
385  // can ever dynamically change then the corresponding decode
386  // code needs to be updated as well.
388  }
389 
392  if (!op)
393  return AVERROR(ENOMEM);
394 
395  tq = tq_alloc(nb_streams, queue_size, op,
397  if (!tq) {
398  objpool_free(&op);
399  return AVERROR(ENOMEM);
400  }
401 
402  *ptq = tq;
403  return 0;
404 }
405 
406 static void *task_wrapper(void *arg);
407 
408 static int task_start(SchTask *task)
409 {
410  int ret;
411 
412  av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n");
413 
414  av_assert0(!task->thread_running);
415 
416  ret = pthread_create(&task->thread, NULL, task_wrapper, task);
417  if (ret) {
418  av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n",
419  strerror(ret));
420  return AVERROR(ret);
421  }
422 
423  task->thread_running = 1;
424  return 0;
425 }
426 
427 static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx,
428  SchThreadFunc func, void *func_arg)
429 {
430  task->parent = sch;
431 
432  task->node.type = type;
433  task->node.idx = idx;
434 
435  task->func = func;
436  task->func_arg = func_arg;
437 }
438 
439 static int64_t trailing_dts(const Scheduler *sch, int count_finished)
440 {
441  int64_t min_dts = INT64_MAX;
442 
443  for (unsigned i = 0; i < sch->nb_mux; i++) {
444  const SchMux *mux = &sch->mux[i];
445 
446  for (unsigned j = 0; j < mux->nb_streams; j++) {
447  const SchMuxStream *ms = &mux->streams[j];
448 
449  if (ms->source_finished && !count_finished)
450  continue;
451  if (ms->last_dts == AV_NOPTS_VALUE)
452  return AV_NOPTS_VALUE;
453 
454  min_dts = FFMIN(min_dts, ms->last_dts);
455  }
456  }
457 
458  return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts;
459 }
460 
461 void sch_free(Scheduler **psch)
462 {
463  Scheduler *sch = *psch;
464 
465  if (!sch)
466  return;
467 
468  sch_stop(sch, NULL);
469 
470  for (unsigned i = 0; i < sch->nb_demux; i++) {
471  SchDemux *d = &sch->demux[i];
472 
473  for (unsigned j = 0; j < d->nb_streams; j++) {
474  SchDemuxStream *ds = &d->streams[j];
475  av_freep(&ds->dst);
476  av_freep(&ds->dst_finished);
477  }
478  av_freep(&d->streams);
479 
480  av_packet_free(&d->send_pkt);
481 
482  waiter_uninit(&d->waiter);
483  }
484  av_freep(&sch->demux);
485 
486  for (unsigned i = 0; i < sch->nb_mux; i++) {
487  SchMux *mux = &sch->mux[i];
488 
489  for (unsigned j = 0; j < mux->nb_streams; j++) {
490  SchMuxStream *ms = &mux->streams[j];
491 
492  if (ms->pre_mux_queue.fifo) {
493  AVPacket *pkt;
494  while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0)
497  }
498 
500  }
501  av_freep(&mux->streams);
502 
504 
505  tq_free(&mux->queue);
506  }
507  av_freep(&sch->mux);
508 
509  for (unsigned i = 0; i < sch->nb_dec; i++) {
510  SchDec *dec = &sch->dec[i];
511 
512  tq_free(&dec->queue);
513 
515 
516  av_freep(&dec->dst);
517  av_freep(&dec->dst_finished);
518 
519  av_frame_free(&dec->send_frame);
520  }
521  av_freep(&sch->dec);
522 
523  for (unsigned i = 0; i < sch->nb_enc; i++) {
524  SchEnc *enc = &sch->enc[i];
525 
526  tq_free(&enc->queue);
527 
528  av_packet_free(&enc->send_pkt);
529 
530  av_freep(&enc->dst);
531  av_freep(&enc->dst_finished);
532  }
533  av_freep(&sch->enc);
534 
535  for (unsigned i = 0; i < sch->nb_sq_enc; i++) {
536  SchSyncQueue *sq = &sch->sq_enc[i];
537  sq_free(&sq->sq);
538  av_frame_free(&sq->frame);
540  av_freep(&sq->enc_idx);
541  }
542  av_freep(&sch->sq_enc);
543 
544  for (unsigned i = 0; i < sch->nb_filters; i++) {
545  SchFilterGraph *fg = &sch->filters[i];
546 
547  tq_free(&fg->queue);
548 
549  av_freep(&fg->inputs);
550  av_freep(&fg->outputs);
551 
552  waiter_uninit(&fg->waiter);
553  }
554  av_freep(&sch->filters);
555 
556  av_freep(&sch->sdp_filename);
557 
559 
561 
564 
565  av_freep(psch);
566 }
567 
568 static const AVClass scheduler_class = {
569  .class_name = "Scheduler",
570  .version = LIBAVUTIL_VERSION_INT,
571 };
572 
574 {
575  Scheduler *sch;
576  int ret;
577 
578  sch = av_mallocz(sizeof(*sch));
579  if (!sch)
580  return NULL;
581 
582  sch->class = &scheduler_class;
583  sch->sdp_auto = 1;
584 
586  if (ret)
587  goto fail;
588 
590  if (ret)
591  goto fail;
592 
594  if (ret)
595  goto fail;
596 
598  if (ret)
599  goto fail;
600 
601  return sch;
602 fail:
603  sch_free(&sch);
604  return NULL;
605 }
606 
607 int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
608 {
609  av_freep(&sch->sdp_filename);
610  sch->sdp_filename = av_strdup(sdp_filename);
611  return sch->sdp_filename ? 0 : AVERROR(ENOMEM);
612 }
613 
614 static const AVClass sch_mux_class = {
615  .class_name = "SchMux",
616  .version = LIBAVUTIL_VERSION_INT,
617  .parent_log_context_offset = offsetof(SchMux, task.func_arg),
618 };
619 
620 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
621  void *arg, int sdp_auto, unsigned thread_queue_size)
622 {
623  const unsigned idx = sch->nb_mux;
624 
625  SchMux *mux;
626  int ret;
627 
628  ret = GROW_ARRAY(sch->mux, sch->nb_mux);
629  if (ret < 0)
630  return ret;
631 
632  mux = &sch->mux[idx];
633  mux->class = &sch_mux_class;
634  mux->init = init;
635  mux->queue_size = thread_queue_size;
636 
637  task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
638 
639  sch->sdp_auto &= sdp_auto;
640 
641  return idx;
642 }
643 
644 int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
645 {
646  SchMux *mux;
647  SchMuxStream *ms;
648  unsigned stream_idx;
649  int ret;
650 
651  av_assert0(mux_idx < sch->nb_mux);
652  mux = &sch->mux[mux_idx];
653 
654  ret = GROW_ARRAY(mux->streams, mux->nb_streams);
655  if (ret < 0)
656  return ret;
657  stream_idx = mux->nb_streams - 1;
658 
659  ms = &mux->streams[stream_idx];
660 
661  ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
662  if (!ms->pre_mux_queue.fifo)
663  return AVERROR(ENOMEM);
664 
665  ms->last_dts = AV_NOPTS_VALUE;
666 
667  return stream_idx;
668 }
669 
670 static const AVClass sch_demux_class = {
671  .class_name = "SchDemux",
672  .version = LIBAVUTIL_VERSION_INT,
673  .parent_log_context_offset = offsetof(SchDemux, task.func_arg),
674 };
675 
677 {
678  const unsigned idx = sch->nb_demux;
679 
680  SchDemux *d;
681  int ret;
682 
683  ret = GROW_ARRAY(sch->demux, sch->nb_demux);
684  if (ret < 0)
685  return ret;
686 
687  d = &sch->demux[idx];
688 
689  task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx);
690 
691  d->class = &sch_demux_class;
692  d->send_pkt = av_packet_alloc();
693  if (!d->send_pkt)
694  return AVERROR(ENOMEM);
695 
696  ret = waiter_init(&d->waiter);
697  if (ret < 0)
698  return ret;
699 
700  return idx;
701 }
702 
703 int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
704 {
705  SchDemux *d;
706  int ret;
707 
708  av_assert0(demux_idx < sch->nb_demux);
709  d = &sch->demux[demux_idx];
710 
711  ret = GROW_ARRAY(d->streams, d->nb_streams);
712  return ret < 0 ? ret : d->nb_streams - 1;
713 }
714 
715 static const AVClass sch_dec_class = {
716  .class_name = "SchDec",
717  .version = LIBAVUTIL_VERSION_INT,
718  .parent_log_context_offset = offsetof(SchDec, task.func_arg),
719 };
720 
722  int send_end_ts)
723 {
724  const unsigned idx = sch->nb_dec;
725 
726  SchDec *dec;
727  int ret;
728 
729  ret = GROW_ARRAY(sch->dec, sch->nb_dec);
730  if (ret < 0)
731  return ret;
732 
733  dec = &sch->dec[idx];
734 
735  task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx);
736 
737  dec->class = &sch_dec_class;
738  dec->send_frame = av_frame_alloc();
739  if (!dec->send_frame)
740  return AVERROR(ENOMEM);
741 
742  ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
743  if (ret < 0)
744  return ret;
745 
746  if (send_end_ts) {
748  if (ret < 0)
749  return ret;
750  }
751 
752  return idx;
753 }
754 
755 static const AVClass sch_enc_class = {
756  .class_name = "SchEnc",
757  .version = LIBAVUTIL_VERSION_INT,
758  .parent_log_context_offset = offsetof(SchEnc, task.func_arg),
759 };
760 
762  int (*open_cb)(void *opaque, const AVFrame *frame))
763 {
764  const unsigned idx = sch->nb_enc;
765 
766  SchEnc *enc;
767  int ret;
768 
769  ret = GROW_ARRAY(sch->enc, sch->nb_enc);
770  if (ret < 0)
771  return ret;
772 
773  enc = &sch->enc[idx];
774 
775  enc->class = &sch_enc_class;
776  enc->open_cb = open_cb;
777  enc->sq_idx[0] = -1;
778  enc->sq_idx[1] = -1;
779 
780  task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
781 
782  enc->send_pkt = av_packet_alloc();
783  if (!enc->send_pkt)
784  return AVERROR(ENOMEM);
785 
786  ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
787  if (ret < 0)
788  return ret;
789 
790  return idx;
791 }
792 
793 static const AVClass sch_fg_class = {
794  .class_name = "SchFilterGraph",
795  .version = LIBAVUTIL_VERSION_INT,
796  .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg),
797 };
798 
799 int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
800  SchThreadFunc func, void *ctx)
801 {
802  const unsigned idx = sch->nb_filters;
803 
804  SchFilterGraph *fg;
805  int ret;
806 
807  ret = GROW_ARRAY(sch->filters, sch->nb_filters);
808  if (ret < 0)
809  return ret;
810  fg = &sch->filters[idx];
811 
812  fg->class = &sch_fg_class;
813 
814  task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx);
815 
816  if (nb_inputs) {
817  fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs));
818  if (!fg->inputs)
819  return AVERROR(ENOMEM);
820  fg->nb_inputs = nb_inputs;
821  }
822 
823  if (nb_outputs) {
824  fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs));
825  if (!fg->outputs)
826  return AVERROR(ENOMEM);
827  fg->nb_outputs = nb_outputs;
828  }
829 
830  ret = waiter_init(&fg->waiter);
831  if (ret < 0)
832  return ret;
833 
834  ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
835  if (ret < 0)
836  return ret;
837 
838  return idx;
839 }
840 
841 int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
842 {
843  SchSyncQueue *sq;
844  int ret;
845 
846  ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc);
847  if (ret < 0)
848  return ret;
849  sq = &sch->sq_enc[sch->nb_sq_enc - 1];
850 
851  sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx);
852  if (!sq->sq)
853  return AVERROR(ENOMEM);
854 
855  sq->frame = av_frame_alloc();
856  if (!sq->frame)
857  return AVERROR(ENOMEM);
858 
859  ret = pthread_mutex_init(&sq->lock, NULL);
860  if (ret)
861  return AVERROR(ret);
862 
863  return sq - sch->sq_enc;
864 }
865 
866 int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
867  int limiting, uint64_t max_frames)
868 {
869  SchSyncQueue *sq;
870  SchEnc *enc;
871  int ret;
872 
873  av_assert0(sq_idx < sch->nb_sq_enc);
874  sq = &sch->sq_enc[sq_idx];
875 
876  av_assert0(enc_idx < sch->nb_enc);
877  enc = &sch->enc[enc_idx];
878 
879  ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx);
880  if (ret < 0)
881  return ret;
882  sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx;
883 
884  ret = sq_add_stream(sq->sq, limiting);
885  if (ret < 0)
886  return ret;
887 
888  enc->sq_idx[0] = sq_idx;
889  enc->sq_idx[1] = ret;
890 
891  if (max_frames != INT64_MAX)
892  sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames);
893 
894  return 0;
895 }
896 
898 {
899  int ret;
900 
901  switch (src.type) {
902  case SCH_NODE_TYPE_DEMUX: {
903  SchDemuxStream *ds;
904 
905  av_assert0(src.idx < sch->nb_demux &&
906  src.idx_stream < sch->demux[src.idx].nb_streams);
907  ds = &sch->demux[src.idx].streams[src.idx_stream];
908 
909  ret = GROW_ARRAY(ds->dst, ds->nb_dst);
910  if (ret < 0)
911  return ret;
912 
913  ds->dst[ds->nb_dst - 1] = dst;
914 
915  // demuxed packets go to decoding or streamcopy
916  switch (dst.type) {
917  case SCH_NODE_TYPE_DEC: {
918  SchDec *dec;
919 
920  av_assert0(dst.idx < sch->nb_dec);
921  dec = &sch->dec[dst.idx];
922 
923  av_assert0(!dec->src.type);
924  dec->src = src;
925  break;
926  }
927  case SCH_NODE_TYPE_MUX: {
928  SchMuxStream *ms;
929 
930  av_assert0(dst.idx < sch->nb_mux &&
931  dst.idx_stream < sch->mux[dst.idx].nb_streams);
932  ms = &sch->mux[dst.idx].streams[dst.idx_stream];
933 
934  av_assert0(!ms->src.type);
935  ms->src = src;
936 
937  break;
938  }
939  default: av_assert0(0);
940  }
941 
942  break;
943  }
944  case SCH_NODE_TYPE_DEC: {
945  SchDec *dec;
946 
947  av_assert0(src.idx < sch->nb_dec);
948  dec = &sch->dec[src.idx];
949 
950  ret = GROW_ARRAY(dec->dst, dec->nb_dst);
951  if (ret < 0)
952  return ret;
953 
954  dec->dst[dec->nb_dst - 1] = dst;
955 
956  // decoded frames go to filters or encoding
957  switch (dst.type) {
959  SchFilterIn *fi;
960 
961  av_assert0(dst.idx < sch->nb_filters &&
962  dst.idx_stream < sch->filters[dst.idx].nb_inputs);
963  fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
964 
965  av_assert0(!fi->src.type);
966  fi->src = src;
967  break;
968  }
969  case SCH_NODE_TYPE_ENC: {
970  SchEnc *enc;
971 
972  av_assert0(dst.idx < sch->nb_enc);
973  enc = &sch->enc[dst.idx];
974 
975  av_assert0(!enc->src.type);
976  enc->src = src;
977  break;
978  }
979  default: av_assert0(0);
980  }
981 
982  break;
983  }
985  SchFilterOut *fo;
986  SchEnc *enc;
987 
988  av_assert0(src.idx < sch->nb_filters &&
989  src.idx_stream < sch->filters[src.idx].nb_outputs);
990  // filtered frames go to encoding
992  dst.idx < sch->nb_enc);
993 
994  fo = &sch->filters[src.idx].outputs[src.idx_stream];
995  enc = &sch->enc[dst.idx];
996 
997  av_assert0(!fo->dst.type && !enc->src.type);
998  fo->dst = dst;
999  enc->src = src;
1000 
1001  break;
1002  }
1003  case SCH_NODE_TYPE_ENC: {
1004  SchEnc *enc;
1005 
1006  av_assert0(src.idx < sch->nb_enc);
1007  enc = &sch->enc[src.idx];
1008 
1009  ret = GROW_ARRAY(enc->dst, enc->nb_dst);
1010  if (ret < 0)
1011  return ret;
1012 
1013  enc->dst[enc->nb_dst - 1] = dst;
1014 
1015  // encoding packets go to muxing or decoding
1016  switch (dst.type) {
1017  case SCH_NODE_TYPE_MUX: {
1018  SchMuxStream *ms;
1019 
1020  av_assert0(dst.idx < sch->nb_mux &&
1021  dst.idx_stream < sch->mux[dst.idx].nb_streams);
1022  ms = &sch->mux[dst.idx].streams[dst.idx_stream];
1023 
1024  av_assert0(!ms->src.type);
1025  ms->src = src;
1026 
1027  break;
1028  }
1029  case SCH_NODE_TYPE_DEC: {
1030  SchDec *dec;
1031 
1032  av_assert0(dst.idx < sch->nb_dec);
1033  dec = &sch->dec[dst.idx];
1034 
1035  av_assert0(!dec->src.type);
1036  dec->src = src;
1037 
1038  break;
1039  }
1040  default: av_assert0(0);
1041  }
1042 
1043  break;
1044  }
1045  default: av_assert0(0);
1046  }
1047 
1048  return 0;
1049 }
1050 
1051 static int mux_task_start(SchMux *mux)
1052 {
1053  int ret = 0;
1054 
1055  ret = task_start(&mux->task);
1056  if (ret < 0)
1057  return ret;
1058 
1059  /* flush the pre-muxing queues */
1060  for (unsigned i = 0; i < mux->nb_streams; i++) {
1061  SchMuxStream *ms = &mux->streams[i];
1062  AVPacket *pkt;
1063 
1064  while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0) {
1065  if (pkt) {
1066  if (!ms->init_eof)
1067  ret = tq_send(mux->queue, i, pkt);
1068  av_packet_free(&pkt);
1069  if (ret == AVERROR_EOF)
1070  ms->init_eof = 1;
1071  else if (ret < 0)
1072  return ret;
1073  } else
1074  tq_send_finish(mux->queue, i);
1075  }
1076  }
1077 
1078  atomic_store(&mux->mux_started, 1);
1079 
1080  return 0;
1081 }
1082 
1083 int print_sdp(const char *filename);
1084 
1085 static int mux_init(Scheduler *sch, SchMux *mux)
1086 {
1087  int ret;
1088 
1089  ret = mux->init(mux->task.func_arg);
1090  if (ret < 0)
1091  return ret;
1092 
1093  sch->nb_mux_ready++;
1094 
1095  if (sch->sdp_filename || sch->sdp_auto) {
1096  if (sch->nb_mux_ready < sch->nb_mux)
1097  return 0;
1098 
1099  ret = print_sdp(sch->sdp_filename);
1100  if (ret < 0) {
1101  av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n");
1102  return ret;
1103  }
1104 
1105  /* SDP is written only after all the muxers are ready, so now we
1106  * start ALL the threads */
1107  for (unsigned i = 0; i < sch->nb_mux; i++) {
1108  ret = mux_task_start(&sch->mux[i]);
1109  if (ret < 0)
1110  return ret;
1111  }
1112  } else {
1113  ret = mux_task_start(mux);
1114  if (ret < 0)
1115  return ret;
1116  }
1117 
1118  return 0;
1119 }
1120 
1121 void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1122  size_t data_threshold, int max_packets)
1123 {
1124  SchMux *mux;
1125  SchMuxStream *ms;
1126 
1127  av_assert0(mux_idx < sch->nb_mux);
1128  mux = &sch->mux[mux_idx];
1129 
1130  av_assert0(stream_idx < mux->nb_streams);
1131  ms = &mux->streams[stream_idx];
1132 
1133  ms->pre_mux_queue.max_packets = max_packets;
1134  ms->pre_mux_queue.data_threshold = data_threshold;
1135 }
1136 
1137 int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
1138 {
1139  SchMux *mux;
1140  int ret = 0;
1141 
1142  av_assert0(mux_idx < sch->nb_mux);
1143  mux = &sch->mux[mux_idx];
1144 
1145  av_assert0(stream_idx < mux->nb_streams);
1146 
1148 
1149  av_assert0(mux->nb_streams_ready < mux->nb_streams);
1150 
1151  // this may be called during initialization - do not start
1152  // threads before sch_start() is called
1153  if (++mux->nb_streams_ready == mux->nb_streams &&
1154  sch->state >= SCH_STATE_STARTED)
1155  ret = mux_init(sch, mux);
1156 
1158 
1159  return ret;
1160 }
1161 
1162 int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1163  unsigned dec_idx)
1164 {
1165  SchMux *mux;
1166  SchMuxStream *ms;
1167  int ret = 0;
1168 
1169  av_assert0(mux_idx < sch->nb_mux);
1170  mux = &sch->mux[mux_idx];
1171 
1172  av_assert0(stream_idx < mux->nb_streams);
1173  ms = &mux->streams[stream_idx];
1174 
1176  if (ret < 0)
1177  return ret;
1178 
1179  av_assert0(dec_idx < sch->nb_dec);
1180  ms->sub_heartbeat_dst[ms->nb_sub_heartbeat_dst - 1] = dec_idx;
1181 
1182  if (!mux->sub_heartbeat_pkt) {
1184  if (!mux->sub_heartbeat_pkt)
1185  return AVERROR(ENOMEM);
1186  }
1187 
1188  return 0;
1189 }
1190 
1192 {
1193  while (1) {
1194  SchFilterGraph *fg;
1195 
1196  // fed directly by a demuxer (i.e. not through a filtergraph)
1197  if (src.type == SCH_NODE_TYPE_DEMUX) {
1198  sch->demux[src.idx].waiter.choked_next = 0;
1199  return;
1200  }
1201 
1203  fg = &sch->filters[src.idx];
1204 
1205  // the filtergraph contains internal sources and
1206  // requested to be scheduled directly
1207  if (fg->best_input == fg->nb_inputs) {
1208  fg->waiter.choked_next = 0;
1209  return;
1210  }
1211 
1212  src = fg->inputs[fg->best_input].src_sched;
1213  }
1214 }
1215 
1217 {
1218  int64_t dts;
1219  int have_unchoked = 0;
1220 
1221  // on termination request all waiters are choked,
1222  // we are not to unchoke them
1223  if (atomic_load(&sch->terminate))
1224  return;
1225 
1226  dts = trailing_dts(sch, 0);
1227 
1228  atomic_store(&sch->last_dts, dts);
1229 
1230  // initialize our internal state
1231  for (unsigned type = 0; type < 2; type++)
1232  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1233  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1234  w->choked_prev = atomic_load(&w->choked);
1235  w->choked_next = 1;
1236  }
1237 
1238  // figure out the sources that are allowed to proceed
1239  for (unsigned i = 0; i < sch->nb_mux; i++) {
1240  SchMux *mux = &sch->mux[i];
1241 
1242  for (unsigned j = 0; j < mux->nb_streams; j++) {
1243  SchMuxStream *ms = &mux->streams[j];
1244 
1245  // unblock sources for output streams that are not finished
1246  // and not too far ahead of the trailing stream
1247  if (ms->source_finished)
1248  continue;
1249  if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
1250  continue;
1251  if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
1252  continue;
1253 
1254  // resolve the source to unchoke
1255  unchoke_for_stream(sch, ms->src_sched);
1256  have_unchoked = 1;
1257  }
1258  }
1259 
1260  // make sure to unchoke at least one source, if still available
1261  for (unsigned type = 0; !have_unchoked && type < 2; type++)
1262  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1263  int exited = type ? sch->filters[i].task_exited : sch->demux[i].task_exited;
1264  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1265  if (!exited) {
1266  w->choked_next = 0;
1267  have_unchoked = 1;
1268  break;
1269  }
1270  }
1271 
1272 
1273  for (unsigned type = 0; type < 2; type++)
1274  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1275  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1276  if (w->choked_prev != w->choked_next)
1277  waiter_set(w, w->choked_next);
1278  }
1279 
1280 }
1281 
1282 enum {
1286 };
1287 
1288 static int
1290  uint8_t *filters_visited, SchedulerNode *filters_stack)
1291 {
1292  unsigned nb_filters_stack = 0;
1293 
1294  memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited));
1295 
1296  while (1) {
1297  const SchFilterGraph *fg = &sch->filters[src.idx];
1298 
1299  filters_visited[src.idx] = CYCLE_NODE_STARTED;
1300 
1301  // descend into every input, depth first
1302  if (src.idx_stream < fg->nb_inputs) {
1303  const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
1304 
1305  // connected to demuxer, no cycles possible
1306  if (fi->src_sched.type == SCH_NODE_TYPE_DEMUX)
1307  continue;
1308 
1309  // otherwise connected to another filtergraph
1311 
1312  // found a cycle
1313  if (filters_visited[fi->src_sched.idx] == CYCLE_NODE_STARTED)
1314  return AVERROR(EINVAL);
1315 
1316  // place current position on stack and descend
1317  av_assert0(nb_filters_stack < sch->nb_filters);
1318  filters_stack[nb_filters_stack++] = src;
1319  src = (SchedulerNode){ .idx = fi->src_sched.idx, .idx_stream = 0 };
1320  continue;
1321  }
1322 
1323  filters_visited[src.idx] = CYCLE_NODE_DONE;
1324 
1325  // previous search finished,
1326  if (nb_filters_stack) {
1327  src = filters_stack[--nb_filters_stack];
1328  continue;
1329  }
1330  return 0;
1331  }
1332 }
1333 
1334 static int check_acyclic(Scheduler *sch)
1335 {
1336  uint8_t *filters_visited = NULL;
1337  SchedulerNode *filters_stack = NULL;
1338 
1339  int ret = 0;
1340 
1341  if (!sch->nb_filters)
1342  return 0;
1343 
1344  filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited));
1345  if (!filters_visited)
1346  return AVERROR(ENOMEM);
1347 
1348  filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack));
1349  if (!filters_stack) {
1350  ret = AVERROR(ENOMEM);
1351  goto fail;
1352  }
1353 
1354  // trace the transcoding graph upstream from every output stream
1355  // fed by a filtergraph
1356  for (unsigned i = 0; i < sch->nb_mux; i++) {
1357  SchMux *mux = &sch->mux[i];
1358 
1359  for (unsigned j = 0; j < mux->nb_streams; j++) {
1360  SchMuxStream *ms = &mux->streams[j];
1361  SchedulerNode src = ms->src_sched;
1362 
1363  if (src.type != SCH_NODE_TYPE_FILTER_OUT)
1364  continue;
1365  src.idx_stream = 0;
1366 
1367  ret = check_acyclic_for_output(sch, src, filters_visited, filters_stack);
1368  if (ret < 0) {
1369  av_log(mux, AV_LOG_ERROR, "Transcoding graph has a cycle\n");
1370  goto fail;
1371  }
1372  }
1373  }
1374 
1375 fail:
1376  av_freep(&filters_visited);
1377  av_freep(&filters_stack);
1378  return ret;
1379 }
1380 
1381 static int start_prepare(Scheduler *sch)
1382 {
1383  int ret;
1384 
1385  for (unsigned i = 0; i < sch->nb_demux; i++) {
1386  SchDemux *d = &sch->demux[i];
1387 
1388  for (unsigned j = 0; j < d->nb_streams; j++) {
1389  SchDemuxStream *ds = &d->streams[j];
1390 
1391  if (!ds->nb_dst) {
1393  "Demuxer stream %u not connected to any sink\n", j);
1394  return AVERROR(EINVAL);
1395  }
1396 
1397  ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished));
1398  if (!ds->dst_finished)
1399  return AVERROR(ENOMEM);
1400  }
1401  }
1402 
1403  for (unsigned i = 0; i < sch->nb_dec; i++) {
1404  SchDec *dec = &sch->dec[i];
1405 
1406  if (!dec->src.type) {
1407  av_log(dec, AV_LOG_ERROR,
1408  "Decoder not connected to a source\n");
1409  return AVERROR(EINVAL);
1410  }
1411  if (!dec->nb_dst) {
1412  av_log(dec, AV_LOG_ERROR,
1413  "Decoder not connected to any sink\n");
1414  return AVERROR(EINVAL);
1415  }
1416 
1417  dec->dst_finished = av_calloc(dec->nb_dst, sizeof(*dec->dst_finished));
1418  if (!dec->dst_finished)
1419  return AVERROR(ENOMEM);
1420  }
1421 
1422  for (unsigned i = 0; i < sch->nb_enc; i++) {
1423  SchEnc *enc = &sch->enc[i];
1424 
1425  if (!enc->src.type) {
1426  av_log(enc, AV_LOG_ERROR,
1427  "Encoder not connected to a source\n");
1428  return AVERROR(EINVAL);
1429  }
1430  if (!enc->nb_dst) {
1431  av_log(enc, AV_LOG_ERROR,
1432  "Encoder not connected to any sink\n");
1433  return AVERROR(EINVAL);
1434  }
1435 
1436  enc->dst_finished = av_calloc(enc->nb_dst, sizeof(*enc->dst_finished));
1437  if (!enc->dst_finished)
1438  return AVERROR(ENOMEM);
1439  }
1440 
1441  for (unsigned i = 0; i < sch->nb_mux; i++) {
1442  SchMux *mux = &sch->mux[i];
1443 
1444  for (unsigned j = 0; j < mux->nb_streams; j++) {
1445  SchMuxStream *ms = &mux->streams[j];
1446 
1447  switch (ms->src.type) {
1448  case SCH_NODE_TYPE_ENC: {
1449  SchEnc *enc = &sch->enc[ms->src.idx];
1450  if (enc->src.type == SCH_NODE_TYPE_DEC) {
1451  ms->src_sched = sch->dec[enc->src.idx].src;
1453  } else {
1454  ms->src_sched = enc->src;
1456  }
1457  break;
1458  }
1459  case SCH_NODE_TYPE_DEMUX:
1460  ms->src_sched = ms->src;
1461  break;
1462  default:
1463  av_log(mux, AV_LOG_ERROR,
1464  "Muxer stream #%u not connected to a source\n", j);
1465  return AVERROR(EINVAL);
1466  }
1467  }
1468 
1469  ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
1470  QUEUE_PACKETS);
1471  if (ret < 0)
1472  return ret;
1473  }
1474 
1475  for (unsigned i = 0; i < sch->nb_filters; i++) {
1476  SchFilterGraph *fg = &sch->filters[i];
1477 
1478  for (unsigned j = 0; j < fg->nb_inputs; j++) {
1479  SchFilterIn *fi = &fg->inputs[j];
1480  SchDec *dec;
1481 
1482  if (!fi->src.type) {
1483  av_log(fg, AV_LOG_ERROR,
1484  "Filtergraph input %u not connected to a source\n", j);
1485  return AVERROR(EINVAL);
1486  }
1488  dec = &sch->dec[fi->src.idx];
1489 
1490  switch (dec->src.type) {
1491  case SCH_NODE_TYPE_DEMUX: fi->src_sched = dec->src; break;
1492  case SCH_NODE_TYPE_ENC: fi->src_sched = sch->enc[dec->src.idx].src; break;
1493  default: av_assert0(0);
1494  }
1495  }
1496 
1497  for (unsigned j = 0; j < fg->nb_outputs; j++) {
1498  SchFilterOut *fo = &fg->outputs[j];
1499 
1500  if (!fo->dst.type) {
1501  av_log(fg, AV_LOG_ERROR,
1502  "Filtergraph %u output %u not connected to a sink\n", i, j);
1503  return AVERROR(EINVAL);
1504  }
1505  }
1506  }
1507 
1508  // Check that the transcoding graph has no cycles.
1509  ret = check_acyclic(sch);
1510  if (ret < 0)
1511  return ret;
1512 
1513  return 0;
1514 }
1515 
1517 {
1518  int ret;
1519 
1520  ret = start_prepare(sch);
1521  if (ret < 0)
1522  return ret;
1523 
1525  sch->state = SCH_STATE_STARTED;
1526 
1527  for (unsigned i = 0; i < sch->nb_mux; i++) {
1528  SchMux *mux = &sch->mux[i];
1529 
1530  if (mux->nb_streams_ready == mux->nb_streams) {
1531  ret = mux_init(sch, mux);
1532  if (ret < 0)
1533  goto fail;
1534  }
1535  }
1536 
1537  for (unsigned i = 0; i < sch->nb_enc; i++) {
1538  SchEnc *enc = &sch->enc[i];
1539 
1540  ret = task_start(&enc->task);
1541  if (ret < 0)
1542  goto fail;
1543  }
1544 
1545  for (unsigned i = 0; i < sch->nb_filters; i++) {
1546  SchFilterGraph *fg = &sch->filters[i];
1547 
1548  ret = task_start(&fg->task);
1549  if (ret < 0)
1550  goto fail;
1551  }
1552 
1553  for (unsigned i = 0; i < sch->nb_dec; i++) {
1554  SchDec *dec = &sch->dec[i];
1555 
1556  ret = task_start(&dec->task);
1557  if (ret < 0)
1558  goto fail;
1559  }
1560 
1561  for (unsigned i = 0; i < sch->nb_demux; i++) {
1562  SchDemux *d = &sch->demux[i];
1563 
1564  if (!d->nb_streams)
1565  continue;
1566 
1567  ret = task_start(&d->task);
1568  if (ret < 0)
1569  goto fail;
1570  }
1571 
1575 
1576  return 0;
1577 fail:
1578  sch_stop(sch, NULL);
1579  return ret;
1580 }
1581 
1582 int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
1583 {
1584  int ret, err;
1585 
1586  // convert delay to absolute timestamp
1587  timeout_us += av_gettime();
1588 
1590 
1591  if (sch->nb_mux_done < sch->nb_mux) {
1592  struct timespec tv = { .tv_sec = timeout_us / 1000000,
1593  .tv_nsec = (timeout_us % 1000000) * 1000 };
1595  }
1596 
1597  ret = sch->nb_mux_done == sch->nb_mux;
1598 
1600 
1601  *transcode_ts = atomic_load(&sch->last_dts);
1602 
1603  // abort transcoding if any task failed
1604  err = atomic_load(&sch->task_failed);
1605 
1606  return ret || err;
1607 }
1608 
1609 static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
1610 {
1611  int ret;
1612 
1613  ret = enc->open_cb(enc->task.func_arg, frame);
1614  if (ret < 0)
1615  return ret;
1616 
1617  // ret>0 signals audio frame size, which means sync queue must
1618  // have been enabled during encoder creation
1619  if (ret > 0) {
1620  SchSyncQueue *sq;
1621 
1622  av_assert0(enc->sq_idx[0] >= 0);
1623  sq = &sch->sq_enc[enc->sq_idx[0]];
1624 
1625  pthread_mutex_lock(&sq->lock);
1626 
1627  sq_frame_samples(sq->sq, enc->sq_idx[1], ret);
1628 
1629  pthread_mutex_unlock(&sq->lock);
1630  }
1631 
1632  return 0;
1633 }
1634 
1636 {
1637  int ret;
1638 
1639  if (!frame) {
1640  tq_send_finish(enc->queue, 0);
1641  return 0;
1642  }
1643 
1644  if (enc->in_finished)
1645  return AVERROR_EOF;
1646 
1647  ret = tq_send(enc->queue, 0, frame);
1648  if (ret < 0)
1649  enc->in_finished = 1;
1650 
1651  return ret;
1652 }
1653 
1654 static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1655 {
1656  SchSyncQueue *sq = &sch->sq_enc[enc->sq_idx[0]];
1657  int ret = 0;
1658 
1659  // inform the scheduling code that no more input will arrive along this path;
1660  // this is necessary because the sync queue may not send an EOF downstream
1661  // until other streams finish
1662  // TODO: consider a cleaner way of passing this information through
1663  // the pipeline
1664  if (!frame) {
1665  for (unsigned i = 0; i < enc->nb_dst; i++) {
1666  SchMux *mux;
1667  SchMuxStream *ms;
1668 
1669  if (enc->dst[i].type != SCH_NODE_TYPE_MUX)
1670  continue;
1671 
1672  mux = &sch->mux[enc->dst[i].idx];
1673  ms = &mux->streams[enc->dst[i].idx_stream];
1674 
1676 
1677  ms->source_finished = 1;
1679 
1681  }
1682  }
1683 
1684  pthread_mutex_lock(&sq->lock);
1685 
1686  ret = sq_send(sq->sq, enc->sq_idx[1], SQFRAME(frame));
1687  if (ret < 0)
1688  goto finish;
1689 
1690  while (1) {
1691  SchEnc *enc;
1692 
1693  // TODO: the SQ API should be extended to allow returning EOF
1694  // for individual streams
1695  ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame));
1696  if (ret < 0) {
1697  ret = (ret == AVERROR(EAGAIN)) ? 0 : ret;
1698  break;
1699  }
1700 
1701  enc = &sch->enc[sq->enc_idx[ret]];
1702  ret = send_to_enc_thread(sch, enc, sq->frame);
1703  if (ret < 0) {
1704  av_frame_unref(sq->frame);
1705  if (ret != AVERROR_EOF)
1706  break;
1707 
1708  sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL));
1709  continue;
1710  }
1711  }
1712 
1713  if (ret < 0) {
1714  // close all encoders fed from this sync queue
1715  for (unsigned i = 0; i < sq->nb_enc_idx; i++) {
1716  int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL);
1717 
1718  // if the sync queue error is EOF and closing the encoder
1719  // produces a more serious error, make sure to pick the latter
1720  ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err);
1721  }
1722  }
1723 
1724 finish:
1725  pthread_mutex_unlock(&sq->lock);
1726 
1727  return ret;
1728 }
1729 
1730 static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1731 {
1732  if (enc->open_cb && frame && !enc->opened) {
1733  int ret = enc_open(sch, enc, frame);
1734  if (ret < 0)
1735  return ret;
1736  enc->opened = 1;
1737 
1738  // discard empty frames that only carry encoder init parameters
1739  if (!frame->buf[0]) {
1741  return 0;
1742  }
1743  }
1744 
1745  return (enc->sq_idx[0] >= 0) ?
1746  send_to_enc_sq (sch, enc, frame) :
1747  send_to_enc_thread(sch, enc, frame);
1748 }
1749 
1751 {
1752  PreMuxQueue *q = &ms->pre_mux_queue;
1753  AVPacket *tmp_pkt = NULL;
1754  int ret;
1755 
1756  if (!av_fifo_can_write(q->fifo)) {
1757  size_t packets = av_fifo_can_read(q->fifo);
1758  size_t pkt_size = pkt ? pkt->size : 0;
1759  int thresh_reached = (q->data_size + pkt_size) > q->data_threshold;
1760  size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX;
1761  size_t new_size = FFMIN(2 * packets, max_packets);
1762 
1763  if (new_size <= packets) {
1764  av_log(mux, AV_LOG_ERROR,
1765  "Too many packets buffered for output stream.\n");
1766  return AVERROR(ENOSPC);
1767  }
1768  ret = av_fifo_grow2(q->fifo, new_size - packets);
1769  if (ret < 0)
1770  return ret;
1771  }
1772 
1773  if (pkt) {
1774  tmp_pkt = av_packet_alloc();
1775  if (!tmp_pkt)
1776  return AVERROR(ENOMEM);
1777 
1778  av_packet_move_ref(tmp_pkt, pkt);
1779  q->data_size += tmp_pkt->size;
1780  }
1781  av_fifo_write(q->fifo, &tmp_pkt, 1);
1782 
1783  return 0;
1784 }
1785 
1786 static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx,
1787  AVPacket *pkt)
1788 {
1789  SchMuxStream *ms = &mux->streams[stream_idx];
1790  int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ?
1793 
1794  // queue the packet if the muxer cannot be started yet
1795  if (!atomic_load(&mux->mux_started)) {
1796  int queued = 0;
1797 
1798  // the muxer could have started between the above atomic check and
1799  // locking the mutex, then this block falls through to normal send path
1801 
1802  if (!atomic_load(&mux->mux_started)) {
1803  int ret = mux_queue_packet(mux, ms, pkt);
1804  queued = ret < 0 ? ret : 1;
1805  }
1806 
1808 
1809  if (queued < 0)
1810  return queued;
1811  else if (queued)
1812  goto update_schedule;
1813  }
1814 
1815  if (pkt) {
1816  int ret;
1817 
1818  if (ms->init_eof)
1819  return AVERROR_EOF;
1820 
1821  ret = tq_send(mux->queue, stream_idx, pkt);
1822  if (ret < 0)
1823  return ret;
1824  } else
1825  tq_send_finish(mux->queue, stream_idx);
1826 
1827 update_schedule:
1828  // TODO: use atomics to check whether this changes trailing dts
1829  // to avoid locking unnecesarily
1830  if (dts != AV_NOPTS_VALUE || !pkt) {
1832 
1833  if (pkt) ms->last_dts = dts;
1834  else ms->source_finished = 1;
1835 
1837 
1839  }
1840 
1841  return 0;
1842 }
1843 
1844 static int
1846  uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
1847 {
1848  int ret;
1849 
1850  if (*dst_finished)
1851  return AVERROR_EOF;
1852 
1853  if (pkt && dst.type == SCH_NODE_TYPE_MUX &&
1856  pkt = NULL;
1857  }
1858 
1859  if (!pkt)
1860  goto finish;
1861 
1862  ret = (dst.type == SCH_NODE_TYPE_MUX) ?
1863  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
1864  tq_send(sch->dec[dst.idx].queue, 0, pkt);
1865  if (ret == AVERROR_EOF)
1866  goto finish;
1867 
1868  return ret;
1869 
1870 finish:
1871  if (dst.type == SCH_NODE_TYPE_MUX)
1872  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
1873  else
1874  tq_send_finish(sch->dec[dst.idx].queue, 0);
1875 
1876  *dst_finished = 1;
1877  return AVERROR_EOF;
1878 }
1879 
1881  AVPacket *pkt, unsigned flags)
1882 {
1883  unsigned nb_done = 0;
1884 
1885  for (unsigned i = 0; i < ds->nb_dst; i++) {
1886  AVPacket *to_send = pkt;
1887  uint8_t *finished = &ds->dst_finished[i];
1888 
1889  int ret;
1890 
1891  // sending a packet consumes it, so make a temporary reference if needed
1892  if (pkt && i < ds->nb_dst - 1) {
1893  to_send = d->send_pkt;
1894 
1895  ret = av_packet_ref(to_send, pkt);
1896  if (ret < 0)
1897  return ret;
1898  }
1899 
1900  ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags);
1901  if (to_send)
1902  av_packet_unref(to_send);
1903  if (ret == AVERROR_EOF)
1904  nb_done++;
1905  else if (ret < 0)
1906  return ret;
1907  }
1908 
1909  return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0;
1910 }
1911 
1913 {
1914  Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE };
1915 
1916  av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems);
1917 
1918  for (unsigned i = 0; i < d->nb_streams; i++) {
1919  SchDemuxStream *ds = &d->streams[i];
1920 
1921  for (unsigned j = 0; j < ds->nb_dst; j++) {
1922  const SchedulerNode *dst = &ds->dst[j];
1923  SchDec *dec;
1924  int ret;
1925 
1926  if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC)
1927  continue;
1928 
1929  dec = &sch->dec[dst->idx];
1930 
1931  ret = tq_send(dec->queue, 0, pkt);
1932  if (ret < 0)
1933  return ret;
1934 
1935  if (dec->queue_end_ts) {
1936  Timestamp ts;
1938  if (ret < 0)
1939  return ret;
1940 
1941  if (max_end_ts.ts == AV_NOPTS_VALUE ||
1942  (ts.ts != AV_NOPTS_VALUE &&
1943  av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0))
1944  max_end_ts = ts;
1945 
1946  }
1947  }
1948  }
1949 
1950  pkt->pts = max_end_ts.ts;
1951  pkt->time_base = max_end_ts.tb;
1952 
1953  return 0;
1954 }
1955 
1956 int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt,
1957  unsigned flags)
1958 {
1959  SchDemux *d;
1960  int terminate;
1961 
1962  av_assert0(demux_idx < sch->nb_demux);
1963  d = &sch->demux[demux_idx];
1964 
1965  terminate = waiter_wait(sch, &d->waiter);
1966  if (terminate)
1967  return AVERROR_EXIT;
1968 
1969  // flush the downstreams after seek
1970  if (pkt->stream_index == -1)
1971  return demux_flush(sch, d, pkt);
1972 
1973  av_assert0(pkt->stream_index < d->nb_streams);
1974 
1975  return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags);
1976 }
1977 
1978 static int demux_done(Scheduler *sch, unsigned demux_idx)
1979 {
1980  SchDemux *d = &sch->demux[demux_idx];
1981  int ret = 0;
1982 
1983  for (unsigned i = 0; i < d->nb_streams; i++) {
1984  int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0);
1985  if (err != AVERROR_EOF)
1986  ret = err_merge(ret, err);
1987  }
1988 
1990 
1991  d->task_exited = 1;
1992 
1994 
1996 
1997  return ret;
1998 }
1999 
2000 int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
2001 {
2002  SchMux *mux;
2003  int ret, stream_idx;
2004 
2005  av_assert0(mux_idx < sch->nb_mux);
2006  mux = &sch->mux[mux_idx];
2007 
2008  ret = tq_receive(mux->queue, &stream_idx, pkt);
2009  pkt->stream_index = stream_idx;
2010  return ret;
2011 }
2012 
2013 void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
2014 {
2015  SchMux *mux;
2016 
2017  av_assert0(mux_idx < sch->nb_mux);
2018  mux = &sch->mux[mux_idx];
2019 
2020  av_assert0(stream_idx < mux->nb_streams);
2021  tq_receive_finish(mux->queue, stream_idx);
2022 
2024  mux->streams[stream_idx].source_finished = 1;
2025 
2027 
2029 }
2030 
2031 int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
2032  const AVPacket *pkt)
2033 {
2034  SchMux *mux;
2035  SchMuxStream *ms;
2036 
2037  av_assert0(mux_idx < sch->nb_mux);
2038  mux = &sch->mux[mux_idx];
2039 
2040  av_assert0(stream_idx < mux->nb_streams);
2041  ms = &mux->streams[stream_idx];
2042 
2043  for (unsigned i = 0; i < ms->nb_sub_heartbeat_dst; i++) {
2044  SchDec *dst = &sch->dec[ms->sub_heartbeat_dst[i]];
2045  int ret;
2046 
2048  if (ret < 0)
2049  return ret;
2050 
2051  tq_send(dst->queue, 0, mux->sub_heartbeat_pkt);
2052  }
2053 
2054  return 0;
2055 }
2056 
2057 static int mux_done(Scheduler *sch, unsigned mux_idx)
2058 {
2059  SchMux *mux = &sch->mux[mux_idx];
2060 
2062 
2063  for (unsigned i = 0; i < mux->nb_streams; i++) {
2064  tq_receive_finish(mux->queue, i);
2065  mux->streams[i].source_finished = 1;
2066  }
2067 
2069 
2071 
2073 
2074  av_assert0(sch->nb_mux_done < sch->nb_mux);
2075  sch->nb_mux_done++;
2076 
2078 
2080 
2081  return 0;
2082 }
2083 
2084 int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
2085 {
2086  SchDec *dec;
2087  int ret, dummy;
2088 
2089  av_assert0(dec_idx < sch->nb_dec);
2090  dec = &sch->dec[dec_idx];
2091 
2092  // the decoder should have given us post-flush end timestamp in pkt
2093  if (dec->expect_end_ts) {
2094  Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base };
2096  if (ret < 0)
2097  return ret;
2098 
2099  dec->expect_end_ts = 0;
2100  }
2101 
2102  ret = tq_receive(dec->queue, &dummy, pkt);
2103  av_assert0(dummy <= 0);
2104 
2105  // got a flush packet, on the next call to this function the decoder
2106  // will give us post-flush end timestamp
2107  if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
2108  dec->expect_end_ts = 1;
2109 
2110  return ret;
2111 }
2112 
2114  unsigned in_idx, AVFrame *frame)
2115 {
2116  if (frame)
2117  return tq_send(fg->queue, in_idx, frame);
2118 
2119  if (!fg->inputs[in_idx].send_finished) {
2120  fg->inputs[in_idx].send_finished = 1;
2121  tq_send_finish(fg->queue, in_idx);
2122 
2123  // close the control stream when all actual inputs are done
2124  if (atomic_fetch_add(&fg->nb_inputs_finished_send, 1) == fg->nb_inputs - 1)
2125  tq_send_finish(fg->queue, fg->nb_inputs);
2126  }
2127  return 0;
2128 }
2129 
2130 static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2131  uint8_t *dst_finished, AVFrame *frame)
2132 {
2133  int ret;
2134 
2135  if (*dst_finished)
2136  return AVERROR_EOF;
2137 
2138  if (!frame)
2139  goto finish;
2140 
2141  ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ?
2142  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) :
2143  send_to_enc(sch, &sch->enc[dst.idx], frame);
2144  if (ret == AVERROR_EOF)
2145  goto finish;
2146 
2147  return ret;
2148 
2149 finish:
2150  if (dst.type == SCH_NODE_TYPE_FILTER_IN)
2151  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2152  else
2153  send_to_enc(sch, &sch->enc[dst.idx], NULL);
2154 
2155  *dst_finished = 1;
2156 
2157  return AVERROR_EOF;
2158 }
2159 
2160 int sch_dec_send(Scheduler *sch, unsigned dec_idx, AVFrame *frame)
2161 {
2162  SchDec *dec;
2163  int ret = 0;
2164  unsigned nb_done = 0;
2165 
2166  av_assert0(dec_idx < sch->nb_dec);
2167  dec = &sch->dec[dec_idx];
2168 
2169  for (unsigned i = 0; i < dec->nb_dst; i++) {
2170  uint8_t *finished = &dec->dst_finished[i];
2171  AVFrame *to_send = frame;
2172 
2173  // sending a frame consumes it, so make a temporary reference if needed
2174  if (i < dec->nb_dst - 1) {
2175  to_send = dec->send_frame;
2176 
2177  // frame may sometimes contain props only,
2178  // e.g. to signal EOF timestamp
2179  ret = frame->buf[0] ? av_frame_ref(to_send, frame) :
2180  av_frame_copy_props(to_send, frame);
2181  if (ret < 0)
2182  return ret;
2183  }
2184 
2185  ret = dec_send_to_dst(sch, dec->dst[i], finished, to_send);
2186  if (ret < 0) {
2187  av_frame_unref(to_send);
2188  if (ret == AVERROR_EOF) {
2189  nb_done++;
2190  ret = 0;
2191  continue;
2192  }
2193  return ret;
2194  }
2195  }
2196 
2197  return (nb_done == dec->nb_dst) ? AVERROR_EOF : 0;
2198 }
2199 
2200 static int dec_done(Scheduler *sch, unsigned dec_idx)
2201 {
2202  SchDec *dec = &sch->dec[dec_idx];
2203  int ret = 0;
2204 
2205  tq_receive_finish(dec->queue, 0);
2206 
2207  // make sure our source does not get stuck waiting for end timestamps
2208  // that will never arrive
2209  if (dec->queue_end_ts)
2211 
2212  for (unsigned i = 0; i < dec->nb_dst; i++) {
2213  int err = dec_send_to_dst(sch, dec->dst[i], &dec->dst_finished[i], NULL);
2214  if (err < 0 && err != AVERROR_EOF)
2215  ret = err_merge(ret, err);
2216  }
2217 
2218  return ret;
2219 }
2220 
2221 int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
2222 {
2223  SchEnc *enc;
2224  int ret, dummy;
2225 
2226  av_assert0(enc_idx < sch->nb_enc);
2227  enc = &sch->enc[enc_idx];
2228 
2229  ret = tq_receive(enc->queue, &dummy, frame);
2230  av_assert0(dummy <= 0);
2231 
2232  return ret;
2233 }
2234 
2235 static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2236  uint8_t *dst_finished, AVPacket *pkt)
2237 {
2238  int ret;
2239 
2240  if (*dst_finished)
2241  return AVERROR_EOF;
2242 
2243  if (!pkt)
2244  goto finish;
2245 
2246  ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2247  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2248  tq_send(sch->dec[dst.idx].queue, 0, pkt);
2249  if (ret == AVERROR_EOF)
2250  goto finish;
2251 
2252  return ret;
2253 
2254 finish:
2255  if (dst.type == SCH_NODE_TYPE_MUX)
2256  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2257  else
2258  tq_send_finish(sch->dec[dst.idx].queue, 0);
2259 
2260  *dst_finished = 1;
2261 
2262  return AVERROR_EOF;
2263 }
2264 
2265 int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
2266 {
2267  SchEnc *enc;
2268  int ret;
2269 
2270  av_assert0(enc_idx < sch->nb_enc);
2271  enc = &sch->enc[enc_idx];
2272 
2273  for (unsigned i = 0; i < enc->nb_dst; i++) {
2274  uint8_t *finished = &enc->dst_finished[i];
2275  AVPacket *to_send = pkt;
2276 
2277  // sending a packet consumes it, so make a temporary reference if needed
2278  if (i < enc->nb_dst - 1) {
2279  to_send = enc->send_pkt;
2280 
2281  ret = av_packet_ref(to_send, pkt);
2282  if (ret < 0)
2283  return ret;
2284  }
2285 
2286  ret = enc_send_to_dst(sch, enc->dst[i], finished, to_send);
2287  if (ret < 0) {
2288  av_packet_unref(to_send);
2289  if (ret == AVERROR_EOF) {
2290  ret = 0;
2291  continue;
2292  }
2293  return ret;
2294  }
2295  }
2296 
2297  return ret;
2298 }
2299 
2300 static int enc_done(Scheduler *sch, unsigned enc_idx)
2301 {
2302  SchEnc *enc = &sch->enc[enc_idx];
2303  int ret = 0;
2304 
2305  tq_receive_finish(enc->queue, 0);
2306 
2307  for (unsigned i = 0; i < enc->nb_dst; i++) {
2308  int err = enc_send_to_dst(sch, enc->dst[i], &enc->dst_finished[i], NULL);
2309  if (err < 0 && err != AVERROR_EOF)
2310  ret = err_merge(ret, err);
2311  }
2312 
2313  return ret;
2314 }
2315 
2316 int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
2317  unsigned *in_idx, AVFrame *frame)
2318 {
2319  SchFilterGraph *fg;
2320 
2321  av_assert0(fg_idx < sch->nb_filters);
2322  fg = &sch->filters[fg_idx];
2323 
2324  av_assert0(*in_idx <= fg->nb_inputs);
2325 
2326  // update scheduling to account for desired input stream, if it changed
2327  //
2328  // this check needs no locking because only the filtering thread
2329  // updates this value
2330  if (*in_idx != fg->best_input) {
2332 
2333  fg->best_input = *in_idx;
2335 
2337  }
2338 
2339  if (*in_idx == fg->nb_inputs) {
2340  int terminate = waiter_wait(sch, &fg->waiter);
2341  return terminate ? AVERROR_EOF : AVERROR(EAGAIN);
2342  }
2343 
2344  while (1) {
2345  int ret, idx;
2346 
2347  ret = tq_receive(fg->queue, &idx, frame);
2348  if (idx < 0)
2349  return AVERROR_EOF;
2350  else if (ret >= 0) {
2351  *in_idx = idx;
2352  return 0;
2353  }
2354 
2355  // disregard EOFs for specific streams - they should always be
2356  // preceded by an EOF frame
2357  }
2358 }
2359 
2360 void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
2361 {
2362  SchFilterGraph *fg;
2363  SchFilterIn *fi;
2364 
2365  av_assert0(fg_idx < sch->nb_filters);
2366  fg = &sch->filters[fg_idx];
2367 
2368  av_assert0(in_idx < fg->nb_inputs);
2369  fi = &fg->inputs[in_idx];
2370 
2371  if (!fi->receive_finished) {
2372  fi->receive_finished = 1;
2373  tq_receive_finish(fg->queue, in_idx);
2374 
2375  // close the control stream when all actual inputs are done
2376  if (++fg->nb_inputs_finished_receive == fg->nb_inputs)
2377  tq_receive_finish(fg->queue, fg->nb_inputs);
2378  }
2379 }
2380 
2381 int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
2382 {
2383  SchFilterGraph *fg;
2384 
2385  av_assert0(fg_idx < sch->nb_filters);
2386  fg = &sch->filters[fg_idx];
2387 
2388  av_assert0(out_idx < fg->nb_outputs);
2389  return send_to_enc(sch, &sch->enc[fg->outputs[out_idx].dst.idx], frame);
2390 }
2391 
2392 static int filter_done(Scheduler *sch, unsigned fg_idx)
2393 {
2394  SchFilterGraph *fg = &sch->filters[fg_idx];
2395  int ret = 0;
2396 
2397  for (unsigned i = 0; i <= fg->nb_inputs; i++)
2398  tq_receive_finish(fg->queue, i);
2399 
2400  for (unsigned i = 0; i < fg->nb_outputs; i++) {
2401  SchEnc *enc = &sch->enc[fg->outputs[i].dst.idx];
2402  int err = send_to_enc(sch, enc, NULL);
2403  if (err < 0 && err != AVERROR_EOF)
2404  ret = err_merge(ret, err);
2405  }
2406 
2408 
2409  fg->task_exited = 1;
2410 
2412 
2414 
2415  return ret;
2416 }
2417 
2418 int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
2419 {
2420  SchFilterGraph *fg;
2421 
2422  av_assert0(fg_idx < sch->nb_filters);
2423  fg = &sch->filters[fg_idx];
2424 
2425  return send_to_filter(sch, fg, fg->nb_inputs, frame);
2426 }
2427 
2428 static int task_cleanup(Scheduler *sch, SchedulerNode node)
2429 {
2430  switch (node.type) {
2431  case SCH_NODE_TYPE_DEMUX: return demux_done (sch, node.idx);
2432  case SCH_NODE_TYPE_MUX: return mux_done (sch, node.idx);
2433  case SCH_NODE_TYPE_DEC: return dec_done (sch, node.idx);
2434  case SCH_NODE_TYPE_ENC: return enc_done (sch, node.idx);
2435  case SCH_NODE_TYPE_FILTER_IN: return filter_done(sch, node.idx);
2436  default: av_assert0(0);
2437  }
2438 }
2439 
2440 static void *task_wrapper(void *arg)
2441 {
2442  SchTask *task = arg;
2443  Scheduler *sch = task->parent;
2444  int ret;
2445  int err = 0;
2446 
2447  ret = task->func(task->func_arg);
2448  if (ret < 0)
2449  av_log(task->func_arg, AV_LOG_ERROR,
2450  "Task finished with error code: %d (%s)\n", ret, av_err2str(ret));
2451 
2452  err = task_cleanup(sch, task->node);
2453  ret = err_merge(ret, err);
2454 
2455  // EOF is considered normal termination
2456  if (ret == AVERROR_EOF)
2457  ret = 0;
2458  if (ret < 0)
2459  atomic_store(&sch->task_failed, 1);
2460 
2462  "Terminating thread with return code %d (%s)\n", ret,
2463  ret < 0 ? av_err2str(ret) : "success");
2464 
2465  return (void*)(intptr_t)ret;
2466 }
2467 
2468 static int task_stop(Scheduler *sch, SchTask *task)
2469 {
2470  int ret;
2471  void *thread_ret;
2472 
2473  if (!task->thread_running)
2474  return task_cleanup(sch, task->node);
2475 
2476  ret = pthread_join(task->thread, &thread_ret);
2477  av_assert0(ret == 0);
2478 
2479  task->thread_running = 0;
2480 
2481  return (intptr_t)thread_ret;
2482 }
2483 
2484 int sch_stop(Scheduler *sch, int64_t *finish_ts)
2485 {
2486  int ret = 0, err;
2487 
2488  if (sch->state != SCH_STATE_STARTED)
2489  return 0;
2490 
2491  atomic_store(&sch->terminate, 1);
2492 
2493  for (unsigned type = 0; type < 2; type++)
2494  for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
2495  SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
2496  waiter_set(w, 1);
2497  }
2498 
2499  for (unsigned i = 0; i < sch->nb_demux; i++) {
2500  SchDemux *d = &sch->demux[i];
2501 
2502  err = task_stop(sch, &d->task);
2503  ret = err_merge(ret, err);
2504  }
2505 
2506  for (unsigned i = 0; i < sch->nb_dec; i++) {
2507  SchDec *dec = &sch->dec[i];
2508 
2509  err = task_stop(sch, &dec->task);
2510  ret = err_merge(ret, err);
2511  }
2512 
2513  for (unsigned i = 0; i < sch->nb_filters; i++) {
2514  SchFilterGraph *fg = &sch->filters[i];
2515 
2516  err = task_stop(sch, &fg->task);
2517  ret = err_merge(ret, err);
2518  }
2519 
2520  for (unsigned i = 0; i < sch->nb_enc; i++) {
2521  SchEnc *enc = &sch->enc[i];
2522 
2523  err = task_stop(sch, &enc->task);
2524  ret = err_merge(ret, err);
2525  }
2526 
2527  for (unsigned i = 0; i < sch->nb_mux; i++) {
2528  SchMux *mux = &sch->mux[i];
2529 
2530  err = task_stop(sch, &mux->task);
2531  ret = err_merge(ret, err);
2532  }
2533 
2534  if (finish_ts)
2535  *finish_ts = trailing_dts(sch, 1);
2536 
2537  sch->state = SCH_STATE_STOPPED;
2538 
2539  return ret;
2540 }
Scheduler::sq_enc
SchSyncQueue * sq_enc
Definition: ffmpeg_sched.c:292
func
int(* func)(AVBPrint *dst, const char *in, const char *arg)
Definition: jacosubdec.c:68
pthread_mutex_t
_fmutex pthread_mutex_t
Definition: os2threads.h:53
SchWaiter
Definition: ffmpeg_sched.c:52
av_packet_unref
void av_packet_unref(AVPacket *pkt)
Wipe the packet.
Definition: avpacket.c:427
mux_task_start
static int mux_task_start(SchMux *mux)
Definition: ffmpeg_sched.c:1051
pthread_join
static av_always_inline int pthread_join(pthread_t thread, void **value_ptr)
Definition: os2threads.h:94
SchedulerNode::idx_stream
unsigned idx_stream
Definition: ffmpeg_sched.h:102
waiter_init
static int waiter_init(SchWaiter *w)
Definition: ffmpeg_sched.c:345
av_fifo_can_write
size_t av_fifo_can_write(const AVFifo *f)
Definition: fifo.c:94
atomic_store
#define atomic_store(object, desired)
Definition: stdatomic.h:85
sch_filter_send
int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
Called by filtergraph tasks to send a filtered frame or EOF to consumers.
Definition: ffmpeg_sched.c:2381
err_merge
static int err_merge(int err0, int err1)
Merge two return codes - return one of the error codes if at least one of them was negative,...
Definition: ffmpeg_utils.h:41
SchDec::task
SchTask task
Definition: ffmpeg_sched.c:82
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
Scheduler::enc
SchEnc * enc
Definition: ffmpeg_sched.c:289
Scheduler::nb_mux_done
unsigned nb_mux_done
Definition: ffmpeg_sched.c:281
av_compare_ts
int av_compare_ts(int64_t ts_a, AVRational tb_a, int64_t ts_b, AVRational tb_b)
Compare two timestamps each in its own time base.
Definition: mathematics.c:147
SchedulerState
SchedulerState
Definition: ffmpeg_sched.c:263
sch_mux_receive_finish
void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
Called by muxer tasks to signal that a stream will no longer accept input.
Definition: ffmpeg_sched.c:2013
SCH_NODE_TYPE_ENC
@ SCH_NODE_TYPE_ENC
Definition: ffmpeg_sched.h:94
SchSyncQueue::sq
SyncQueue * sq
Definition: ffmpeg_sched.c:95
SchTask::thread
pthread_t thread
Definition: ffmpeg_sched.c:70
demux_flush
static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt)
Definition: ffmpeg_sched.c:1912
thread.h
AVERROR_EOF
#define AVERROR_EOF
End of file.
Definition: error.h:57
Scheduler::nb_sq_enc
unsigned nb_sq_enc
Definition: ffmpeg_sched.c:293
SchMux::sub_heartbeat_pkt
AVPacket * sub_heartbeat_pkt
Definition: ffmpeg_sched.c:227
sq_limit_frames
void sq_limit_frames(SyncQueue *sq, unsigned int stream_idx, uint64_t frames)
Limit the number of output frames for stream with index stream_idx to max_frames.
Definition: sync_queue.c:649
pthread_mutex_init
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
SchEnc::send_pkt
AVPacket * send_pkt
Definition: ffmpeg_sched.c:141
SCHEDULE_TOLERANCE
#define SCHEDULE_TOLERANCE
Definition: ffmpeg_sched.c:45
sch_add_demux
int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx)
Add a demuxer to the scheduler.
Definition: ffmpeg_sched.c:676
PreMuxQueue::data_size
size_t data_size
Definition: ffmpeg_sched.c:179
AV_TIME_BASE_Q
#define AV_TIME_BASE_Q
Internal time base represented as fractional value.
Definition: avutil.h:264
int64_t
long long int64_t
Definition: coverity.c:34
SchTask::func
SchThreadFunc func
Definition: ffmpeg_sched.c:67
mux_done
static int mux_done(Scheduler *sch, unsigned mux_idx)
Definition: ffmpeg_sched.c:2057
Scheduler::nb_enc
unsigned nb_enc
Definition: ffmpeg_sched.c:290
av_frame_free
void av_frame_free(AVFrame **frame)
Free the frame and any dynamically allocated objects in it, e.g.
Definition: frame.c:130
SQFRAME
#define SQFRAME(frame)
Definition: sync_queue.h:38
check_acyclic_for_output
static int check_acyclic_for_output(const Scheduler *sch, SchedulerNode src, uint8_t *filters_visited, SchedulerNode *filters_stack)
Definition: ffmpeg_sched.c:1289
AVFrame
This structure describes decoded (raw) audio or video data.
Definition: frame.h:344
w
uint8_t w
Definition: llviddspenc.c:38
task_cleanup
static int task_cleanup(Scheduler *sch, SchedulerNode node)
Definition: ffmpeg_sched.c:2428
frame_move
static void frame_move(void *dst, void *src)
Definition: ffmpeg_utils.h:51
sync_queue.h
AVPacket::data
uint8_t * data
Definition: packet.h:522
SchMux::nb_streams_ready
unsigned nb_streams_ready
Definition: ffmpeg_sched.c:212
SchDemux::nb_streams
unsigned nb_streams
Definition: ffmpeg_sched.c:154
send_to_mux
static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx, AVPacket *pkt)
Definition: ffmpeg_sched.c:1786
CYCLE_NODE_STARTED
@ CYCLE_NODE_STARTED
Definition: ffmpeg_sched.c:1284
Scheduler::nb_mux_ready
unsigned nb_mux_ready
Definition: ffmpeg_sched.c:278
atomic_int
intptr_t atomic_int
Definition: stdatomic.h:55
objpool_free
void objpool_free(ObjPool **pop)
Definition: objpool.c:54
enc_done
static int enc_done(Scheduler *sch, unsigned enc_idx)
Definition: ffmpeg_sched.c:2300
SCH_NODE_TYPE_MUX
@ SCH_NODE_TYPE_MUX
Definition: ffmpeg_sched.h:92
AV_LOG_VERBOSE
#define AV_LOG_VERBOSE
Detailed information.
Definition: log.h:196
AVPacket::duration
int64_t duration
Duration of this packet in AVStream->time_base units, 0 if unknown.
Definition: packet.h:540
SchWaiter::choked_prev
int choked_prev
Definition: ffmpeg_sched.c:59
QUEUE_PACKETS
@ QUEUE_PACKETS
Definition: ffmpeg_sched.c:48
SchMux
Definition: ffmpeg_sched.c:207
Scheduler::class
const AVClass * class
Definition: ffmpeg_sched.c:270
objpool_alloc_packets
ObjPool * objpool_alloc_packets(void)
Definition: objpool.c:124
SchFilterOut
Definition: ffmpeg_sched.c:237
AVFrame::buf
AVBufferRef * buf[AV_NUM_DATA_POINTERS]
AVBuffer references backing the data for this frame.
Definition: frame.h:557
SCH_STATE_UNINIT
@ SCH_STATE_UNINIT
Definition: ffmpeg_sched.c:264
Timestamp::ts
int64_t ts
Definition: ffmpeg_utils.h:31
PreMuxQueue::fifo
AVFifo * fifo
Queue for buffering the packets before the muxer task can be started.
Definition: ffmpeg_sched.c:170
SchMuxStream::last_dts
int64_t last_dts
Definition: ffmpeg_sched.c:201
av_packet_free
void av_packet_free(AVPacket **pkt)
Free the packet, if the packet is reference counted, it will be unreferenced first.
Definition: avpacket.c:74
DEFAULT_FRAME_THREAD_QUEUE_SIZE
#define DEFAULT_FRAME_THREAD_QUEUE_SIZE
Default size of a frame thread queue.
Definition: ffmpeg_sched.h:246
Scheduler::last_dts
atomic_int_least64_t last_dts
Definition: ffmpeg_sched.c:307
SchDemux::send_pkt
AVPacket * send_pkt
Definition: ffmpeg_sched.c:160
sch_mux_stream_ready
int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
Signal to the scheduler that the specified muxed stream is initialized and ready.
Definition: ffmpeg_sched.c:1137
send_to_enc_thread
static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1635
task_stop
static int task_stop(Scheduler *sch, SchTask *task)
Definition: ffmpeg_sched.c:2468
SchFilterIn::receive_finished
int receive_finished
Definition: ffmpeg_sched.c:234
SchedulerNode::type
enum SchedulerNodeType type
Definition: ffmpeg_sched.h:100
fifo.h
finish
static void finish(void)
Definition: movenc.c:342
sch_stop
int sch_stop(Scheduler *sch, int64_t *finish_ts)
Definition: ffmpeg_sched.c:2484
SchEnc::sq_idx
int sq_idx[2]
Definition: ffmpeg_sched.c:113
fail
#define fail()
Definition: checkasm.h:179
av_fifo_write
int av_fifo_write(AVFifo *f, const void *buf, size_t nb_elems)
Write data into a FIFO.
Definition: fifo.c:188
sch_dec_send
int sch_dec_send(Scheduler *sch, unsigned dec_idx, AVFrame *frame)
Called by decoder tasks to send a decoded frame downstream.
Definition: ffmpeg_sched.c:2160
SchThreadFunc
int(* SchThreadFunc)(void *arg)
Definition: ffmpeg_sched.h:105
SchFilterOut::dst
SchedulerNode dst
Definition: ffmpeg_sched.c:238
SchEnc
Definition: ffmpeg_sched.c:103
dummy
int dummy
Definition: motion.c:66
av_fifo_grow2
int av_fifo_grow2(AVFifo *f, size_t inc)
Enlarge an AVFifo.
Definition: fifo.c:99
SchDec::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:84
sch_add_mux_stream
int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
Add a muxed stream for a previously added muxer.
Definition: ffmpeg_sched.c:644
SchMux::class
const AVClass * class
Definition: ffmpeg_sched.c:208
SchFilterGraph::nb_inputs_finished_send
atomic_uint nb_inputs_finished_send
Definition: ffmpeg_sched.c:246
SCH_STATE_STOPPED
@ SCH_STATE_STOPPED
Definition: ffmpeg_sched.c:266
sq_receive
int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
Read a frame from the queue.
Definition: sync_queue.c:608
type
it s the only field you need to keep assuming you have a context There is some magic you don t need to care about around this just let it vf type
Definition: writing_filters.txt:86
Scheduler::nb_dec
unsigned nb_dec
Definition: ffmpeg_sched.c:287
av_thread_message_queue_recv
int av_thread_message_queue_recv(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Receive a message from the queue.
Definition: threadmessage.c:177
SchDec::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:78
sch_add_filtergraph
int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, SchThreadFunc func, void *ctx)
Add a filtergraph to the scheduler.
Definition: ffmpeg_sched.c:799
av_frame_alloc
AVFrame * av_frame_alloc(void)
Allocate an AVFrame and set its fields to default values.
Definition: frame.c:118
SchFilterGraph
Definition: ffmpeg_sched.c:241
SchMuxStream::src_sched
SchedulerNode src_sched
Definition: ffmpeg_sched.c:186
avassert.h
pkt
AVPacket * pkt
Definition: movenc.c:59
AV_LOG_ERROR
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:180
sch_free
void sch_free(Scheduler **psch)
Definition: ffmpeg_sched.c:461
Scheduler::state
enum SchedulerState state
Definition: ffmpeg_sched.c:301
SchMux::streams
SchMuxStream * streams
Definition: ffmpeg_sched.c:210
av_thread_message_queue_send
int av_thread_message_queue_send(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Send a message on the queue.
Definition: threadmessage.c:161
Scheduler::mux_done_cond
pthread_cond_t mux_done_cond
Definition: ffmpeg_sched.c:283
av_fifo_read
int av_fifo_read(AVFifo *f, void *buf, size_t nb_elems)
Read data from a FIFO.
Definition: fifo.c:240
SchMuxStream
Definition: ffmpeg_sched.c:184
sch_add_mux
int sch_add_mux(Scheduler *sch, SchThreadFunc func, int(*init)(void *), void *arg, int sdp_auto, unsigned thread_queue_size)
Add a muxer to the scheduler.
Definition: ffmpeg_sched.c:620
waiter_set
static void waiter_set(SchWaiter *w, int choked)
Definition: ffmpeg_sched.c:335
SchFilterGraph::nb_outputs
unsigned nb_outputs
Definition: ffmpeg_sched.c:250
SchDec::expect_end_ts
int expect_end_ts
Definition: ffmpeg_sched.c:88
enc_open
static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
Definition: ffmpeg_sched.c:1609
sch_alloc
Scheduler * sch_alloc(void)
Definition: ffmpeg_sched.c:573
dec_send_to_dst
static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVFrame *frame)
Definition: ffmpeg_sched.c:2130
task_init
static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx, SchThreadFunc func, void *func_arg)
Definition: ffmpeg_sched.c:427
SchMuxStream::nb_sub_heartbeat_dst
unsigned nb_sub_heartbeat_dst
Definition: ffmpeg_sched.c:189
op
static int op(uint8_t **dst, const uint8_t *dst_end, GetByteContext *gb, int pixel, int count, int *x, int width, int linesize)
Perform decode operation.
Definition: anm.c:76
SchEnc::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:107
sch_add_demux_stream
int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
Add a demuxed stream for a previously added demuxer.
Definition: ffmpeg_sched.c:703
SchMuxStream::src
SchedulerNode src
Definition: ffmpeg_sched.c:185
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:40
SchedulerNodeType
SchedulerNodeType
Definition: ffmpeg_sched.h:89
ctx
AVFormatContext * ctx
Definition: movenc.c:48
nb_streams
static int nb_streams
Definition: ffprobe.c:383
SchMuxStream::source_finished
int source_finished
Definition: ffmpeg_sched.c:203
av_rescale_q
int64_t av_rescale_q(int64_t a, AVRational bq, AVRational cq)
Rescale a 64-bit integer by 2 rational numbers.
Definition: mathematics.c:142
ffmpeg_utils.h
filter_done
static int filter_done(Scheduler *sch, unsigned fg_idx)
Definition: ffmpeg_sched.c:2392
SchFilterGraph::class
const AVClass * class
Definition: ffmpeg_sched.c:242
sch_enc_receive
int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
Called by encoder tasks to obtain frames for encoding.
Definition: ffmpeg_sched.c:2221
AVThreadMessageQueue
Definition: threadmessage.c:30
atomic_load
#define atomic_load(object)
Definition: stdatomic.h:93
objpool_alloc_frames
ObjPool * objpool_alloc_frames(void)
Definition: objpool.c:128
frame
static AVFrame * frame
Definition: demux_decode.c:54
sch_sdp_filename
int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
Set the file path for the SDP.
Definition: ffmpeg_sched.c:607
SchEnc::class
const AVClass * class
Definition: ffmpeg_sched.c:104
demux_send_for_stream
static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds, AVPacket *pkt, unsigned flags)
Definition: ffmpeg_sched.c:1880
arg
const char * arg
Definition: jacosubdec.c:67
pthread_create
static av_always_inline int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
Definition: os2threads.h:80
SchMuxStream::pre_mux_queue
PreMuxQueue pre_mux_queue
Definition: ffmpeg_sched.c:191
sq_add_stream
int sq_add_stream(SyncQueue *sq, int limiting)
Add a new stream to the sync queue.
Definition: sync_queue.c:620
SchMux::mux_started
atomic_int mux_started
Set to 1 after starting the muxer task and flushing the pre-muxing queues.
Definition: ffmpeg_sched.c:223
SCH_NODE_TYPE_DEMUX
@ SCH_NODE_TYPE_DEMUX
Definition: ffmpeg_sched.h:91
Scheduler::demux
SchDemux * demux
Definition: ffmpeg_sched.c:272
pkt_move
static void pkt_move(void *dst, void *src)
Definition: ffmpeg_utils.h:46
AVPacket::buf
AVBufferRef * buf
A reference to the reference-counted buffer where the packet data is stored.
Definition: packet.h:505
tq_free
void tq_free(ThreadQueue **ptq)
Definition: thread_queue.c:55
LIBAVUTIL_VERSION_INT
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
waiter_uninit
static void waiter_uninit(SchWaiter *w)
Definition: ffmpeg_sched.c:362
AVClass
Describe the class of an AVClass context structure.
Definition: log.h:66
Scheduler::sdp_filename
char * sdp_filename
Definition: ffmpeg_sched.c:298
send_to_enc_sq
static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1654
NULL
#define NULL
Definition: coverity.c:32
SchEnc::open_cb
int(* open_cb)(void *opaque, const AVFrame *frame)
Definition: ffmpeg_sched.c:131
av_frame_copy_props
int av_frame_copy_props(AVFrame *dst, const AVFrame *src)
Copy only "metadata" fields from src to dst.
Definition: frame.c:679
SchFilterGraph::nb_inputs
unsigned nb_inputs
Definition: ffmpeg_sched.c:245
CYCLE_NODE_NEW
@ CYCLE_NODE_NEW
Definition: ffmpeg_sched.c:1283
Scheduler::mux
SchMux * mux
Definition: ffmpeg_sched.c:275
schedule_update_locked
static void schedule_update_locked(Scheduler *sch)
Definition: ffmpeg_sched.c:1216
tq_receive_finish
void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the receiving side.
Definition: thread_queue.c:241
sch_add_enc
int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx, int(*open_cb)(void *opaque, const AVFrame *frame))
Definition: ffmpeg_sched.c:761
SchSyncQueue::enc_idx
unsigned * enc_idx
Definition: ffmpeg_sched.c:99
SCH_STATE_STARTED
@ SCH_STATE_STARTED
Definition: ffmpeg_sched.c:265
dec_done
static int dec_done(Scheduler *sch, unsigned dec_idx)
Definition: ffmpeg_sched.c:2200
SchFilterGraph::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:255
av_fifo_can_read
size_t av_fifo_can_read(const AVFifo *f)
Definition: fifo.c:87
SchEnc::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:108
sch_add_dec
int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
Add a decoder to the scheduler.
Definition: ffmpeg_sched.c:721
SchWaiter::choked
atomic_int choked
Definition: ffmpeg_sched.c:55
SchWaiter::cond
pthread_cond_t cond
Definition: ffmpeg_sched.c:54
time.h
DEMUX_SEND_STREAMCOPY_EOF
@ DEMUX_SEND_STREAMCOPY_EOF
Treat the packet as an EOF for SCH_NODE_TYPE_MUX destinations send normally to other types.
Definition: ffmpeg_sched.h:324
sch_fg_class
static const AVClass sch_fg_class
Definition: ffmpeg_sched.c:793
QUEUE_FRAMES
@ QUEUE_FRAMES
Definition: ffmpeg_sched.c:49
av_packet_ref
int av_packet_ref(AVPacket *dst, const AVPacket *src)
Setup a new reference to the data described by a given packet.
Definition: avpacket.c:435
av_packet_move_ref
void av_packet_move_ref(AVPacket *dst, AVPacket *src)
Move every field in src to dst and reset src.
Definition: avpacket.c:484
SchTask::thread_running
int thread_running
Definition: ffmpeg_sched.c:71
sch_enc_send
int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
Called by encoder tasks to send encoded packets downstream.
Definition: ffmpeg_sched.c:2265
pthread_mutex_unlock
#define pthread_mutex_unlock(a)
Definition: ffprobe.c:81
error.h
Scheduler
Definition: ffmpeg_sched.c:269
SchMux::nb_streams
unsigned nb_streams
Definition: ffmpeg_sched.c:211
SchSyncQueue::lock
pthread_mutex_t lock
Definition: ffmpeg_sched.c:97
SchMuxStream::sub_heartbeat_dst
unsigned * sub_heartbeat_dst
Definition: ffmpeg_sched.c:188
SchDec::class
const AVClass * class
Definition: ffmpeg_sched.c:75
sq_frame_samples
void sq_frame_samples(SyncQueue *sq, unsigned int stream_idx, int frame_samples)
Set a constant output audio frame size, in samples.
Definition: sync_queue.c:661
SchEnc::in_finished
int in_finished
Definition: ffmpeg_sched.c:138
SchDemux::task
SchTask task
Definition: ffmpeg_sched.c:156
SchDemuxStream::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:147
SchFilterGraph::nb_inputs_finished_receive
unsigned nb_inputs_finished_receive
Definition: ffmpeg_sched.c:247
tq_send
int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
Send an item for the given stream to the queue.
Definition: thread_queue.c:120
init
int(* init)(AVBSFContext *ctx)
Definition: dts2pts.c:365
AVPacket::size
int size
Definition: packet.h:523
AVFifo
Definition: fifo.c:35
SchSyncQueue::nb_enc_idx
unsigned nb_enc_idx
Definition: ffmpeg_sched.c:100
SchFilterGraph::task_exited
int task_exited
Definition: ffmpeg_sched.c:260
av_frame_ref
int av_frame_ref(AVFrame *dst, const AVFrame *src)
Set up a new reference to the data described by the source frame.
Definition: frame.c:354
threadmessage.h
PreMuxQueue::max_packets
int max_packets
Maximum number of packets in fifo.
Definition: ffmpeg_sched.c:174
SchFilterGraph::task
SchTask task
Definition: ffmpeg_sched.c:252
av_err2str
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
Definition: error.h:121
SchWaiter::lock
pthread_mutex_t lock
Definition: ffmpeg_sched.c:53
sq_send
int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame)
Submit a frame for the stream with index stream_idx.
Definition: sync_queue.c:343
PreMuxQueue::data_threshold
size_t data_threshold
Definition: ffmpeg_sched.c:181
sq_free
void sq_free(SyncQueue **psq)
Definition: sync_queue.c:699
AV_NOPTS_VALUE
#define AV_NOPTS_VALUE
Undefined timestamp value.
Definition: avutil.h:248
sch_dec_class
static const AVClass sch_dec_class
Definition: ffmpeg_sched.c:715
SchFilterGraph::inputs
SchFilterIn * inputs
Definition: ffmpeg_sched.c:244
Scheduler::schedule_lock
pthread_mutex_t schedule_lock
Definition: ffmpeg_sched.c:305
frame.h
SchTask::func_arg
void * func_arg
Definition: ffmpeg_sched.c:68
SCH_NODE_TYPE_FILTER_OUT
@ SCH_NODE_TYPE_FILTER_OUT
Definition: ffmpeg_sched.h:96
AVPacket::dts
int64_t dts
Decompression timestamp in AVStream->time_base units; the time at which the packet is decompressed.
Definition: packet.h:521
ObjPool
Definition: objpool.c:30
enc_send_to_dst
static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVPacket *pkt)
Definition: ffmpeg_sched.c:2235
sch_mux_class
static const AVClass sch_mux_class
Definition: ffmpeg_sched.c:614
SchFilterIn
Definition: ffmpeg_sched.c:230
sch_filter_receive
int sch_filter_receive(Scheduler *sch, unsigned fg_idx, unsigned *in_idx, AVFrame *frame)
Called by filtergraph tasks to obtain frames for filtering.
Definition: ffmpeg_sched.c:2316
Scheduler::nb_mux
unsigned nb_mux
Definition: ffmpeg_sched.c:276
av_packet_alloc
AVPacket * av_packet_alloc(void)
Allocate an AVPacket and set its fields to default values.
Definition: avpacket.c:63
tq_alloc
ThreadQueue * tq_alloc(unsigned int nb_streams, size_t queue_size, ObjPool *obj_pool, void(*obj_move)(void *dst, void *src))
Allocate a queue for sending data between threads.
Definition: thread_queue.c:79
SchEnc::opened
int opened
Definition: ffmpeg_sched.c:132
scheduler_class
static const AVClass scheduler_class
Definition: ffmpeg_sched.c:568
pthread_t
Definition: os2threads.h:44
pthread_cond_destroy
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:144
Scheduler::nb_demux
unsigned nb_demux
Definition: ffmpeg_sched.c:273
Scheduler::task_failed
atomic_int task_failed
Definition: ffmpeg_sched.c:303
av_thread_message_queue_alloc
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, unsigned nelem, unsigned elsize)
Allocate a new message queue.
Definition: threadmessage.c:45
pthread_mutex_destroy
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
av_packet_copy_props
int av_packet_copy_props(AVPacket *dst, const AVPacket *src)
Copy only "properties" fields from src to dst.
Definition: avpacket.c:390
SchDemuxStream::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:146
SchDemux::task_exited
int task_exited
Definition: ffmpeg_sched.c:163
SCH_NODE_TYPE_FILTER_IN
@ SCH_NODE_TYPE_FILTER_IN
Definition: ffmpeg_sched.h:95
task_start
static int task_start(SchTask *task)
Definition: ffmpeg_sched.c:408
Scheduler::filters
SchFilterGraph * filters
Definition: ffmpeg_sched.c:295
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:255
AVPacket::pts
int64_t pts
Presentation timestamp in AVStream->time_base units; the time at which the decompressed packet will b...
Definition: packet.h:515
demux_done
static int demux_done(Scheduler *sch, unsigned demux_idx)
Definition: ffmpeg_sched.c:1978
packet.h
SchWaiter::choked_next
int choked_next
Definition: ffmpeg_sched.c:60
SchFilterGraph::best_input
unsigned best_input
Definition: ffmpeg_sched.c:259
av_malloc_array
#define av_malloc_array(a, b)
Definition: tableprint_vlc.h:31
Scheduler::mux_ready_lock
pthread_mutex_t mux_ready_lock
Definition: ffmpeg_sched.c:279
Scheduler::terminate
atomic_int terminate
Definition: ffmpeg_sched.c:302
SchDec
Definition: ffmpeg_sched.c:74
DEFAULT_PACKET_THREAD_QUEUE_SIZE
#define DEFAULT_PACKET_THREAD_QUEUE_SIZE
Default size of a packet thread queue.
Definition: ffmpeg_sched.h:241
QueueType
QueueType
Definition: ffmpeg_sched.c:47
FFMIN
#define FFMIN(a, b)
Definition: macros.h:49
av_frame_unref
void av_frame_unref(AVFrame *frame)
Unreference all the buffers referenced by frame and reset the frame fields.
Definition: frame.c:576
trailing_dts
static int64_t trailing_dts(const Scheduler *sch, int count_finished)
Definition: ffmpeg_sched.c:439
Scheduler::mux_done_lock
pthread_mutex_t mux_done_lock
Definition: ffmpeg_sched.c:282
av_mallocz
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:254
SchFilterGraph::outputs
SchFilterOut * outputs
Definition: ffmpeg_sched.c:249
sch_enc_class
static const AVClass sch_enc_class
Definition: ffmpeg_sched.c:755
SchedulerNode
Definition: ffmpeg_sched.h:99
SCH_NODE_TYPE_DEC
@ SCH_NODE_TYPE_DEC
Definition: ffmpeg_sched.h:93
pthread_cond_t
Definition: os2threads.h:58
SchTask
Definition: ffmpeg_sched.c:63
mux_init
static int mux_init(Scheduler *sch, SchMux *mux)
Definition: ffmpeg_sched.c:1085
send_to_filter
static int send_to_filter(Scheduler *sch, SchFilterGraph *fg, unsigned in_idx, AVFrame *frame)
Definition: ffmpeg_sched.c:2113
tq_receive
int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
Read the next item from the queue.
Definition: thread_queue.c:196
SchDemuxStream::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:145
av_calloc
void * av_calloc(size_t nmemb, size_t size)
Definition: mem.c:262
sch_connect
int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
Definition: ffmpeg_sched.c:897
send_to_enc
static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1730
sch_filter_command
int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
Definition: ffmpeg_sched.c:2418
SchDemuxStream
Definition: ffmpeg_sched.c:144
Timestamp::tb
AVRational tb
Definition: ffmpeg_utils.h:32
atomic_int_least64_t
intptr_t atomic_int_least64_t
Definition: stdatomic.h:68
SchFilterIn::src_sched
SchedulerNode src_sched
Definition: ffmpeg_sched.c:232
ret
ret
Definition: filter_design.txt:187
sch_dec_receive
int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
Called by decoder tasks to receive a packet for decoding.
Definition: ffmpeg_sched.c:2084
AVClass::class_name
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:71
SchMuxStream::init_eof
int init_eof
Definition: ffmpeg_sched.c:194
mux_queue_packet
static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt)
Definition: ffmpeg_sched.c:1750
SchMux::init
int(* init)(void *arg)
Definition: ffmpeg_sched.c:214
sch_demux_class
static const AVClass sch_demux_class
Definition: ffmpeg_sched.c:670
ThreadQueue
Definition: thread_queue.c:42
av_fifo_alloc2
AVFifo * av_fifo_alloc2(size_t nb_elems, size_t elem_size, unsigned int flags)
Allocate and initialize an AVFifo with a given element size.
Definition: fifo.c:47
pthread_cond_signal
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:152
task_wrapper
static void * task_wrapper(void *arg)
Definition: ffmpeg_sched.c:2440
SchMux::task
SchTask task
Definition: ffmpeg_sched.c:216
SyncQueue
A sync queue provides timestamp synchronization between multiple streams.
Definition: sync_queue.c:88
sch_demux_send
int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt, unsigned flags)
Called by demuxer tasks to communicate with their downstreams.
Definition: ffmpeg_sched.c:1956
SchDemux
Definition: ffmpeg_sched.c:150
Scheduler::dec
SchDec * dec
Definition: ffmpeg_sched.c:286
atomic_fetch_add
#define atomic_fetch_add(object, operand)
Definition: stdatomic.h:131
SchDec::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:79
atomic_uint
intptr_t atomic_uint
Definition: stdatomic.h:56
SchDec::queue_end_ts
AVThreadMessageQueue * queue_end_ts
Definition: ffmpeg_sched.c:87
demux_stream_send_to_dst
static int demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
Definition: ffmpeg_sched.c:1845
SchDec::src
SchedulerNode src
Definition: ffmpeg_sched.c:77
thread_queue.h
AVPacket::stream_index
int stream_index
Definition: packet.h:524
GROW_ARRAY
#define GROW_ARRAY(array, nb_elems)
Definition: cmdutils.h:465
pthread_cond_wait
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:192
SchMux::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:224
SchDemux::waiter
SchWaiter waiter
Definition: ffmpeg_sched.c:157
av_gettime
int64_t av_gettime(void)
Get the current time in microseconds.
Definition: time.c:39
waiter_wait
static int waiter_wait(Scheduler *sch, SchWaiter *w)
Wait until this task is allowed to proceed.
Definition: ffmpeg_sched.c:316
av_strdup
char * av_strdup(const char *s)
Duplicate a string.
Definition: mem.c:270
SchSyncQueue::frame
AVFrame * frame
Definition: ffmpeg_sched.c:96
SchTask::node
SchedulerNode node
Definition: ffmpeg_sched.c:65
sch_sq_add_enc
int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx, int limiting, uint64_t max_frames)
Definition: ffmpeg_sched.c:866
print_sdp
int print_sdp(const char *filename)
Definition: ffmpeg_mux.c:507
mem.h
start_prepare
static int start_prepare(Scheduler *sch)
Definition: ffmpeg_sched.c:1381
sch_mux_sub_heartbeat_add
int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, unsigned dec_idx)
Definition: ffmpeg_sched.c:1162
SchedulerNode::idx
unsigned idx
Definition: ffmpeg_sched.h:101
sch_filter_receive_finish
void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
Called by filter tasks to signal that a filter input will no longer accept input.
Definition: ffmpeg_sched.c:2360
ffmpeg_sched.h
SchEnc::src
SchedulerNode src
Definition: ffmpeg_sched.c:106
sch_wait
int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
Wait until transcoding terminates or the specified timeout elapses.
Definition: ffmpeg_sched.c:1582
AVPacket
This structure stores compressed data.
Definition: packet.h:499
av_thread_message_queue_free
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
Free a message queue.
Definition: threadmessage.c:96
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:34
src
INIT_CLIP pixel * src
Definition: h264pred_template.c:418
cmdutils.h
SchSyncQueue
Definition: ffmpeg_sched.c:94
d
d
Definition: ffmpeg_filter.c:409
SchMux::queue_size
unsigned queue_size
Definition: ffmpeg_sched.c:225
SchTask::parent
Scheduler * parent
Definition: ffmpeg_sched.c:64
SchDec::send_frame
AVFrame * send_frame
Definition: ffmpeg_sched.c:91
queue_alloc
static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size, enum QueueType type)
Definition: ffmpeg_sched.c:368
sch_start
int sch_start(Scheduler *sch)
Definition: ffmpeg_sched.c:1516
flags
#define flags(name, subs,...)
Definition: cbs_av1.c:482
av_thread_message_queue_set_err_recv
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err)
Set the receiving error code.
Definition: threadmessage.c:204
pthread_cond_timedwait
static av_always_inline int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime)
Definition: os2threads.h:170
av_log
#define av_log(a,...)
Definition: tableprint_vlc.h:27
sch_mux_sub_heartbeat
int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, const AVPacket *pkt)
Definition: ffmpeg_sched.c:2031
av_fifo_freep2
void av_fifo_freep2(AVFifo **f)
Free an AVFifo and reset pointer to NULL.
Definition: fifo.c:286
SchEnc::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:136
pthread_cond_init
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:133
AVERROR_EXIT
#define AVERROR_EXIT
Immediate exit was requested; the called function should not be restarted.
Definition: error.h:58
SYNC_QUEUE_FRAMES
@ SYNC_QUEUE_FRAMES
Definition: sync_queue.h:30
sq_alloc
SyncQueue * sq_alloc(enum SyncQueueType type, int64_t buf_size_us, void *logctx)
Allocate a sync queue of the given type.
Definition: sync_queue.c:675
atomic_init
#define atomic_init(obj, value)
Definition: stdatomic.h:33
SchEnc::task
SchTask task
Definition: ffmpeg_sched.c:134
Timestamp
Definition: ffmpeg_utils.h:30
SchFilterIn::src
SchedulerNode src
Definition: ffmpeg_sched.c:231
sch_mux_stream_buffering
void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, size_t data_threshold, int max_packets)
Configure limits on packet buffering performed before the muxer task is started.
Definition: ffmpeg_sched.c:1121
check_acyclic
static int check_acyclic(Scheduler *sch)
Definition: ffmpeg_sched.c:1334
SchDemux::streams
SchDemuxStream * streams
Definition: ffmpeg_sched.c:153
PreMuxQueue
Definition: ffmpeg_sched.c:166
CYCLE_NODE_DONE
@ CYCLE_NODE_DONE
Definition: ffmpeg_sched.c:1285
int
int
Definition: ffmpeg_filter.c:409
SchDec::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:80
Scheduler::sdp_auto
int sdp_auto
Definition: ffmpeg_sched.c:299
SchFilterIn::send_finished
int send_finished
Definition: ffmpeg_sched.c:233
SchFilterGraph::waiter
SchWaiter waiter
Definition: ffmpeg_sched.c:256
AVPacket::time_base
AVRational time_base
Time base of the packet's timestamps.
Definition: packet.h:566
unchoke_for_stream
static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
Definition: ffmpeg_sched.c:1191
AVPacket::side_data_elems
int side_data_elems
Definition: packet.h:534
sch_mux_receive
int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
Called by muxer tasks to obtain packets for muxing.
Definition: ffmpeg_sched.c:2000
sch_add_sq_enc
int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
Add an pre-encoding sync queue to the scheduler.
Definition: ffmpeg_sched.c:841
pthread_mutex_lock
#define pthread_mutex_lock(a)
Definition: ffprobe.c:77
SchEnc::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:109
tq_send_finish
void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the sending side.
Definition: thread_queue.c:226
Scheduler::nb_filters
unsigned nb_filters
Definition: ffmpeg_sched.c:296