00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00027 #define _BSD_SOURCE
00028
00029 #include "avformat.h"
00030 #include "avio_internal.h"
00031 #include "libavutil/parseutils.h"
00032 #include "libavutil/fifo.h"
00033 #include "libavutil/intreadwrite.h"
00034 #include "libavutil/avstring.h"
00035 #include "libavutil/opt.h"
00036 #include "libavutil/log.h"
00037 #include "internal.h"
00038 #include "network.h"
00039 #include "os_support.h"
00040 #include "url.h"
00041
00042 #if HAVE_PTHREAD_CANCEL
00043 #include <pthread.h>
00044 #endif
00045
00046 #ifndef HAVE_PTHREAD_CANCEL
00047 #define HAVE_PTHREAD_CANCEL 0
00048 #endif
00049
00050 #ifndef IPV6_ADD_MEMBERSHIP
00051 #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
00052 #define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
00053 #endif
00054
00055 #define UDP_TX_BUF_SIZE 32768
00056 #define UDP_MAX_PKT_SIZE 65536
00057
00058 typedef struct {
00059 const AVClass *class;
00060 int udp_fd;
00061 int ttl;
00062 int buffer_size;
00063 int is_multicast;
00064 int local_port;
00065 int reuse_socket;
00066 int overrun_nonfatal;
00067 struct sockaddr_storage dest_addr;
00068 int dest_addr_len;
00069 int is_connected;
00070
00071
00072 int circular_buffer_size;
00073 AVFifoBuffer *fifo;
00074 int circular_buffer_error;
00075 #if HAVE_PTHREAD_CANCEL
00076 pthread_t circular_buffer_thread;
00077 pthread_mutex_t mutex;
00078 pthread_cond_t cond;
00079 int thread_started;
00080 #endif
00081 uint8_t tmp[UDP_MAX_PKT_SIZE+4];
00082 int remaining_in_dg;
00083 char *local_addr;
00084 int packet_size;
00085 int timeout;
00086 } UDPContext;
00087
00088 #define OFFSET(x) offsetof(UDPContext, x)
00089 #define D AV_OPT_FLAG_DECODING_PARAM
00090 #define E AV_OPT_FLAG_ENCODING_PARAM
00091 static const AVOption options[] = {
00092 {"buffer_size", "Socket buffer size in bytes", OFFSET(buffer_size), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D|E },
00093 {"localport", "Set local port to bind to", OFFSET(local_port), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D|E },
00094 {"localaddr", "Choose local IP address", OFFSET(local_addr), AV_OPT_TYPE_STRING, {.str = ""}, 0, 0, D|E },
00095 {"pkt_size", "Set size of UDP packets", OFFSET(packet_size), AV_OPT_TYPE_INT, {.i64 = 1472}, 0, INT_MAX, D|E },
00096 {"reuse", "Explicitly allow or disallow reusing UDP sockets", OFFSET(reuse_socket), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, D|E },
00097 {"ttl", "Set the time to live value (for multicast only)", OFFSET(ttl), AV_OPT_TYPE_INT, {.i64 = 16}, 0, INT_MAX, E },
00098 {"connect", "Should connect() be called on socket", OFFSET(is_connected), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, D|E },
00099
00100 {"fifo_size", "Set the UDP receiving circular buffer size, expressed as a number of packets with size of 188 bytes", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D },
00101 {"overrun_nonfatal", "Survive in case of UDP receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, D },
00102 {"timeout", "In read mode: if no data arrived in more than this time interval, raise error", OFFSET(timeout), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D },
00103 {NULL}
00104 };
00105
00106 static const AVClass udp_context_class = {
00107 .class_name = "udp",
00108 .item_name = av_default_item_name,
00109 .option = options,
00110 .version = LIBAVUTIL_VERSION_INT,
00111 };
00112
00113 static void log_net_error(void *ctx, int level, const char* prefix)
00114 {
00115 char errbuf[100];
00116 av_strerror(ff_neterrno(), errbuf, sizeof(errbuf));
00117 av_log(ctx, level, "%s: %s\n", prefix, errbuf);
00118 }
00119
00120 static int udp_set_multicast_ttl(int sockfd, int mcastTTL,
00121 struct sockaddr *addr)
00122 {
00123 #ifdef IP_MULTICAST_TTL
00124 if (addr->sa_family == AF_INET) {
00125 if (setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_TTL, &mcastTTL, sizeof(mcastTTL)) < 0) {
00126 log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_MULTICAST_TTL)");
00127 return -1;
00128 }
00129 }
00130 #endif
00131 #if defined(IPPROTO_IPV6) && defined(IPV6_MULTICAST_HOPS)
00132 if (addr->sa_family == AF_INET6) {
00133 if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &mcastTTL, sizeof(mcastTTL)) < 0) {
00134 log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_MULTICAST_HOPS)");
00135 return -1;
00136 }
00137 }
00138 #endif
00139 return 0;
00140 }
00141
00142 static int udp_join_multicast_group(int sockfd, struct sockaddr *addr)
00143 {
00144 #ifdef IP_ADD_MEMBERSHIP
00145 if (addr->sa_family == AF_INET) {
00146 struct ip_mreq mreq;
00147
00148 mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
00149 mreq.imr_interface.s_addr= INADDR_ANY;
00150 if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
00151 log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_MEMBERSHIP)");
00152 return -1;
00153 }
00154 }
00155 #endif
00156 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
00157 if (addr->sa_family == AF_INET6) {
00158 struct ipv6_mreq mreq6;
00159
00160 memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
00161 mreq6.ipv6mr_interface= 0;
00162 if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
00163 log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_ADD_MEMBERSHIP)");
00164 return -1;
00165 }
00166 }
00167 #endif
00168 return 0;
00169 }
00170
00171 static int udp_leave_multicast_group(int sockfd, struct sockaddr *addr)
00172 {
00173 #ifdef IP_DROP_MEMBERSHIP
00174 if (addr->sa_family == AF_INET) {
00175 struct ip_mreq mreq;
00176
00177 mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
00178 mreq.imr_interface.s_addr= INADDR_ANY;
00179 if (setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
00180 log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_DROP_MEMBERSHIP)");
00181 return -1;
00182 }
00183 }
00184 #endif
00185 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
00186 if (addr->sa_family == AF_INET6) {
00187 struct ipv6_mreq mreq6;
00188
00189 memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
00190 mreq6.ipv6mr_interface= 0;
00191 if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
00192 log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_DROP_MEMBERSHIP)");
00193 return -1;
00194 }
00195 }
00196 #endif
00197 return 0;
00198 }
00199
00200 static struct addrinfo* udp_resolve_host(const char *hostname, int port,
00201 int type, int family, int flags)
00202 {
00203 struct addrinfo hints = { 0 }, *res = 0;
00204 int error;
00205 char sport[16];
00206 const char *node = 0, *service = "0";
00207
00208 if (port > 0) {
00209 snprintf(sport, sizeof(sport), "%d", port);
00210 service = sport;
00211 }
00212 if ((hostname) && (hostname[0] != '\0') && (hostname[0] != '?')) {
00213 node = hostname;
00214 }
00215 hints.ai_socktype = type;
00216 hints.ai_family = family;
00217 hints.ai_flags = flags;
00218 if ((error = getaddrinfo(node, service, &hints, &res))) {
00219 res = NULL;
00220 av_log(NULL, AV_LOG_ERROR, "udp_resolve_host: %s\n", gai_strerror(error));
00221 }
00222
00223 return res;
00224 }
00225
00226 static int udp_set_multicast_sources(int sockfd, struct sockaddr *addr,
00227 int addr_len, char **sources,
00228 int nb_sources, int include)
00229 {
00230 #if HAVE_STRUCT_GROUP_SOURCE_REQ && defined(MCAST_BLOCK_SOURCE) && !defined(_WIN32)
00231
00232
00233 int i;
00234 for (i = 0; i < nb_sources; i++) {
00235 struct group_source_req mreqs;
00236 int level = addr->sa_family == AF_INET ? IPPROTO_IP : IPPROTO_IPV6;
00237 struct addrinfo *sourceaddr = udp_resolve_host(sources[i], 0,
00238 SOCK_DGRAM, AF_UNSPEC,
00239 AI_NUMERICHOST);
00240 if (!sourceaddr)
00241 return AVERROR(ENOENT);
00242
00243 mreqs.gsr_interface = 0;
00244 memcpy(&mreqs.gsr_group, addr, addr_len);
00245 memcpy(&mreqs.gsr_source, sourceaddr->ai_addr, sourceaddr->ai_addrlen);
00246 freeaddrinfo(sourceaddr);
00247
00248 if (setsockopt(sockfd, level,
00249 include ? MCAST_JOIN_SOURCE_GROUP : MCAST_BLOCK_SOURCE,
00250 (const void *)&mreqs, sizeof(mreqs)) < 0) {
00251 if (include)
00252 log_net_error(NULL, AV_LOG_ERROR, "setsockopt(MCAST_JOIN_SOURCE_GROUP)");
00253 else
00254 log_net_error(NULL, AV_LOG_ERROR, "setsockopt(MCAST_BLOCK_SOURCE)");
00255 return ff_neterrno();
00256 }
00257 }
00258 #elif HAVE_STRUCT_IP_MREQ_SOURCE && defined(IP_BLOCK_SOURCE)
00259 int i;
00260 if (addr->sa_family != AF_INET) {
00261 av_log(NULL, AV_LOG_ERROR,
00262 "Setting multicast sources only supported for IPv4\n");
00263 return AVERROR(EINVAL);
00264 }
00265 for (i = 0; i < nb_sources; i++) {
00266 struct ip_mreq_source mreqs;
00267 struct addrinfo *sourceaddr = udp_resolve_host(sources[i], 0,
00268 SOCK_DGRAM, AF_UNSPEC,
00269 AI_NUMERICHOST);
00270 if (!sourceaddr)
00271 return AVERROR(ENOENT);
00272 if (sourceaddr->ai_addr->sa_family != AF_INET) {
00273 freeaddrinfo(sourceaddr);
00274 av_log(NULL, AV_LOG_ERROR, "%s is of incorrect protocol family\n",
00275 sources[i]);
00276 return AVERROR(EINVAL);
00277 }
00278
00279 mreqs.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
00280 mreqs.imr_interface.s_addr = INADDR_ANY;
00281 mreqs.imr_sourceaddr.s_addr = ((struct sockaddr_in *)sourceaddr->ai_addr)->sin_addr.s_addr;
00282 freeaddrinfo(sourceaddr);
00283
00284 if (setsockopt(sockfd, IPPROTO_IP,
00285 include ? IP_ADD_SOURCE_MEMBERSHIP : IP_BLOCK_SOURCE,
00286 (const void *)&mreqs, sizeof(mreqs)) < 0) {
00287 if (include)
00288 log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_SOURCE_MEMBERSHIP)");
00289 else
00290 log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_BLOCK_SOURCE)");
00291 return ff_neterrno();
00292 }
00293 }
00294 #else
00295 return AVERROR(ENOSYS);
00296 #endif
00297 return 0;
00298 }
00299 static int udp_set_url(struct sockaddr_storage *addr,
00300 const char *hostname, int port)
00301 {
00302 struct addrinfo *res0;
00303 int addr_len;
00304
00305 res0 = udp_resolve_host(hostname, port, SOCK_DGRAM, AF_UNSPEC, 0);
00306 if (res0 == 0) return AVERROR(EIO);
00307 memcpy(addr, res0->ai_addr, res0->ai_addrlen);
00308 addr_len = res0->ai_addrlen;
00309 freeaddrinfo(res0);
00310
00311 return addr_len;
00312 }
00313
00314 static int udp_socket_create(UDPContext *s, struct sockaddr_storage *addr,
00315 int *addr_len, const char *localaddr)
00316 {
00317 int udp_fd = -1;
00318 struct addrinfo *res0 = NULL, *res = NULL;
00319 int family = AF_UNSPEC;
00320
00321 if (((struct sockaddr *) &s->dest_addr)->sa_family)
00322 family = ((struct sockaddr *) &s->dest_addr)->sa_family;
00323 res0 = udp_resolve_host(localaddr[0] ? localaddr : NULL, s->local_port,
00324 SOCK_DGRAM, family, AI_PASSIVE);
00325 if (res0 == 0)
00326 goto fail;
00327 for (res = res0; res; res=res->ai_next) {
00328 udp_fd = socket(res->ai_family, SOCK_DGRAM, 0);
00329 if (udp_fd != -1) break;
00330 log_net_error(NULL, AV_LOG_ERROR, "socket");
00331 }
00332
00333 if (udp_fd < 0)
00334 goto fail;
00335
00336 memcpy(addr, res->ai_addr, res->ai_addrlen);
00337 *addr_len = res->ai_addrlen;
00338
00339 freeaddrinfo(res0);
00340
00341 return udp_fd;
00342
00343 fail:
00344 if (udp_fd >= 0)
00345 closesocket(udp_fd);
00346 if(res0)
00347 freeaddrinfo(res0);
00348 return -1;
00349 }
00350
00351 static int udp_port(struct sockaddr_storage *addr, int addr_len)
00352 {
00353 char sbuf[sizeof(int)*3+1];
00354 int error;
00355
00356 if ((error = getnameinfo((struct sockaddr *)addr, addr_len, NULL, 0, sbuf, sizeof(sbuf), NI_NUMERICSERV)) != 0) {
00357 av_log(NULL, AV_LOG_ERROR, "getnameinfo: %s\n", gai_strerror(error));
00358 return -1;
00359 }
00360
00361 return strtol(sbuf, NULL, 10);
00362 }
00363
00364
00381 int ff_udp_set_remote_url(URLContext *h, const char *uri)
00382 {
00383 UDPContext *s = h->priv_data;
00384 char hostname[256], buf[10];
00385 int port;
00386 const char *p;
00387
00388 av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
00389
00390
00391 s->dest_addr_len = udp_set_url(&s->dest_addr, hostname, port);
00392 if (s->dest_addr_len < 0) {
00393 return AVERROR(EIO);
00394 }
00395 s->is_multicast = ff_is_multicast_address((struct sockaddr*) &s->dest_addr);
00396 p = strchr(uri, '?');
00397 if (p) {
00398 if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
00399 int was_connected = s->is_connected;
00400 s->is_connected = strtol(buf, NULL, 10);
00401 if (s->is_connected && !was_connected) {
00402 if (connect(s->udp_fd, (struct sockaddr *) &s->dest_addr,
00403 s->dest_addr_len)) {
00404 s->is_connected = 0;
00405 log_net_error(h, AV_LOG_ERROR, "connect");
00406 return AVERROR(EIO);
00407 }
00408 }
00409 }
00410 }
00411
00412 return 0;
00413 }
00414
00420 int ff_udp_get_local_port(URLContext *h)
00421 {
00422 UDPContext *s = h->priv_data;
00423 return s->local_port;
00424 }
00425
00431 static int udp_get_file_handle(URLContext *h)
00432 {
00433 UDPContext *s = h->priv_data;
00434 return s->udp_fd;
00435 }
00436
00437 #if HAVE_PTHREAD_CANCEL
00438 static void *circular_buffer_task( void *_URLContext)
00439 {
00440 URLContext *h = _URLContext;
00441 UDPContext *s = h->priv_data;
00442 int old_cancelstate;
00443
00444 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
00445 ff_socket_nonblock(s->udp_fd, 0);
00446 pthread_mutex_lock(&s->mutex);
00447 while(1) {
00448 int len;
00449
00450 pthread_mutex_unlock(&s->mutex);
00451
00452
00453
00454 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
00455 len = recv(s->udp_fd, s->tmp+4, sizeof(s->tmp)-4, 0);
00456 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
00457 pthread_mutex_lock(&s->mutex);
00458 if (len < 0) {
00459 if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) {
00460 s->circular_buffer_error = ff_neterrno();
00461 goto end;
00462 }
00463 continue;
00464 }
00465 AV_WL32(s->tmp, len);
00466
00467 if(av_fifo_space(s->fifo) < len + 4) {
00468
00469 if (s->overrun_nonfatal) {
00470 av_log(h, AV_LOG_WARNING, "Circular buffer overrun. "
00471 "Surviving due to overrun_nonfatal option\n");
00472 continue;
00473 } else {
00474 av_log(h, AV_LOG_ERROR, "Circular buffer overrun. "
00475 "To avoid, increase fifo_size URL option. "
00476 "To survive in such case, use overrun_nonfatal option\n");
00477 s->circular_buffer_error = AVERROR(EIO);
00478 goto end;
00479 }
00480 }
00481 av_fifo_generic_write(s->fifo, s->tmp, len+4, NULL);
00482 pthread_cond_signal(&s->cond);
00483 }
00484
00485 end:
00486 pthread_cond_signal(&s->cond);
00487 pthread_mutex_unlock(&s->mutex);
00488 return NULL;
00489 }
00490 #endif
00491
00492
00493
00494 static int udp_open(URLContext *h, const char *uri, int flags)
00495 {
00496 char hostname[1024], localaddr[1024] = "";
00497 int port, udp_fd = -1, tmp, bind_ret = -1;
00498 UDPContext *s = h->priv_data;
00499 int is_output;
00500 const char *p;
00501 char buf[256];
00502 struct sockaddr_storage my_addr;
00503 int len;
00504 int reuse_specified = 0;
00505 int i, include = 0, num_sources = 0;
00506 char *sources[32];
00507
00508 h->is_streamed = 1;
00509
00510 is_output = !(flags & AVIO_FLAG_READ);
00511 if (!s->buffer_size)
00512 s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_MAX_PKT_SIZE;
00513
00514 p = strchr(uri, '?');
00515 if (p) {
00516 if (av_find_info_tag(buf, sizeof(buf), "reuse", p)) {
00517 char *endptr = NULL;
00518 s->reuse_socket = strtol(buf, &endptr, 10);
00519
00520 if (buf == endptr)
00521 s->reuse_socket = 1;
00522 reuse_specified = 1;
00523 }
00524 if (av_find_info_tag(buf, sizeof(buf), "overrun_nonfatal", p)) {
00525 char *endptr = NULL;
00526 s->overrun_nonfatal = strtol(buf, &endptr, 10);
00527
00528 if (buf == endptr)
00529 s->overrun_nonfatal = 1;
00530 if (!HAVE_PTHREAD_CANCEL)
00531 av_log(h, AV_LOG_WARNING,
00532 "'overrun_nonfatal' option was set but it is not supported "
00533 "on this build (pthread support is required)\n");
00534 }
00535 if (av_find_info_tag(buf, sizeof(buf), "ttl", p)) {
00536 s->ttl = strtol(buf, NULL, 10);
00537 }
00538 if (av_find_info_tag(buf, sizeof(buf), "localport", p)) {
00539 s->local_port = strtol(buf, NULL, 10);
00540 }
00541 if (av_find_info_tag(buf, sizeof(buf), "pkt_size", p)) {
00542 s->packet_size = strtol(buf, NULL, 10);
00543 }
00544 if (av_find_info_tag(buf, sizeof(buf), "buffer_size", p)) {
00545 s->buffer_size = strtol(buf, NULL, 10);
00546 }
00547 if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
00548 s->is_connected = strtol(buf, NULL, 10);
00549 }
00550 if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) {
00551 s->circular_buffer_size = strtol(buf, NULL, 10);
00552 if (!HAVE_PTHREAD_CANCEL)
00553 av_log(h, AV_LOG_WARNING,
00554 "'circular_buffer_size' option was set but it is not supported "
00555 "on this build (pthread support is required)\n");
00556 }
00557 if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
00558 av_strlcpy(localaddr, buf, sizeof(localaddr));
00559 }
00560 if (av_find_info_tag(buf, sizeof(buf), "sources", p))
00561 include = 1;
00562 if (include || av_find_info_tag(buf, sizeof(buf), "block", p)) {
00563 char *source_start;
00564
00565 source_start = buf;
00566 while (1) {
00567 char *next = strchr(source_start, ',');
00568 if (next)
00569 *next = '\0';
00570 sources[num_sources] = av_strdup(source_start);
00571 if (!sources[num_sources])
00572 goto fail;
00573 source_start = next + 1;
00574 num_sources++;
00575 if (num_sources >= FF_ARRAY_ELEMS(sources) || !next)
00576 break;
00577 }
00578 }
00579 if (!is_output && av_find_info_tag(buf, sizeof(buf), "timeout", p))
00580 s->timeout = strtol(buf, NULL, 10);
00581 }
00582
00583 s->circular_buffer_size *= 188;
00584 h->max_packet_size = s->packet_size;
00585 h->rw_timeout = s->timeout;
00586
00587
00588 av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
00589
00590
00591 if (hostname[0] == '\0' || hostname[0] == '?') {
00592
00593 if (!(flags & AVIO_FLAG_READ))
00594 goto fail;
00595 } else {
00596 if (ff_udp_set_remote_url(h, uri) < 0)
00597 goto fail;
00598 }
00599
00600 if ((s->is_multicast || !s->local_port) && (h->flags & AVIO_FLAG_READ))
00601 s->local_port = port;
00602 udp_fd = udp_socket_create(s, &my_addr, &len, localaddr[0] ? localaddr : s->local_addr);
00603 if (udp_fd < 0)
00604 goto fail;
00605
00606
00607
00608
00609 if (s->reuse_socket || (s->is_multicast && !reuse_specified)) {
00610 s->reuse_socket = 1;
00611 if (setsockopt (udp_fd, SOL_SOCKET, SO_REUSEADDR, &(s->reuse_socket), sizeof(s->reuse_socket)) != 0)
00612 goto fail;
00613 }
00614
00615
00616
00617
00618
00619 if (s->is_multicast && !(h->flags & AVIO_FLAG_WRITE)) {
00620 bind_ret = bind(udp_fd,(struct sockaddr *)&s->dest_addr, len);
00621 }
00622
00623
00624
00625 if (bind_ret < 0 && bind(udp_fd,(struct sockaddr *)&my_addr, len) < 0) {
00626 log_net_error(h, AV_LOG_ERROR, "bind failed");
00627 goto fail;
00628 }
00629
00630 len = sizeof(my_addr);
00631 getsockname(udp_fd, (struct sockaddr *)&my_addr, &len);
00632 s->local_port = udp_port(&my_addr, len);
00633
00634 if (s->is_multicast) {
00635 if (h->flags & AVIO_FLAG_WRITE) {
00636
00637 if (udp_set_multicast_ttl(udp_fd, s->ttl, (struct sockaddr *)&s->dest_addr) < 0)
00638 goto fail;
00639 }
00640 if (h->flags & AVIO_FLAG_READ) {
00641
00642 if (num_sources == 0 || !include) {
00643 if (udp_join_multicast_group(udp_fd, (struct sockaddr *)&s->dest_addr) < 0)
00644 goto fail;
00645
00646 if (num_sources) {
00647 if (udp_set_multicast_sources(udp_fd, (struct sockaddr *)&s->dest_addr, s->dest_addr_len, sources, num_sources, 0) < 0)
00648 goto fail;
00649 }
00650 } else if (include && num_sources) {
00651 if (udp_set_multicast_sources(udp_fd, (struct sockaddr *)&s->dest_addr, s->dest_addr_len, sources, num_sources, 1) < 0)
00652 goto fail;
00653 } else {
00654 av_log(NULL, AV_LOG_ERROR, "invalid udp settings: inclusive multicast but no sources given\n");
00655 goto fail;
00656 }
00657 }
00658 }
00659
00660 if (is_output) {
00661
00662 tmp = s->buffer_size;
00663 if (setsockopt(udp_fd, SOL_SOCKET, SO_SNDBUF, &tmp, sizeof(tmp)) < 0) {
00664 log_net_error(h, AV_LOG_ERROR, "setsockopt(SO_SNDBUF)");
00665 goto fail;
00666 }
00667 } else {
00668
00669
00670 tmp = s->buffer_size;
00671 if (setsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, sizeof(tmp)) < 0) {
00672 log_net_error(h, AV_LOG_WARNING, "setsockopt(SO_RECVBUF)");
00673 }
00674
00675 ff_socket_nonblock(udp_fd, 1);
00676 }
00677 if (s->is_connected) {
00678 if (connect(udp_fd, (struct sockaddr *) &s->dest_addr, s->dest_addr_len)) {
00679 log_net_error(h, AV_LOG_ERROR, "connect");
00680 goto fail;
00681 }
00682 }
00683
00684 for (i = 0; i < num_sources; i++)
00685 av_freep(&sources[i]);
00686
00687 s->udp_fd = udp_fd;
00688
00689 #if HAVE_PTHREAD_CANCEL
00690 if (!is_output && s->circular_buffer_size) {
00691 int ret;
00692
00693
00694 s->fifo = av_fifo_alloc(s->circular_buffer_size);
00695 ret = pthread_mutex_init(&s->mutex, NULL);
00696 if (ret != 0) {
00697 av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
00698 goto fail;
00699 }
00700 ret = pthread_cond_init(&s->cond, NULL);
00701 if (ret != 0) {
00702 av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
00703 goto cond_fail;
00704 }
00705 ret = pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h);
00706 if (ret != 0) {
00707 av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
00708 goto thread_fail;
00709 }
00710 s->thread_started = 1;
00711 }
00712 #endif
00713
00714 return 0;
00715 #if HAVE_PTHREAD_CANCEL
00716 thread_fail:
00717 pthread_cond_destroy(&s->cond);
00718 cond_fail:
00719 pthread_mutex_destroy(&s->mutex);
00720 #endif
00721 fail:
00722 if (udp_fd >= 0)
00723 closesocket(udp_fd);
00724 av_fifo_free(s->fifo);
00725 for (i = 0; i < num_sources; i++)
00726 av_freep(&sources[i]);
00727 return AVERROR(EIO);
00728 }
00729
00730 static int udp_read(URLContext *h, uint8_t *buf, int size)
00731 {
00732 UDPContext *s = h->priv_data;
00733 int ret;
00734 int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK;
00735
00736 #if HAVE_PTHREAD_CANCEL
00737 if (s->fifo) {
00738 pthread_mutex_lock(&s->mutex);
00739 do {
00740 avail = av_fifo_size(s->fifo);
00741 if (avail) {
00742 uint8_t tmp[4];
00743
00744 av_fifo_generic_read(s->fifo, tmp, 4, NULL);
00745 avail= AV_RL32(tmp);
00746 if(avail > size){
00747 av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n");
00748 avail= size;
00749 }
00750
00751 av_fifo_generic_read(s->fifo, buf, avail, NULL);
00752 av_fifo_drain(s->fifo, AV_RL32(tmp) - avail);
00753 pthread_mutex_unlock(&s->mutex);
00754 return avail;
00755 } else if(s->circular_buffer_error){
00756 int err = s->circular_buffer_error;
00757 pthread_mutex_unlock(&s->mutex);
00758 return err;
00759 } else if(nonblock) {
00760 pthread_mutex_unlock(&s->mutex);
00761 return AVERROR(EAGAIN);
00762 }
00763 else {
00764
00765
00766 int64_t t = av_gettime() + 100000;
00767 struct timespec tv = { .tv_sec = t / 1000000,
00768 .tv_nsec = (t % 1000000) * 1000 };
00769 if (pthread_cond_timedwait(&s->cond, &s->mutex, &tv) < 0)
00770 return AVERROR(errno == ETIMEDOUT ? EAGAIN : errno);
00771 nonblock = 1;
00772 }
00773 } while( 1);
00774 }
00775 #endif
00776
00777 if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
00778 ret = ff_network_wait_fd(s->udp_fd, 0);
00779 if (ret < 0)
00780 return ret;
00781 }
00782 ret = recv(s->udp_fd, buf, size, 0);
00783
00784 return ret < 0 ? ff_neterrno() : ret;
00785 }
00786
00787 static int udp_write(URLContext *h, const uint8_t *buf, int size)
00788 {
00789 UDPContext *s = h->priv_data;
00790 int ret;
00791
00792 if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
00793 ret = ff_network_wait_fd(s->udp_fd, 1);
00794 if (ret < 0)
00795 return ret;
00796 }
00797
00798 if (!s->is_connected) {
00799 ret = sendto (s->udp_fd, buf, size, 0,
00800 (struct sockaddr *) &s->dest_addr,
00801 s->dest_addr_len);
00802 } else
00803 ret = send(s->udp_fd, buf, size, 0);
00804
00805 return ret < 0 ? ff_neterrno() : ret;
00806 }
00807
00808 static int udp_close(URLContext *h)
00809 {
00810 UDPContext *s = h->priv_data;
00811 int ret;
00812
00813 if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
00814 udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr);
00815 closesocket(s->udp_fd);
00816 #if HAVE_PTHREAD_CANCEL
00817 if (s->thread_started) {
00818 pthread_cancel(s->circular_buffer_thread);
00819 ret = pthread_join(s->circular_buffer_thread, NULL);
00820 if (ret != 0)
00821 av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
00822 pthread_mutex_destroy(&s->mutex);
00823 pthread_cond_destroy(&s->cond);
00824 }
00825 #endif
00826 av_fifo_free(s->fifo);
00827 return 0;
00828 }
00829
00830 URLProtocol ff_udp_protocol = {
00831 .name = "udp",
00832 .url_open = udp_open,
00833 .url_read = udp_read,
00834 .url_write = udp_write,
00835 .url_close = udp_close,
00836 .url_get_file_handle = udp_get_file_handle,
00837 .priv_data_size = sizeof(UDPContext),
00838 .priv_data_class = &udp_context_class,
00839 .flags = URL_PROTOCOL_FLAG_NETWORK,
00840 };