Implement S/G IO for non-batched sends and eliminate more data copies (#2867)

This commit is contained in:
Cameron Gutman
2024-07-17 21:34:56 -05:00
committed by GitHub
parent b93756a804
commit 81c6e61594
5 changed files with 76 additions and 63 deletions

View File

@@ -136,12 +136,7 @@ namespace stream {
std::uint8_t tag[16];
};
struct audio_packet_raw_t {
uint8_t *
payload() {
return (uint8_t *) (this + 1);
}
struct audio_packet_t {
RTP_PACKET rtp;
};
@@ -219,12 +214,7 @@ namespace stream {
// encrypted control_header_v2 and payload data follow
} *control_encrypted_p;
struct audio_fec_packet_raw_t {
uint8_t *
payload() {
return (uint8_t *) (this + 1);
}
struct audio_fec_packet_t {
RTP_PACKET rtp;
AUDIO_FEC_HEADER fecHeader;
};
@@ -238,8 +228,6 @@ namespace stream {
constexpr std::size_t MAX_AUDIO_PACKET_SIZE = 1400;
using video_packet_t = util::c_ptr<video_packet_raw_t>;
using audio_packet_t = util::c_ptr<audio_packet_raw_t>;
using audio_fec_packet_t = util::c_ptr<audio_fec_packet_raw_t>;
using audio_aes_t = std::array<char, round_to_pkcs7_padded(MAX_AUDIO_PACKET_SIZE)>;
using av_session_id_t = std::variant<asio::ip::address, std::string>; // IP address or SS-Ping-Payload from RTSP handshake
@@ -249,14 +237,14 @@ namespace stream {
// return bytes written on success
// return -1 on error
static inline int
encode_audio(bool encrypted, const audio::buffer_t &plaintext, audio_packet_t &destination, crypto::aes_t &iv, crypto::cipher::cbc_t &cbc) {
encode_audio(bool encrypted, const audio::buffer_t &plaintext, uint8_t *destination, crypto::aes_t &iv, crypto::cipher::cbc_t &cbc) {
// If encryption isn't enabled
if (!encrypted) {
std::copy(std::begin(plaintext), std::end(plaintext), destination->payload());
std::copy(std::begin(plaintext), std::end(plaintext), destination);
return plaintext.size();
}
return cbc.encrypt(std::string_view { (char *) std::begin(plaintext), plaintext.size() }, destination->payload(), &iv);
return cbc.encrypt(std::string_view { (char *) std::begin(plaintext), plaintext.size() }, destination, &iv);
}
static inline void
@@ -755,6 +743,7 @@ namespace stream {
std::vector<uint8_t>
replace(const std::string_view &original, const std::string_view &old, const std::string_view &_new) {
std::vector<uint8_t> replaced;
replaced.reserve(original.size() + _new.size() - old.size());
auto begin = std::begin(original);
auto end = std::end(original);
@@ -1531,6 +1520,8 @@ namespace stream {
BOOST_LOG(verbose) << "Falling back to unbatched send"sv;
for (auto y = 0; y < current_batch_size; y++) {
auto send_info = platf::send_info_t {
nullptr,
0,
shards.prefix(next_shard_to_send + y),
shards.prefixsize + shards.blocksize,
(uintptr_t) sock.native_handle(),
@@ -1584,9 +1575,7 @@ namespace stream {
auto shutdown_event = mail::man->event<bool>(mail::broadcast_shutdown);
auto packets = mail::man->queue<audio::packet_t>(mail::audio_packets);
constexpr auto max_block_size = crypto::cipher::round_to_pkcs7_padded(2048);
audio_packet_t audio_packet { (audio_packet_raw_t *) malloc(sizeof(audio_packet_raw_t) + max_block_size) };
audio_packet_t audio_packet;
fec::rs_t rs { reed_solomon_new(RTPA_DATA_SHARDS, RTPA_FEC_SHARDS) };
crypto::aes_t iv(16);
@@ -1598,9 +1587,9 @@ namespace stream {
const unsigned char parity[] = { 0x77, 0x40, 0x38, 0x0e, 0xc7, 0xa7, 0x0d, 0x6c };
memcpy(rs.get()->p, parity, sizeof(parity));
audio_packet->rtp.header = 0x80;
audio_packet->rtp.packetType = 97;
audio_packet->rtp.ssrc = 0;
audio_packet.rtp.header = 0x80;
audio_packet.rtp.packetType = 97;
audio_packet.rtp.ssrc = 0;
// Audio traffic is sent on this thread
platf::adjust_thread_priority(platf::thread_priority_e::high);
@@ -1618,26 +1607,28 @@ namespace stream {
*(std::uint32_t *) iv.data() = util::endian::big<std::uint32_t>(session->audio.avRiKeyId + sequenceNumber);
auto bytes = encode_audio(session->config.encryptionFlagsEnabled & SS_ENC_AUDIO, packet_data, audio_packet, iv, session->audio.cipher);
auto &shards_p = session->audio.shards_p;
auto bytes = encode_audio(session->config.encryptionFlagsEnabled & SS_ENC_AUDIO, packet_data,
shards_p[sequenceNumber % RTPA_DATA_SHARDS], iv, session->audio.cipher);
if (bytes < 0) {
BOOST_LOG(error) << "Couldn't encode audio packet"sv;
break;
}
audio_packet->rtp.sequenceNumber = util::endian::big(sequenceNumber);
audio_packet->rtp.timestamp = util::endian::big(timestamp);
audio_packet.rtp.sequenceNumber = util::endian::big(sequenceNumber);
audio_packet.rtp.timestamp = util::endian::big(timestamp);
session->audio.sequenceNumber++;
session->audio.timestamp += session->config.audio.packetDuration;
auto &shards_p = session->audio.shards_p;
std::copy_n(audio_packet->payload(), bytes, shards_p[sequenceNumber % RTPA_DATA_SHARDS]);
auto peer_address = session->audio.peer.address();
try {
auto send_info = platf::send_info_t {
(const char *) audio_packet.get(),
sizeof(audio_packet_raw_t) + bytes,
(const char *) &audio_packet,
sizeof(audio_packet),
(const char *) shards_p[sequenceNumber % RTPA_DATA_SHARDS],
(size_t) bytes,
(uintptr_t) sock.native_handle(),
peer_address,
session->audio.peer.port(),
@@ -1649,8 +1640,8 @@ namespace stream {
auto &fec_packet = session->audio.fec_packet;
// initialize the FEC header at the beginning of the FEC block
if (sequenceNumber % RTPA_DATA_SHARDS == 0) {
fec_packet->fecHeader.baseSequenceNumber = util::endian::big(sequenceNumber);
fec_packet->fecHeader.baseTimestamp = util::endian::big(timestamp);
fec_packet.fecHeader.baseSequenceNumber = util::endian::big(sequenceNumber);
fec_packet.fecHeader.baseTimestamp = util::endian::big(timestamp);
}
// generate parity shards at the end of the FEC block
@@ -1658,13 +1649,14 @@ namespace stream {
reed_solomon_encode(rs.get(), shards_p.begin(), RTPA_TOTAL_SHARDS, bytes);
for (auto x = 0; x < RTPA_FEC_SHARDS; ++x) {
fec_packet->rtp.sequenceNumber = util::endian::big<std::uint16_t>(sequenceNumber + x + 1);
fec_packet->fecHeader.fecShardIndex = x;
memcpy(fec_packet->payload(), shards_p[RTPA_DATA_SHARDS + x], bytes);
fec_packet.rtp.sequenceNumber = util::endian::big<std::uint16_t>(sequenceNumber + x + 1);
fec_packet.fecHeader.fecShardIndex = x;
auto send_info = platf::send_info_t {
(const char *) fec_packet.get(),
sizeof(audio_fec_packet_raw_t) + bytes,
(const char *) &fec_packet,
sizeof(fec_packet),
(const char *) shards_p[RTPA_DATA_SHARDS + x],
(size_t) bytes,
(uintptr_t) sock.native_handle(),
peer_address,
session->audio.peer.port(),
@@ -2030,15 +2022,13 @@ namespace stream {
session->audio.shards = std::move(shards);
session->audio.shards_p = std::move(shards_p);
session->audio.fec_packet.reset((audio_fec_packet_raw_t *) malloc(sizeof(audio_fec_packet_raw_t) + max_block_size));
session->audio.fec_packet.rtp.header = 0x80;
session->audio.fec_packet.rtp.packetType = 127;
session->audio.fec_packet.rtp.timestamp = 0;
session->audio.fec_packet.rtp.ssrc = 0;
session->audio.fec_packet->rtp.header = 0x80;
session->audio.fec_packet->rtp.packetType = 127;
session->audio.fec_packet->rtp.timestamp = 0;
session->audio.fec_packet->rtp.ssrc = 0;
session->audio.fec_packet->fecHeader.payloadType = 97;
session->audio.fec_packet->fecHeader.ssrc = 0;
session->audio.fec_packet.fecHeader.payloadType = 97;
session->audio.fec_packet.fecHeader.ssrc = 0;
session->audio.cipher = crypto::cipher::cbc_t {
launch_session.gcm_key, true