Specify the source address for outbound audio and video traffic (#1569)

This commit is contained in:
Cameron Gutman
2023-08-26 16:37:04 -05:00
committed by GitHub
parent b344af2d88
commit ebb6a7c9a9
5 changed files with 492 additions and 54 deletions

View File

@@ -2,6 +2,12 @@
* @file src/misc.cpp
* @brief todo
*/
// Required for in6_pktinfo with glibc headers
#ifndef _GNU_SOURCE
#define _GNU_SOURCE 1
#endif
// standard includes
#include <fstream>
@@ -243,49 +249,102 @@ namespace platf {
lifetime::exit_sunshine(0, true);
}
struct sockaddr_in
to_sockaddr(boost::asio::ip::address_v4 address, uint16_t port) {
struct sockaddr_in saddr_v4 = {};
saddr_v4.sin_family = AF_INET;
saddr_v4.sin_port = htons(port);
auto addr_bytes = address.to_bytes();
memcpy(&saddr_v4.sin_addr, addr_bytes.data(), sizeof(saddr_v4.sin_addr));
return saddr_v4;
}
struct sockaddr_in6
to_sockaddr(boost::asio::ip::address_v6 address, uint16_t port) {
struct sockaddr_in6 saddr_v6 = {};
saddr_v6.sin6_family = AF_INET6;
saddr_v6.sin6_port = htons(port);
saddr_v6.sin6_scope_id = address.scope_id();
auto addr_bytes = address.to_bytes();
memcpy(&saddr_v6.sin6_addr, addr_bytes.data(), sizeof(saddr_v6.sin6_addr));
return saddr_v6;
}
bool
send_batch(batched_send_info_t &send_info) {
auto sockfd = (int) send_info.native_socket;
struct msghdr msg = {};
// Convert the target address into a sockaddr
struct sockaddr_in saddr_v4 = {};
struct sockaddr_in6 saddr_v6 = {};
struct sockaddr *addr;
socklen_t addr_len;
struct sockaddr_in taddr_v4 = {};
struct sockaddr_in6 taddr_v6 = {};
if (send_info.target_address.is_v6()) {
auto address_v6 = send_info.target_address.to_v6();
taddr_v6 = to_sockaddr(send_info.target_address.to_v6(), send_info.target_port);
saddr_v6.sin6_family = AF_INET6;
saddr_v6.sin6_port = htons(send_info.target_port);
saddr_v6.sin6_scope_id = address_v6.scope_id();
auto addr_bytes = address_v6.to_bytes();
memcpy(&saddr_v6.sin6_addr, addr_bytes.data(), sizeof(saddr_v6.sin6_addr));
addr = (struct sockaddr *) &saddr_v6;
addr_len = sizeof(saddr_v6);
msg.msg_name = (struct sockaddr *) &taddr_v6;
msg.msg_namelen = sizeof(taddr_v6);
}
else {
auto address_v4 = send_info.target_address.to_v4();
taddr_v4 = to_sockaddr(send_info.target_address.to_v4(), send_info.target_port);
saddr_v4.sin_family = AF_INET;
saddr_v4.sin_port = htons(send_info.target_port);
msg.msg_name = (struct sockaddr *) &taddr_v4;
msg.msg_namelen = sizeof(taddr_v4);
}
auto addr_bytes = address_v4.to_bytes();
memcpy(&saddr_v4.sin_addr, addr_bytes.data(), sizeof(saddr_v4.sin_addr));
union {
char buf[CMSG_SPACE(sizeof(uint16_t)) +
std::max(CMSG_SPACE(sizeof(struct in_pktinfo)), CMSG_SPACE(sizeof(struct in6_pktinfo)))];
struct cmsghdr alignment;
} cmbuf;
socklen_t cmbuflen = 0;
addr = (struct sockaddr *) &saddr_v4;
addr_len = sizeof(saddr_v4);
msg.msg_control = cmbuf.buf;
msg.msg_controllen = sizeof(cmbuf.buf);
// The PKTINFO option will always be first, then we will conditionally
// append the UDP_SEGMENT option next if applicable.
auto pktinfo_cm = CMSG_FIRSTHDR(&msg);
if (send_info.source_address.is_v6()) {
struct in6_pktinfo pktInfo;
struct sockaddr_in6 saddr_v6 = to_sockaddr(send_info.source_address.to_v6(), 0);
pktInfo.ipi6_addr = saddr_v6.sin6_addr;
pktInfo.ipi6_ifindex = 0;
cmbuflen += CMSG_SPACE(sizeof(pktInfo));
pktinfo_cm->cmsg_level = IPPROTO_IPV6;
pktinfo_cm->cmsg_type = IPV6_PKTINFO;
pktinfo_cm->cmsg_len = CMSG_LEN(sizeof(pktInfo));
memcpy(CMSG_DATA(pktinfo_cm), &pktInfo, sizeof(pktInfo));
}
else {
struct in_pktinfo pktInfo;
struct sockaddr_in saddr_v4 = to_sockaddr(send_info.source_address.to_v4(), 0);
pktInfo.ipi_spec_dst = saddr_v4.sin_addr;
pktInfo.ipi_ifindex = 0;
cmbuflen += CMSG_SPACE(sizeof(pktInfo));
pktinfo_cm->cmsg_level = IPPROTO_IP;
pktinfo_cm->cmsg_type = IP_PKTINFO;
pktinfo_cm->cmsg_len = CMSG_LEN(sizeof(pktInfo));
memcpy(CMSG_DATA(pktinfo_cm), &pktInfo, sizeof(pktInfo));
}
#ifdef UDP_SEGMENT
{
struct msghdr msg = {};
struct iovec iov = {};
union {
char buf[CMSG_SPACE(sizeof(uint16_t))];
struct cmsghdr alignment;
} cmbuf;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
// UDP GSO on Linux currently only supports sending 64K or 64 segments at a time
size_t seg_index = 0;
@@ -294,26 +353,19 @@ namespace platf {
iov.iov_base = (void *) &send_info.buffer[seg_index * send_info.block_size];
iov.iov_len = send_info.block_size * std::min(send_info.block_count - seg_index, seg_max);
msg.msg_name = addr;
msg.msg_namelen = addr_len;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
// We should not use GSO if the data is <= one full block size
if (iov.iov_len > send_info.block_size) {
msg.msg_control = cmbuf.buf;
msg.msg_controllen = CMSG_SPACE(sizeof(uint16_t));
msg.msg_controllen = cmbuflen + CMSG_SPACE(sizeof(uint16_t));
// Enable GSO to perform segmentation of our buffer for us
auto cm = CMSG_FIRSTHDR(&msg);
auto cm = CMSG_NXTHDR(&msg, pktinfo_cm);
cm->cmsg_level = SOL_UDP;
cm->cmsg_type = UDP_SEGMENT;
cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
*((uint16_t *) CMSG_DATA(cm)) = send_info.block_size;
}
else {
msg.msg_control = nullptr;
msg.msg_controllen = 0;
msg.msg_controllen = cmbuflen;
}
// This will fail if GSO is not available, so we will fall back to non-GSO if
@@ -360,10 +412,12 @@ namespace platf {
iovs[i].iov_len = send_info.block_size;
msgs[i] = {};
msgs[i].msg_hdr.msg_name = addr;
msgs[i].msg_hdr.msg_namelen = addr_len;
msgs[i].msg_hdr.msg_name = msg.msg_name;
msgs[i].msg_hdr.msg_namelen = msg.msg_namelen;
msgs[i].msg_hdr.msg_iov = &iovs[i];
msgs[i].msg_hdr.msg_iovlen = 1;
msgs[i].msg_hdr.msg_control = cmbuf.buf;
msgs[i].msg_hdr.msg_controllen = cmbuflen;
}
// Call sendmmsg() until all messages are sent
@@ -398,6 +452,101 @@ namespace platf {
}
}
bool
send(send_info_t &send_info) {
auto sockfd = (int) send_info.native_socket;
struct msghdr msg = {};
// Convert the target address into a sockaddr
struct sockaddr_in taddr_v4 = {};
struct sockaddr_in6 taddr_v6 = {};
if (send_info.target_address.is_v6()) {
taddr_v6 = to_sockaddr(send_info.target_address.to_v6(), send_info.target_port);
msg.msg_name = (struct sockaddr *) &taddr_v6;
msg.msg_namelen = sizeof(taddr_v6);
}
else {
taddr_v4 = to_sockaddr(send_info.target_address.to_v4(), send_info.target_port);
msg.msg_name = (struct sockaddr *) &taddr_v4;
msg.msg_namelen = sizeof(taddr_v4);
}
union {
char buf[std::max(CMSG_SPACE(sizeof(struct in_pktinfo)), CMSG_SPACE(sizeof(struct in6_pktinfo)))];
struct cmsghdr alignment;
} cmbuf;
socklen_t cmbuflen = 0;
msg.msg_control = cmbuf.buf;
msg.msg_controllen = sizeof(cmbuf.buf);
auto pktinfo_cm = CMSG_FIRSTHDR(&msg);
if (send_info.source_address.is_v6()) {
struct in6_pktinfo pktInfo;
struct sockaddr_in6 saddr_v6 = to_sockaddr(send_info.source_address.to_v6(), 0);
pktInfo.ipi6_addr = saddr_v6.sin6_addr;
pktInfo.ipi6_ifindex = 0;
cmbuflen += CMSG_SPACE(sizeof(pktInfo));
pktinfo_cm->cmsg_level = IPPROTO_IPV6;
pktinfo_cm->cmsg_type = IPV6_PKTINFO;
pktinfo_cm->cmsg_len = CMSG_LEN(sizeof(pktInfo));
memcpy(CMSG_DATA(pktinfo_cm), &pktInfo, sizeof(pktInfo));
}
else {
struct in_pktinfo pktInfo;
struct sockaddr_in saddr_v4 = to_sockaddr(send_info.source_address.to_v4(), 0);
pktInfo.ipi_spec_dst = saddr_v4.sin_addr;
pktInfo.ipi_ifindex = 0;
cmbuflen += CMSG_SPACE(sizeof(pktInfo));
pktinfo_cm->cmsg_level = IPPROTO_IP;
pktinfo_cm->cmsg_type = IP_PKTINFO;
pktinfo_cm->cmsg_len = CMSG_LEN(sizeof(pktInfo));
memcpy(CMSG_DATA(pktinfo_cm), &pktInfo, sizeof(pktInfo));
}
struct iovec iov = {};
iov.iov_base = (void *) send_info.buffer;
iov.iov_len = send_info.size;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_controllen = cmbuflen;
auto bytes_sent = sendmsg(sockfd, &msg, 0);
// If there's no send buffer space, wait for some to be available
while (bytes_sent < 0 && errno == EAGAIN) {
struct pollfd pfd;
pfd.fd = sockfd;
pfd.events = POLLOUT;
if (poll(&pfd, 1, -1) != 1) {
BOOST_LOG(warning) << "poll() failed: "sv << errno;
break;
}
// Try to send again
bytes_sent = sendmsg(sockfd, &msg, 0);
}
if (bytes_sent < 0) {
BOOST_LOG(warning) << "sendmsg() failed: "sv << errno;
return false;
}
return true;
}
class qos_t: public deinit_t {
public:
qos_t(int sockfd, int level, int option):