FFmpeg
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
async.c
Go to the documentation of this file.
1 /*
2  * Input async protocol.
3  * Copyright (c) 2015 Zhang Rui <bbcallen@gmail.com>
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  *
21  * Based on libavformat/cache.c by Michael Niedermayer
22  */
23 
24  /**
25  * @TODO
26  * support timeout
27  * support backward short seek
28  * support work with concatdec, hls
29  */
30 
31 #include "libavutil/avassert.h"
32 #include "libavutil/avstring.h"
33 #include "libavutil/error.h"
34 #include "libavutil/fifo.h"
35 #include "libavutil/log.h"
36 #include "libavutil/opt.h"
37 #include "url.h"
38 #include <stdint.h>
39 #include <pthread.h>
40 
41 #if HAVE_UNISTD_H
42 #include <unistd.h>
43 #endif
44 
45 #define BUFFER_CAPACITY (4 * 1024 * 1024)
46 #define SHORT_SEEK_THRESHOLD (256 * 1024)
47 
48 typedef struct Context {
49  AVClass *class;
51 
53  size_t seek_pos;
56  int64_t seek_ret;
57 
58  int io_error;
60 
61  size_t logical_pos;
62  size_t logical_size;
64 
69 
72 } Context;
73 
74 static int async_check_interrupt(void *arg)
75 {
76  URLContext *h = arg;
77  Context *c = h->priv_data;
78 
79  if (c->abort_request)
80  return 1;
81 
83  c->abort_request = 1;
84 
85  return c->abort_request;
86 }
87 
88 static void *async_buffer_task(void *arg)
89 {
90  URLContext *h = arg;
91  Context *c = h->priv_data;
92  AVFifoBuffer *fifo = c->fifo;
93  int ret = 0;
94 
95  while (1) {
96  int fifo_space, to_copy;
97 
99  if (async_check_interrupt(h)) {
100  c->io_eof_reached = 1;
101  c->io_error = AVERROR_EXIT;
104  break;
105  }
106 
107  if (c->seek_request) {
108  ret = ffurl_seek(c->inner, c->seek_pos, c->seek_whence);
109  if (ret < 0) {
110  c->io_eof_reached = 1;
111  c->io_error = ret;
112  } else {
113  c->io_eof_reached = 0;
114  c->io_error = 0;
115  }
116 
117  c->seek_completed = 1;
118  c->seek_ret = ret;
119  c->seek_request = 0;
120 
121  av_fifo_reset(fifo);
122 
125  continue;
126  }
127 
128  fifo_space = av_fifo_space(fifo);
129  if (c->io_eof_reached || fifo_space <= 0) {
133  continue;
134  }
136 
137  to_copy = FFMIN(4096, fifo_space);
138  ret = av_fifo_generic_write(fifo, c->inner, to_copy, (void *)ffurl_read);
139 
141  if (ret <= 0) {
142  c->io_eof_reached = 1;
143  if (ret < 0) {
144  c->io_error = ret;
145  }
146  }
147 
150  }
151 
152  return NULL;
153 }
154 
155 static int async_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
156 {
157  Context *c = h->priv_data;
158  int ret;
159  AVIOInterruptCB interrupt_callback = {.callback = async_check_interrupt, .opaque = h};
160 
161  av_strstart(arg, "async:", &arg);
162 
164  if (!c->fifo) {
165  ret = AVERROR(ENOMEM);
166  goto fifo_fail;
167  }
168 
169  /* wrap interrupt callback */
171  ret = ffurl_open(&c->inner, arg, flags, &interrupt_callback, options);
172  if (ret != 0) {
173  av_log(h, AV_LOG_ERROR, "ffurl_open failed : %s, %s\n", av_err2str(ret), arg);
174  goto url_fail;
175  }
176 
177  c->logical_size = ffurl_size(c->inner);
178  h->is_streamed = c->inner->is_streamed;
179 
180  ret = pthread_mutex_init(&c->mutex, NULL);
181  if (ret != 0) {
182  av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", av_err2str(ret));
183  goto mutex_fail;
184  }
185 
187  if (ret != 0) {
188  av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", av_err2str(ret));
189  goto cond_wakeup_main_fail;
190  }
191 
193  if (ret != 0) {
194  av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", av_err2str(ret));
195  goto cond_wakeup_background_fail;
196  }
197 
199  if (ret) {
200  av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", av_err2str(ret));
201  goto thread_fail;
202  }
203 
204  return 0;
205 
206 thread_fail:
208 cond_wakeup_background_fail:
210 cond_wakeup_main_fail:
212 mutex_fail:
213  ffurl_close(c->inner);
214 url_fail:
215  av_fifo_freep(&c->fifo);
216 fifo_fail:
217  return ret;
218 }
219 
221 {
222  Context *c = h->priv_data;
223  int ret;
224 
226  c->abort_request = 1;
229 
231  if (ret != 0)
232  av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", av_err2str(ret));
233 
237  ffurl_close(c->inner);
238  av_fifo_freep(&c->fifo);
239 
240  return 0;
241 }
242 
243 static int async_read_internal(URLContext *h, void *dest, int size, int read_complete,
244  void (*func)(void*, void*, int))
245 {
246  Context *c = h->priv_data;
247  AVFifoBuffer *fifo = c->fifo;
248  int to_read = size;
249  int ret = 0;
250 
252 
253  while (to_read > 0) {
254  int fifo_size, to_copy;
255  if (async_check_interrupt(h)) {
256  ret = AVERROR_EXIT;
257  break;
258  }
259  fifo_size = av_fifo_size(fifo);
260  to_copy = FFMIN(to_read, fifo_size);
261  if (to_copy > 0) {
262  av_fifo_generic_read(fifo, dest, to_copy, func);
263  if (!func)
264  dest = (uint8_t *)dest + to_copy;
265  c->logical_pos += to_copy;
266  to_read -= to_copy;
267  ret = size - to_read;
268 
269  if (to_read <= 0 || !read_complete)
270  break;
271  } else if (c->io_eof_reached) {
272  if (ret <= 0)
273  ret = AVERROR_EOF;
274  break;
275  }
278  }
279 
282 
283  return ret;
284 }
285 
286 static int async_read(URLContext *h, unsigned char *buf, int size)
287 {
288  return async_read_internal(h, buf, size, 0, NULL);
289 }
290 
291 static void fifo_do_not_copy_func(void* dest, void* src, int size) {
292  // do not copy
293 }
294 
295 static int64_t async_seek(URLContext *h, int64_t pos, int whence)
296 {
297  Context *c = h->priv_data;
298  AVFifoBuffer *fifo = c->fifo;
299  int64_t ret;
300  int64_t new_logical_pos;
301  int fifo_size;
302 
303  if (whence == AVSEEK_SIZE) {
304  av_log(h, AV_LOG_TRACE, "async_seek: AVSEEK_SIZE: %"PRId64"\n", (int64_t)c->logical_size);
305  return c->logical_size;
306  } else if (whence == SEEK_CUR) {
307  av_log(h, AV_LOG_TRACE, "async_seek: %"PRId64"\n", pos);
308  new_logical_pos = pos + c->logical_pos;
309  } else if (whence == SEEK_SET){
310  av_log(h, AV_LOG_TRACE, "async_seek: %"PRId64"\n", pos);
311  new_logical_pos = pos;
312  } else {
313  return AVERROR(EINVAL);
314  }
315  if (new_logical_pos < 0)
316  return AVERROR(EINVAL);
317 
318  fifo_size = av_fifo_size(fifo);
319  if (new_logical_pos == c->logical_pos) {
320  /* current position */
321  return c->logical_pos;
322  } else if ((new_logical_pos > c->logical_pos) &&
323  (new_logical_pos < (c->logical_pos + fifo_size + SHORT_SEEK_THRESHOLD))) {
324  /* fast seek */
325  av_log(h, AV_LOG_TRACE, "async_seek: fask_seek %"PRId64" from %d dist:%d/%d\n",
326  new_logical_pos, (int)c->logical_pos,
327  (int)(new_logical_pos - c->logical_pos), fifo_size);
328  async_read_internal(h, NULL, new_logical_pos - c->logical_pos, 1, fifo_do_not_copy_func);
329  return c->logical_pos;
330  } else if (c->logical_size <= 0) {
331  /* can not seek */
332  return AVERROR(EINVAL);
333  } else if (new_logical_pos > c->logical_size) {
334  /* beyond end */
335  return AVERROR(EINVAL);
336  }
337 
339 
340  c->seek_request = 1;
341  c->seek_pos = new_logical_pos;
342  c->seek_whence = SEEK_SET;
343  c->seek_completed = 0;
344  c->seek_ret = 0;
345 
346  while (1) {
347  if (async_check_interrupt(h)) {
348  ret = AVERROR_EXIT;
349  break;
350  }
351  if (c->seek_completed) {
352  if (c->seek_ret >= 0)
353  c->logical_pos = c->seek_ret;
354  ret = c->seek_ret;
355  break;
356  }
359  }
360 
362 
363  return ret;
364 }
365 
366 #define OFFSET(x) offsetof(Context, x)
367 #define D AV_OPT_FLAG_DECODING_PARAM
368 
369 static const AVOption options[] = {
370  {NULL},
371 };
372 
373 static const AVClass async_context_class = {
374  .class_name = "Async",
375  .item_name = av_default_item_name,
376  .option = options,
377  .version = LIBAVUTIL_VERSION_INT,
378 };
379 
381  .name = "async",
382  .url_open2 = async_open,
383  .url_read = async_read,
384  .url_seek = async_seek,
385  .url_close = async_close,
386  .priv_data_size = sizeof(Context),
387  .priv_data_class = &async_context_class,
388 };
389 
390 #ifdef TEST
391 
392 #define TEST_SEEK_POS (1536)
393 #define TEST_STREAM_SIZE (2048)
394 
395 typedef struct TestContext {
396  AVClass *class;
397  size_t logical_pos;
398  size_t logical_size;
399 } TestContext;
400 
401 static int async_test_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
402 {
403  TestContext *c = h->priv_data;
404  c->logical_pos = 0;
405  c->logical_size = TEST_STREAM_SIZE;
406  return 0;
407 }
408 
409 static int async_test_close(URLContext *h)
410 {
411  return 0;
412 }
413 
414 static int async_test_read(URLContext *h, unsigned char *buf, int size)
415 {
416  TestContext *c = h->priv_data;
417  int i;
418  int read_len = 0;
419 
420  if (c->logical_pos >= c->logical_size)
421  return AVERROR_EOF;
422 
423  for (i = 0; i < size; ++i) {
424  buf[i] = c->logical_pos & 0xFF;
425 
426  c->logical_pos++;
427  read_len++;
428 
429  if (c->logical_pos >= c->logical_size)
430  break;
431  }
432 
433  return read_len;
434 }
435 
436 static int64_t async_test_seek(URLContext *h, int64_t pos, int whence)
437 {
438  TestContext *c = h->priv_data;
439  int64_t new_logical_pos;
440 
441  if (whence == AVSEEK_SIZE) {
442  return c->logical_size;
443  } else if (whence == SEEK_CUR) {
444  new_logical_pos = pos + c->logical_pos;
445  } else if (whence == SEEK_SET){
446  new_logical_pos = pos;
447  } else {
448  return AVERROR(EINVAL);
449  }
450  if (new_logical_pos < 0)
451  return AVERROR(EINVAL);
452 
453  c->logical_pos = new_logical_pos;
454  return new_logical_pos;
455 }
456 
457 static const AVClass async_test_context_class = {
458  .class_name = "Async-Test",
459  .item_name = av_default_item_name,
460  .version = LIBAVUTIL_VERSION_INT,
461 };
462 
463 URLProtocol ff_async_test_protocol = {
464  .name = "async-test",
465  .url_open2 = async_test_open,
466  .url_read = async_test_read,
467  .url_seek = async_test_seek,
468  .url_close = async_test_close,
469  .priv_data_size = sizeof(TestContext),
470  .priv_data_class = &async_test_context_class,
471 };
472 
473 int main(void)
474 {
475  URLContext *h = NULL;
476  int i;
477  int ret;
478  int64_t size;
479  int64_t pos;
480  int64_t read_len;
481  unsigned char buf[4096];
482 
483  ffurl_register_protocol(&ff_async_protocol);
484  ffurl_register_protocol(&ff_async_test_protocol);
485 
486  ret = ffurl_open(&h, "async:async-test:", AVIO_FLAG_READ, NULL, NULL);
487  printf("open: %d\n", ret);
488 
489  size = ffurl_size(h);
490  printf("size: %"PRId64"\n", size);
491 
492  pos = ffurl_seek(h, 0, SEEK_CUR);
493  read_len = 0;
494  while (1) {
495  ret = ffurl_read(h, buf, sizeof(buf));
496  if (ret == AVERROR_EOF) {
497  printf("read-error: AVERROR_EOF at %"PRId64"\n", ffurl_seek(h, 0, SEEK_CUR));
498  break;
499  }
500  else if (ret == 0)
501  break;
502  else if (ret < 0) {
503  printf("read-error: %d at %"PRId64"\n", ret, ffurl_seek(h, 0, SEEK_CUR));
504  goto fail;
505  } else {
506  for (i = 0; i < ret; ++i) {
507  if (buf[i] != (pos & 0xFF)) {
508  printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n",
509  (int)buf[i], (int)(pos & 0xFF), pos);
510  break;
511  }
512  pos++;
513  }
514  }
515 
516  read_len += ret;
517  }
518  printf("read: %"PRId64"\n", read_len);
519 
520  ret = ffurl_read(h, buf, 1);
521  printf("read: %d\n", ret);
522 
523  pos = ffurl_seek(h, TEST_SEEK_POS, SEEK_SET);
524  printf("seek: %"PRId64"\n", pos);
525 
526  read_len = 0;
527  while (1) {
528  ret = ffurl_read(h, buf, sizeof(buf));
529  if (ret == AVERROR_EOF)
530  break;
531  else if (ret == 0)
532  break;
533  else if (ret < 0) {
534  printf("read-error: %d at %"PRId64"\n", ret, ffurl_seek(h, 0, SEEK_CUR));
535  goto fail;
536  } else {
537  for (i = 0; i < ret; ++i) {
538  if (buf[i] != (pos & 0xFF)) {
539  printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n",
540  (int)buf[i], (int)(pos & 0xFF), pos);
541  break;
542  }
543  pos++;
544  }
545  }
546 
547  read_len += ret;
548  }
549  printf("read: %"PRId64"\n", read_len);
550 
551  ret = ffurl_read(h, buf, 1);
552  printf("read: %d\n", ret);
553 
554 fail:
555  ffurl_close(h);
556  return 0;
557 }
558 
559 #endif
Definition: async.c:48
#define NULL
Definition: coverity.c:32
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:94
int io_error
Definition: async.c:58
static int async_check_interrupt(void *arg)
Definition: async.c:74
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:153
AVOption.
Definition: opt.h:255
#define LIBAVUTIL_VERSION_INT
Definition: version.h:62
static int async_read_internal(URLContext *h, void *dest, int size, int read_complete, void(*func)(void *, void *, int))
Definition: async.c:243
int is_streamed
true if streamed (no seek possible), default = false
Definition: url.h:46
AVIOInterruptCB interrupt_callback
Definition: url.h:48
#define AVIO_FLAG_READ
read-only
Definition: avio.h:485
pthread_mutex_t mutex
Definition: async.c:67
static const AVClass async_context_class
Definition: async.c:373
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:124
static const AVOption options[]
Definition: async.c:369
int av_fifo_generic_write(AVFifoBuffer *f, void *src, int size, int(*func)(void *, void *, int))
Feed data from a user-supplied callback to an AVFifoBuffer.
Definition: fifo.c:122
URLProtocol ff_async_protocol
Definition: async.c:380
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:72
HMTX pthread_mutex_t
Definition: os2threads.h:40
uint8_t
AVOptions.
#define AV_LOG_TRACE
Extremely verbose debugging, useful for libav* development.
Definition: log.h:202
int av_fifo_space(const AVFifoBuffer *f)
Return the amount of space in bytes in the AVFifoBuffer, that is the amount of data you can write int...
Definition: fifo.c:82
#define SHORT_SEEK_THRESHOLD
Definition: async.c:46
#define AVERROR_EOF
End of file.
Definition: error.h:55
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:131
ptrdiff_t size
Definition: opengl_enc.c:101
#define av_log(a,...)
static void * async_buffer_task(void *arg)
Definition: async.c:88
Callback for checking whether to abort blocking functions.
Definition: avio.h:50
static int64_t async_seek(URLContext *h, int64_t pos, int whence)
Definition: async.c:295
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:176
error code definitions
int(* callback)(void *)
Definition: avio.h:51
av_default_item_name
#define AVERROR(e)
Definition: error.h:43
int av_fifo_generic_read(AVFifoBuffer *f, void *dest, int buf_size, void(*func)(void *, void *, int))
Feed data from an AVFifoBuffer to a user-supplied callback.
Definition: fifo.c:177
const char * arg
Definition: jacosubdec.c:66
simple assert() macros that are a bit more flexible than ISO C assert().
URLContext * inner
Definition: async.c:50
static void fifo_do_not_copy_func(void *dest, void *src, int size)
Definition: async.c:291
#define fail()
Definition: checkasm.h:57
static int async_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
Definition: async.c:155
#define FFMIN(a, b)
Definition: common.h:81
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
Definition: error.h:119
AVIOInterruptCB interrupt_callback
Definition: async.c:71
static av_always_inline int pthread_join(pthread_t thread, void **value_ptr)
Definition: os2threads.h:80
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:87
int seek_whence
Definition: async.c:54
int abort_request
Definition: async.c:70
#define AVERROR_EXIT
Immediate exit was requested; the called function should not be restarted.
Definition: error.h:56
int seek_request
Definition: async.c:52
static av_always_inline int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
Definition: os2threads.h:64
AVS_Value src
Definition: avisynth_c.h:482
int ff_check_interrupt(AVIOInterruptCB *cb)
Check if the user has requested to interrup a blocking function associated with cb.
Definition: avio.c:600
int av_fifo_size(const AVFifoBuffer *f)
Return the amount of data in bytes in the AVFifoBuffer, that is the amount of data you can read from ...
Definition: fifo.c:77
pthread_t async_buffer_thread
Definition: async.c:68
int64_t ffurl_size(URLContext *h)
Return the filesize of the resource accessed by h, AVERROR(ENOSYS) if the operation is not supported ...
Definition: avio.c:556
a very simple circular buffer FIFO implementation
void * buf
Definition: avisynth_c.h:553
Definition: url.h:39
int seek_completed
Definition: async.c:55
Describe the class of an AVClass context structure.
Definition: log.h:67
void * priv_data
Definition: url.h:42
int(* func)(AVBPrint *dst, const char *in, const char *arg)
Definition: jacosubdec.c:67
const char * name
Definition: url.h:53
static int flags
Definition: cpu.c:47
int ffurl_close(URLContext *h)
Definition: avio.c:412
int av_strstart(const char *str, const char *pfx, const char **ptr)
Return non-zero if pfx is a prefix of str.
Definition: avstring.c:34
AVFifoBuffer * fifo
Definition: async.c:63
int ffurl_register_protocol(URLProtocol *protocol)
Register the URLProtocol protocol.
Definition: avio.c:98
int64_t ffurl_seek(URLContext *h, int64_t pos, int whence)
Change the position that will be used by the next read/write operation on the resource accessed by h...
Definition: avio.c:380
int ffurl_open(URLContext **puc, const char *filename, int flags, const AVIOInterruptCB *int_cb, AVDictionary **options)
Create an URLContext for accessing to the resource indicated by url, and open it. ...
Definition: avio.c:292
static double c[64]
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:115
#define AVSEEK_SIZE
Passing this as the "whence" parameter to a seek function causes it to return the filesize without se...
Definition: avio.h:364
pthread_cond_t cond_wakeup_main
Definition: async.c:65
size_t logical_size
Definition: async.c:62
size_t seek_pos
Definition: async.c:53
int64_t seek_ret
Definition: async.c:56
AVFifoBuffer * av_fifo_alloc(unsigned int size)
Initialize an AVFifoBuffer.
Definition: fifo.c:43
static int async_close(URLContext *h)
Definition: async.c:220
static av_always_inline int pthread_mutex_unlock(pthread_mutex_t *mutex)
Definition: os2threads.h:108
#define BUFFER_CAPACITY
support timeout support backward short seek support work with concatdec, hls
Definition: async.c:45
int io_eof_reached
Definition: async.c:59
size_t logical_pos
Definition: async.c:61
unbuffered private I/O API
void av_fifo_freep(AVFifoBuffer **f)
Free an AVFifoBuffer and reset pointer to NULL.
Definition: fifo.c:63
static av_always_inline int pthread_mutex_lock(pthread_mutex_t *mutex)
Definition: os2threads.h:101
int main(int argc, char **argv)
Definition: main.c:22
void av_fifo_reset(AVFifoBuffer *f)
Reset the AVFifoBuffer to the state right after av_fifo_alloc, in particular it is emptied...
Definition: fifo.c:71
static int async_read(URLContext *h, unsigned char *buf, int size)
Definition: async.c:286
int ffurl_read(URLContext *h, unsigned char *buf, int size)
Read up to size bytes from the resource accessed by h, and store the read bytes in buf...
Definition: avio.c:353
pthread_cond_t cond_wakeup_background
Definition: async.c:66