27 #define _DEFAULT_SOURCE 
   47 #include "TargetConditionals.h" 
   56 #define UDPLITE_SEND_CSCOV                               10 
   57 #define UDPLITE_RECV_CSCOV                               11 
   60 #ifndef IPPROTO_UDPLITE 
   61 #define IPPROTO_UDPLITE                                  136 
   65 #undef HAVE_PTHREAD_CANCEL 
   66 #define HAVE_PTHREAD_CANCEL 1 
   69 #if HAVE_PTHREAD_CANCEL 
   73 #ifndef IPV6_ADD_MEMBERSHIP 
   74 #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP 
   75 #define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP 
   78 #define UDP_TX_BUF_SIZE 32768 
   79 #define UDP_RX_BUF_SIZE 393216 
   80 #define UDP_MAX_PKT_SIZE 65536 
   81 #define UDP_HEADER_SIZE 8 
  106 #if HAVE_PTHREAD_CANCEL 
  122 #define OFFSET(x) offsetof(UDPContext, x) 
  123 #define D AV_OPT_FLAG_DECODING_PARAM 
  124 #define E AV_OPT_FLAG_ENCODING_PARAM 
  126     { 
"buffer_size",    
"System data size (in bytes)",                     
OFFSET(buffer_size),    
AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, .flags = 
D|
E },
 
  128     { 
"burst_bits",     
"Max length of bursts in bits (when using bitrate)", 
OFFSET(burst_bits),   
AV_OPT_TYPE_INT64,  { .i64 = 0  },     0, INT64_MAX, .flags = 
E },
 
  132     { 
"udplite_coverage", 
"choose UDPLite head size which should be validated by checksum", 
OFFSET(udplite_coverage), 
AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, 
D|
E },
 
  133     { 
"pkt_size",       
"Maximum UDP packet size",                         
OFFSET(pkt_size),       
AV_OPT_TYPE_INT,    { .i64 = 1472 },  -1, INT_MAX, .flags = 
D|
E },
 
  134     { 
"reuse",          
"explicitly allow reusing UDP sockets",            
OFFSET(reuse_socket),   
AV_OPT_TYPE_BOOL,   { .i64 = -1 },    -1, 1,       
D|
E },
 
  135     { 
"reuse_socket",   
"explicitly allow reusing UDP sockets",            
OFFSET(reuse_socket),   
AV_OPT_TYPE_BOOL,   { .i64 = -1 },    -1, 1,       .flags = 
D|
E },
 
  136     { 
"broadcast", 
"explicitly allow or disallow broadcast destination",   
OFFSET(is_broadcast),   
AV_OPT_TYPE_BOOL,   { .i64 = 0  },     0, 1,       
E },
 
  138     { 
"connect",        
"set if connect() should be called on socket",     
OFFSET(is_connected),   
AV_OPT_TYPE_BOOL,   { .i64 =  0 },     0, 1,       .flags = 
D|
E },
 
  139     { 
"fifo_size",      
"set the UDP receiving circular buffer size, expressed as a number of packets with size of 188 bytes", 
OFFSET(circular_buffer_size), 
AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, 
D },
 
  140     { 
"overrun_nonfatal", 
"survive in case of UDP receiving circular buffer overrun", 
OFFSET(overrun_nonfatal), 
AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1,    
D },
 
  141     { 
"timeout",        
"set raise error timeout (only in read mode)",     
OFFSET(timeout),        
AV_OPT_TYPE_INT,    { .i64 = 0 },      0, INT_MAX, 
D },
 
  162                                  struct sockaddr *addr)
 
  164 #ifdef IP_MULTICAST_TTL 
  165     if (addr->sa_family == AF_INET) {
 
  166         if (setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_TTL, &mcastTTL, 
sizeof(mcastTTL)) < 0) {
 
  172 #if defined(IPPROTO_IPV6) && defined(IPV6_MULTICAST_HOPS) 
  173     if (addr->sa_family == AF_INET6) {
 
  174         if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &mcastTTL, 
sizeof(mcastTTL)) < 0) {
 
  185 #ifdef IP_ADD_MEMBERSHIP 
  186     if (addr->sa_family == AF_INET) {
 
  189         mreq.imr_multiaddr.s_addr = ((
struct sockaddr_in *)addr)->sin_addr.s_addr;
 
  191             mreq.imr_interface= ((
struct sockaddr_in *)local_addr)->sin_addr;
 
  193             mreq.imr_interface.s_addr= INADDR_ANY;
 
  194         if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (
const void *)&mreq, 
sizeof(mreq)) < 0) {
 
  200 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6) 
  201     if (addr->sa_family == AF_INET6) {
 
  202         struct ipv6_mreq mreq6;
 
  204         memcpy(&mreq6.ipv6mr_multiaddr, &(((
struct sockaddr_in6 *)addr)->sin6_addr), 
sizeof(
struct in6_addr));
 
  206         mreq6.ipv6mr_interface= 0;
 
  218 #ifdef IP_DROP_MEMBERSHIP 
  219     if (addr->sa_family == AF_INET) {
 
  222         mreq.imr_multiaddr.s_addr = ((
struct sockaddr_in *)addr)->sin_addr.s_addr;
 
  224             mreq.imr_interface= ((
struct sockaddr_in *)local_addr)->sin_addr;
 
  226             mreq.imr_interface.s_addr= INADDR_ANY;
 
  227         if (setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (
const void *)&mreq, 
sizeof(mreq)) < 0) {
 
  233 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6) 
  234     if (addr->sa_family == AF_INET6) {
 
  235         struct ipv6_mreq mreq6;
 
  237         memcpy(&mreq6.ipv6mr_multiaddr, &(((
struct sockaddr_in6 *)addr)->sin6_addr), 
sizeof(
struct in6_addr));
 
  239         mreq6.ipv6mr_interface= 0;
 
  250                                      int sockfd, 
struct sockaddr *addr,
 
  253                                      int nb_sources, 
int include)
 
  256     if (addr->sa_family != AF_INET) {
 
  257 #if HAVE_STRUCT_GROUP_SOURCE_REQ && defined(MCAST_BLOCK_SOURCE) 
  262         for (
i = 0; 
i < nb_sources; 
i++) {
 
  263             struct group_source_req mreqs;
 
  264             int level = addr->sa_family == AF_INET ? IPPROTO_IP : IPPROTO_IPV6;
 
  267             mreqs.gsr_interface = 0;
 
  268             memcpy(&mreqs.gsr_group, addr, addr_len);
 
  271             if (setsockopt(sockfd, 
level,
 
  272                            include ? MCAST_JOIN_SOURCE_GROUP : MCAST_BLOCK_SOURCE,
 
  273                            (
const void *)&mreqs, 
sizeof(mreqs)) < 0) {
 
  284                "Setting multicast sources only supported for IPv4\n");
 
  288 #if HAVE_STRUCT_IP_MREQ_SOURCE && defined(IP_BLOCK_SOURCE) 
  289     for (
i = 0; 
i < nb_sources; 
i++) {
 
  290         struct ip_mreq_source mreqs;
 
  291         if (
sources[
i].ss_family != AF_INET) {
 
  296         mreqs.imr_multiaddr.s_addr = ((
struct sockaddr_in *)addr)->sin_addr.s_addr;
 
  298             mreqs.imr_interface= ((
struct sockaddr_in *)local_addr)->sin_addr;
 
  300             mreqs.imr_interface.s_addr= INADDR_ANY;
 
  301         mreqs.imr_sourceaddr.s_addr = ((
struct sockaddr_in *)&
sources[
i])->sin_addr.s_addr;
 
  303         if (setsockopt(sockfd, IPPROTO_IP,
 
  304                        include ? IP_ADD_SOURCE_MEMBERSHIP : IP_BLOCK_SOURCE,
 
  305                        (
const void *)&mreqs, 
sizeof(mreqs)) < 0) {
 
  320                        const char *hostname, 
int port)
 
  326     if (!res0) 
return AVERROR(EIO);
 
  335                              socklen_t *addr_len, 
const char *localaddr)
 
  340     int family = AF_UNSPEC;
 
  342     if (((
struct sockaddr *) &
s->dest_addr)->sa_family)
 
  343         family = ((
struct sockaddr *) &
s->dest_addr)->sa_family;
 
  349     for (res = res0; res; res=res->
ai_next) {
 
  350         if (
s->udplite_coverage)
 
  354         if (udp_fd != -1) 
break;
 
  378     char sbuf[
sizeof(
int)*3+1];
 
  386     return strtol(sbuf, 
NULL, 10);
 
  409     char hostname[256], buf[10];
 
  417     if (
s->dest_addr_len < 0) {
 
  421     p = strchr(uri, 
'?');
 
  424             int was_connected = 
s->is_connected;
 
  425             s->is_connected = strtol(buf, 
NULL, 10);
 
  426             if (
s->is_connected && !was_connected) {
 
  427                 if (connect(
s->udp_fd, (
struct sockaddr *) &
s->dest_addr,
 
  448     return s->local_port;
 
  462 #if HAVE_PTHREAD_CANCEL 
  463 static void *circular_buffer_task_rx( 
void *_URLContext)
 
  473         s->circular_buffer_error = 
AVERROR(EIO);
 
  479         socklen_t addr_len = 
sizeof(addr);
 
  486         len = recvfrom(
s->udp_fd, 
s->tmp+4, 
sizeof(
s->tmp)-4, 0, (
struct sockaddr *)&addr, &addr_len);
 
  502             if (
s->overrun_nonfatal) {
 
  504                         "Surviving due to overrun_nonfatal option\n");
 
  508                         "To avoid, increase fifo_size URL option. " 
  509                         "To survive in such case, use overrun_nonfatal option\n");
 
  510                 s->circular_buffer_error = 
AVERROR(EIO);
 
  524 static void *circular_buffer_task_tx( 
void *_URLContext)
 
  530     int64_t sent_bits = 0;
 
  531     int64_t burst_interval = 
s->bitrate ? (
s->burst_bits * 1000000 / 
s->bitrate) : 0;
 
  532     int64_t max_delay = 
s->bitrate ?  ((int64_t)
h->max_packet_size * 8 * 1000000 / 
s->bitrate + 1) : 0;
 
  538         s->circular_buffer_error = 
AVERROR(EIO);
 
  571             if (timestamp < target_timestamp) {
 
  572                 int64_t delay = target_timestamp - timestamp;
 
  573                 if (delay > max_delay) {
 
  575                     start_timestamp = timestamp + delay;
 
  580                 if (timestamp - burst_interval > target_timestamp) {
 
  581                     start_timestamp = timestamp - burst_interval;
 
  585             sent_bits += 
len * 8;
 
  586             target_timestamp = start_timestamp + sent_bits * 1000000 / 
s->bitrate;
 
  593             if (!
s->is_connected) {
 
  594                 ret = sendto (
s->udp_fd, p, 
len, 0,
 
  595                             (
struct sockaddr *) &
s->dest_addr,
 
  598                 ret = send(
s->udp_fd, p, 
len, 0);
 
  606                     s->circular_buffer_error = 
ret;
 
  628     char hostname[1024], localaddr[1024] = 
"";
 
  629     int port, udp_fd = -1, 
tmp, bind_ret = -1, dscp = -1;
 
  640     if (
s->buffer_size < 0)
 
  654         h->max_packet_size = 
s->pkt_size;
 
  656     p = strchr(uri, 
'?');
 
  660             s->reuse_socket = strtol(buf, &endptr, 10);
 
  667             s->overrun_nonfatal = strtol(buf, &endptr, 10);
 
  670                 s->overrun_nonfatal = 1;
 
  671             if (!HAVE_PTHREAD_CANCEL)
 
  673                        "'overrun_nonfatal' option was set but it is not supported " 
  674                        "on this build (pthread support is required)\n");
 
  677             s->ttl = strtol(buf, 
NULL, 10);
 
  680             s->udplite_coverage = strtol(buf, 
NULL, 10);
 
  683             s->local_port = strtol(buf, 
NULL, 10);
 
  686             s->pkt_size = strtol(buf, 
NULL, 10);
 
  689             s->buffer_size = strtol(buf, 
NULL, 10);
 
  692             s->is_connected = strtol(buf, 
NULL, 10);
 
  695             dscp = strtol(buf, 
NULL, 10);
 
  698             s->circular_buffer_size = strtol(buf, 
NULL, 10);
 
  699             if (!HAVE_PTHREAD_CANCEL)
 
  701                        "'circular_buffer_size' option was set but it is not supported " 
  702                        "on this build (pthread support is required)\n");
 
  705             s->bitrate = strtoll(buf, 
NULL, 10);
 
  706             if (!HAVE_PTHREAD_CANCEL)
 
  708                        "'bitrate' option was set but it is not supported " 
  709                        "on this build (pthread support is required)\n");
 
  712             s->burst_bits = strtoll(buf, 
NULL, 10);
 
  715             av_strlcpy(localaddr, buf, 
sizeof(localaddr));
 
  726             s->timeout = strtol(buf, 
NULL, 10);
 
  728             s->is_broadcast = strtol(buf, 
NULL, 10);
 
  731     s->circular_buffer_size *= 188;
 
  733         h->max_packet_size = 
s->pkt_size;
 
  737     h->rw_timeout = 
s->timeout;
 
  743     if (hostname[0] == 
'\0' || hostname[0] == 
'?') {
 
  753         s->local_port = port;
 
  762     s->local_addr_storage=my_addr; 
 
  767     if (
s->reuse_socket > 0 || (
s->is_multicast && 
s->reuse_socket < 0)) {
 
  769         if (setsockopt (udp_fd, SOL_SOCKET, SO_REUSEADDR, &(
s->reuse_socket), 
sizeof(
s->reuse_socket)) != 0)
 
  773     if (
s->is_broadcast) {
 
  775         if (setsockopt (udp_fd, SOL_SOCKET, SO_BROADCAST, &(
s->is_broadcast), 
sizeof(
s->is_broadcast)) != 0)
 
  784     if (
s->udplite_coverage) {
 
  794         if (setsockopt (udp_fd, IPPROTO_IP, IP_TOS, &dscp, 
sizeof(dscp)) != 0)
 
  803         bind_ret = bind(udp_fd,(
struct sockaddr *)&
s->dest_addr, 
len);
 
  808     if (bind_ret < 0 && bind(udp_fd,(
struct sockaddr *)&my_addr, 
len) < 0) {
 
  813     len = 
sizeof(my_addr);
 
  814     getsockname(udp_fd, (
struct sockaddr *)&my_addr, &
len);
 
  817     if (
s->is_multicast) {
 
  825             if (
s->filters.nb_include_addrs) {
 
  827                                               (
struct sockaddr *)&
s->dest_addr,
 
  828                                               s->dest_addr_len, &
s->local_addr_storage,
 
  829                                               s->filters.include_addrs,
 
  830                                               s->filters.nb_include_addrs, 1) < 0)
 
  836             if (
s->filters.nb_exclude_addrs) {
 
  838                                               (
struct sockaddr *)&
s->dest_addr,
 
  839                                               s->dest_addr_len, &
s->local_addr_storage,
 
  840                                               s->filters.exclude_addrs,
 
  841                                               s->filters.nb_exclude_addrs, 0) < 0)
 
  849         tmp = 
s->buffer_size;
 
  850         if (setsockopt(udp_fd, SOL_SOCKET, SO_SNDBUF, &
tmp, 
sizeof(
tmp)) < 0) {
 
  856         tmp = 
s->buffer_size;
 
  857         if (setsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &
tmp, 
sizeof(
tmp)) < 0) {
 
  861         if (getsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &
tmp, &
len) < 0) {
 
  865             if(tmp < s->buffer_size)
 
  866                 av_log(
h, 
AV_LOG_WARNING, 
"attempted to set receive buffer to size %d but it only ended up set as %d\n", 
s->buffer_size, 
tmp);
 
  872     if (
s->is_connected) {
 
  873         if (connect(udp_fd, (
struct sockaddr *) &
s->dest_addr, 
s->dest_addr_len)) {
 
  881 #if HAVE_PTHREAD_CANCEL 
  888     if (is_output && 
s->bitrate && !
s->circular_buffer_size) {
 
  890         av_log(
h, 
AV_LOG_WARNING,
"'bitrate' option was set but 'circular_buffer_size' is not, but required\n");
 
  893     if ((!is_output && 
s->circular_buffer_size) || (is_output && 
s->bitrate && 
s->circular_buffer_size)) {
 
  908         ret = 
pthread_create(&
s->circular_buffer_thread, 
NULL, is_output?circular_buffer_task_tx:circular_buffer_task_rx, 
h);
 
  913         s->thread_started = 1;
 
  918 #if HAVE_PTHREAD_CANCEL 
  947     socklen_t addr_len = 
sizeof(addr);
 
  948 #if HAVE_PTHREAD_CANCEL 
  969             } 
else if(
s->circular_buffer_error){
 
  970                 int err = 
s->circular_buffer_error;
 
  973             } 
else if(nonblock) {
 
  981                 struct timespec tv = { .tv_sec  =  t / 1000000,
 
  982                                        .tv_nsec = (t % 1000000) * 1000 };
 
  986                     return AVERROR(err == ETIMEDOUT ? EAGAIN : err);
 
  999     ret = recvfrom(
s->udp_fd, buf, 
size, 0, (
struct sockaddr *)&addr, &addr_len);
 
 1012 #if HAVE_PTHREAD_CANCEL 
 1022         if (
s->circular_buffer_error<0) {
 
 1023             int err=
s->circular_buffer_error;
 
 1047     if (!
s->is_connected) {
 
 1048         ret = sendto (
s->udp_fd, buf, 
size, 0,
 
 1049                       (
struct sockaddr *) &
s->dest_addr,
 
 1052         ret = send(
s->udp_fd, buf, 
size, 0);
 
 1061 #if HAVE_PTHREAD_CANCEL 
 1073 #if HAVE_PTHREAD_CANCEL 
 1074     if (
s->thread_started) {
 
 1082             shutdown(
s->udp_fd, SD_RECEIVE);
 
 1083             CancelIoEx((HANDLE)(SOCKET)
s->udp_fd, 
NULL);
 
 1085             pthread_cancel(
s->circular_buffer_thread);
 
 1095     closesocket(
s->udp_fd);