From ebb6a7c9a929cd9cf39f022a55d7d9ca3d881403 Mon Sep 17 00:00:00 2001 From: Cameron Gutman Date: Sat, 26 Aug 2023 16:37:04 -0500 Subject: [PATCH] Specify the source address for outbound audio and video traffic (#1569) --- src/platform/common.h | 13 ++ src/platform/linux/misc.cpp | 225 ++++++++++++++++++++++++++++------ src/platform/macos/misc.mm | 129 +++++++++++++++++++ src/platform/windows/misc.cpp | 136 ++++++++++++++++++-- src/stream.cpp | 43 ++++++- 5 files changed, 492 insertions(+), 54 deletions(-) diff --git a/src/platform/common.h b/src/platform/common.h index c80cdced..651e4d13 100644 --- a/src/platform/common.h +++ b/src/platform/common.h @@ -586,10 +586,23 @@ namespace platf { std::uintptr_t native_socket; boost::asio::ip::address &target_address; uint16_t target_port; + boost::asio::ip::address &source_address; }; bool send_batch(batched_send_info_t &send_info); + struct send_info_t { + const char *buffer; + size_t size; + + std::uintptr_t native_socket; + boost::asio::ip::address &target_address; + uint16_t target_port; + boost::asio::ip::address &source_address; + }; + bool + send(send_info_t &send_info); + enum class qos_data_type_e : int { audio, video diff --git a/src/platform/linux/misc.cpp b/src/platform/linux/misc.cpp index 70b6f3ba..6c8e391f 100644 --- a/src/platform/linux/misc.cpp +++ b/src/platform/linux/misc.cpp @@ -2,6 +2,12 @@ * @file src/misc.cpp * @brief todo */ + +// Required for in6_pktinfo with glibc headers +#ifndef _GNU_SOURCE + #define _GNU_SOURCE 1 +#endif + // standard includes #include @@ -243,49 +249,102 @@ namespace platf { lifetime::exit_sunshine(0, true); } + struct sockaddr_in + to_sockaddr(boost::asio::ip::address_v4 address, uint16_t port) { + struct sockaddr_in saddr_v4 = {}; + + saddr_v4.sin_family = AF_INET; + saddr_v4.sin_port = htons(port); + + auto addr_bytes = address.to_bytes(); + memcpy(&saddr_v4.sin_addr, addr_bytes.data(), sizeof(saddr_v4.sin_addr)); + + return saddr_v4; + } + + struct sockaddr_in6 + to_sockaddr(boost::asio::ip::address_v6 address, uint16_t port) { + struct sockaddr_in6 saddr_v6 = {}; + + saddr_v6.sin6_family = AF_INET6; + saddr_v6.sin6_port = htons(port); + saddr_v6.sin6_scope_id = address.scope_id(); + + auto addr_bytes = address.to_bytes(); + memcpy(&saddr_v6.sin6_addr, addr_bytes.data(), sizeof(saddr_v6.sin6_addr)); + + return saddr_v6; + } + bool send_batch(batched_send_info_t &send_info) { auto sockfd = (int) send_info.native_socket; + struct msghdr msg = {}; // Convert the target address into a sockaddr - struct sockaddr_in saddr_v4 = {}; - struct sockaddr_in6 saddr_v6 = {}; - struct sockaddr *addr; - socklen_t addr_len; + struct sockaddr_in taddr_v4 = {}; + struct sockaddr_in6 taddr_v6 = {}; if (send_info.target_address.is_v6()) { - auto address_v6 = send_info.target_address.to_v6(); + taddr_v6 = to_sockaddr(send_info.target_address.to_v6(), send_info.target_port); - saddr_v6.sin6_family = AF_INET6; - saddr_v6.sin6_port = htons(send_info.target_port); - saddr_v6.sin6_scope_id = address_v6.scope_id(); - - auto addr_bytes = address_v6.to_bytes(); - memcpy(&saddr_v6.sin6_addr, addr_bytes.data(), sizeof(saddr_v6.sin6_addr)); - - addr = (struct sockaddr *) &saddr_v6; - addr_len = sizeof(saddr_v6); + msg.msg_name = (struct sockaddr *) &taddr_v6; + msg.msg_namelen = sizeof(taddr_v6); } else { - auto address_v4 = send_info.target_address.to_v4(); + taddr_v4 = to_sockaddr(send_info.target_address.to_v4(), send_info.target_port); - saddr_v4.sin_family = AF_INET; - saddr_v4.sin_port = htons(send_info.target_port); + msg.msg_name = (struct sockaddr *) &taddr_v4; + msg.msg_namelen = sizeof(taddr_v4); + } - auto addr_bytes = address_v4.to_bytes(); - memcpy(&saddr_v4.sin_addr, addr_bytes.data(), sizeof(saddr_v4.sin_addr)); + union { + char buf[CMSG_SPACE(sizeof(uint16_t)) + + std::max(CMSG_SPACE(sizeof(struct in_pktinfo)), CMSG_SPACE(sizeof(struct in6_pktinfo)))]; + struct cmsghdr alignment; + } cmbuf; + socklen_t cmbuflen = 0; - addr = (struct sockaddr *) &saddr_v4; - addr_len = sizeof(saddr_v4); + msg.msg_control = cmbuf.buf; + msg.msg_controllen = sizeof(cmbuf.buf); + + // The PKTINFO option will always be first, then we will conditionally + // append the UDP_SEGMENT option next if applicable. + auto pktinfo_cm = CMSG_FIRSTHDR(&msg); + if (send_info.source_address.is_v6()) { + struct in6_pktinfo pktInfo; + + struct sockaddr_in6 saddr_v6 = to_sockaddr(send_info.source_address.to_v6(), 0); + pktInfo.ipi6_addr = saddr_v6.sin6_addr; + pktInfo.ipi6_ifindex = 0; + + cmbuflen += CMSG_SPACE(sizeof(pktInfo)); + + pktinfo_cm->cmsg_level = IPPROTO_IPV6; + pktinfo_cm->cmsg_type = IPV6_PKTINFO; + pktinfo_cm->cmsg_len = CMSG_LEN(sizeof(pktInfo)); + memcpy(CMSG_DATA(pktinfo_cm), &pktInfo, sizeof(pktInfo)); + } + else { + struct in_pktinfo pktInfo; + + struct sockaddr_in saddr_v4 = to_sockaddr(send_info.source_address.to_v4(), 0); + pktInfo.ipi_spec_dst = saddr_v4.sin_addr; + pktInfo.ipi_ifindex = 0; + + cmbuflen += CMSG_SPACE(sizeof(pktInfo)); + + pktinfo_cm->cmsg_level = IPPROTO_IP; + pktinfo_cm->cmsg_type = IP_PKTINFO; + pktinfo_cm->cmsg_len = CMSG_LEN(sizeof(pktInfo)); + memcpy(CMSG_DATA(pktinfo_cm), &pktInfo, sizeof(pktInfo)); } #ifdef UDP_SEGMENT { - struct msghdr msg = {}; struct iovec iov = {}; - union { - char buf[CMSG_SPACE(sizeof(uint16_t))]; - struct cmsghdr alignment; - } cmbuf; + + msg.msg_iov = &iov; + msg.msg_iovlen = 1; // UDP GSO on Linux currently only supports sending 64K or 64 segments at a time size_t seg_index = 0; @@ -294,26 +353,19 @@ namespace platf { iov.iov_base = (void *) &send_info.buffer[seg_index * send_info.block_size]; iov.iov_len = send_info.block_size * std::min(send_info.block_count - seg_index, seg_max); - msg.msg_name = addr; - msg.msg_namelen = addr_len; - msg.msg_iov = &iov; - msg.msg_iovlen = 1; - // We should not use GSO if the data is <= one full block size if (iov.iov_len > send_info.block_size) { - msg.msg_control = cmbuf.buf; - msg.msg_controllen = CMSG_SPACE(sizeof(uint16_t)); + msg.msg_controllen = cmbuflen + CMSG_SPACE(sizeof(uint16_t)); // Enable GSO to perform segmentation of our buffer for us - auto cm = CMSG_FIRSTHDR(&msg); + auto cm = CMSG_NXTHDR(&msg, pktinfo_cm); cm->cmsg_level = SOL_UDP; cm->cmsg_type = UDP_SEGMENT; cm->cmsg_len = CMSG_LEN(sizeof(uint16_t)); *((uint16_t *) CMSG_DATA(cm)) = send_info.block_size; } else { - msg.msg_control = nullptr; - msg.msg_controllen = 0; + msg.msg_controllen = cmbuflen; } // This will fail if GSO is not available, so we will fall back to non-GSO if @@ -360,10 +412,12 @@ namespace platf { iovs[i].iov_len = send_info.block_size; msgs[i] = {}; - msgs[i].msg_hdr.msg_name = addr; - msgs[i].msg_hdr.msg_namelen = addr_len; + msgs[i].msg_hdr.msg_name = msg.msg_name; + msgs[i].msg_hdr.msg_namelen = msg.msg_namelen; msgs[i].msg_hdr.msg_iov = &iovs[i]; msgs[i].msg_hdr.msg_iovlen = 1; + msgs[i].msg_hdr.msg_control = cmbuf.buf; + msgs[i].msg_hdr.msg_controllen = cmbuflen; } // Call sendmmsg() until all messages are sent @@ -398,6 +452,101 @@ namespace platf { } } + bool + send(send_info_t &send_info) { + auto sockfd = (int) send_info.native_socket; + struct msghdr msg = {}; + + // Convert the target address into a sockaddr + struct sockaddr_in taddr_v4 = {}; + struct sockaddr_in6 taddr_v6 = {}; + if (send_info.target_address.is_v6()) { + taddr_v6 = to_sockaddr(send_info.target_address.to_v6(), send_info.target_port); + + msg.msg_name = (struct sockaddr *) &taddr_v6; + msg.msg_namelen = sizeof(taddr_v6); + } + else { + taddr_v4 = to_sockaddr(send_info.target_address.to_v4(), send_info.target_port); + + msg.msg_name = (struct sockaddr *) &taddr_v4; + msg.msg_namelen = sizeof(taddr_v4); + } + + union { + char buf[std::max(CMSG_SPACE(sizeof(struct in_pktinfo)), CMSG_SPACE(sizeof(struct in6_pktinfo)))]; + struct cmsghdr alignment; + } cmbuf; + socklen_t cmbuflen = 0; + + msg.msg_control = cmbuf.buf; + msg.msg_controllen = sizeof(cmbuf.buf); + + auto pktinfo_cm = CMSG_FIRSTHDR(&msg); + if (send_info.source_address.is_v6()) { + struct in6_pktinfo pktInfo; + + struct sockaddr_in6 saddr_v6 = to_sockaddr(send_info.source_address.to_v6(), 0); + pktInfo.ipi6_addr = saddr_v6.sin6_addr; + pktInfo.ipi6_ifindex = 0; + + cmbuflen += CMSG_SPACE(sizeof(pktInfo)); + + pktinfo_cm->cmsg_level = IPPROTO_IPV6; + pktinfo_cm->cmsg_type = IPV6_PKTINFO; + pktinfo_cm->cmsg_len = CMSG_LEN(sizeof(pktInfo)); + memcpy(CMSG_DATA(pktinfo_cm), &pktInfo, sizeof(pktInfo)); + } + else { + struct in_pktinfo pktInfo; + + struct sockaddr_in saddr_v4 = to_sockaddr(send_info.source_address.to_v4(), 0); + pktInfo.ipi_spec_dst = saddr_v4.sin_addr; + pktInfo.ipi_ifindex = 0; + + cmbuflen += CMSG_SPACE(sizeof(pktInfo)); + + pktinfo_cm->cmsg_level = IPPROTO_IP; + pktinfo_cm->cmsg_type = IP_PKTINFO; + pktinfo_cm->cmsg_len = CMSG_LEN(sizeof(pktInfo)); + memcpy(CMSG_DATA(pktinfo_cm), &pktInfo, sizeof(pktInfo)); + } + + struct iovec iov = {}; + iov.iov_base = (void *) send_info.buffer; + iov.iov_len = send_info.size; + + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + msg.msg_controllen = cmbuflen; + + auto bytes_sent = sendmsg(sockfd, &msg, 0); + + // If there's no send buffer space, wait for some to be available + while (bytes_sent < 0 && errno == EAGAIN) { + struct pollfd pfd; + + pfd.fd = sockfd; + pfd.events = POLLOUT; + + if (poll(&pfd, 1, -1) != 1) { + BOOST_LOG(warning) << "poll() failed: "sv << errno; + break; + } + + // Try to send again + bytes_sent = sendmsg(sockfd, &msg, 0); + } + + if (bytes_sent < 0) { + BOOST_LOG(warning) << "sendmsg() failed: "sv << errno; + return false; + } + + return true; + } + class qos_t: public deinit_t { public: qos_t(int sockfd, int level, int option): diff --git a/src/platform/macos/misc.mm b/src/platform/macos/misc.mm index 0f285a89..5281cc86 100644 --- a/src/platform/macos/misc.mm +++ b/src/platform/macos/misc.mm @@ -2,6 +2,12 @@ * @file src/platform/macos/misc.mm * @brief todo */ + +// Required for IPV6_PKTINFO with Darwin headers +#ifndef __APPLE_USE_RFC_3542 + #define __APPLE_USE_RFC_3542 1 +#endif + #include #include #include @@ -15,6 +21,7 @@ #include "src/main.h" #include "src/platform/common.h" +#include #include using namespace std::literals; @@ -245,12 +252,134 @@ namespace platf { lifetime::exit_sunshine(0, true); } + struct sockaddr_in + to_sockaddr(boost::asio::ip::address_v4 address, uint16_t port) { + struct sockaddr_in saddr_v4 = {}; + + saddr_v4.sin_family = AF_INET; + saddr_v4.sin_port = htons(port); + + auto addr_bytes = address.to_bytes(); + memcpy(&saddr_v4.sin_addr, addr_bytes.data(), sizeof(saddr_v4.sin_addr)); + + return saddr_v4; + } + + struct sockaddr_in6 + to_sockaddr(boost::asio::ip::address_v6 address, uint16_t port) { + struct sockaddr_in6 saddr_v6 = {}; + + saddr_v6.sin6_family = AF_INET6; + saddr_v6.sin6_port = htons(port); + saddr_v6.sin6_scope_id = address.scope_id(); + + auto addr_bytes = address.to_bytes(); + memcpy(&saddr_v6.sin6_addr, addr_bytes.data(), sizeof(saddr_v6.sin6_addr)); + + return saddr_v6; + } + bool send_batch(batched_send_info_t &send_info) { // Fall back to unbatched send calls return false; } + bool + send(send_info_t &send_info) { + auto sockfd = (int) send_info.native_socket; + struct msghdr msg = {}; + + // Convert the target address into a sockaddr + struct sockaddr_in taddr_v4 = {}; + struct sockaddr_in6 taddr_v6 = {}; + if (send_info.target_address.is_v6()) { + taddr_v6 = to_sockaddr(send_info.target_address.to_v6(), send_info.target_port); + + msg.msg_name = (struct sockaddr *) &taddr_v6; + msg.msg_namelen = sizeof(taddr_v6); + } + else { + taddr_v4 = to_sockaddr(send_info.target_address.to_v4(), send_info.target_port); + + msg.msg_name = (struct sockaddr *) &taddr_v4; + msg.msg_namelen = sizeof(taddr_v4); + } + + union { + char buf[std::max(CMSG_SPACE(sizeof(struct in_pktinfo)), CMSG_SPACE(sizeof(struct in6_pktinfo)))]; + struct cmsghdr alignment; + } cmbuf; + socklen_t cmbuflen = 0; + + msg.msg_control = cmbuf.buf; + msg.msg_controllen = sizeof(cmbuf.buf); + + auto pktinfo_cm = CMSG_FIRSTHDR(&msg); + if (send_info.source_address.is_v6()) { + struct in6_pktinfo pktInfo; + + struct sockaddr_in6 saddr_v6 = to_sockaddr(send_info.source_address.to_v6(), 0); + pktInfo.ipi6_addr = saddr_v6.sin6_addr; + pktInfo.ipi6_ifindex = 0; + + cmbuflen += CMSG_SPACE(sizeof(pktInfo)); + + pktinfo_cm->cmsg_level = IPPROTO_IPV6; + pktinfo_cm->cmsg_type = IPV6_PKTINFO; + pktinfo_cm->cmsg_len = CMSG_LEN(sizeof(pktInfo)); + memcpy(CMSG_DATA(pktinfo_cm), &pktInfo, sizeof(pktInfo)); + } + else { + struct in_pktinfo pktInfo; + + struct sockaddr_in saddr_v4 = to_sockaddr(send_info.source_address.to_v4(), 0); + pktInfo.ipi_spec_dst = saddr_v4.sin_addr; + pktInfo.ipi_ifindex = 0; + + cmbuflen += CMSG_SPACE(sizeof(pktInfo)); + + pktinfo_cm->cmsg_level = IPPROTO_IP; + pktinfo_cm->cmsg_type = IP_PKTINFO; + pktinfo_cm->cmsg_len = CMSG_LEN(sizeof(pktInfo)); + memcpy(CMSG_DATA(pktinfo_cm), &pktInfo, sizeof(pktInfo)); + } + + struct iovec iov = {}; + iov.iov_base = (void *) send_info.buffer; + iov.iov_len = send_info.size; + + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + msg.msg_controllen = cmbuflen; + + auto bytes_sent = sendmsg(sockfd, &msg, 0); + + // If there's no send buffer space, wait for some to be available + while (bytes_sent < 0 && errno == EAGAIN) { + struct pollfd pfd; + + pfd.fd = sockfd; + pfd.events = POLLOUT; + + if (poll(&pfd, 1, -1) != 1) { + BOOST_LOG(warning) << "poll() failed: "sv << errno; + break; + } + + // Try to send again + bytes_sent = sendmsg(sockfd, &msg, 0); + } + + if (bytes_sent < 0) { + BOOST_LOG(warning) << "sendmsg() failed: "sv << errno; + return false; + } + + return true; + } + std::unique_ptr enable_socket_qos(uintptr_t native_socket, boost::asio::ip::address &address, uint16_t port, qos_data_type_e data_type) { // Unimplemented diff --git a/src/platform/windows/misc.cpp b/src/platform/windows/misc.cpp index 86147816..455d5681 100644 --- a/src/platform/windows/misc.cpp +++ b/src/platform/windows/misc.cpp @@ -938,19 +938,19 @@ namespace platf { WSAMSG msg; // Convert the target address into a SOCKADDR - SOCKADDR_IN saddr_v4; - SOCKADDR_IN6 saddr_v6; + SOCKADDR_IN taddr_v4; + SOCKADDR_IN6 taddr_v6; if (send_info.target_address.is_v6()) { - saddr_v6 = to_sockaddr(send_info.target_address.to_v6(), send_info.target_port); + taddr_v6 = to_sockaddr(send_info.target_address.to_v6(), send_info.target_port); - msg.name = (PSOCKADDR) &saddr_v6; - msg.namelen = sizeof(saddr_v6); + msg.name = (PSOCKADDR) &taddr_v6; + msg.namelen = sizeof(taddr_v6); } else { - saddr_v4 = to_sockaddr(send_info.target_address.to_v4(), send_info.target_port); + taddr_v4 = to_sockaddr(send_info.target_address.to_v4(), send_info.target_port); - msg.name = (PSOCKADDR) &saddr_v4; - msg.namelen = sizeof(saddr_v4); + msg.name = (PSOCKADDR) &taddr_v4; + msg.namelen = sizeof(taddr_v4); } WSABUF buf; @@ -961,25 +961,137 @@ namespace platf { msg.dwBufferCount = 1; msg.dwFlags = 0; - char cmbuf[WSA_CMSG_SPACE(sizeof(DWORD))]; + // At most, one DWORD option and one PKTINFO option + char cmbuf[WSA_CMSG_SPACE(sizeof(DWORD)) + + std::max(WSA_CMSG_SPACE(sizeof(IN6_PKTINFO)), WSA_CMSG_SPACE(sizeof(IN_PKTINFO)))] = {}; + ULONG cmbuflen = 0; + msg.Control.buf = cmbuf; - msg.Control.len = 0; + msg.Control.len = sizeof(cmbuf); + + auto cm = WSA_CMSG_FIRSTHDR(&msg); + if (send_info.source_address.is_v6()) { + IN6_PKTINFO pktInfo; + + SOCKADDR_IN6 saddr_v6 = to_sockaddr(send_info.source_address.to_v6(), 0); + pktInfo.ipi6_addr = saddr_v6.sin6_addr; + pktInfo.ipi6_ifindex = 0; + + cmbuflen += WSA_CMSG_SPACE(sizeof(pktInfo)); + + cm->cmsg_level = IPPROTO_IPV6; + cm->cmsg_type = IPV6_PKTINFO; + cm->cmsg_len = WSA_CMSG_LEN(sizeof(pktInfo)); + memcpy(WSA_CMSG_DATA(cm), &pktInfo, sizeof(pktInfo)); + } + else { + IN_PKTINFO pktInfo; + + SOCKADDR_IN saddr_v4 = to_sockaddr(send_info.source_address.to_v4(), 0); + pktInfo.ipi_addr = saddr_v4.sin_addr; + pktInfo.ipi_ifindex = 0; + + cmbuflen += WSA_CMSG_SPACE(sizeof(pktInfo)); + + cm->cmsg_level = IPPROTO_IP; + cm->cmsg_type = IP_PKTINFO; + cm->cmsg_len = WSA_CMSG_LEN(sizeof(pktInfo)); + memcpy(WSA_CMSG_DATA(cm), &pktInfo, sizeof(pktInfo)); + } if (send_info.block_count > 1) { - msg.Control.len += WSA_CMSG_SPACE(sizeof(DWORD)); + cmbuflen += WSA_CMSG_SPACE(sizeof(DWORD)); - auto cm = WSA_CMSG_FIRSTHDR(&msg); + cm = WSA_CMSG_NXTHDR(&msg, cm); cm->cmsg_level = IPPROTO_UDP; cm->cmsg_type = UDP_SEND_MSG_SIZE; cm->cmsg_len = WSA_CMSG_LEN(sizeof(DWORD)); *((DWORD *) WSA_CMSG_DATA(cm)) = send_info.block_size; } + msg.Control.len = cmbuflen; + // If USO is not supported, this will fail and the caller will fall back to unbatched sends. DWORD bytes_sent; return WSASendMsg((SOCKET) send_info.native_socket, &msg, 1, &bytes_sent, nullptr, nullptr) != SOCKET_ERROR; } + bool + send(send_info_t &send_info) { + WSAMSG msg; + + // Convert the target address into a SOCKADDR + SOCKADDR_IN taddr_v4; + SOCKADDR_IN6 taddr_v6; + if (send_info.target_address.is_v6()) { + taddr_v6 = to_sockaddr(send_info.target_address.to_v6(), send_info.target_port); + + msg.name = (PSOCKADDR) &taddr_v6; + msg.namelen = sizeof(taddr_v6); + } + else { + taddr_v4 = to_sockaddr(send_info.target_address.to_v4(), send_info.target_port); + + msg.name = (PSOCKADDR) &taddr_v4; + msg.namelen = sizeof(taddr_v4); + } + + WSABUF buf; + buf.buf = (char *) send_info.buffer; + buf.len = send_info.size; + + msg.lpBuffers = &buf; + msg.dwBufferCount = 1; + msg.dwFlags = 0; + + char cmbuf[std::max(WSA_CMSG_SPACE(sizeof(IN6_PKTINFO)), WSA_CMSG_SPACE(sizeof(IN_PKTINFO)))] = {}; + ULONG cmbuflen = 0; + + msg.Control.buf = cmbuf; + msg.Control.len = sizeof(cmbuf); + + auto cm = WSA_CMSG_FIRSTHDR(&msg); + if (send_info.source_address.is_v6()) { + IN6_PKTINFO pktInfo; + + SOCKADDR_IN6 saddr_v6 = to_sockaddr(send_info.source_address.to_v6(), 0); + pktInfo.ipi6_addr = saddr_v6.sin6_addr; + pktInfo.ipi6_ifindex = 0; + + cmbuflen += WSA_CMSG_SPACE(sizeof(pktInfo)); + + cm->cmsg_level = IPPROTO_IPV6; + cm->cmsg_type = IPV6_PKTINFO; + cm->cmsg_len = WSA_CMSG_LEN(sizeof(pktInfo)); + memcpy(WSA_CMSG_DATA(cm), &pktInfo, sizeof(pktInfo)); + } + else { + IN_PKTINFO pktInfo; + + SOCKADDR_IN saddr_v4 = to_sockaddr(send_info.source_address.to_v4(), 0); + pktInfo.ipi_addr = saddr_v4.sin_addr; + pktInfo.ipi_ifindex = 0; + + cmbuflen += WSA_CMSG_SPACE(sizeof(pktInfo)); + + cm->cmsg_level = IPPROTO_IP; + cm->cmsg_type = IP_PKTINFO; + cm->cmsg_len = WSA_CMSG_LEN(sizeof(pktInfo)); + memcpy(WSA_CMSG_DATA(cm), &pktInfo, sizeof(pktInfo)); + } + + msg.Control.len = cmbuflen; + + DWORD bytes_sent; + if (WSASendMsg((SOCKET) send_info.native_socket, &msg, 1, &bytes_sent, nullptr, nullptr) == SOCKET_ERROR) { + auto winerr = WSAGetLastError(); + BOOST_LOG(warning) << "WSASendMsg() failed: "sv << winerr; + return false; + } + + return true; + } + class qos_t: public deinit_t { public: qos_t(QOS_FLOWID flow_id): diff --git a/src/stream.cpp b/src/stream.cpp index 355cb7e0..036f635a 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -355,6 +355,8 @@ namespace stream { safe::shared_t::ptr_t broadcast_ref; + boost::asio::ip::address localAddress; + struct { int lowseq; udp::endpoint peer; @@ -466,6 +468,12 @@ namespace stream { session_p->control.peer = peer; session_port = port; + // Use the local address from the control connection as the source address + // for other communications to the client. This is necessary to ensure + // proper routing on multi-homed hosts. + auto local_address = platf::from_sockaddr((sockaddr *) &peer->localAddress.address); + session_p->localAddress = boost::asio::ip::make_address(local_address); + return session_p; } @@ -1283,6 +1291,7 @@ namespace stream { (uintptr_t) sock.native_handle(), peer_address, session->video.peer.port(), + session->localAddress, }; // Use a batched send if it's supported on this platform @@ -1290,7 +1299,16 @@ namespace stream { // Batched send is not available, so send each packet individually BOOST_LOG(verbose) << "Falling back to unbatched send"sv; for (auto x = 0; x < shards.size(); ++x) { - sock.send_to(asio::buffer(shards[x]), session->video.peer); + auto send_info = platf::send_info_t { + shards[x].data(), + shards[x].size(), + (uintptr_t) sock.native_handle(), + peer_address, + session->video.peer.port(), + session->localAddress, + }; + + platf::send(send_info); } } @@ -1371,9 +1389,17 @@ namespace stream { auto &shards_p = session->audio.shards_p; std::copy_n(audio_packet->payload(), bytes, shards_p[sequenceNumber % RTPA_DATA_SHARDS]); + auto peer_address = session->audio.peer.address(); try { - sock.send_to(asio::buffer((char *) audio_packet.get(), sizeof(audio_packet_raw_t) + bytes), session->audio.peer); - + auto send_info = platf::send_info_t { + (const char *) audio_packet.get(), + sizeof(audio_packet_raw_t) + bytes, + (uintptr_t) sock.native_handle(), + peer_address, + session->audio.peer.port(), + session->localAddress, + }; + platf::send(send_info); BOOST_LOG(verbose) << "Audio ["sv << sequenceNumber << "] :: send..."sv; auto &fec_packet = session->audio.fec_packet; @@ -1391,7 +1417,16 @@ namespace stream { fec_packet->rtp.sequenceNumber = util::endian::big(sequenceNumber + x + 1); fec_packet->fecHeader.fecShardIndex = x; memcpy(fec_packet->payload(), shards_p[RTPA_DATA_SHARDS + x], bytes); - sock.send_to(asio::buffer((char *) fec_packet.get(), sizeof(audio_fec_packet_raw_t) + bytes), session->audio.peer); + + auto send_info = platf::send_info_t { + (const char *) fec_packet.get(), + sizeof(audio_fec_packet_raw_t) + bytes, + (uintptr_t) sock.native_handle(), + peer_address, + session->audio.peer.port(), + session->localAddress, + }; + platf::send(send_info); BOOST_LOG(verbose) << "Audio FEC ["sv << (sequenceNumber & ~(RTPA_DATA_SHARDS - 1)) << ' ' << x << "] :: send..."sv; } }