diff --git a/src/platform/common.h b/src/platform/common.h index be94673d..b9558484 100644 --- a/src/platform/common.h +++ b/src/platform/common.h @@ -23,6 +23,11 @@ struct AVHWFramesContext; // Forward declarations of boost classes to avoid having to include boost headers // here, which results in issues with Windows.h and WinSock2.h include order. namespace boost { +namespace asio { +namespace ip { +class address; +} // namespace ip +} // namespace asio namespace filesystem { class path; } @@ -335,6 +340,23 @@ void streaming_will_stop(); bool restart_supported(); bool restart(); +struct batched_send_info_t { + const char *buffer; + size_t block_size; + size_t block_count; + + std::uintptr_t native_socket; + boost::asio::ip::address &target_address; + uint16_t target_port; +}; +bool send_batch(batched_send_info_t &send_info); + +enum class qos_data_type_e : int { + audio, + video +}; +std::unique_ptr enable_socket_qos(uintptr_t native_socket, boost::asio::ip::address &address, uint16_t port, qos_data_type_e data_type); + input_t input(); void move_mouse(input_t &input, int deltaX, int deltaY); void abs_mouse(input_t &input, const touch_port_t &touch_port, float x, float y); diff --git a/src/platform/linux/misc.cpp b/src/platform/linux/misc.cpp index 35e68b96..be606228 100644 --- a/src/platform/linux/misc.cpp +++ b/src/platform/linux/misc.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include @@ -14,6 +15,7 @@ #include "src/main.h" #include "src/platform/common.h" +#include #include #ifdef __GNUC__ @@ -175,6 +177,215 @@ bool restart() { return false; } +bool send_batch(batched_send_info_t &send_info) { + auto sockfd = (int)send_info.native_socket; + + // Convert the target address into a sockaddr + struct sockaddr_in saddr_v4 = {}; + struct sockaddr_in6 saddr_v6 = {}; + struct sockaddr *addr; + socklen_t addr_len; + if(send_info.target_address.is_v6()) { + auto address_v6 = send_info.target_address.to_v6(); + + saddr_v6.sin6_family = AF_INET6; + saddr_v6.sin6_port = htons(send_info.target_port); + saddr_v6.sin6_scope_id = address_v6.scope_id(); + + auto addr_bytes = address_v6.to_bytes(); + memcpy(&saddr_v6.sin6_addr, addr_bytes.data(), sizeof(saddr_v6.sin6_addr)); + + addr = (struct sockaddr *)&saddr_v6; + addr_len = sizeof(saddr_v6); + } + else { + auto address_v4 = send_info.target_address.to_v4(); + + saddr_v4.sin_family = AF_INET; + saddr_v4.sin_port = htons(send_info.target_port); + + auto addr_bytes = address_v4.to_bytes(); + memcpy(&saddr_v4.sin_addr, addr_bytes.data(), sizeof(saddr_v4.sin_addr)); + + addr = (struct sockaddr *)&saddr_v4; + addr_len = sizeof(saddr_v4); + } + +#ifdef UDP_SEGMENT + { + struct msghdr msg = {}; + struct iovec iov = {}; + union { + char buf[CMSG_SPACE(sizeof(uint16_t))]; + struct cmsghdr alignment; + } cmbuf; + + // UDP GSO on Linux currently only supports sending 64K or 64 segments at a time + size_t seg_index = 0; + const size_t seg_max = 65536 / 1500; + while(seg_index < send_info.block_count) { + iov.iov_base = (void *)&send_info.buffer[seg_index * send_info.block_size]; + iov.iov_len = send_info.block_size * std::min(send_info.block_count - seg_index, seg_max); + + msg.msg_name = addr; + msg.msg_namelen = addr_len; + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + // We should not use GSO if the data is <= one full block size + if(iov.iov_len > send_info.block_size) { + msg.msg_control = cmbuf.buf; + msg.msg_controllen = CMSG_SPACE(sizeof(uint16_t)); + + // Enable GSO to perform segmentation of our buffer for us + auto cm = CMSG_FIRSTHDR(&msg); + cm->cmsg_level = SOL_UDP; + cm->cmsg_type = UDP_SEGMENT; + cm->cmsg_len = CMSG_LEN(sizeof(uint16_t)); + *((uint16_t *)CMSG_DATA(cm)) = send_info.block_size; + } + else { + msg.msg_control = nullptr; + msg.msg_controllen = 0; + } + + // This will fail if GSO is not available, so we will fall back to non-GSO if + // it's the first sendmsg() call. On subsequent calls, we will treat errors as + // actual failures and return to the caller. + auto bytes_sent = sendmsg(sockfd, &msg, 0); + if(bytes_sent < 0) { + // If there's no send buffer space, wait for some to be available + if(errno == EAGAIN) { + struct pollfd pfd; + + pfd.fd = sockfd; + pfd.events = POLLOUT; + + if(poll(&pfd, 1, -1) != 1) { + BOOST_LOG(warning) << "poll() failed: "sv << errno; + break; + } + + // Try to send again + continue; + } + + break; + } + + seg_index += bytes_sent / send_info.block_size; + } + + // If we sent something, return the status and don't fall back to the non-GSO path. + if(seg_index != 0) { + return seg_index >= send_info.block_count; + } + } +#endif + + { + // If GSO is not supported, use sendmmsg() instead. + struct mmsghdr msgs[send_info.block_count]; + struct iovec iovs[send_info.block_count]; + for(size_t i = 0; i < send_info.block_count; i++) { + iovs[i] = {}; + iovs[i].iov_base = (void *)&send_info.buffer[i * send_info.block_size]; + iovs[i].iov_len = send_info.block_size; + + msgs[i] = {}; + msgs[i].msg_hdr.msg_name = addr; + msgs[i].msg_hdr.msg_namelen = addr_len; + msgs[i].msg_hdr.msg_iov = &iovs[i]; + msgs[i].msg_hdr.msg_iovlen = 1; + } + + // Call sendmmsg() until all messages are sent + size_t blocks_sent = 0; + while(blocks_sent < send_info.block_count) { + int msgs_sent = sendmmsg(sockfd, &msgs[blocks_sent], send_info.block_count - blocks_sent, 0); + if(msgs_sent < 0) { + // If there's no send buffer space, wait for some to be available + if(errno == EAGAIN) { + struct pollfd pfd; + + pfd.fd = sockfd; + pfd.events = POLLOUT; + + if(poll(&pfd, 1, -1) != 1) { + BOOST_LOG(warning) << "poll() failed: "sv << errno; + break; + } + + // Try to send again + continue; + } + + BOOST_LOG(warning) << "sendmmsg() failed: "sv << errno; + return false; + } + + blocks_sent += msgs_sent; + } + + return true; + } +} + +class qos_t : public deinit_t { +public: + qos_t(int sockfd, int level, int option) : sockfd(sockfd), level(level), option(option) {} + + virtual ~qos_t() { + int reset_val = -1; + if(setsockopt(sockfd, level, option, &reset_val, sizeof(reset_val)) < 0) { + BOOST_LOG(warning) << "Failed to reset IP TOS: "sv << errno; + } + } + +private: + int sockfd; + int level; + int option; +}; + +std::unique_ptr enable_socket_qos(uintptr_t native_socket, boost::asio::ip::address &address, uint16_t port, qos_data_type_e data_type) { + int sockfd = (int)native_socket; + + int level; + int option; + if(address.is_v6()) { + level = SOL_IPV6; + option = IPV6_TCLASS; + } + else { + level = SOL_IP; + option = IP_TOS; + } + + // The specific DSCP values here are chosen to be consistent with Windows + int dscp; + switch(data_type) { + case qos_data_type_e::video: + dscp = 40; + break; + case qos_data_type_e::audio: + dscp = 56; + break; + default: + BOOST_LOG(error) << "Unknown traffic type: "sv << (int)data_type; + return nullptr; + } + + // Shift to put the DSCP value in the correct position in the TOS field + dscp <<= 2; + + if(setsockopt(sockfd, level, option, &dscp, sizeof(dscp)) < 0) { + return nullptr; + } + + return std::make_unique(sockfd, level, option); +} + namespace source { enum source_e : std::size_t { #ifdef SUNSHINE_BUILD_CUDA diff --git a/src/platform/macos/misc.cpp b/src/platform/macos/misc.cpp index eb99de78..199471e9 100644 --- a/src/platform/macos/misc.cpp +++ b/src/platform/macos/misc.cpp @@ -153,6 +153,18 @@ bool restart() { return false; } +bool send_batch(batched_send_info_t &send_info) { + // Fall back to unbatched send calls + return false; +} + +std::unique_ptr enable_socket_qos(uintptr_t native_socket, boost::asio::ip::address &address, uint16_t port, qos_data_type_e data_type) { + // Unimplemented + // + // NB: When implementing, remember to consider that some routes can drop DSCP-tagged packets completely! + return nullptr; +} + } // namespace platf namespace dyn { diff --git a/src/platform/windows/misc.cpp b/src/platform/windows/misc.cpp index 9b6867da..9c1307b1 100644 --- a/src/platform/windows/misc.cpp +++ b/src/platform/windows/misc.cpp @@ -4,6 +4,7 @@ #include #include +#include #include // prevent clang format from "optimizing" the header include order @@ -16,12 +17,27 @@ #include #include #include +#include // clang-format on #include "src/main.h" #include "src/platform/common.h" #include "src/utility.h" +// UDP_SEND_MSG_SIZE was added in the Windows 10 20H1 SDK +#ifndef UDP_SEND_MSG_SIZE +#define UDP_SEND_MSG_SIZE 2 +#endif + +// MinGW headers are missing qWAVE stuff +typedef UINT32 QOS_FLOWID, *PQOS_FLOWID; +#define QOS_NON_ADAPTIVE_FLOW 0x00000002 +#include + +#ifndef WLAN_API_MAKE_VERSION +#define WLAN_API_MAKE_VERSION(_major, _minor) (((DWORD)(_minor)) << 16 | (_major)) +#endif + namespace bp = boost::process; using namespace std::literals; @@ -31,6 +47,20 @@ using adapteraddrs_t = util::c_ptr; bool enabled_mouse_keys = false; MOUSEKEYS previous_mouse_keys_state; +HANDLE qos_handle = nullptr; + +decltype(QOSCreateHandle) *fn_QOSCreateHandle = nullptr; +decltype(QOSAddSocketToFlow) *fn_QOSAddSocketToFlow = nullptr; +decltype(QOSRemoveSocketFromFlow) *fn_QOSRemoveSocketFromFlow = nullptr; + +HANDLE wlan_handle = nullptr; + +decltype(WlanOpenHandle) *fn_WlanOpenHandle = nullptr; +decltype(WlanCloseHandle) *fn_WlanCloseHandle = nullptr; +decltype(WlanFreeMemory) *fn_WlanFreeMemory = nullptr; +decltype(WlanEnumInterfaces) *fn_WlanEnumInterfaces = nullptr; +decltype(WlanSetInterface) *fn_WlanSetInterface = nullptr; + std::filesystem::path appdata() { WCHAR sunshine_path[MAX_PATH]; GetModuleFileNameW(NULL, sunshine_path, _countof(sunshine_path)); @@ -507,6 +537,35 @@ void adjust_thread_priority(thread_priority_e priority) { } void streaming_will_start() { + static std::once_flag load_wlanapi_once_flag; + std::call_once(load_wlanapi_once_flag, []() { + // wlanapi.dll is not installed by default on Windows Server, so we load it dynamically + HMODULE wlanapi = LoadLibraryExA("wlanapi.dll", NULL, LOAD_LIBRARY_SEARCH_SYSTEM32); + if(!wlanapi) { + BOOST_LOG(debug) << "wlanapi.dll is not available on this OS"sv; + return; + } + + fn_WlanOpenHandle = (decltype(fn_WlanOpenHandle))GetProcAddress(wlanapi, "WlanOpenHandle"); + fn_WlanCloseHandle = (decltype(fn_WlanCloseHandle))GetProcAddress(wlanapi, "WlanCloseHandle"); + fn_WlanFreeMemory = (decltype(fn_WlanFreeMemory))GetProcAddress(wlanapi, "WlanFreeMemory"); + fn_WlanEnumInterfaces = (decltype(fn_WlanEnumInterfaces))GetProcAddress(wlanapi, "WlanEnumInterfaces"); + fn_WlanSetInterface = (decltype(fn_WlanSetInterface))GetProcAddress(wlanapi, "WlanSetInterface"); + + if(!fn_WlanOpenHandle || !fn_WlanCloseHandle || !fn_WlanFreeMemory || !fn_WlanEnumInterfaces || !fn_WlanSetInterface) { + BOOST_LOG(error) << "wlanapi.dll is missing exports?"sv; + + fn_WlanOpenHandle = nullptr; + fn_WlanCloseHandle = nullptr; + fn_WlanFreeMemory = nullptr; + fn_WlanEnumInterfaces = nullptr; + fn_WlanSetInterface = nullptr; + + FreeLibrary(wlanapi); + return; + } + }); + // Enable MMCSS scheduling for DWM DwmEnableMMCSS(true); @@ -516,6 +575,39 @@ void streaming_will_start() { // Promote ourselves to high priority class SetPriorityClass(GetCurrentProcess(), HIGH_PRIORITY_CLASS); + // Enable low latency mode on all connected WLAN NICs if wlanapi.dll is available + if(fn_WlanOpenHandle) { + DWORD negotiated_version; + + if(fn_WlanOpenHandle(WLAN_API_MAKE_VERSION(2, 0), nullptr, &negotiated_version, &wlan_handle) == ERROR_SUCCESS) { + PWLAN_INTERFACE_INFO_LIST wlan_interface_list; + + if(fn_WlanEnumInterfaces(wlan_handle, nullptr, &wlan_interface_list) == ERROR_SUCCESS) { + for(DWORD i = 0; i < wlan_interface_list->dwNumberOfItems; i++) { + if(wlan_interface_list->InterfaceInfo[i].isState == wlan_interface_state_connected) { + // Enable media streaming mode for 802.11 wireless interfaces to reduce latency and + // unneccessary background scanning operations that cause packet loss and jitter. + // + // https://docs.microsoft.com/en-us/windows-hardware/drivers/network/oid-wdi-set-connection-quality + // https://docs.microsoft.com/en-us/previous-versions/windows/hardware/wireless/native-802-11-media-streaming + BOOL value = TRUE; + auto error = fn_WlanSetInterface(wlan_handle, &wlan_interface_list->InterfaceInfo[i].InterfaceGuid, + wlan_intf_opcode_media_streaming_mode, sizeof(value), &value, nullptr); + if(error == ERROR_SUCCESS) { + BOOST_LOG(info) << "WLAN interface "sv << i << " is now in low latency mode"sv; + } + } + } + + fn_WlanFreeMemory(wlan_interface_list); + } + else { + fn_WlanCloseHandle(wlan_handle, nullptr); + wlan_handle = NULL; + } + } + } + // If there is no mouse connected, enable Mouse Keys to force the cursor to appear if(!GetSystemMetrics(SM_MOUSEPRESENT)) { BOOST_LOG(info) << "A mouse was not detected. Sunshine will enable Mouse Keys while streaming to force the mouse cursor to appear."; @@ -556,6 +648,12 @@ void streaming_will_stop() { // Disable MMCSS scheduling for DWM DwmEnableMMCSS(false); + // Closing our WLAN client handle will undo our optimizations + if(wlan_handle != nullptr) { + fn_WlanCloseHandle(wlan_handle, nullptr); + wlan_handle = nullptr; + } + // Restore Mouse Keys back to the previous settings if we turned it on if(enabled_mouse_keys) { enabled_mouse_keys = false; @@ -578,4 +676,166 @@ bool restart() { return true; } +SOCKADDR_IN to_sockaddr(boost::asio::ip::address_v4 address, uint16_t port) { + SOCKADDR_IN saddr_v4 = {}; + + saddr_v4.sin_family = AF_INET; + saddr_v4.sin_port = htons(port); + + auto addr_bytes = address.to_bytes(); + memcpy(&saddr_v4.sin_addr, addr_bytes.data(), sizeof(saddr_v4.sin_addr)); + + return saddr_v4; +} + +SOCKADDR_IN6 to_sockaddr(boost::asio::ip::address_v6 address, uint16_t port) { + SOCKADDR_IN6 saddr_v6 = {}; + + saddr_v6.sin6_family = AF_INET6; + saddr_v6.sin6_port = htons(port); + saddr_v6.sin6_scope_id = address.scope_id(); + + auto addr_bytes = address.to_bytes(); + memcpy(&saddr_v6.sin6_addr, addr_bytes.data(), sizeof(saddr_v6.sin6_addr)); + + return saddr_v6; +} + +// Use UDP segmentation offload if it is supported by the OS. If the NIC is capable, this will use +// hardware acceleration to reduce CPU usage. Support for USO was introduced in Windows 10 20H1. +bool send_batch(batched_send_info_t &send_info) { + WSAMSG msg; + + // Convert the target address into a SOCKADDR + SOCKADDR_IN saddr_v4; + SOCKADDR_IN6 saddr_v6; + if(send_info.target_address.is_v6()) { + saddr_v6 = to_sockaddr(send_info.target_address.to_v6(), send_info.target_port); + + msg.name = (PSOCKADDR)&saddr_v6; + msg.namelen = sizeof(saddr_v6); + } + else { + saddr_v4 = to_sockaddr(send_info.target_address.to_v4(), send_info.target_port); + + msg.name = (PSOCKADDR)&saddr_v4; + msg.namelen = sizeof(saddr_v4); + } + + WSABUF buf; + buf.buf = (char *)send_info.buffer; + buf.len = send_info.block_size * send_info.block_count; + + msg.lpBuffers = &buf; + msg.dwBufferCount = 1; + msg.dwFlags = 0; + + char cmbuf[WSA_CMSG_SPACE(sizeof(DWORD))]; + msg.Control.buf = cmbuf; + msg.Control.len = 0; + + if(send_info.block_count > 1) { + msg.Control.len += WSA_CMSG_SPACE(sizeof(DWORD)); + + auto cm = WSA_CMSG_FIRSTHDR(&msg); + cm->cmsg_level = IPPROTO_UDP; + cm->cmsg_type = UDP_SEND_MSG_SIZE; + cm->cmsg_len = WSA_CMSG_LEN(sizeof(DWORD)); + *((DWORD *)WSA_CMSG_DATA(cm)) = send_info.block_size; + } + + // If USO is not supported, this will fail and the caller will fall back to unbatched sends. + DWORD bytes_sent; + return WSASendMsg((SOCKET)send_info.native_socket, &msg, 1, &bytes_sent, nullptr, nullptr) != SOCKET_ERROR; +} + +class qos_t : public deinit_t { +public: + qos_t(QOS_FLOWID flow_id) : flow_id(flow_id) {} + + virtual ~qos_t() { + if(!fn_QOSRemoveSocketFromFlow(qos_handle, (SOCKET)NULL, flow_id, 0)) { + auto winerr = GetLastError(); + BOOST_LOG(warning) << "QOSRemoveSocketFromFlow() failed: "sv << winerr; + } + } + +private: + QOS_FLOWID flow_id; +}; + +std::unique_ptr enable_socket_qos(uintptr_t native_socket, boost::asio::ip::address &address, uint16_t port, qos_data_type_e data_type) { + SOCKADDR_IN saddr_v4; + SOCKADDR_IN6 saddr_v6; + PSOCKADDR dest_addr; + + static std::once_flag load_qwave_once_flag; + std::call_once(load_qwave_once_flag, []() { + // qWAVE is not installed by default on Windows Server, so we load it dynamically + HMODULE qwave = LoadLibraryExA("qwave.dll", NULL, LOAD_LIBRARY_SEARCH_SYSTEM32); + if(!qwave) { + BOOST_LOG(debug) << "qwave.dll is not available on this OS"sv; + return; + } + + fn_QOSCreateHandle = (decltype(fn_QOSCreateHandle))GetProcAddress(qwave, "QOSCreateHandle"); + fn_QOSAddSocketToFlow = (decltype(fn_QOSAddSocketToFlow))GetProcAddress(qwave, "QOSAddSocketToFlow"); + fn_QOSRemoveSocketFromFlow = (decltype(fn_QOSRemoveSocketFromFlow))GetProcAddress(qwave, "QOSRemoveSocketFromFlow"); + + if(!fn_QOSCreateHandle || !fn_QOSAddSocketToFlow || !fn_QOSRemoveSocketFromFlow) { + BOOST_LOG(error) << "qwave.dll is missing exports?"sv; + + fn_QOSCreateHandle = nullptr; + fn_QOSAddSocketToFlow = nullptr; + fn_QOSRemoveSocketFromFlow = nullptr; + + FreeLibrary(qwave); + return; + } + + QOS_VERSION qos_version { 1, 0 }; + if(!fn_QOSCreateHandle(&qos_version, &qos_handle)) { + auto winerr = GetLastError(); + BOOST_LOG(warning) << "QOSCreateHandle() failed: "sv << winerr; + return; + } + }); + + // If qWAVE is unavailable, just return + if(!fn_QOSAddSocketToFlow || !qos_handle) { + return nullptr; + } + + if(address.is_v6()) { + saddr_v6 = to_sockaddr(address.to_v6(), port); + dest_addr = (PSOCKADDR)&saddr_v6; + } + else { + saddr_v4 = to_sockaddr(address.to_v4(), port); + dest_addr = (PSOCKADDR)&saddr_v4; + } + + QOS_TRAFFIC_TYPE traffic_type; + switch(data_type) { + case qos_data_type_e::audio: + traffic_type = QOSTrafficTypeVoice; + break; + case qos_data_type_e::video: + traffic_type = QOSTrafficTypeAudioVideo; + break; + default: + BOOST_LOG(error) << "Unknown traffic type: "sv << (int)data_type; + return nullptr; + } + + QOS_FLOWID flow_id = 0; + if(!fn_QOSAddSocketToFlow(qos_handle, (SOCKET)native_socket, dest_addr, traffic_type, QOS_NON_ADAPTIVE_FLOW, &flow_id)) { + auto winerr = GetLastError(); + BOOST_LOG(warning) << "QOSAddSocketToFlow() failed: "sv << winerr; + return nullptr; + } + + return std::make_unique(flow_id); +} + } // namespace platf \ No newline at end of file diff --git a/src/rtsp.cpp b/src/rtsp.cpp index 9f2a7bbd..791d6888 100644 --- a/src/rtsp.cpp +++ b/src/rtsp.cpp @@ -613,6 +613,8 @@ void cmd_announce(rtsp_server_t *server, tcp::socket &sock, msg_t &&req) { args.try_emplace("x-nv-general.useReliableUdp"sv, "1"sv); args.try_emplace("x-nv-vqos[0].fec.minRequiredFecPackets"sv, "0"sv); args.try_emplace("x-nv-general.featureFlags"sv, "135"sv); + args.try_emplace("x-nv-vqos[0].qosTrafficType"sv, "5"sv); + args.try_emplace("x-nv-aqos.qosTrafficType"sv, "4"sv); config_t config; @@ -629,6 +631,8 @@ void cmd_announce(rtsp_server_t *server, tcp::socket &sock, msg_t &&req) { config.packetsize = util::from_view(args.at("x-nv-video[0].packetSize"sv)); config.minRequiredFecPackets = util::from_view(args.at("x-nv-vqos[0].fec.minRequiredFecPackets"sv)); config.featureFlags = util::from_view(args.at("x-nv-general.featureFlags"sv)); + config.audioQosType = util::from_view(args.at("x-nv-aqos.qosTrafficType"sv)); + config.videoQosType = util::from_view(args.at("x-nv-vqos[0].qosTrafficType"sv)); config.monitor.height = util::from_view(args.at("x-nv-video[0].clientViewportHt"sv)); config.monitor.width = util::from_view(args.at("x-nv-video[0].clientViewportWd"sv)); diff --git a/src/stream.cpp b/src/stream.cpp index 78667aa1..67ad8a07 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -287,6 +287,7 @@ struct session_t { int lowseq; udp::endpoint peer; safe::mail_raw_t::event_t idr_events; + std::unique_ptr qos; } video; struct { @@ -302,6 +303,7 @@ struct session_t { util::buffer_t shards_p; audio_fec_packet_t fec_packet; + std::unique_ptr qos; } audio; struct { @@ -762,7 +764,10 @@ void controlBroadcastThread(control_server_t *server) { if(session->state.load(std::memory_order_acquire) == session::state_e::STOPPING) { pos = server->_map_addr_session->erase(pos); - enet_peer_disconnect_now(session->control.peer, 0); + if(session->control.peer) { + enet_peer_disconnect_now(session->control.peer, 0); + } + session->controlEnd.raise(true); continue; } @@ -1036,8 +1041,25 @@ void videoBroadcastThread(udp::socket &sock) { inspect->packet.multiFecBlocks = (blockIndex << 4) | lastBlockIndex; inspect->packet.frameIndex = av_packet->pts; + } - sock.send_to(asio::buffer(shards[x]), session->video.peer); + auto peer_address = session->video.peer.address(); + auto batch_info = platf::batched_send_info_t { + shards.shards.begin(), + shards.blocksize, + shards.nr_shards, + (uintptr_t)sock.native_handle(), + peer_address, + session->video.peer.port(), + }; + + // Use a batched send if it's supported on this platform + if(!platf::send_batch(batch_info)) { + // Batched send is not available, so send each packet individually + BOOST_LOG(verbose) << "Falling back to unbatched send"sv; + for(auto x = 0; x < shards.size(); ++x) { + sock.send_to(asio::buffer(shards[x]), session->video.peer); + } } if(av_packet->flags & AV_PKT_FLAG_KEY) { @@ -1318,6 +1340,13 @@ void videoThread(session_t *session) { return; } + // Enable QoS tagging on video traffic if requested by the client + if(session->config.videoQosType) { + auto address = session->video.peer.address(); + session->video.qos = std::move(platf::enable_socket_qos(ref->video_sock.native_handle(), address, + session->video.peer.port(), platf::qos_data_type_e::video)); + } + BOOST_LOG(debug) << "Start capturing Video"sv; video::capture(session->mail, session->config.monitor, session); } @@ -1335,6 +1364,13 @@ void audioThread(session_t *session) { return; } + // Enable QoS tagging on audio traffic if requested by the client + if(session->config.audioQosType) { + auto address = session->audio.peer.address(); + session->audio.qos = std::move(platf::enable_socket_qos(ref->audio_sock.native_handle(), address, + session->audio.peer.port(), platf::qos_data_type_e::audio)); + } + BOOST_LOG(debug) << "Start capturing Audio"sv; audio::capture(session->mail, session->config.audio, session); } diff --git a/src/stream.h b/src/stream.h index 8e054aa9..02b30413 100644 --- a/src/stream.h +++ b/src/stream.h @@ -23,6 +23,8 @@ struct config_t { int minRequiredFecPackets; int featureFlags; int controlProtocolType; + int audioQosType; + int videoQosType; std::optional gcmap; };