Skip to content

Commit

Permalink
refactor: streamline send_window_update and tmux_stream_write functio…
Browse files Browse the repository at this point in the history
…ns for clarity and efficiency

Signed-off-by: Dengfeng Liu <liudf0716@gmail.com>
  • Loading branch information
liudf0716 committed Nov 13, 2024
1 parent a2c4ea0 commit 08f0455
Showing 1 changed file with 60 additions and 60 deletions.
120 changes: 60 additions & 60 deletions tcpmux.c
Original file line number Diff line number Diff line change
Expand Up @@ -484,15 +484,14 @@ static uint16_t get_send_flags(struct tmux_stream *stream) {
*
* @note Window size cannot exceed MAX_STREAM_WINDOW_SIZE
*/
void send_window_update(struct bufferevent *bout, struct tmux_stream *stream,
uint32_t length) {
uint32_t max = MAX_STREAM_WINDOW_SIZE;
uint32_t delta = (max - length) - stream->recv_window;

void send_window_update(struct bufferevent *bout, struct tmux_stream *stream, uint32_t length) {
uint32_t max_window = MAX_STREAM_WINDOW_SIZE;
uint32_t delta = max_window - length - stream->recv_window;
uint16_t flags = get_send_flags(stream);

if (delta < max / 2 && flags == 0)
if (delta < max_window / 2 && flags == 0) {
return;
}

stream->recv_window += delta;
tcp_mux_send_win_update(bout, flags, stream->id, delta);
Expand Down Expand Up @@ -961,56 +960,55 @@ uint32_t tx_ring_buffer_write(struct bufferevent *bev, struct ring_buffer *ring,
*/
uint32_t tmux_stream_write(struct bufferevent *bev, uint8_t *data,
uint32_t length, struct tmux_stream *stream) {
switch (stream->state) {
case LOCAL_CLOSE:
case CLOSED:
case RESET:
// Check if the stream is in a closed state
if (stream->state == LOCAL_CLOSE || stream->state == CLOSED || stream->state == RESET) {
debug(LOG_INFO, "stream %d state is closed", stream->id);
return 0;
default:
break;
}

struct ring_buffer *tx_ring = &stream->tx_ring;
uint32_t left = WBUF_SIZE - tx_ring->sz;
if (stream->send_window == 0) {
debug(LOG_INFO, "stream %d send_window is zero, length %d left %d",
stream->id, length, left);
uint32_t available_window = stream->send_window;
uint32_t buffered_size = tx_ring->sz;
uint32_t total_data_size = buffered_size + length;

// If send window is zero, buffer the data
if (available_window == 0) {
debug(LOG_INFO, "stream %d send_window is zero, buffering data", stream->id);
tx_ring_buffer_append(tx_ring, data, length);
return 0;
}

uint16_t flags = get_send_flags(stream);
uint32_t max = length;
struct bufferevent *bout = get_main_control()->connect_bev;
if (stream->send_window < tx_ring->sz) {
debug(LOG_INFO, " send_window %u less than tx_ring size %u",
stream->send_window, tx_ring->sz);
max = stream->send_window;
tcp_mux_send_data(bout, flags, stream->id, max);
tx_ring_buffer_write(bev, tx_ring, max);
tx_ring_buffer_append(tx_ring, data, length);
} else if (stream->send_window < tx_ring->sz + length) {
debug(LOG_INFO, " send_window %u less than %u", stream->send_window,
tx_ring->sz + length);
max = stream->send_window;
tcp_mux_send_data(bout, flags, stream->id, max);
if (tx_ring->sz > 0)
tx_ring_buffer_write(bev, tx_ring, tx_ring->sz);
bufferevent_write(bev, data, max - tx_ring->sz);
tx_ring_buffer_append(tx_ring, data + max - tx_ring->sz,
length + tx_ring->sz - max);
} else {
max = tx_ring->sz + length;
tcp_mux_send_data(bout, flags, stream->id, max);
if (tx_ring->sz > 0)
tx_ring_buffer_write(bev, tx_ring, tx_ring->sz);
bufferevent_write(bev, data, length);

// Determine how much data we can send
uint32_t max_send = (available_window < total_data_size) ? available_window : total_data_size;

// Send data header
tcp_mux_send_data(bout, flags, stream->id, max_send);

// Send data from tx_ring buffer if any
if (buffered_size > 0) {
uint32_t send_from_buffer = (max_send < buffered_size) ? max_send : buffered_size;
tx_ring_buffer_write(bev, tx_ring, send_from_buffer);
max_send -= send_from_buffer;
}

// Send new data if there is remaining window
if (max_send > 0) {
bufferevent_write(bev, data, max_send);
}

stream->send_window -= max;
// Buffer any remaining new data
if (total_data_size > available_window) {
uint32_t remaining_data = total_data_size - available_window;
tx_ring_buffer_append(tx_ring, data + (length - remaining_data), remaining_data);
}

// Update send window
stream->send_window -= (total_data_size - tx_ring->sz);

return max;
return (length - tx_ring->sz);
}

/**
Expand All @@ -1029,29 +1027,31 @@ uint32_t tmux_stream_write(struct bufferevent *bev, uint8_t *data,
* - 1 if stream entered LOCAL_CLOSE state but final closure is pending
*/
int tmux_stream_close(struct bufferevent *bout, struct tmux_stream *stream) {
uint8_t flag = 0;
uint8_t should_close = 0;

switch (stream->state) {
case SYN_SEND:
case SYN_RECEIVED:
case ESTABLISHED:
stream->state = LOCAL_CLOSE;
break;
case LOCAL_CLOSE:
case REMOTE_CLOSE:
flag = 1;
stream->state = CLOSED;
break;
case CLOSED:
case RESET:
default:
return 0;
case SYN_SEND:
case SYN_RECEIVED:
case ESTABLISHED:
stream->state = LOCAL_CLOSE;
break;
case LOCAL_CLOSE:
case REMOTE_CLOSE:
should_close = 1;
stream->state = CLOSED;
break;
case CLOSED:
case RESET:
default:
return 0;
}

uint16_t flags = get_send_flags(stream);
flags |= FIN;
uint16_t flags = get_send_flags(stream) | FIN;
tcp_mux_send_win_update(bout, flags, stream->id, 0);
if (!flag)

if (!should_close) {
return 1;
}

debug(LOG_DEBUG, "del proxy client %d", stream->id);
del_proxy_client_by_stream_id(stream->id);
Expand Down

0 comments on commit 08f0455

Please sign in to comment.