Network performance optimizations (#771)

This commit is contained in:
Cameron Gutman
2023-01-16 17:17:04 -06:00
committed by GitHub
parent 42f6634e85
commit fa14b6ead7
7 changed files with 549 additions and 2 deletions

View File

@@ -2,6 +2,7 @@
#include <dlfcn.h>
#include <fcntl.h>
#include <ifaddrs.h>
#include <netinet/udp.h>
#include <pwd.h>
#include <unistd.h>
@@ -14,6 +15,7 @@
#include "src/main.h"
#include "src/platform/common.h"
#include <boost/asio/ip/address.hpp>
#include <boost/process.hpp>
#ifdef __GNUC__
@@ -175,6 +177,215 @@ bool restart() {
return false;
}
bool send_batch(batched_send_info_t &send_info) {
auto sockfd = (int)send_info.native_socket;
// Convert the target address into a sockaddr
struct sockaddr_in saddr_v4 = {};
struct sockaddr_in6 saddr_v6 = {};
struct sockaddr *addr;
socklen_t addr_len;
if(send_info.target_address.is_v6()) {
auto address_v6 = send_info.target_address.to_v6();
saddr_v6.sin6_family = AF_INET6;
saddr_v6.sin6_port = htons(send_info.target_port);
saddr_v6.sin6_scope_id = address_v6.scope_id();
auto addr_bytes = address_v6.to_bytes();
memcpy(&saddr_v6.sin6_addr, addr_bytes.data(), sizeof(saddr_v6.sin6_addr));
addr = (struct sockaddr *)&saddr_v6;
addr_len = sizeof(saddr_v6);
}
else {
auto address_v4 = send_info.target_address.to_v4();
saddr_v4.sin_family = AF_INET;
saddr_v4.sin_port = htons(send_info.target_port);
auto addr_bytes = address_v4.to_bytes();
memcpy(&saddr_v4.sin_addr, addr_bytes.data(), sizeof(saddr_v4.sin_addr));
addr = (struct sockaddr *)&saddr_v4;
addr_len = sizeof(saddr_v4);
}
#ifdef UDP_SEGMENT
{
struct msghdr msg = {};
struct iovec iov = {};
union {
char buf[CMSG_SPACE(sizeof(uint16_t))];
struct cmsghdr alignment;
} cmbuf;
// UDP GSO on Linux currently only supports sending 64K or 64 segments at a time
size_t seg_index = 0;
const size_t seg_max = 65536 / 1500;
while(seg_index < send_info.block_count) {
iov.iov_base = (void *)&send_info.buffer[seg_index * send_info.block_size];
iov.iov_len = send_info.block_size * std::min(send_info.block_count - seg_index, seg_max);
msg.msg_name = addr;
msg.msg_namelen = addr_len;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
// We should not use GSO if the data is <= one full block size
if(iov.iov_len > send_info.block_size) {
msg.msg_control = cmbuf.buf;
msg.msg_controllen = CMSG_SPACE(sizeof(uint16_t));
// Enable GSO to perform segmentation of our buffer for us
auto cm = CMSG_FIRSTHDR(&msg);
cm->cmsg_level = SOL_UDP;
cm->cmsg_type = UDP_SEGMENT;
cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
*((uint16_t *)CMSG_DATA(cm)) = send_info.block_size;
}
else {
msg.msg_control = nullptr;
msg.msg_controllen = 0;
}
// This will fail if GSO is not available, so we will fall back to non-GSO if
// it's the first sendmsg() call. On subsequent calls, we will treat errors as
// actual failures and return to the caller.
auto bytes_sent = sendmsg(sockfd, &msg, 0);
if(bytes_sent < 0) {
// If there's no send buffer space, wait for some to be available
if(errno == EAGAIN) {
struct pollfd pfd;
pfd.fd = sockfd;
pfd.events = POLLOUT;
if(poll(&pfd, 1, -1) != 1) {
BOOST_LOG(warning) << "poll() failed: "sv << errno;
break;
}
// Try to send again
continue;
}
break;
}
seg_index += bytes_sent / send_info.block_size;
}
// If we sent something, return the status and don't fall back to the non-GSO path.
if(seg_index != 0) {
return seg_index >= send_info.block_count;
}
}
#endif
{
// If GSO is not supported, use sendmmsg() instead.
struct mmsghdr msgs[send_info.block_count];
struct iovec iovs[send_info.block_count];
for(size_t i = 0; i < send_info.block_count; i++) {
iovs[i] = {};
iovs[i].iov_base = (void *)&send_info.buffer[i * send_info.block_size];
iovs[i].iov_len = send_info.block_size;
msgs[i] = {};
msgs[i].msg_hdr.msg_name = addr;
msgs[i].msg_hdr.msg_namelen = addr_len;
msgs[i].msg_hdr.msg_iov = &iovs[i];
msgs[i].msg_hdr.msg_iovlen = 1;
}
// Call sendmmsg() until all messages are sent
size_t blocks_sent = 0;
while(blocks_sent < send_info.block_count) {
int msgs_sent = sendmmsg(sockfd, &msgs[blocks_sent], send_info.block_count - blocks_sent, 0);
if(msgs_sent < 0) {
// If there's no send buffer space, wait for some to be available
if(errno == EAGAIN) {
struct pollfd pfd;
pfd.fd = sockfd;
pfd.events = POLLOUT;
if(poll(&pfd, 1, -1) != 1) {
BOOST_LOG(warning) << "poll() failed: "sv << errno;
break;
}
// Try to send again
continue;
}
BOOST_LOG(warning) << "sendmmsg() failed: "sv << errno;
return false;
}
blocks_sent += msgs_sent;
}
return true;
}
}
class qos_t : public deinit_t {
public:
qos_t(int sockfd, int level, int option) : sockfd(sockfd), level(level), option(option) {}
virtual ~qos_t() {
int reset_val = -1;
if(setsockopt(sockfd, level, option, &reset_val, sizeof(reset_val)) < 0) {
BOOST_LOG(warning) << "Failed to reset IP TOS: "sv << errno;
}
}
private:
int sockfd;
int level;
int option;
};
std::unique_ptr<deinit_t> enable_socket_qos(uintptr_t native_socket, boost::asio::ip::address &address, uint16_t port, qos_data_type_e data_type) {
int sockfd = (int)native_socket;
int level;
int option;
if(address.is_v6()) {
level = SOL_IPV6;
option = IPV6_TCLASS;
}
else {
level = SOL_IP;
option = IP_TOS;
}
// The specific DSCP values here are chosen to be consistent with Windows
int dscp;
switch(data_type) {
case qos_data_type_e::video:
dscp = 40;
break;
case qos_data_type_e::audio:
dscp = 56;
break;
default:
BOOST_LOG(error) << "Unknown traffic type: "sv << (int)data_type;
return nullptr;
}
// Shift to put the DSCP value in the correct position in the TOS field
dscp <<= 2;
if(setsockopt(sockfd, level, option, &dscp, sizeof(dscp)) < 0) {
return nullptr;
}
return std::make_unique<qos_t>(sockfd, level, option);
}
namespace source {
enum source_e : std::size_t {
#ifdef SUNSHINE_BUILD_CUDA