[FFmpeg-devel] [PATCH 2/2] avformat/udp: Replace use of pthread_cancel.

Matt Oliver protogonoi at gmail.com
Sat Dec 3 15:13:56 EET 2016


On 3 December 2016 at 23:40, Hendrik Leppkes <h.leppkes at gmail.com> wrote:

> On Sat, Dec 3, 2016 at 1:33 PM, Matt Oliver <protogonoi at gmail.com> wrote:
> >
> > I havent fully tested Hendriks idea as the msdn docs dont recommend
> calling
> > multiple winsock functions at the same time from different threads so i
> > wasnt sure about how well received a patch that relies on closesocket to
> > unblock the recv function would be received (although i have seen it
> > extensively used without issuers). If Hendrik has tested this though with
> > his local patch without issues then I would accept that as a solution to
> > fixing my issue. ping Hendrik!
>
> I don't really use UDP streaming on a regular basis, but I did test
> this approach when I build it, and it works just fine.
> What I did was basically just define pthread_cancel/setcancelstate to
> empty in udp.c (and define HAVE_PTHREAD_CANCEL in that file to enable
> the code) and re-shuffle the udp_close code to close the socket to
> unblock the thread before waiting for it to exit.
>
> I don't have a clean and finished patch to go, because I had no real
> interest in it working on other platforms, so its rather ugly. But the
> approach is rather simple.


Would something like the following work for you:

---
 libavformat/udp.c | 46 +++++++++++++++++++++++++++++-----------------
 1 file changed, 29 insertions(+), 17 deletions(-)

diff --git a/libavformat/udp.c b/libavformat/udp.c
index 3835f98..a30918b 100644
--- a/libavformat/udp.c
+++ b/libavformat/udp.c
@@ -60,14 +60,26 @@
 #define IPPROTO_UDPLITE                                  136
 #endif

-#if HAVE_PTHREAD_CANCEL
-#include <pthread.h>
-#endif
-
 #ifndef HAVE_PTHREAD_CANCEL
 #define HAVE_PTHREAD_CANCEL 0
 #endif

+#if HAVE_PTHREAD_CANCEL || (HAVE_THREADS && HAVE_WINSOCK2_H)
+#define THREAD_RXRW 1
+#else
+#define THREAD_RXRW 0
+#endif
+
+#if !HAVE_PTHREAD_CANCEL && (HAVE_THREADS && HAVE_WINSOCK2_H)
+/* Winsock2 recv function can be unblocked by shutting down and closing
the socket */
+#define pthread_setcancelstate(x, y)
+#define pthread_cancel {shutdown(s->udp_fd, SD_BOTH);
closesocket(s->udp_fd);}
+#endif
+
+#if THREAD_RXRW
+#include "libavutil/thread.h"
+#endif
+
 #ifndef IPV6_ADD_MEMBERSHIP
 #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
 #define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
@@ -100,7 +112,7 @@ typedef struct UDPContext {
     int64_t bitrate; /* number of bits to send per second */
     int64_t burst_bits;
     int close_req;
-#if HAVE_PTHREAD_CANCEL
+#if THREAD_RXRW
     pthread_t circular_buffer_thread;
     pthread_mutex_t mutex;
     pthread_cond_t cond;
@@ -495,7 +507,7 @@ static int udp_get_file_handle(URLContext *h)
     return s->udp_fd;
 }

-#if HAVE_PTHREAD_CANCEL
+#if THREAD_RXRW
 static void *circular_buffer_task_rx( void *_URLContext)
 {
     URLContext *h = _URLContext;
@@ -730,7 +742,7 @@ static int udp_open(URLContext *h, const char *uri, int
flags)
             /* assume if no digits were found it is a request to enable it
*/
             if (buf == endptr)
                 s->overrun_nonfatal = 1;
-            if (!HAVE_PTHREAD_CANCEL)
+            if (!THREAD_RXRW)
                 av_log(h, AV_LOG_WARNING,
                        "'overrun_nonfatal' option was set but it is not
supported "
                        "on this build (pthread support is required)\n");
@@ -758,14 +770,14 @@ static int udp_open(URLContext *h, const char *uri,
int flags)
         }
         if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) {
             s->circular_buffer_size = strtol(buf, NULL, 10);
-            if (!HAVE_PTHREAD_CANCEL)
+            if (!THREAD_RXRW)
                 av_log(h, AV_LOG_WARNING,
                        "'circular_buffer_size' option was set but it is
not supported "
                        "on this build (pthread support is required)\n");
         }
         if (av_find_info_tag(buf, sizeof(buf), "bitrate", p)) {
             s->bitrate = strtoll(buf, NULL, 10);
-            if (!HAVE_PTHREAD_CANCEL)
+            if (!THREAD_RXRW)
                 av_log(h, AV_LOG_WARNING,
                        "'bitrate' option was set but it is not supported "
                        "on this build (pthread support is required)\n");
@@ -951,7 +963,7 @@ static int udp_open(URLContext *h, const char *uri, int
flags)

     s->udp_fd = udp_fd;

-#if HAVE_PTHREAD_CANCEL
+#if THREAD_RXRW
     /*
       Create thread in case of:
       1. Input and circular_buffer_size is set
@@ -988,7 +1000,7 @@ static int udp_open(URLContext *h, const char *uri,
int flags)
 #endif

     return 0;
-#if HAVE_PTHREAD_CANCEL
+#if THREAD_RXRW
  thread_fail:
     pthread_cond_destroy(&s->cond);
  cond_fail:
@@ -1019,7 +1031,7 @@ static int udp_read(URLContext *h, uint8_t *buf, int
size)
 {
     UDPContext *s = h->priv_data;
     int ret;
-#if HAVE_PTHREAD_CANCEL
+#if THREAD_RXRW
     int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK;

     if (s->fifo) {
@@ -1054,9 +1066,9 @@ static int udp_read(URLContext *h, uint8_t *buf, int
size)
                 int64_t t = av_gettime() + 100000;
                 struct timespec tv = { .tv_sec  =  t / 1000000,
                                        .tv_nsec = (t % 1000000) * 1000 };
-                if (pthread_cond_timedwait(&s->cond, &s->mutex, &tv) < 0) {
+                if (ret = pthread_cond_timedwait(&s->cond, &s->mutex, &tv)
< 0) {
                     pthread_mutex_unlock(&s->mutex);
-                    return AVERROR(errno == ETIMEDOUT ? EAGAIN : errno);
+                    return AVERROR(ret == ETIMEDOUT ? EAGAIN : ret);
                 }
                 nonblock = 1;
             }
@@ -1079,7 +1091,7 @@ static int udp_write(URLContext *h, const uint8_t
*buf, int size)
     UDPContext *s = h->priv_data;
     int ret;

-#if HAVE_PTHREAD_CANCEL
+#if THREAD_RXRW
     if (s->fifo) {
         uint8_t tmp[4];

@@ -1128,7 +1140,7 @@ static int udp_close(URLContext *h)
 {
     UDPContext *s = h->priv_data;

-#if HAVE_PTHREAD_CANCEL
+#if THREAD_RXRW
     // Request close once writing is finished
     if (s->thread_started && !(h->flags & AVIO_FLAG_READ)) {
         pthread_mutex_lock(&s->mutex);
@@ -1140,7 +1152,7 @@ static int udp_close(URLContext *h)

     if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
         udp_leave_multicast_group(s->udp_fd, (struct sockaddr
*)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage);
-#if HAVE_PTHREAD_CANCEL
+#if THREAD_RXRW
     if (s->thread_started) {
         int ret;
         // Cancel only read, as write has been signaled as success to the
user
--

Ill split the patch so that pthread_cond_timedwait is separate and name it
properly if everyone is happy with it first.


More information about the ffmpeg-devel mailing list