Skip to content

Commit

Permalink
Remove all dependencies from receiver, delete receiver_buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
Gallardo994 committed Jan 29, 2024
1 parent 1a13397 commit 4bcf296
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 191 deletions.
20 changes: 9 additions & 11 deletions imkcpp/include/imkcpp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,10 @@ namespace imkcpp {
SegmentTracker segment_tracker{};
RtoCalculator rto_calculator{};
CongestionController<MTU> congestion_controller{};
ReceiverBuffer receiver_buffer{};
SenderBuffer sender_buffer{};
WindowProber window_prober{};
Receiver receiver{};

Receiver receiver{segment_tracker, receiver_buffer};

SenderBuffer sender_buffer{};
AckController ack_controller{sender_buffer, segment_tracker};
Sender<MTU> sender{shared_ctx, congestion_controller, rto_calculator, flusher, sender_buffer, segment_tracker};

Expand All @@ -58,7 +56,7 @@ namespace imkcpp {
header.cmd = commands::IKCP_CMD_ACK;
header.frg = 0;
header.wnd = unused_receive_window;
header.una = this->segment_tracker.get_rcv_nxt();
header.una = this->receiver.get_rcv_nxt();
header.sn = 0;
header.ts = 0;
header.len = 0;
Expand Down Expand Up @@ -109,7 +107,7 @@ namespace imkcpp {

this->congestion_controller.set_receive_window(rcvwnd);
this->ack_controller.reserve(rcvwnd);
this->receiver_buffer.set_queue_limit(rcvwnd);
this->receiver.set_queue_limit(rcvwnd);
}

/// Enables or disables congestion window.
Expand Down Expand Up @@ -167,19 +165,18 @@ namespace imkcpp {

switch (header.cmd) {
case commands::IKCP_CMD_PUSH: {
if (!this->congestion_controller.fits_receive_window(this->segment_tracker.get_rcv_nxt(), header.sn)) {
if (!this->congestion_controller.fits_receive_window(this->receiver.get_rcv_nxt(), header.sn)) {
drop_push();
break;
}

this->ack_controller.schedule_ack(header.sn, header.ts);

if (this->segment_tracker.should_receive(header.sn)) {
if (this->receiver.should_receive(header.sn)) {
SegmentData segment_data;
segment_data.decode_from(data, offset, header.len);

this->receiver_buffer.emplace_segment(header, segment_data);
this->receiver.move_receive_buffer_to_queue();
this->receiver.emplace_segment(header, segment_data);
} else {
drop_push();
}
Expand Down Expand Up @@ -357,7 +354,8 @@ namespace imkcpp {
flush_probes();

// Useful data
this->sender.flush_data_segments(flush_result, callback, current, unused_receive_window);
const u32 rcv_nxt = this->receiver.get_rcv_nxt();
this->sender.flush_data_segments(flush_result, callback, current, unused_receive_window, rcv_nxt);

// Flush remaining
flush_result.total_bytes_sent += this->flusher.flush_if_not_empty(callback);
Expand Down
54 changes: 39 additions & 15 deletions imkcpp/include/receiver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,20 @@
#include "types.hpp"
#include "errors.hpp"
#include "segment.hpp"
#include "receiver_buffer.hpp"
#include "results.hpp"
#include "segment_tracker.hpp"

namespace imkcpp {
// TODO: Benchmark against std::vector instead of std::deque
class Receiver final {
SegmentTracker& segment_tracker;
ReceiverBuffer& receiver_buffer;
std::deque<Segment> rcv_buf{};

std::deque<Segment> rcv_queue{}; // TODO: Does not need to be Segment as we don't use metadata
u32 queue_limit = 0;

public:
u32 rcv_nxt = 0;

explicit Receiver(SegmentTracker& segment_tracker, ReceiverBuffer& receiver_buffer) :
segment_tracker(segment_tracker),
receiver_buffer(receiver_buffer) {}
public:
explicit Receiver() = default;

[[nodiscard]] tl::expected<size_t, error> peek_size() const {
if (this->rcv_queue.empty()) {
Expand Down Expand Up @@ -95,24 +92,51 @@ namespace imkcpp {
return result;
}

void move_receive_buffer_to_queue() {
const u32 queue_limit = this->receiver_buffer.get_queue_limit();
void emplace_segment(const SegmentHeader& header, SegmentData& data) {
u32 sn = header.sn;

const auto rit = std::find_if(this->rcv_buf.rbegin(), this->rcv_buf.rend(), [sn](const Segment& seg) {
return seg.header.sn < sn;
});

const auto it = rit.base();

while (!this->receiver_buffer.empty()) {
Segment& seg = this->receiver_buffer.front();
if (seg.header.sn != this->segment_tracker.get_rcv_nxt() || this->rcv_queue.size() >= queue_limit) {
if (it != this->rcv_buf.end() && it->header.sn == sn) {
return;
}

this->rcv_buf.emplace(it, header, data);
this->move_receive_buffer_to_queue();
}

void move_receive_buffer_to_queue() {
while (!this->rcv_buf.empty()) {
Segment& seg = this->rcv_buf.front();
if (seg.header.sn != this->get_rcv_nxt() || this->rcv_queue.size() >= this->queue_limit) {
break;
}

this->rcv_queue.push_back(std::move(seg));

this->receiver_buffer.pop_front();
this->segment_tracker.increment_rcv_nxt();
this->rcv_buf.pop_front();
this->rcv_nxt++;
}
}

[[nodiscard]] size_t size() const {
return this->rcv_queue.size();
}

[[nodiscard]] u32 get_rcv_nxt() const {
return this->rcv_nxt;
}

[[nodiscard]] bool should_receive(const u32 sn) const {
return sn >= this->rcv_nxt;
}

void set_queue_limit(const u32 value) {
this->queue_limit = value;
}
};
}
54 changes: 0 additions & 54 deletions imkcpp/include/receiver_buffer.hpp

This file was deleted.

24 changes: 0 additions & 24 deletions imkcpp/include/segment_tracker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,14 @@
#include "types.hpp"

namespace imkcpp {
// TODO: Add logical methods like "can_receive" and "is_in_flight", etc
class SegmentTracker final {
/// Sequence number of the first unacknowledged segment.
u32 snd_una = 0;

/// Sequence number of the next segment to be sent.
u32 snd_nxt = 0;

/// Sequence number of the next segment to be received.
u32 rcv_nxt = 0;

public:
// snd_una

[[nodiscard]] u32 get_snd_una() const {
return this->snd_una;
}
Expand All @@ -29,8 +23,6 @@ namespace imkcpp {
this->snd_una = this->snd_nxt;
}

// snd_nxt

[[nodiscard]] u32 get_snd_nxt() const {
return this->snd_nxt;
}
Expand All @@ -39,26 +31,10 @@ namespace imkcpp {
return this->snd_nxt++;
}

// rcv_nxt

[[nodiscard]] u32 get_rcv_nxt() const {
return this->rcv_nxt;
}

void increment_rcv_nxt() {
this->rcv_nxt++;
}

// Other

[[nodiscard]] u32 get_packets_in_flight_count() const {
assert(this->snd_nxt >= this->snd_una);

return this->snd_nxt - this->snd_una;
}

[[nodiscard]] bool should_receive(const u32 sn) const {
return sn >= this->rcv_nxt;
}
};
}
11 changes: 6 additions & 5 deletions imkcpp/include/sender.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "utility.hpp"
#include "results.hpp"
#include "flusher.hpp"
#include "segment_tracker.hpp"

namespace imkcpp {
template <size_t MTU>
Expand Down Expand Up @@ -86,7 +87,7 @@ namespace imkcpp {
}

/// Flushes data segments from the send queue to the sender buffer.
void move_send_queue_to_buffer(const u32 cwnd, const u32 current, const i32 unused_receive_window) {
void move_send_queue_to_buffer(const u32 cwnd, const u32 current, const i32 unused_receive_window, const u32 rcv_nxt) {
while (!this->snd_queue.empty() && this->segment_tracker.get_snd_nxt() < this->segment_tracker.get_snd_una() + cwnd) {
Segment& newseg = this->snd_queue.front();

Expand All @@ -95,7 +96,7 @@ namespace imkcpp {
newseg.header.wnd = unused_receive_window;
newseg.header.ts = current;
newseg.header.sn = this->segment_tracker.get_and_increment_snd_nxt();
newseg.header.una = this->segment_tracker.get_rcv_nxt();
newseg.header.una = rcv_nxt;

newseg.metadata.resendts = current;
newseg.metadata.rto = this->rto_calculator.get_rto();
Expand Down Expand Up @@ -131,9 +132,9 @@ namespace imkcpp {
}

/// Flushes data segments from the send queue to the output callback.
void flush_data_segments(FlushResult& flush_result, const output_callback_t& output, const u32 current, const i32 unused_receive_window) {
void flush_data_segments(FlushResult& flush_result, const output_callback_t& output, const u32 current, const i32 unused_receive_window, const u32 rcv_nxt) {
const u32 cwnd = this->congestion_controller.calculate_congestion_window();
this->move_send_queue_to_buffer(cwnd, current, unused_receive_window);
this->move_send_queue_to_buffer(cwnd, current, unused_receive_window, rcv_nxt);

bool change = false;

Expand Down Expand Up @@ -181,7 +182,7 @@ namespace imkcpp {
const auto send_segment = [&](Segment& segment) -> void {
segment.header.ts = current;
segment.header.wnd = unused_receive_window;
segment.header.una = this->segment_tracker.get_rcv_nxt();
segment.header.una = rcv_nxt;

flush_result.total_bytes_sent += this->flusher.flush_if_does_not_fit(output, segment.data_size());
this->flusher.emplace(segment.header, segment.data);
Expand Down
1 change: 0 additions & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ add_executable(imkcpp_tests
Send_Tests.cpp
RtoCalculator_Tests.cpp
Flusher_Tests.cpp
ReceiverBuffer_Tests.cpp
SenderBuffer_Tests.cpp
CongestionController_Tests.cpp
)
Expand Down
Loading

0 comments on commit 4bcf296

Please sign in to comment.