From 4bcf29661ce02b6b6906e3dce3674e83bffe6160 Mon Sep 17 00:00:00 2001 From: Gallardo994 Date: Tue, 30 Jan 2024 03:34:35 +0400 Subject: [PATCH] Remove all dependencies from receiver, delete receiver_buffer --- imkcpp/include/imkcpp.hpp | 20 ++++---- imkcpp/include/receiver.hpp | 54 ++++++++++++++------ imkcpp/include/receiver_buffer.hpp | 54 -------------------- imkcpp/include/segment_tracker.hpp | 24 --------- imkcpp/include/sender.hpp | 11 ++-- tests/CMakeLists.txt | 1 - tests/ReceiverBuffer_Tests.cpp | 81 ------------------------------ 7 files changed, 54 insertions(+), 191 deletions(-) delete mode 100644 imkcpp/include/receiver_buffer.hpp delete mode 100644 tests/ReceiverBuffer_Tests.cpp diff --git a/imkcpp/include/imkcpp.hpp b/imkcpp/include/imkcpp.hpp index 3aca1f7..e0ba286 100644 --- a/imkcpp/include/imkcpp.hpp +++ b/imkcpp/include/imkcpp.hpp @@ -37,12 +37,10 @@ namespace imkcpp { SegmentTracker segment_tracker{}; RtoCalculator rto_calculator{}; CongestionController congestion_controller{}; - ReceiverBuffer receiver_buffer{}; - SenderBuffer sender_buffer{}; WindowProber window_prober{}; + Receiver receiver{}; - Receiver receiver{segment_tracker, receiver_buffer}; - + SenderBuffer sender_buffer{}; AckController ack_controller{sender_buffer, segment_tracker}; Sender sender{shared_ctx, congestion_controller, rto_calculator, flusher, sender_buffer, segment_tracker}; @@ -58,7 +56,7 @@ namespace imkcpp { header.cmd = commands::IKCP_CMD_ACK; header.frg = 0; header.wnd = unused_receive_window; - header.una = this->segment_tracker.get_rcv_nxt(); + header.una = this->receiver.get_rcv_nxt(); header.sn = 0; header.ts = 0; header.len = 0; @@ -109,7 +107,7 @@ namespace imkcpp { this->congestion_controller.set_receive_window(rcvwnd); this->ack_controller.reserve(rcvwnd); - this->receiver_buffer.set_queue_limit(rcvwnd); + this->receiver.set_queue_limit(rcvwnd); } /// Enables or disables congestion window. @@ -167,19 +165,18 @@ namespace imkcpp { switch (header.cmd) { case commands::IKCP_CMD_PUSH: { - if (!this->congestion_controller.fits_receive_window(this->segment_tracker.get_rcv_nxt(), header.sn)) { + if (!this->congestion_controller.fits_receive_window(this->receiver.get_rcv_nxt(), header.sn)) { drop_push(); break; } this->ack_controller.schedule_ack(header.sn, header.ts); - if (this->segment_tracker.should_receive(header.sn)) { + if (this->receiver.should_receive(header.sn)) { SegmentData segment_data; segment_data.decode_from(data, offset, header.len); - this->receiver_buffer.emplace_segment(header, segment_data); - this->receiver.move_receive_buffer_to_queue(); + this->receiver.emplace_segment(header, segment_data); } else { drop_push(); } @@ -357,7 +354,8 @@ namespace imkcpp { flush_probes(); // Useful data - this->sender.flush_data_segments(flush_result, callback, current, unused_receive_window); + const u32 rcv_nxt = this->receiver.get_rcv_nxt(); + this->sender.flush_data_segments(flush_result, callback, current, unused_receive_window, rcv_nxt); // Flush remaining flush_result.total_bytes_sent += this->flusher.flush_if_not_empty(callback); diff --git a/imkcpp/include/receiver.hpp b/imkcpp/include/receiver.hpp index f85da90..e354c41 100644 --- a/imkcpp/include/receiver.hpp +++ b/imkcpp/include/receiver.hpp @@ -6,23 +6,20 @@ #include "types.hpp" #include "errors.hpp" #include "segment.hpp" -#include "receiver_buffer.hpp" #include "results.hpp" -#include "segment_tracker.hpp" namespace imkcpp { // TODO: Benchmark against std::vector instead of std::deque class Receiver final { - SegmentTracker& segment_tracker; - ReceiverBuffer& receiver_buffer; + std::deque rcv_buf{}; std::deque rcv_queue{}; // TODO: Does not need to be Segment as we don't use metadata + u32 queue_limit = 0; - public: + u32 rcv_nxt = 0; - explicit Receiver(SegmentTracker& segment_tracker, ReceiverBuffer& receiver_buffer) : - segment_tracker(segment_tracker), - receiver_buffer(receiver_buffer) {} + public: + explicit Receiver() = default; [[nodiscard]] tl::expected peek_size() const { if (this->rcv_queue.empty()) { @@ -95,24 +92,51 @@ namespace imkcpp { return result; } - void move_receive_buffer_to_queue() { - const u32 queue_limit = this->receiver_buffer.get_queue_limit(); + void emplace_segment(const SegmentHeader& header, SegmentData& data) { + u32 sn = header.sn; + + const auto rit = std::find_if(this->rcv_buf.rbegin(), this->rcv_buf.rend(), [sn](const Segment& seg) { + return seg.header.sn < sn; + }); + + const auto it = rit.base(); - while (!this->receiver_buffer.empty()) { - Segment& seg = this->receiver_buffer.front(); - if (seg.header.sn != this->segment_tracker.get_rcv_nxt() || this->rcv_queue.size() >= queue_limit) { + if (it != this->rcv_buf.end() && it->header.sn == sn) { + return; + } + + this->rcv_buf.emplace(it, header, data); + this->move_receive_buffer_to_queue(); + } + + void move_receive_buffer_to_queue() { + while (!this->rcv_buf.empty()) { + Segment& seg = this->rcv_buf.front(); + if (seg.header.sn != this->get_rcv_nxt() || this->rcv_queue.size() >= this->queue_limit) { break; } this->rcv_queue.push_back(std::move(seg)); - this->receiver_buffer.pop_front(); - this->segment_tracker.increment_rcv_nxt(); + this->rcv_buf.pop_front(); + this->rcv_nxt++; } } [[nodiscard]] size_t size() const { return this->rcv_queue.size(); } + + [[nodiscard]] u32 get_rcv_nxt() const { + return this->rcv_nxt; + } + + [[nodiscard]] bool should_receive(const u32 sn) const { + return sn >= this->rcv_nxt; + } + + void set_queue_limit(const u32 value) { + this->queue_limit = value; + } }; } \ No newline at end of file diff --git a/imkcpp/include/receiver_buffer.hpp b/imkcpp/include/receiver_buffer.hpp deleted file mode 100644 index a466988..0000000 --- a/imkcpp/include/receiver_buffer.hpp +++ /dev/null @@ -1,54 +0,0 @@ -#pragma once - -#include -#include "types.hpp" -#include "segment.hpp" - -namespace imkcpp { - class ReceiverBuffer final { - - std::deque rcv_buf{}; // TODO: Does not need to be Segment as we don't use metadata - u32 queue_limit = 0; - - public: - void set_queue_limit(const u32 value) { - this->queue_limit = value; - } - - [[nodiscard]] u32 get_queue_limit() const { - return this->queue_limit; - } - - [[nodiscard]] bool empty() const { - return this->rcv_buf.empty(); - } - - [[nodiscard]] Segment& front() { - return this->rcv_buf.front(); - } - - [[nodiscard]] size_t size() const { - return this->rcv_buf.size(); - } - - void pop_front() { - this->rcv_buf.pop_front(); - } - - void emplace_segment(const SegmentHeader& header, SegmentData& data) { - u32 sn = header.sn; - - const auto rit = std::find_if(this->rcv_buf.rbegin(), this->rcv_buf.rend(), [sn](const Segment& seg) { - return seg.header.sn < sn; - }); - - const auto it = rit.base(); - - if (it != this->rcv_buf.end() && it->header.sn == sn) { - return; - } - - this->rcv_buf.emplace(it, header, data); - } - }; -} \ No newline at end of file diff --git a/imkcpp/include/segment_tracker.hpp b/imkcpp/include/segment_tracker.hpp index 46df1e8..0c849d0 100644 --- a/imkcpp/include/segment_tracker.hpp +++ b/imkcpp/include/segment_tracker.hpp @@ -3,7 +3,6 @@ #include "types.hpp" namespace imkcpp { - // TODO: Add logical methods like "can_receive" and "is_in_flight", etc class SegmentTracker final { /// Sequence number of the first unacknowledged segment. u32 snd_una = 0; @@ -11,12 +10,7 @@ namespace imkcpp { /// Sequence number of the next segment to be sent. u32 snd_nxt = 0; - /// Sequence number of the next segment to be received. - u32 rcv_nxt = 0; - public: - // snd_una - [[nodiscard]] u32 get_snd_una() const { return this->snd_una; } @@ -29,8 +23,6 @@ namespace imkcpp { this->snd_una = this->snd_nxt; } - // snd_nxt - [[nodiscard]] u32 get_snd_nxt() const { return this->snd_nxt; } @@ -39,26 +31,10 @@ namespace imkcpp { return this->snd_nxt++; } - // rcv_nxt - - [[nodiscard]] u32 get_rcv_nxt() const { - return this->rcv_nxt; - } - - void increment_rcv_nxt() { - this->rcv_nxt++; - } - - // Other - [[nodiscard]] u32 get_packets_in_flight_count() const { assert(this->snd_nxt >= this->snd_una); return this->snd_nxt - this->snd_una; } - - [[nodiscard]] bool should_receive(const u32 sn) const { - return sn >= this->rcv_nxt; - } }; } diff --git a/imkcpp/include/sender.hpp b/imkcpp/include/sender.hpp index 000ba42..0c27c41 100644 --- a/imkcpp/include/sender.hpp +++ b/imkcpp/include/sender.hpp @@ -13,6 +13,7 @@ #include "utility.hpp" #include "results.hpp" #include "flusher.hpp" +#include "segment_tracker.hpp" namespace imkcpp { template @@ -86,7 +87,7 @@ namespace imkcpp { } /// Flushes data segments from the send queue to the sender buffer. - void move_send_queue_to_buffer(const u32 cwnd, const u32 current, const i32 unused_receive_window) { + void move_send_queue_to_buffer(const u32 cwnd, const u32 current, const i32 unused_receive_window, const u32 rcv_nxt) { while (!this->snd_queue.empty() && this->segment_tracker.get_snd_nxt() < this->segment_tracker.get_snd_una() + cwnd) { Segment& newseg = this->snd_queue.front(); @@ -95,7 +96,7 @@ namespace imkcpp { newseg.header.wnd = unused_receive_window; newseg.header.ts = current; newseg.header.sn = this->segment_tracker.get_and_increment_snd_nxt(); - newseg.header.una = this->segment_tracker.get_rcv_nxt(); + newseg.header.una = rcv_nxt; newseg.metadata.resendts = current; newseg.metadata.rto = this->rto_calculator.get_rto(); @@ -131,9 +132,9 @@ namespace imkcpp { } /// Flushes data segments from the send queue to the output callback. - void flush_data_segments(FlushResult& flush_result, const output_callback_t& output, const u32 current, const i32 unused_receive_window) { + void flush_data_segments(FlushResult& flush_result, const output_callback_t& output, const u32 current, const i32 unused_receive_window, const u32 rcv_nxt) { const u32 cwnd = this->congestion_controller.calculate_congestion_window(); - this->move_send_queue_to_buffer(cwnd, current, unused_receive_window); + this->move_send_queue_to_buffer(cwnd, current, unused_receive_window, rcv_nxt); bool change = false; @@ -181,7 +182,7 @@ namespace imkcpp { const auto send_segment = [&](Segment& segment) -> void { segment.header.ts = current; segment.header.wnd = unused_receive_window; - segment.header.una = this->segment_tracker.get_rcv_nxt(); + segment.header.una = rcv_nxt; flush_result.total_bytes_sent += this->flusher.flush_if_does_not_fit(output, segment.data_size()); this->flusher.emplace(segment.header, segment.data); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 97b56d8..a07ae77 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -12,7 +12,6 @@ add_executable(imkcpp_tests Send_Tests.cpp RtoCalculator_Tests.cpp Flusher_Tests.cpp - ReceiverBuffer_Tests.cpp SenderBuffer_Tests.cpp CongestionController_Tests.cpp ) diff --git a/tests/ReceiverBuffer_Tests.cpp b/tests/ReceiverBuffer_Tests.cpp deleted file mode 100644 index 817b55c..0000000 --- a/tests/ReceiverBuffer_Tests.cpp +++ /dev/null @@ -1,81 +0,0 @@ -#include -#include "imkcpp.hpp" - -class ReceiverBufferTest : public ::testing::Test { -protected: - imkcpp::ReceiverBuffer buffer; -}; - -TEST_F(ReceiverBufferTest, QueueLimit) { - using namespace imkcpp; - - buffer.set_queue_limit(10); - ASSERT_EQ(buffer.get_queue_limit(), 10); - - for (u32 i = 0; i < 20; i++) { - SegmentData data{}; - buffer.emplace_segment(SegmentHeader{ .sn = i }, data); - } - - // ASSERT_EQ(buffer.size(), 10); // TODO: Does receiver have to enforce queue limit? -} - -TEST_F(ReceiverBufferTest, InitiallyEmpty) { - ASSERT_TRUE(buffer.empty()); -} - -TEST_F(ReceiverBufferTest, AddAndCheckFront) { - using namespace imkcpp; - - constexpr SegmentHeader header{ .sn = 1 }; - SegmentData data{}; - buffer.emplace_segment(header, data); - - ASSERT_FALSE(buffer.empty()); - ASSERT_EQ(buffer.front().header.sn, header.sn); -} - -TEST_F(ReceiverBufferTest, PopFront) { - using namespace imkcpp; - - constexpr SegmentHeader header{ .sn = 1 }; - SegmentData data{}; - buffer.emplace_segment(header, data); - - buffer.pop_front(); - ASSERT_TRUE(buffer.empty()); -} - -TEST_F(ReceiverBufferTest, SegmentOrder) { - using namespace imkcpp; - - constexpr SegmentHeader header1 { .sn = 1 }, header2{ .sn = 2 }, header3 { .sn = 3 }; - SegmentData data1{}, data2{}, data3{}; - - buffer.emplace_segment(header3, data3); - buffer.emplace_segment(header1, data1); - buffer.emplace_segment(header2, data2); - - ASSERT_EQ(buffer.front().header.sn, header1.sn); - buffer.pop_front(); - - ASSERT_EQ(buffer.front().header.sn, header2.sn); - buffer.pop_front(); - - ASSERT_EQ(buffer.front().header.sn, header3.sn); - buffer.pop_front(); - - ASSERT_TRUE(buffer.empty()); -} - -TEST_F(ReceiverBufferTest, NoDuplicateSegments) { - using namespace imkcpp; - - constexpr SegmentHeader header{ .sn = 1 }; - SegmentData data{}; - - buffer.emplace_segment(header, data); - buffer.emplace_segment(header, data); - - ASSERT_EQ(buffer.size(), 1); -} \ No newline at end of file