Skip to content

Commit

Permalink
Functionality for removals after specified offset [part 1]
Browse files Browse the repository at this point in the history
Summary: When we send a reliable reset, we don't want to reset the entire `writeBuffer`, `retransmissionBuffer`, `lossBuffer`, and `pendingWrites`. We only want to reset them up to the `reliableSize`. I'm making helper functions to do this.

Reviewed By: jbeshay

Differential Revision: D64907982

fbshipit-source-id: ad9256638f5e0cb5f7c038e2c9d12253a7776b2d
  • Loading branch information
Aman Sharma authored and facebook-github-bot committed Nov 20, 2024
1 parent 4ee2544 commit fb2c430
Show file tree
Hide file tree
Showing 3 changed files with 351 additions and 0 deletions.
56 changes: 56 additions & 0 deletions quic/state/StreamData.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#pragma once

#include <folly/container/F14Map.h>
#include <folly/container/F14Set.h>
#include <quic/QuicConstants.h>
#include <quic/codec/Types.h>
#include <quic/common/SmallCollections.h>
Expand Down Expand Up @@ -266,6 +267,61 @@ struct QuicStreamLike {
}
}
}

void removeFromLossBufAfterOffset(uint64_t removalOffset) {
while (!lossBuffer.empty()) {
auto& lastElement = lossBuffer.back();
if (lastElement.offset > removalOffset) {
lossBuffer.pop_back();
} else if (
lastElement.offset + lastElement.data.chainLength() > removalOffset) {
lastElement.data = lastElement.data.splitAtMost(
size_t(removalOffset - lastElement.offset + 1));
return;
} else {
return;
}
}
}

void removeFromRetransmissionBufAfterOffset(uint64_t removalOffset) {
folly::F14FastSet<uint64_t> offsetsToRemove;

for (auto& [offset, buf] : retransmissionBuffer) {
if (offset > removalOffset) {
offsetsToRemove.insert(offset);
} else if (offset + buf->data.chainLength() > removalOffset) {
buf->data = buf->data.splitAtMost(size_t(removalOffset - offset + 1));
}
}

for (auto offset : offsetsToRemove) {
retransmissionBuffer.erase(offset);
}
}

void removeFromWriteBufAfterOffset(uint64_t removalOffset) {
if (writeBuffer.empty() ||
writeBufferStartOffset + writeBuffer.chainLength() - 1 <=
removalOffset) {
return;
}

removalOffset = std::max(removalOffset, writeBufferStartOffset - 1);
writeBuffer = writeBuffer.splitAtMost(
size_t(removalOffset - writeBufferStartOffset + 1));
}

void removeFromPendingWritesAfterOffset(uint64_t removalOffset) {
if (pendingWrites.empty() ||
currentWriteOffset + pendingWrites.chainLength() - 1 <= removalOffset) {
return;
}

removalOffset = std::max(removalOffset, currentWriteOffset - 1);
pendingWrites = pendingWrites.splitAtMost(
size_t(removalOffset - currentWriteOffset + 1));
}
};

struct QuicConnectionStateBase;
Expand Down
11 changes: 11 additions & 0 deletions quic/state/test/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,14 @@ cpp_benchmark(
"//quic/state:quic_priority_queue",
],
)

cpp_unittest(
name = "stream_data_test",
srcs = [
"StreamDataTest.cpp",
],
deps = [
"fbsource//third-party/googletest:gmock",
"//quic/state:quic_state_machine",
],
)
284 changes: 284 additions & 0 deletions quic/state/test/StreamDataTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <quic/state/StreamData.h>

using namespace quic;
using namespace testing;

namespace quic::test {

Buf createBuffer(uint32_t len) {
auto buf = folly::IOBuf::create(len);
buf->append(len);
return buf;
}

std::unique_ptr<WriteStreamBuffer>
createWriteStreamBuffer(uint32_t offset, Buf& buf, bool eof) {
ChainedByteRangeHead cbrh(buf);
return std::make_unique<WriteStreamBuffer>(std::move(cbrh), offset, eof);
}

void addDataToBufQueue(BufQueue& bufQueue, uint32_t len) {
auto buf = folly::IOBuf::create(len);
buf->append(len);
bufQueue.append(std::move(buf));
}

TEST(StreamDataTest, LossBufferRemovalAll) {
QuicStreamLike state;
// [1, 2] [5, 12] [17, 19]
auto buf1 = createBuffer(2);
auto buf2 = createBuffer(8);
auto buf3 = createBuffer(3);
state.insertIntoLossBuffer(createWriteStreamBuffer(1, buf1, false));
state.insertIntoLossBuffer(createWriteStreamBuffer(5, buf2, false));
state.insertIntoLossBuffer(createWriteStreamBuffer(17, buf3, false));

state.removeFromLossBufAfterOffset(0);
EXPECT_EQ(state.lossBuffer.size(), 0);
}

TEST(StreamDataTest, LossBufferRemovalExactMatch) {
QuicStreamLike state;
// [1, 2] [5, 12] [17, 19]
auto buf1 = createBuffer(2);
auto buf2 = createBuffer(8);
auto buf3 = createBuffer(3);
state.insertIntoLossBuffer(createWriteStreamBuffer(1, buf1, false));
state.insertIntoLossBuffer(createWriteStreamBuffer(5, buf2, false));
state.insertIntoLossBuffer(createWriteStreamBuffer(17, buf3, false));

state.removeFromLossBufAfterOffset(4);
EXPECT_EQ(state.lossBuffer.size(), 1);
EXPECT_EQ(state.lossBuffer[0].offset, 1);
EXPECT_EQ(state.lossBuffer[0].data.chainLength(), 2);
}

TEST(StreamDataTest, LossBufferRemovalPartialMatch) {
QuicStreamLike state;
// [1, 2] [5, 12] [17, 19]
auto buf1 = createBuffer(2);
auto buf2 = createBuffer(8);
auto buf3 = createBuffer(3);
state.insertIntoLossBuffer(createWriteStreamBuffer(1, buf1, false));
state.insertIntoLossBuffer(createWriteStreamBuffer(5, buf2, false));
state.insertIntoLossBuffer(createWriteStreamBuffer(17, buf3, false));

state.removeFromLossBufAfterOffset(5);
EXPECT_EQ(state.lossBuffer.size(), 2);

EXPECT_EQ(state.lossBuffer[0].offset, 1);
EXPECT_EQ(state.lossBuffer[0].data.chainLength(), 2);

EXPECT_EQ(state.lossBuffer[1].offset, 5);
EXPECT_EQ(state.lossBuffer[1].data.chainLength(), 1);
}

TEST(StreamDataTest, LossBufferRemovalNoMatch) {
QuicStreamLike state;
// [1, 2] [5, 12] [17, 19]
auto buf1 = createBuffer(2);
auto buf2 = createBuffer(8);
auto buf3 = createBuffer(3);
state.insertIntoLossBuffer(createWriteStreamBuffer(1, buf1, false));
state.insertIntoLossBuffer(createWriteStreamBuffer(5, buf2, false));
state.insertIntoLossBuffer(createWriteStreamBuffer(17, buf3, false));

state.removeFromLossBufAfterOffset(20);
EXPECT_EQ(state.lossBuffer.size(), 3);

EXPECT_EQ(state.lossBuffer[0].offset, 1);
EXPECT_EQ(state.lossBuffer[0].data.chainLength(), 2);

EXPECT_EQ(state.lossBuffer[1].offset, 5);
EXPECT_EQ(state.lossBuffer[1].data.chainLength(), 8);

EXPECT_EQ(state.lossBuffer[2].offset, 17);
EXPECT_EQ(state.lossBuffer[2].data.chainLength(), 3);
}

TEST(StreamDataTest, RetxBufferRemovalAll) {
QuicStreamLike state;
// [1, 2] [5, 12] [17, 19]
auto buf1 = createBuffer(2);
auto buf2 = createBuffer(8);
auto buf3 = createBuffer(3);
state.retransmissionBuffer.emplace(
1, createWriteStreamBuffer(1, buf1, false));
state.retransmissionBuffer.emplace(
5, createWriteStreamBuffer(5, buf2, false));
state.retransmissionBuffer.emplace(
17, createWriteStreamBuffer(17, buf3, false));

state.removeFromRetransmissionBufAfterOffset(0);
EXPECT_EQ(state.retransmissionBuffer.size(), 0);
}

TEST(StreamDataTest, RetxBufferRemovalExactMatch) {
QuicStreamLike state;
// [1, 2] [5, 12] [17, 19]
auto buf1 = createBuffer(2);
auto buf2 = createBuffer(8);
auto buf3 = createBuffer(3);
state.retransmissionBuffer.emplace(
1, createWriteStreamBuffer(1, buf1, false));
state.retransmissionBuffer.emplace(
5, createWriteStreamBuffer(5, buf2, false));
state.retransmissionBuffer.emplace(
17, createWriteStreamBuffer(17, buf3, false));

state.removeFromRetransmissionBufAfterOffset(16);
EXPECT_EQ(state.retransmissionBuffer.size(), 2);

EXPECT_EQ(state.retransmissionBuffer[1]->offset, 1);
EXPECT_EQ(state.retransmissionBuffer[1]->data.chainLength(), 2);

EXPECT_EQ(state.retransmissionBuffer[5]->offset, 5);
EXPECT_EQ(state.retransmissionBuffer[5]->data.chainLength(), 8);
}

TEST(StreamDataTest, RetxBufferRemovalPartialMatch) {
QuicStreamLike state;
// [1, 2] [5, 12] [17, 19]
auto buf1 = createBuffer(2);
auto buf2 = createBuffer(8);
auto buf3 = createBuffer(3);
state.retransmissionBuffer.emplace(
1, createWriteStreamBuffer(1, buf1, false));
state.retransmissionBuffer.emplace(
5, createWriteStreamBuffer(5, buf2, false));
state.retransmissionBuffer.emplace(
17, createWriteStreamBuffer(17, buf3, false));

state.removeFromRetransmissionBufAfterOffset(5);
EXPECT_EQ(state.retransmissionBuffer.size(), 2);

EXPECT_EQ(state.retransmissionBuffer[1]->offset, 1);
EXPECT_EQ(state.retransmissionBuffer[1]->data.chainLength(), 2);

EXPECT_EQ(state.retransmissionBuffer[5]->offset, 5);
EXPECT_EQ(state.retransmissionBuffer[5]->data.chainLength(), 1);
}

TEST(StreamDataTest, RetxBufferRemovalNoMatch) {
QuicStreamLike state;
// [1, 2] [5, 12] [17, 19]
auto buf1 = createBuffer(2);
auto buf2 = createBuffer(8);
auto buf3 = createBuffer(3);
state.retransmissionBuffer.emplace(
1, createWriteStreamBuffer(1, buf1, false));
state.retransmissionBuffer.emplace(
5, createWriteStreamBuffer(5, buf2, false));
state.retransmissionBuffer.emplace(
17, createWriteStreamBuffer(17, buf3, false));

state.removeFromRetransmissionBufAfterOffset(19);
EXPECT_EQ(state.retransmissionBuffer.size(), 3);

EXPECT_EQ(state.retransmissionBuffer[1]->offset, 1);
EXPECT_EQ(state.retransmissionBuffer[1]->data.chainLength(), 2);

EXPECT_EQ(state.retransmissionBuffer[5]->offset, 5);
EXPECT_EQ(state.retransmissionBuffer[5]->data.chainLength(), 8);

EXPECT_EQ(state.retransmissionBuffer[17]->offset, 17);
EXPECT_EQ(state.retransmissionBuffer[17]->data.chainLength(), 3);
}

TEST(StreamDataTest, WriteBufferRemovalAll) {
QuicStreamLike state;
state.writeBufferStartOffset = 5;

// [5, 16]
addDataToBufQueue(state.writeBuffer, 3);
addDataToBufQueue(state.writeBuffer, 2);
addDataToBufQueue(state.writeBuffer, 7);

state.removeFromWriteBufAfterOffset(0);
EXPECT_EQ(state.writeBuffer.chainLength(), 0);
}

TEST(StreamDataTest, WriteBufferRemoval) {
QuicStreamLike state;
state.writeBufferStartOffset = 5;

// [5, 16]
addDataToBufQueue(state.writeBuffer, 3);
addDataToBufQueue(state.writeBuffer, 2);
addDataToBufQueue(state.writeBuffer, 7);

state.removeFromWriteBufAfterOffset(5);
EXPECT_EQ(state.writeBuffer.chainLength(), 1);
}

TEST(StreamDataTest, WriteBufferRemovalNoChange) {
QuicStreamLike state;
state.writeBufferStartOffset = 5;

// [5, 16]
addDataToBufQueue(state.writeBuffer, 3);
addDataToBufQueue(state.writeBuffer, 2);
addDataToBufQueue(state.writeBuffer, 7);

state.removeFromWriteBufAfterOffset(16);
EXPECT_EQ(state.writeBuffer.chainLength(), 12);
}

TEST(StreamDataTest, PendingWritesRemovalAll) {
QuicStreamLike state;
state.currentWriteOffset = 5;

// [5, 12]
Buf buf1 = folly::IOBuf::create(3);
buf1->append(3);
Buf buf2 = folly::IOBuf::create(5);
buf2->append(5);
buf1->appendChain(std::move(buf2));

state.pendingWrites = ChainedByteRangeHead(buf1);
state.removeFromPendingWritesAfterOffset(0);
EXPECT_EQ(state.pendingWrites.chainLength(), 0);
}

TEST(StreamDataTest, PendingWritesRemoval) {
QuicStreamLike state;
state.currentWriteOffset = 5;

// [5, 12]
Buf buf1 = folly::IOBuf::create(3);
buf1->append(3);
Buf buf2 = folly::IOBuf::create(5);
buf2->append(5);
buf1->appendChain(std::move(buf2));

state.pendingWrites = ChainedByteRangeHead(buf1);
state.removeFromPendingWritesAfterOffset(11);
EXPECT_EQ(state.pendingWrites.chainLength(), 7);
}

TEST(StreamDataTest, PendingWritesRemovalNoChange) {
QuicStreamLike state;
state.currentWriteOffset = 5;

// [5, 12]
Buf buf1 = folly::IOBuf::create(3);
buf1->append(3);
Buf buf2 = folly::IOBuf::create(5);
buf2->append(5);
buf1->appendChain(std::move(buf2));

state.pendingWrites = ChainedByteRangeHead(buf1);
state.removeFromPendingWritesAfterOffset(12);
EXPECT_EQ(state.pendingWrites.chainLength(), 8);
}

} // namespace quic::test

0 comments on commit fb2c430

Please sign in to comment.